1use std::{fs, num::NonZeroUsize, path::PathBuf, sync::Arc, time::Duration};
6
7use anyhow::Result;
8use bytes::Bytes;
9use futures::future::try_join_all;
10use iota_config::{
11 node::AuthorityStorePruningConfig,
12 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
13};
14use iota_storage::{
15 mutex_table::RwLockTable,
16 object_store::util::{
17 copy_recursively, find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs,
18 path_to_filesystem, put, run_manifest_update_loop, write_snapshot_manifest,
19 },
20};
21use object_store::{DynObjectStore, path::Path};
22use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
23use tracing::{debug, error, info};
24use typed_store::rocks::MetricConf;
25
26use crate::{
27 authority::{
28 authority_store_pruner::{
29 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
30 },
31 authority_store_tables::AuthorityPerpetualTables,
32 },
33 checkpoints::CheckpointStore,
34 rest_index::RestIndexStore,
35};
36
37pub const SUCCESS_MARKER: &str = "_SUCCESS";
38pub const TEST_MARKER: &str = "_TEST";
39pub const UPLOAD_COMPLETED_MARKER: &str = "_UPLOAD_COMPLETED";
40pub const STATE_SNAPSHOT_COMPLETED_MARKER: &str = "_STATE_SNAPSHOT_COMPLETED";
41
42pub struct DBCheckpointMetrics {
43 pub first_missing_db_checkpoint_epoch: IntGauge,
44 pub num_local_db_checkpoints: IntGauge,
45}
46
47impl DBCheckpointMetrics {
48 pub fn new(registry: &Registry) -> Arc<Self> {
49 let this = Self {
50 first_missing_db_checkpoint_epoch: register_int_gauge_with_registry!(
51 "first_missing_db_checkpoint_epoch",
52 "First epoch for which we have no db checkpoint in remote store",
53 registry
54 )
55 .unwrap(),
56 num_local_db_checkpoints: register_int_gauge_with_registry!(
57 "num_local_db_checkpoints",
58 "Number of RocksDB checkpoints currently residing on local disk (i.e. not yet garbage collected)",
59 registry
60 )
61 .unwrap(),
62 };
63 Arc::new(this)
64 }
65}
66
67pub struct DBCheckpointHandler {
68 input_object_store: Arc<DynObjectStore>,
70 input_root_path: PathBuf,
72 output_object_store: Option<Arc<DynObjectStore>>,
74 interval: Duration,
76 gc_markers: Vec<String>,
79 prune_and_compact_before_upload: bool,
82 indirect_objects_threshold: usize,
84 state_snapshot_enabled: bool,
86 pruning_config: AuthorityStorePruningConfig,
88 metrics: Arc<DBCheckpointMetrics>,
89}
90
91impl DBCheckpointHandler {
92 pub fn new(
93 input_path: &std::path::Path,
94 output_object_store_config: Option<&ObjectStoreConfig>,
95 interval_s: u64,
96 prune_and_compact_before_upload: bool,
97 indirect_objects_threshold: usize,
98 pruning_config: AuthorityStorePruningConfig,
99 registry: &Registry,
100 state_snapshot_enabled: bool,
101 ) -> Result<Arc<Self>> {
102 let input_store_config = ObjectStoreConfig {
103 object_store: Some(ObjectStoreType::File),
104 directory: Some(input_path.to_path_buf()),
105 ..Default::default()
106 };
107 let mut gc_markers = vec![UPLOAD_COMPLETED_MARKER.to_string()];
108 if state_snapshot_enabled {
109 gc_markers.push(STATE_SNAPSHOT_COMPLETED_MARKER.to_string());
110 }
111 Ok(Arc::new(DBCheckpointHandler {
112 input_object_store: input_store_config.make()?,
113 input_root_path: input_path.to_path_buf(),
114 output_object_store: output_object_store_config
115 .map(|config| config.make().expect("Failed to make object store")),
116 interval: Duration::from_secs(interval_s),
117 gc_markers,
118 prune_and_compact_before_upload,
119 indirect_objects_threshold,
120 state_snapshot_enabled,
121 pruning_config,
122 metrics: DBCheckpointMetrics::new(registry),
123 }))
124 }
125 pub fn new_for_test(
126 input_object_store_config: &ObjectStoreConfig,
127 output_object_store_config: Option<&ObjectStoreConfig>,
128 interval_s: u64,
129 prune_and_compact_before_upload: bool,
130 state_snapshot_enabled: bool,
131 ) -> Result<Arc<Self>> {
132 Ok(Arc::new(DBCheckpointHandler {
133 input_object_store: input_object_store_config.make()?,
134 input_root_path: input_object_store_config
135 .directory
136 .as_ref()
137 .unwrap()
138 .clone(),
139 output_object_store: output_object_store_config
140 .map(|config| config.make().expect("Failed to make object store")),
141 interval: Duration::from_secs(interval_s),
142 gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
143 prune_and_compact_before_upload,
144 indirect_objects_threshold: 0,
145 state_snapshot_enabled,
146 pruning_config: AuthorityStorePruningConfig::default(),
147 metrics: DBCheckpointMetrics::new(&Registry::default()),
148 }))
149 }
150
151 pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
157 let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
158 if self.output_object_store.is_some() {
159 tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
160 self.clone(),
161 kill_sender.subscribe(),
162 ));
163 tokio::task::spawn(run_manifest_update_loop(
164 self.output_object_store.as_ref().unwrap().clone(),
165 kill_sender.subscribe(),
166 ));
167 } else {
168 tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
172 self.clone(),
173 kill_sender.subscribe(),
174 ));
175 }
176 tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
179 self,
180 kill_sender.subscribe(),
181 ));
182 kill_sender
183 }
184
185 async fn run_db_checkpoint_upload_loop(
188 self: Arc<Self>,
189 mut recv: tokio::sync::broadcast::Receiver<()>,
190 ) -> Result<()> {
191 let mut interval = tokio::time::interval(self.interval);
192 info!("DB checkpoint upload loop started");
193 loop {
194 tokio::select! {
195 _now = interval.tick() => {
196 let local_checkpoints_by_epoch =
197 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
198 self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
199 match find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
200 Ok(epochs) => {
201 self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
202 if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
203 error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
204 }
205 }
206 Err(err) => {
207 error!("Failed to find missing db checkpoints in remote store: {:?}", err);
208 }
209 }
210 },
211 _ = recv.recv() => break,
212 }
213 }
214 Ok(())
215 }
216
217 async fn run_db_checkpoint_cleanup_loop(
220 self: Arc<Self>,
221 mut recv: tokio::sync::broadcast::Receiver<()>,
222 ) -> Result<()> {
223 let mut interval = tokio::time::interval(self.interval);
224 info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
225 loop {
226 tokio::select! {
227 _now = interval.tick() => {
228 let local_checkpoints_by_epoch =
229 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
230 self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
231 let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
232 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
233 for (_, db_path) in dirs {
234 let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
236 let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
237 if upload_completed_path.exists() {
238 continue;
239 }
240 let bytes = Bytes::from_static(b"success");
241 let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
242 put(&self.input_object_store,
243 &upload_completed_marker,
244 bytes.clone(),
245 )
246 .await?;
247 }
248 },
249 _ = recv.recv() => break,
250 }
251 }
252 Ok(())
253 }
254
255 async fn run_db_checkpoint_gc_loop(
258 self: Arc<Self>,
259 mut recv: tokio::sync::broadcast::Receiver<()>,
260 ) -> Result<()> {
261 let mut gc_interval = tokio::time::interval(Duration::from_secs(30));
262 info!("DB checkpoint garbage collection loop started");
263 loop {
264 tokio::select! {
265 _now = gc_interval.tick() => {
266 if let Ok(deleted) = self.garbage_collect_old_db_checkpoints().await {
267 if !deleted.is_empty() {
268 info!("Garbage collected local db checkpoints: {:?}", deleted);
269 }
270 }
271 },
272 _ = recv.recv() => break,
273 }
274 }
275 Ok(())
276 }
277
278 async fn prune_and_compact(
279 &self,
280 db_path: PathBuf,
281 epoch: u64,
282 epoch_duration_ms: u64,
283 ) -> Result<()> {
284 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&db_path.join("store"), None));
285 let checkpoint_store = Arc::new(CheckpointStore::open_tables_read_write(
286 db_path.join("checkpoints"),
287 MetricConf::new("db_checkpoint"),
288 None,
289 None,
290 ));
291 let rest_index = RestIndexStore::new_without_init(db_path.join("rest_index"));
292 let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
293 let lock_table = Arc::new(RwLockTable::new(1));
294 info!(
295 "Pruning db checkpoint in {:?} for epoch: {epoch}",
296 db_path.display()
297 );
298 AuthorityStorePruner::prune_objects_for_eligible_epochs(
299 &perpetual_db,
300 &checkpoint_store,
301 Some(&rest_index),
302 &lock_table,
303 self.pruning_config.clone(),
304 metrics,
305 self.indirect_objects_threshold,
306 epoch_duration_ms,
307 )
308 .await?;
309 info!(
310 "Compacting db checkpoint in {:?} for epoch: {epoch}",
311 db_path.display()
312 );
313 AuthorityStorePruner::compact(&perpetual_db)?;
314 Ok(())
315 }
316
317 async fn upload_db_checkpoints_to_object_store(
321 &self,
322 missing_epochs: Vec<u64>,
323 ) -> Result<(), anyhow::Error> {
324 let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
325 let local_checkpoints_by_epoch =
326 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
327 let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
328 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
329 let object_store = self
330 .output_object_store
331 .as_ref()
332 .expect("Expected object store to exist")
333 .clone();
334 for (epoch, db_path) in dirs {
335 let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
338 if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
339 if self.state_snapshot_enabled {
340 let snapshot_completed_marker =
341 local_db_path.join(STATE_SNAPSHOT_COMPLETED_MARKER);
342 if !snapshot_completed_marker.exists() {
343 info!(
344 "DB checkpoint upload for epoch {} to wait until state snasphot uploaded",
345 *epoch
346 );
347 continue;
348 }
349 }
350
351 if self.prune_and_compact_before_upload {
352 self.prune_and_compact(local_db_path, *epoch, EPOCH_DURATION_MS_FOR_TESTING)
354 .await?;
355 }
356
357 info!("Copying db checkpoint for epoch: {epoch} to remote storage");
358 copy_recursively(
359 db_path,
360 &self.input_object_store,
361 &object_store,
362 NonZeroUsize::new(20).unwrap(),
363 )
364 .await?;
365
366 write_snapshot_manifest(db_path, &object_store, format!("epoch_{}/", epoch))
369 .await?;
370 let bytes = Bytes::from_static(b"success");
372 let success_marker = db_path.child(SUCCESS_MARKER);
373 put(&object_store, &success_marker, bytes.clone()).await?;
374 }
375 let bytes = Bytes::from_static(b"success");
376 let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
377 put(
378 &self.input_object_store,
379 &upload_completed_marker,
380 bytes.clone(),
381 )
382 .await?;
383 }
384 Ok(())
385 }
386
387 async fn garbage_collect_old_db_checkpoints(&self) -> Result<Vec<u64>> {
390 let local_checkpoints_by_epoch =
391 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
392 let mut deleted = Vec::new();
393 for (epoch, path) in local_checkpoints_by_epoch.iter() {
394 let marker_paths: Vec<Path> = self
395 .gc_markers
396 .iter()
397 .map(|marker| path.child(marker.clone()))
398 .collect();
399 let all_markers_present = try_join_all(
401 marker_paths
402 .iter()
403 .map(|path| self.input_object_store.get(path)),
404 )
405 .await;
406 match all_markers_present {
407 Ok(_) => {
410 info!("Deleting db checkpoint dir: {path} for epoch: {epoch}");
411 deleted.push(*epoch);
412 let local_fs_path = path_to_filesystem(self.input_root_path.clone(), path)?;
413 fs::remove_dir_all(&local_fs_path)?;
414 }
415 Err(_) => {
416 debug!("Not ready for deletion yet: {path}");
417 }
418 }
419 }
420 Ok(deleted)
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use std::fs;
427
428 use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
429 use iota_storage::object_store::util::{
430 find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, path_to_filesystem,
431 };
432 use itertools::Itertools;
433 use tempfile::TempDir;
434
435 use crate::db_checkpoint_handler::{
436 DBCheckpointHandler, SUCCESS_MARKER, TEST_MARKER, UPLOAD_COMPLETED_MARKER,
437 };
438
439 #[tokio::test]
440 async fn test_basic() -> anyhow::Result<()> {
441 let checkpoint_dir = TempDir::new()?;
442 let checkpoint_dir_path = checkpoint_dir.path();
443 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
444 fs::create_dir(&local_epoch0_checkpoint)?;
445 let file1 = local_epoch0_checkpoint.join("file1");
446 fs::write(file1, b"Lorem ipsum")?;
447 let file2 = local_epoch0_checkpoint.join("file2");
448 fs::write(file2, b"Lorem ipsum")?;
449 let nested_dir = local_epoch0_checkpoint.join("data");
450 fs::create_dir(&nested_dir)?;
451 let file3 = nested_dir.join("file3");
452 fs::write(file3, b"Lorem ipsum")?;
453
454 let remote_checkpoint_dir = TempDir::new()?;
455 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
456 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
457
458 let input_store_config = ObjectStoreConfig {
459 object_store: Some(ObjectStoreType::File),
460 directory: Some(checkpoint_dir_path.to_path_buf()),
461 ..Default::default()
462 };
463 let output_store_config = ObjectStoreConfig {
464 object_store: Some(ObjectStoreType::File),
465 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
466 ..Default::default()
467 };
468 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
469 &input_store_config,
470 Some(&output_store_config),
471 10,
472 false,
473 false,
474 )?;
475 let local_checkpoints_by_epoch =
476 find_all_dirs_with_epoch_prefix(&db_checkpoint_handler.input_object_store, None)
477 .await?;
478 assert!(!local_checkpoints_by_epoch.is_empty());
479 assert_eq!(*local_checkpoints_by_epoch.first_key_value().unwrap().0, 0);
480 assert_eq!(
481 path_to_filesystem(
482 db_checkpoint_handler.input_root_path.clone(),
483 local_checkpoints_by_epoch.first_key_value().unwrap().1
484 )
485 .unwrap(),
486 std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
487 );
488 let missing_epochs = find_missing_epochs_dirs(
489 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
490 SUCCESS_MARKER,
491 )
492 .await?;
493 db_checkpoint_handler
494 .upload_db_checkpoints_to_object_store(missing_epochs)
495 .await?;
496
497 assert!(remote_epoch0_checkpoint.join("file1").exists());
498 assert!(remote_epoch0_checkpoint.join("file2").exists());
499 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
500 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
501 assert!(
502 local_epoch0_checkpoint
503 .join(UPLOAD_COMPLETED_MARKER)
504 .exists()
505 );
506
507 let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
509 fs::write(test_marker, b"Lorem ipsum")?;
510 db_checkpoint_handler
511 .garbage_collect_old_db_checkpoints()
512 .await?;
513
514 assert!(!local_epoch0_checkpoint.join("file1").exists());
515 assert!(!local_epoch0_checkpoint.join("file1").exists());
516 assert!(!local_epoch0_checkpoint.join("file2").exists());
517 assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
518 Ok(())
519 }
520
521 #[tokio::test]
522 async fn test_upload_resumes() -> anyhow::Result<()> {
523 let checkpoint_dir = TempDir::new()?;
524 let checkpoint_dir_path = checkpoint_dir.path();
525 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
526
527 let remote_checkpoint_dir = TempDir::new()?;
528 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
529 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
530
531 let input_store_config = ObjectStoreConfig {
532 object_store: Some(ObjectStoreType::File),
533 directory: Some(checkpoint_dir_path.to_path_buf()),
534 ..Default::default()
535 };
536 let output_store_config = ObjectStoreConfig {
537 object_store: Some(ObjectStoreType::File),
538 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
539 ..Default::default()
540 };
541 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
542 &input_store_config,
543 Some(&output_store_config),
544 10,
545 false,
546 false,
547 )?;
548
549 fs::create_dir(&local_epoch0_checkpoint)?;
550 let file1 = local_epoch0_checkpoint.join("file1");
551 fs::write(file1, b"Lorem ipsum")?;
552 let file2 = local_epoch0_checkpoint.join("file2");
553 fs::write(file2, b"Lorem ipsum")?;
554 let nested_dir = local_epoch0_checkpoint.join("data");
555 fs::create_dir(&nested_dir)?;
556 let file3 = nested_dir.join("file3");
557 fs::write(file3, b"Lorem ipsum")?;
558
559 let missing_epochs = find_missing_epochs_dirs(
560 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
561 SUCCESS_MARKER,
562 )
563 .await?;
564 db_checkpoint_handler
565 .upload_db_checkpoints_to_object_store(missing_epochs)
566 .await?;
567 assert!(remote_epoch0_checkpoint.join("file1").exists());
568 assert!(remote_epoch0_checkpoint.join("file2").exists());
569 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
570 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
571 assert!(
572 local_epoch0_checkpoint
573 .join(UPLOAD_COMPLETED_MARKER)
574 .exists()
575 );
576
577 let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
579 fs::create_dir(&local_epoch1_checkpoint)?;
580 let file1 = local_epoch1_checkpoint.join("file1");
581 fs::write(file1, b"Lorem ipsum")?;
582 let file2 = local_epoch1_checkpoint.join("file2");
583 fs::write(file2, b"Lorem ipsum")?;
584 let nested_dir = local_epoch1_checkpoint.join("data");
585 fs::create_dir(&nested_dir)?;
586 let file3 = nested_dir.join("file3");
587 fs::write(file3, b"Lorem ipsum")?;
588
589 fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
592
593 let missing_epochs = find_missing_epochs_dirs(
596 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
597 SUCCESS_MARKER,
598 )
599 .await?;
600 db_checkpoint_handler
601 .upload_db_checkpoints_to_object_store(missing_epochs)
602 .await?;
603 assert!(remote_epoch0_checkpoint.join("file1").exists());
604 assert!(remote_epoch0_checkpoint.join("file2").exists());
605 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
606 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
607 assert!(
608 local_epoch0_checkpoint
609 .join(UPLOAD_COMPLETED_MARKER)
610 .exists()
611 );
612
613 let remote_epoch1_checkpoint = remote_checkpoint_dir_path.join("epoch_1");
614 assert!(remote_epoch1_checkpoint.join("file1").exists());
615 assert!(remote_epoch1_checkpoint.join("file2").exists());
616 assert!(remote_epoch1_checkpoint.join("data").join("file3").exists());
617 assert!(remote_epoch1_checkpoint.join(SUCCESS_MARKER).exists());
618 assert!(
619 local_epoch1_checkpoint
620 .join(UPLOAD_COMPLETED_MARKER)
621 .exists()
622 );
623
624 let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
626 fs::write(test_marker, b"Lorem ipsum")?;
627 let test_marker = local_epoch1_checkpoint.join(TEST_MARKER);
628 fs::write(test_marker, b"Lorem ipsum")?;
629
630 db_checkpoint_handler
631 .garbage_collect_old_db_checkpoints()
632 .await?;
633 assert!(!local_epoch0_checkpoint.join("file1").exists());
634 assert!(!local_epoch0_checkpoint.join("file1").exists());
635 assert!(!local_epoch0_checkpoint.join("file2").exists());
636 assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
637 assert!(!local_epoch1_checkpoint.join("file1").exists());
638 assert!(!local_epoch1_checkpoint.join("file1").exists());
639 assert!(!local_epoch1_checkpoint.join("file2").exists());
640 assert!(!local_epoch1_checkpoint.join("data").join("file3").exists());
641 Ok(())
642 }
643
644 #[tokio::test]
645 async fn test_missing_epochs() -> anyhow::Result<()> {
646 let checkpoint_dir = TempDir::new()?;
647 let checkpoint_dir_path = checkpoint_dir.path();
648 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
649 fs::create_dir(&local_epoch0_checkpoint)?;
650 let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
651 fs::create_dir(&local_epoch1_checkpoint)?;
652 let local_epoch3_checkpoint = checkpoint_dir_path.join("epoch_3");
654 fs::create_dir(&local_epoch3_checkpoint)?;
655 let remote_checkpoint_dir = TempDir::new()?;
656 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
657
658 let input_store_config = ObjectStoreConfig {
659 object_store: Some(ObjectStoreType::File),
660 directory: Some(checkpoint_dir_path.to_path_buf()),
661 ..Default::default()
662 };
663
664 let output_store_config = ObjectStoreConfig {
665 object_store: Some(ObjectStoreType::File),
666 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
667 ..Default::default()
668 };
669 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
670 &input_store_config,
671 Some(&output_store_config),
672 10,
673 false,
674 false,
675 )?;
676
677 let missing_epochs = find_missing_epochs_dirs(
678 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
679 SUCCESS_MARKER,
680 )
681 .await?;
682 db_checkpoint_handler
683 .upload_db_checkpoints_to_object_store(missing_epochs)
684 .await?;
685
686 let first_missing_epoch = find_missing_epochs_dirs(
687 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
688 SUCCESS_MARKER,
689 )
690 .await?
691 .first()
692 .cloned()
693 .unwrap();
694 assert_eq!(first_missing_epoch, 2);
695
696 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
697 fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
698
699 let first_missing_epoch = find_missing_epochs_dirs(
700 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
701 SUCCESS_MARKER,
702 )
703 .await?
704 .first()
705 .cloned()
706 .unwrap();
707 assert_eq!(first_missing_epoch, 0);
708
709 Ok(())
710 }
711
712 #[tokio::test]
713 async fn test_range_missing_epochs() -> anyhow::Result<()> {
714 let checkpoint_dir = TempDir::new()?;
715 let checkpoint_dir_path = checkpoint_dir.path();
716 let local_epoch100_checkpoint = checkpoint_dir_path.join("epoch_100");
717 fs::create_dir(&local_epoch100_checkpoint)?;
718 let local_epoch200_checkpoint = checkpoint_dir_path.join("epoch_200");
719 fs::create_dir(&local_epoch200_checkpoint)?;
720 let remote_checkpoint_dir = TempDir::new()?;
721 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
722
723 let input_store_config = ObjectStoreConfig {
724 object_store: Some(ObjectStoreType::File),
725 directory: Some(checkpoint_dir_path.to_path_buf()),
726 ..Default::default()
727 };
728
729 let output_store_config = ObjectStoreConfig {
730 object_store: Some(ObjectStoreType::File),
731 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
732 ..Default::default()
733 };
734 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
735 &input_store_config,
736 Some(&output_store_config),
737 10,
738 false,
739 false,
740 )?;
741
742 let missing_epochs = find_missing_epochs_dirs(
743 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
744 SUCCESS_MARKER,
745 )
746 .await?;
747 assert_eq!(missing_epochs, vec![0]);
748 db_checkpoint_handler
749 .upload_db_checkpoints_to_object_store(missing_epochs)
750 .await?;
751
752 let missing_epochs = find_missing_epochs_dirs(
753 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
754 SUCCESS_MARKER,
755 )
756 .await?;
757 let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
758 expected_missing_epochs.extend((101..200).collect_vec().iter());
759 expected_missing_epochs.push(201);
760 assert_eq!(missing_epochs, expected_missing_epochs);
761 Ok(())
762 }
763}