1use std::{fs, num::NonZeroUsize, path::PathBuf, sync::Arc, time::Duration};
6
7use anyhow::Result;
8use bytes::Bytes;
9use futures::future::try_join_all;
10use iota_config::{
11 node::AuthorityStorePruningConfig,
12 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
13};
14use iota_storage::object_store::util::{
15 copy_recursively, find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs,
16 path_to_filesystem, put, run_manifest_update_loop, write_snapshot_manifest,
17};
18use object_store::{DynObjectStore, ObjectStoreExt, path::Path};
19use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
20use tracing::{debug, error, info};
21
22use crate::{
23 authority::{
24 authority_store_pruner::{
25 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
26 },
27 authority_store_tables::AuthorityPerpetualTables,
28 },
29 checkpoint_progress_tracker::CheckpointProgressTracker,
30 checkpoints::CheckpointStore,
31 grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
32};
33
34pub const SUCCESS_MARKER: &str = "_SUCCESS";
35pub const TEST_MARKER: &str = "_TEST";
36pub const UPLOAD_COMPLETED_MARKER: &str = "_UPLOAD_COMPLETED";
37pub const STATE_SNAPSHOT_COMPLETED_MARKER: &str = "_STATE_SNAPSHOT_COMPLETED";
38
39pub struct DBCheckpointMetrics {
40 pub first_missing_db_checkpoint_epoch: IntGauge,
41 pub num_local_db_checkpoints: IntGauge,
42}
43
44impl DBCheckpointMetrics {
45 pub fn new(registry: &Registry) -> Arc<Self> {
46 let this = Self {
47 first_missing_db_checkpoint_epoch: register_int_gauge_with_registry!(
48 "first_missing_db_checkpoint_epoch",
49 "First epoch for which we have no db checkpoint in remote store",
50 registry
51 )
52 .unwrap(),
53 num_local_db_checkpoints: register_int_gauge_with_registry!(
54 "num_local_db_checkpoints",
55 "Number of RocksDB checkpoints currently residing on local disk (i.e. not yet garbage collected)",
56 registry
57 )
58 .unwrap(),
59 };
60 Arc::new(this)
61 }
62}
63
64pub struct DBCheckpointHandler {
65 input_object_store: Arc<DynObjectStore>,
67 input_root_path: PathBuf,
69 output_object_store: Option<Arc<DynObjectStore>>,
71 interval: Duration,
73 gc_markers: Vec<String>,
76 prune_and_compact_before_upload: bool,
79 state_snapshot_enabled: bool,
81 pruning_config: AuthorityStorePruningConfig,
83 metrics: Arc<DBCheckpointMetrics>,
84 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
85}
86
87impl DBCheckpointHandler {
88 pub fn new(
89 input_path: &std::path::Path,
90 output_object_store_config: Option<&ObjectStoreConfig>,
91 interval_s: u64,
92 prune_and_compact_before_upload: bool,
93 pruning_config: AuthorityStorePruningConfig,
94 registry: &Registry,
95 state_snapshot_enabled: bool,
96 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
97 ) -> Result<Arc<Self>> {
98 let input_store_config = ObjectStoreConfig {
99 object_store: Some(ObjectStoreType::File),
100 directory: Some(input_path.to_path_buf()),
101 ..Default::default()
102 };
103 let mut gc_markers = vec![UPLOAD_COMPLETED_MARKER.to_string()];
104 if state_snapshot_enabled {
105 gc_markers.push(STATE_SNAPSHOT_COMPLETED_MARKER.to_string());
106 }
107 Ok(Arc::new(DBCheckpointHandler {
108 input_object_store: input_store_config.make()?,
109 input_root_path: input_path.to_path_buf(),
110 output_object_store: output_object_store_config
111 .map(|config| config.make().expect("Failed to make object store")),
112 interval: Duration::from_secs(interval_s),
113 gc_markers,
114 prune_and_compact_before_upload,
115 state_snapshot_enabled,
116 pruning_config,
117 metrics: DBCheckpointMetrics::new(registry),
118 checkpoint_progress_tracker,
119 }))
120 }
121 pub fn new_for_test(
122 input_object_store_config: &ObjectStoreConfig,
123 output_object_store_config: Option<&ObjectStoreConfig>,
124 interval_s: u64,
125 prune_and_compact_before_upload: bool,
126 state_snapshot_enabled: bool,
127 ) -> Result<Arc<Self>> {
128 Ok(Arc::new(DBCheckpointHandler {
129 input_object_store: input_object_store_config.make()?,
130 input_root_path: input_object_store_config
131 .directory
132 .as_ref()
133 .unwrap()
134 .clone(),
135 output_object_store: output_object_store_config
136 .map(|config| config.make().expect("Failed to make object store")),
137 interval: Duration::from_secs(interval_s),
138 gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
139 prune_and_compact_before_upload,
140 state_snapshot_enabled,
141 pruning_config: AuthorityStorePruningConfig::default(),
142 metrics: DBCheckpointMetrics::new(&Registry::default()),
143 checkpoint_progress_tracker: None,
144 }))
145 }
146
147 pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
153 let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
154 if let Some(output_object_store) = &self.output_object_store {
155 tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
156 self.clone(),
157 kill_sender.subscribe(),
158 ));
159 tokio::task::spawn(run_manifest_update_loop(
160 output_object_store.clone(),
161 kill_sender.subscribe(),
162 ));
163 } else {
164 tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
168 self.clone(),
169 kill_sender.subscribe(),
170 ));
171 }
172 tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
175 self,
176 kill_sender.subscribe(),
177 ));
178 kill_sender
179 }
180
181 async fn run_db_checkpoint_upload_loop(
184 self: Arc<Self>,
185 mut recv: tokio::sync::broadcast::Receiver<()>,
186 ) -> Result<()> {
187 let mut interval = tokio::time::interval(self.interval);
188 info!("DB checkpoint upload loop started");
189 loop {
190 tokio::select! {
191 _now = interval.tick() => {
192 let local_checkpoints_by_epoch =
193 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
194 self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
195 match find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
196 Ok(epochs) => {
197 self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
198 if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
199 error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
200 }
201 }
202 Err(err) => {
203 error!("Failed to find missing db checkpoints in remote store: {:?}", err);
204 }
205 }
206 },
207 _ = recv.recv() => break,
208 }
209 }
210 Ok(())
211 }
212
213 async fn run_db_checkpoint_cleanup_loop(
216 self: Arc<Self>,
217 mut recv: tokio::sync::broadcast::Receiver<()>,
218 ) -> Result<()> {
219 let mut interval = tokio::time::interval(self.interval);
220 info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
221 loop {
222 tokio::select! {
223 _now = interval.tick() => {
224 let local_checkpoints_by_epoch =
225 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
226 self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
227 let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
228 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
229 for (_, db_path) in dirs {
230 let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
232 let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
233 if upload_completed_path.exists() {
234 continue;
235 }
236 let bytes = Bytes::from_static(b"success");
237 let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
238 put(&self.input_object_store,
239 &upload_completed_marker,
240 bytes.clone(),
241 )
242 .await?;
243 }
244 },
245 _ = recv.recv() => break,
246 }
247 }
248 Ok(())
249 }
250
251 async fn run_db_checkpoint_gc_loop(
254 self: Arc<Self>,
255 mut recv: tokio::sync::broadcast::Receiver<()>,
256 ) -> Result<()> {
257 let mut gc_interval = tokio::time::interval(Duration::from_secs(30));
258 info!("DB checkpoint garbage collection loop started");
259 loop {
260 tokio::select! {
261 _now = gc_interval.tick() => {
262 if let Ok(deleted) = self.garbage_collect_old_db_checkpoints().await {
263 if !deleted.is_empty() {
264 info!("Garbage collected local db checkpoints: {:?}", deleted);
265 }
266 }
267 },
268 _ = recv.recv() => break,
269 }
270 }
271 Ok(())
272 }
273
274 async fn prune_and_compact(
275 &self,
276 db_path: PathBuf,
277 epoch: u64,
278 epoch_duration_ms: u64,
279 ) -> Result<()> {
280 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&db_path.join("store"), None));
281 let checkpoint_store = Arc::new(CheckpointStore::new_for_db_checkpoint_handler(
282 &db_path.join("checkpoints"),
283 ));
284 let grpc_indexes_store = GrpcIndexesStore::new_without_init(db_path.join(GRPC_INDEXES_DIR));
285 let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
286 info!(
287 "Pruning db checkpoint in {:?} for epoch: {epoch}",
288 db_path.display()
289 );
290 AuthorityStorePruner::prune_objects_for_eligible_epochs(
291 &perpetual_db,
292 &checkpoint_store,
293 Some(&grpc_indexes_store),
294 None,
295 self.pruning_config.clone(),
296 metrics,
297 epoch_duration_ms,
298 self.checkpoint_progress_tracker.as_ref(),
299 )
300 .await?;
301 info!(
302 "Compacting db checkpoint in {:?} for epoch: {epoch}",
303 db_path.display()
304 );
305 AuthorityStorePruner::compact(&perpetual_db)?;
306 Ok(())
307 }
308
309 async fn upload_db_checkpoints_to_object_store(
313 &self,
314 missing_epochs: Vec<u64>,
315 ) -> Result<(), anyhow::Error> {
316 let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
317 let local_checkpoints_by_epoch =
318 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
319 let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
320 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
321 let object_store = self
322 .output_object_store
323 .as_ref()
324 .expect("Expected object store to exist")
325 .clone();
326 for (epoch, db_path) in dirs {
327 let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
330 if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
331 if self.state_snapshot_enabled {
332 let snapshot_completed_marker =
333 local_db_path.join(STATE_SNAPSHOT_COMPLETED_MARKER);
334 if !snapshot_completed_marker.exists() {
335 info!(
336 "DB checkpoint upload for epoch {} to wait until state snasphot uploaded",
337 *epoch
338 );
339 continue;
340 }
341 }
342
343 if self.prune_and_compact_before_upload {
344 self.prune_and_compact(local_db_path, *epoch, EPOCH_DURATION_MS_FOR_TESTING)
346 .await?;
347 }
348
349 info!("Copying db checkpoint for epoch: {epoch} to remote storage");
350 copy_recursively(
351 db_path,
352 &self.input_object_store,
353 &object_store,
354 NonZeroUsize::new(20).unwrap(),
355 )
356 .await?;
357
358 write_snapshot_manifest(db_path, &object_store, format!("epoch_{epoch}/")).await?;
361 let bytes = Bytes::from_static(b"success");
363 let success_marker = db_path.child(SUCCESS_MARKER);
364 put(&object_store, &success_marker, bytes.clone()).await?;
365 }
366 let bytes = Bytes::from_static(b"success");
367 let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
368 put(
369 &self.input_object_store,
370 &upload_completed_marker,
371 bytes.clone(),
372 )
373 .await?;
374 }
375 Ok(())
376 }
377
378 async fn garbage_collect_old_db_checkpoints(&self) -> Result<Vec<u64>> {
381 let local_checkpoints_by_epoch =
382 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
383 let mut deleted = Vec::new();
384 for (epoch, path) in local_checkpoints_by_epoch.iter() {
385 let marker_paths: Vec<Path> = self
386 .gc_markers
387 .iter()
388 .map(|marker| path.child(marker.clone()))
389 .collect();
390 let all_markers_present = try_join_all(
392 marker_paths
393 .iter()
394 .map(|path| self.input_object_store.get(path)),
395 )
396 .await;
397 match all_markers_present {
398 Ok(_) => {
401 info!("Deleting db checkpoint dir: {path} for epoch: {epoch}");
402 deleted.push(*epoch);
403 let local_fs_path = path_to_filesystem(self.input_root_path.clone(), path)?;
404 fs::remove_dir_all(&local_fs_path)?;
405 }
406 Err(_) => {
407 debug!("Not ready for deletion yet: {path}");
408 }
409 }
410 }
411 Ok(deleted)
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use std::fs;
418
419 use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
420 use iota_storage::object_store::util::{
421 find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, path_to_filesystem,
422 };
423 use itertools::Itertools;
424 use tempfile::TempDir;
425
426 use crate::db_checkpoint_handler::{
427 DBCheckpointHandler, SUCCESS_MARKER, TEST_MARKER, UPLOAD_COMPLETED_MARKER,
428 };
429
430 #[tokio::test]
431 async fn test_basic() -> anyhow::Result<()> {
432 let checkpoint_dir = TempDir::new()?;
433 let checkpoint_dir_path = checkpoint_dir.path();
434 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
435 fs::create_dir(&local_epoch0_checkpoint)?;
436 let file1 = local_epoch0_checkpoint.join("file1");
437 fs::write(file1, b"Lorem ipsum")?;
438 let file2 = local_epoch0_checkpoint.join("file2");
439 fs::write(file2, b"Lorem ipsum")?;
440 let nested_dir = local_epoch0_checkpoint.join("data");
441 fs::create_dir(&nested_dir)?;
442 let file3 = nested_dir.join("file3");
443 fs::write(file3, b"Lorem ipsum")?;
444
445 let remote_checkpoint_dir = TempDir::new()?;
446 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
447 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
448
449 let input_store_config = ObjectStoreConfig {
450 object_store: Some(ObjectStoreType::File),
451 directory: Some(checkpoint_dir_path.to_path_buf()),
452 ..Default::default()
453 };
454 let output_store_config = ObjectStoreConfig {
455 object_store: Some(ObjectStoreType::File),
456 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
457 ..Default::default()
458 };
459 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
460 &input_store_config,
461 Some(&output_store_config),
462 10,
463 false,
464 false,
465 )?;
466 let local_checkpoints_by_epoch =
467 find_all_dirs_with_epoch_prefix(&db_checkpoint_handler.input_object_store, None)
468 .await?;
469 assert!(!local_checkpoints_by_epoch.is_empty());
470 assert_eq!(*local_checkpoints_by_epoch.first_key_value().unwrap().0, 0);
471 assert_eq!(
472 path_to_filesystem(
473 db_checkpoint_handler.input_root_path.clone(),
474 local_checkpoints_by_epoch.first_key_value().unwrap().1
475 )
476 .unwrap(),
477 std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
478 );
479 let missing_epochs = find_missing_epochs_dirs(
480 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
481 SUCCESS_MARKER,
482 )
483 .await?;
484 db_checkpoint_handler
485 .upload_db_checkpoints_to_object_store(missing_epochs)
486 .await?;
487
488 assert!(remote_epoch0_checkpoint.join("file1").exists());
489 assert!(remote_epoch0_checkpoint.join("file2").exists());
490 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
491 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
492 assert!(
493 local_epoch0_checkpoint
494 .join(UPLOAD_COMPLETED_MARKER)
495 .exists()
496 );
497
498 let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
500 fs::write(test_marker, b"Lorem ipsum")?;
501 db_checkpoint_handler
502 .garbage_collect_old_db_checkpoints()
503 .await?;
504
505 assert!(!local_epoch0_checkpoint.join("file1").exists());
506 assert!(!local_epoch0_checkpoint.join("file1").exists());
507 assert!(!local_epoch0_checkpoint.join("file2").exists());
508 assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
509 Ok(())
510 }
511
512 #[tokio::test]
513 async fn test_upload_resumes() -> anyhow::Result<()> {
514 let checkpoint_dir = TempDir::new()?;
515 let checkpoint_dir_path = checkpoint_dir.path();
516 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
517
518 let remote_checkpoint_dir = TempDir::new()?;
519 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
520 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
521
522 let input_store_config = ObjectStoreConfig {
523 object_store: Some(ObjectStoreType::File),
524 directory: Some(checkpoint_dir_path.to_path_buf()),
525 ..Default::default()
526 };
527 let output_store_config = ObjectStoreConfig {
528 object_store: Some(ObjectStoreType::File),
529 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
530 ..Default::default()
531 };
532 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
533 &input_store_config,
534 Some(&output_store_config),
535 10,
536 false,
537 false,
538 )?;
539
540 fs::create_dir(&local_epoch0_checkpoint)?;
541 let file1 = local_epoch0_checkpoint.join("file1");
542 fs::write(file1, b"Lorem ipsum")?;
543 let file2 = local_epoch0_checkpoint.join("file2");
544 fs::write(file2, b"Lorem ipsum")?;
545 let nested_dir = local_epoch0_checkpoint.join("data");
546 fs::create_dir(&nested_dir)?;
547 let file3 = nested_dir.join("file3");
548 fs::write(file3, b"Lorem ipsum")?;
549
550 let missing_epochs = find_missing_epochs_dirs(
551 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
552 SUCCESS_MARKER,
553 )
554 .await?;
555 db_checkpoint_handler
556 .upload_db_checkpoints_to_object_store(missing_epochs)
557 .await?;
558 assert!(remote_epoch0_checkpoint.join("file1").exists());
559 assert!(remote_epoch0_checkpoint.join("file2").exists());
560 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
561 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
562 assert!(
563 local_epoch0_checkpoint
564 .join(UPLOAD_COMPLETED_MARKER)
565 .exists()
566 );
567
568 let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
570 fs::create_dir(&local_epoch1_checkpoint)?;
571 let file1 = local_epoch1_checkpoint.join("file1");
572 fs::write(file1, b"Lorem ipsum")?;
573 let file2 = local_epoch1_checkpoint.join("file2");
574 fs::write(file2, b"Lorem ipsum")?;
575 let nested_dir = local_epoch1_checkpoint.join("data");
576 fs::create_dir(&nested_dir)?;
577 let file3 = nested_dir.join("file3");
578 fs::write(file3, b"Lorem ipsum")?;
579
580 fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
583
584 let missing_epochs = find_missing_epochs_dirs(
587 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
588 SUCCESS_MARKER,
589 )
590 .await?;
591 db_checkpoint_handler
592 .upload_db_checkpoints_to_object_store(missing_epochs)
593 .await?;
594 assert!(remote_epoch0_checkpoint.join("file1").exists());
595 assert!(remote_epoch0_checkpoint.join("file2").exists());
596 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
597 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
598 assert!(
599 local_epoch0_checkpoint
600 .join(UPLOAD_COMPLETED_MARKER)
601 .exists()
602 );
603
604 let remote_epoch1_checkpoint = remote_checkpoint_dir_path.join("epoch_1");
605 assert!(remote_epoch1_checkpoint.join("file1").exists());
606 assert!(remote_epoch1_checkpoint.join("file2").exists());
607 assert!(remote_epoch1_checkpoint.join("data").join("file3").exists());
608 assert!(remote_epoch1_checkpoint.join(SUCCESS_MARKER).exists());
609 assert!(
610 local_epoch1_checkpoint
611 .join(UPLOAD_COMPLETED_MARKER)
612 .exists()
613 );
614
615 let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
617 fs::write(test_marker, b"Lorem ipsum")?;
618 let test_marker = local_epoch1_checkpoint.join(TEST_MARKER);
619 fs::write(test_marker, b"Lorem ipsum")?;
620
621 db_checkpoint_handler
622 .garbage_collect_old_db_checkpoints()
623 .await?;
624 assert!(!local_epoch0_checkpoint.join("file1").exists());
625 assert!(!local_epoch0_checkpoint.join("file1").exists());
626 assert!(!local_epoch0_checkpoint.join("file2").exists());
627 assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
628 assert!(!local_epoch1_checkpoint.join("file1").exists());
629 assert!(!local_epoch1_checkpoint.join("file1").exists());
630 assert!(!local_epoch1_checkpoint.join("file2").exists());
631 assert!(!local_epoch1_checkpoint.join("data").join("file3").exists());
632 Ok(())
633 }
634
635 #[tokio::test]
636 async fn test_missing_epochs() -> anyhow::Result<()> {
637 let checkpoint_dir = TempDir::new()?;
638 let checkpoint_dir_path = checkpoint_dir.path();
639 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
640 fs::create_dir(&local_epoch0_checkpoint)?;
641 let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
642 fs::create_dir(&local_epoch1_checkpoint)?;
643 let local_epoch3_checkpoint = checkpoint_dir_path.join("epoch_3");
645 fs::create_dir(&local_epoch3_checkpoint)?;
646 let remote_checkpoint_dir = TempDir::new()?;
647 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
648
649 let input_store_config = ObjectStoreConfig {
650 object_store: Some(ObjectStoreType::File),
651 directory: Some(checkpoint_dir_path.to_path_buf()),
652 ..Default::default()
653 };
654
655 let output_store_config = ObjectStoreConfig {
656 object_store: Some(ObjectStoreType::File),
657 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
658 ..Default::default()
659 };
660 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
661 &input_store_config,
662 Some(&output_store_config),
663 10,
664 false,
665 false,
666 )?;
667
668 let missing_epochs = find_missing_epochs_dirs(
669 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
670 SUCCESS_MARKER,
671 )
672 .await?;
673 db_checkpoint_handler
674 .upload_db_checkpoints_to_object_store(missing_epochs)
675 .await?;
676
677 let first_missing_epoch = find_missing_epochs_dirs(
678 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
679 SUCCESS_MARKER,
680 )
681 .await?
682 .first()
683 .cloned()
684 .unwrap();
685 assert_eq!(first_missing_epoch, 2);
686
687 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
688 fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
689
690 let first_missing_epoch = find_missing_epochs_dirs(
691 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
692 SUCCESS_MARKER,
693 )
694 .await?
695 .first()
696 .cloned()
697 .unwrap();
698 assert_eq!(first_missing_epoch, 0);
699
700 Ok(())
701 }
702
703 #[tokio::test]
704 async fn test_range_missing_epochs() -> anyhow::Result<()> {
705 let checkpoint_dir = TempDir::new()?;
706 let checkpoint_dir_path = checkpoint_dir.path();
707 let local_epoch100_checkpoint = checkpoint_dir_path.join("epoch_100");
708 fs::create_dir(&local_epoch100_checkpoint)?;
709 let local_epoch200_checkpoint = checkpoint_dir_path.join("epoch_200");
710 fs::create_dir(&local_epoch200_checkpoint)?;
711 let remote_checkpoint_dir = TempDir::new()?;
712 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
713
714 let input_store_config = ObjectStoreConfig {
715 object_store: Some(ObjectStoreType::File),
716 directory: Some(checkpoint_dir_path.to_path_buf()),
717 ..Default::default()
718 };
719
720 let output_store_config = ObjectStoreConfig {
721 object_store: Some(ObjectStoreType::File),
722 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
723 ..Default::default()
724 };
725 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
726 &input_store_config,
727 Some(&output_store_config),
728 10,
729 false,
730 false,
731 )?;
732
733 let missing_epochs = find_missing_epochs_dirs(
734 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
735 SUCCESS_MARKER,
736 )
737 .await?;
738 assert_eq!(missing_epochs, vec![0]);
739 db_checkpoint_handler
740 .upload_db_checkpoints_to_object_store(missing_epochs)
741 .await?;
742
743 let missing_epochs = find_missing_epochs_dirs(
744 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
745 SUCCESS_MARKER,
746 )
747 .await?;
748 let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
749 expected_missing_epochs.extend((101..200).collect_vec().iter());
750 expected_missing_epochs.push(201);
751 assert_eq!(missing_epochs, expected_missing_epochs);
752 Ok(())
753 }
754}