1use std::{fs, num::NonZeroUsize, path::PathBuf, sync::Arc, time::Duration};
6
7use anyhow::Result;
8use bytes::Bytes;
9use futures::future::try_join_all;
10use iota_config::{
11 node::AuthorityStorePruningConfig,
12 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
13};
14use iota_storage::object_store::util::{
15 copy_recursively, find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs,
16 path_to_filesystem, put, run_manifest_update_loop, write_snapshot_manifest,
17};
18use object_store::{DynObjectStore, path::Path};
19use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
20use tracing::{debug, error, info};
21
22use crate::{
23 authority::{
24 authority_store_pruner::{
25 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
26 },
27 authority_store_tables::AuthorityPerpetualTables,
28 },
29 checkpoints::CheckpointStore,
30 rest_index::RestIndexStore,
31};
32
33pub const SUCCESS_MARKER: &str = "_SUCCESS";
34pub const TEST_MARKER: &str = "_TEST";
35pub const UPLOAD_COMPLETED_MARKER: &str = "_UPLOAD_COMPLETED";
36pub const STATE_SNAPSHOT_COMPLETED_MARKER: &str = "_STATE_SNAPSHOT_COMPLETED";
37
38pub struct DBCheckpointMetrics {
39 pub first_missing_db_checkpoint_epoch: IntGauge,
40 pub num_local_db_checkpoints: IntGauge,
41}
42
43impl DBCheckpointMetrics {
44 pub fn new(registry: &Registry) -> Arc<Self> {
45 let this = Self {
46 first_missing_db_checkpoint_epoch: register_int_gauge_with_registry!(
47 "first_missing_db_checkpoint_epoch",
48 "First epoch for which we have no db checkpoint in remote store",
49 registry
50 )
51 .unwrap(),
52 num_local_db_checkpoints: register_int_gauge_with_registry!(
53 "num_local_db_checkpoints",
54 "Number of RocksDB checkpoints currently residing on local disk (i.e. not yet garbage collected)",
55 registry
56 )
57 .unwrap(),
58 };
59 Arc::new(this)
60 }
61}
62
63pub struct DBCheckpointHandler {
64 input_object_store: Arc<DynObjectStore>,
66 input_root_path: PathBuf,
68 output_object_store: Option<Arc<DynObjectStore>>,
70 interval: Duration,
72 gc_markers: Vec<String>,
75 prune_and_compact_before_upload: bool,
78 state_snapshot_enabled: bool,
80 pruning_config: AuthorityStorePruningConfig,
82 metrics: Arc<DBCheckpointMetrics>,
83}
84
85impl DBCheckpointHandler {
86 pub fn new(
87 input_path: &std::path::Path,
88 output_object_store_config: Option<&ObjectStoreConfig>,
89 interval_s: u64,
90 prune_and_compact_before_upload: bool,
91 pruning_config: AuthorityStorePruningConfig,
92 registry: &Registry,
93 state_snapshot_enabled: bool,
94 ) -> Result<Arc<Self>> {
95 let input_store_config = ObjectStoreConfig {
96 object_store: Some(ObjectStoreType::File),
97 directory: Some(input_path.to_path_buf()),
98 ..Default::default()
99 };
100 let mut gc_markers = vec![UPLOAD_COMPLETED_MARKER.to_string()];
101 if state_snapshot_enabled {
102 gc_markers.push(STATE_SNAPSHOT_COMPLETED_MARKER.to_string());
103 }
104 Ok(Arc::new(DBCheckpointHandler {
105 input_object_store: input_store_config.make()?,
106 input_root_path: input_path.to_path_buf(),
107 output_object_store: output_object_store_config
108 .map(|config| config.make().expect("Failed to make object store")),
109 interval: Duration::from_secs(interval_s),
110 gc_markers,
111 prune_and_compact_before_upload,
112 state_snapshot_enabled,
113 pruning_config,
114 metrics: DBCheckpointMetrics::new(registry),
115 }))
116 }
117 pub fn new_for_test(
118 input_object_store_config: &ObjectStoreConfig,
119 output_object_store_config: Option<&ObjectStoreConfig>,
120 interval_s: u64,
121 prune_and_compact_before_upload: bool,
122 state_snapshot_enabled: bool,
123 ) -> Result<Arc<Self>> {
124 Ok(Arc::new(DBCheckpointHandler {
125 input_object_store: input_object_store_config.make()?,
126 input_root_path: input_object_store_config
127 .directory
128 .as_ref()
129 .unwrap()
130 .clone(),
131 output_object_store: output_object_store_config
132 .map(|config| config.make().expect("Failed to make object store")),
133 interval: Duration::from_secs(interval_s),
134 gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
135 prune_and_compact_before_upload,
136 state_snapshot_enabled,
137 pruning_config: AuthorityStorePruningConfig::default(),
138 metrics: DBCheckpointMetrics::new(&Registry::default()),
139 }))
140 }
141
142 pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
148 let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
149 if self.output_object_store.is_some() {
150 tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
151 self.clone(),
152 kill_sender.subscribe(),
153 ));
154 tokio::task::spawn(run_manifest_update_loop(
155 self.output_object_store.as_ref().unwrap().clone(),
156 kill_sender.subscribe(),
157 ));
158 } else {
159 tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
163 self.clone(),
164 kill_sender.subscribe(),
165 ));
166 }
167 tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
170 self,
171 kill_sender.subscribe(),
172 ));
173 kill_sender
174 }
175
176 async fn run_db_checkpoint_upload_loop(
179 self: Arc<Self>,
180 mut recv: tokio::sync::broadcast::Receiver<()>,
181 ) -> Result<()> {
182 let mut interval = tokio::time::interval(self.interval);
183 info!("DB checkpoint upload loop started");
184 loop {
185 tokio::select! {
186 _now = interval.tick() => {
187 let local_checkpoints_by_epoch =
188 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
189 self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
190 match find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
191 Ok(epochs) => {
192 self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
193 if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
194 error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
195 }
196 }
197 Err(err) => {
198 error!("Failed to find missing db checkpoints in remote store: {:?}", err);
199 }
200 }
201 },
202 _ = recv.recv() => break,
203 }
204 }
205 Ok(())
206 }
207
208 async fn run_db_checkpoint_cleanup_loop(
211 self: Arc<Self>,
212 mut recv: tokio::sync::broadcast::Receiver<()>,
213 ) -> Result<()> {
214 let mut interval = tokio::time::interval(self.interval);
215 info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
216 loop {
217 tokio::select! {
218 _now = interval.tick() => {
219 let local_checkpoints_by_epoch =
220 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
221 self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
222 let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
223 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
224 for (_, db_path) in dirs {
225 let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
227 let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
228 if upload_completed_path.exists() {
229 continue;
230 }
231 let bytes = Bytes::from_static(b"success");
232 let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
233 put(&self.input_object_store,
234 &upload_completed_marker,
235 bytes.clone(),
236 )
237 .await?;
238 }
239 },
240 _ = recv.recv() => break,
241 }
242 }
243 Ok(())
244 }
245
246 async fn run_db_checkpoint_gc_loop(
249 self: Arc<Self>,
250 mut recv: tokio::sync::broadcast::Receiver<()>,
251 ) -> Result<()> {
252 let mut gc_interval = tokio::time::interval(Duration::from_secs(30));
253 info!("DB checkpoint garbage collection loop started");
254 loop {
255 tokio::select! {
256 _now = gc_interval.tick() => {
257 if let Ok(deleted) = self.garbage_collect_old_db_checkpoints().await {
258 if !deleted.is_empty() {
259 info!("Garbage collected local db checkpoints: {:?}", deleted);
260 }
261 }
262 },
263 _ = recv.recv() => break,
264 }
265 }
266 Ok(())
267 }
268
269 async fn prune_and_compact(
270 &self,
271 db_path: PathBuf,
272 epoch: u64,
273 epoch_duration_ms: u64,
274 ) -> Result<()> {
275 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&db_path.join("store"), None));
276 let checkpoint_store = Arc::new(CheckpointStore::new_for_db_checkpoint_handler(
277 &db_path.join("checkpoints"),
278 ));
279 let rest_index = RestIndexStore::new_without_init(db_path.join("rest_index"));
280 let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
281 info!(
282 "Pruning db checkpoint in {:?} for epoch: {epoch}",
283 db_path.display()
284 );
285 AuthorityStorePruner::prune_objects_for_eligible_epochs(
286 &perpetual_db,
287 &checkpoint_store,
288 Some(&rest_index),
289 None,
290 self.pruning_config.clone(),
291 metrics,
292 epoch_duration_ms,
293 )
294 .await?;
295 info!(
296 "Compacting db checkpoint in {:?} for epoch: {epoch}",
297 db_path.display()
298 );
299 AuthorityStorePruner::compact(&perpetual_db)?;
300 Ok(())
301 }
302
303 async fn upload_db_checkpoints_to_object_store(
307 &self,
308 missing_epochs: Vec<u64>,
309 ) -> Result<(), anyhow::Error> {
310 let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
311 let local_checkpoints_by_epoch =
312 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
313 let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
314 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
315 let object_store = self
316 .output_object_store
317 .as_ref()
318 .expect("Expected object store to exist")
319 .clone();
320 for (epoch, db_path) in dirs {
321 let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
324 if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
325 if self.state_snapshot_enabled {
326 let snapshot_completed_marker =
327 local_db_path.join(STATE_SNAPSHOT_COMPLETED_MARKER);
328 if !snapshot_completed_marker.exists() {
329 info!(
330 "DB checkpoint upload for epoch {} to wait until state snasphot uploaded",
331 *epoch
332 );
333 continue;
334 }
335 }
336
337 if self.prune_and_compact_before_upload {
338 self.prune_and_compact(local_db_path, *epoch, EPOCH_DURATION_MS_FOR_TESTING)
340 .await?;
341 }
342
343 info!("Copying db checkpoint for epoch: {epoch} to remote storage");
344 copy_recursively(
345 db_path,
346 &self.input_object_store,
347 &object_store,
348 NonZeroUsize::new(20).unwrap(),
349 )
350 .await?;
351
352 write_snapshot_manifest(db_path, &object_store, format!("epoch_{epoch}/")).await?;
355 let bytes = Bytes::from_static(b"success");
357 let success_marker = db_path.child(SUCCESS_MARKER);
358 put(&object_store, &success_marker, bytes.clone()).await?;
359 }
360 let bytes = Bytes::from_static(b"success");
361 let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
362 put(
363 &self.input_object_store,
364 &upload_completed_marker,
365 bytes.clone(),
366 )
367 .await?;
368 }
369 Ok(())
370 }
371
372 async fn garbage_collect_old_db_checkpoints(&self) -> Result<Vec<u64>> {
375 let local_checkpoints_by_epoch =
376 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
377 let mut deleted = Vec::new();
378 for (epoch, path) in local_checkpoints_by_epoch.iter() {
379 let marker_paths: Vec<Path> = self
380 .gc_markers
381 .iter()
382 .map(|marker| path.child(marker.clone()))
383 .collect();
384 let all_markers_present = try_join_all(
386 marker_paths
387 .iter()
388 .map(|path| self.input_object_store.get(path)),
389 )
390 .await;
391 match all_markers_present {
392 Ok(_) => {
395 info!("Deleting db checkpoint dir: {path} for epoch: {epoch}");
396 deleted.push(*epoch);
397 let local_fs_path = path_to_filesystem(self.input_root_path.clone(), path)?;
398 fs::remove_dir_all(&local_fs_path)?;
399 }
400 Err(_) => {
401 debug!("Not ready for deletion yet: {path}");
402 }
403 }
404 }
405 Ok(deleted)
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use std::fs;
412
413 use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
414 use iota_storage::object_store::util::{
415 find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, path_to_filesystem,
416 };
417 use itertools::Itertools;
418 use tempfile::TempDir;
419
420 use crate::db_checkpoint_handler::{
421 DBCheckpointHandler, SUCCESS_MARKER, TEST_MARKER, UPLOAD_COMPLETED_MARKER,
422 };
423
424 #[tokio::test]
425 async fn test_basic() -> anyhow::Result<()> {
426 let checkpoint_dir = TempDir::new()?;
427 let checkpoint_dir_path = checkpoint_dir.path();
428 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
429 fs::create_dir(&local_epoch0_checkpoint)?;
430 let file1 = local_epoch0_checkpoint.join("file1");
431 fs::write(file1, b"Lorem ipsum")?;
432 let file2 = local_epoch0_checkpoint.join("file2");
433 fs::write(file2, b"Lorem ipsum")?;
434 let nested_dir = local_epoch0_checkpoint.join("data");
435 fs::create_dir(&nested_dir)?;
436 let file3 = nested_dir.join("file3");
437 fs::write(file3, b"Lorem ipsum")?;
438
439 let remote_checkpoint_dir = TempDir::new()?;
440 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
441 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
442
443 let input_store_config = ObjectStoreConfig {
444 object_store: Some(ObjectStoreType::File),
445 directory: Some(checkpoint_dir_path.to_path_buf()),
446 ..Default::default()
447 };
448 let output_store_config = ObjectStoreConfig {
449 object_store: Some(ObjectStoreType::File),
450 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
451 ..Default::default()
452 };
453 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
454 &input_store_config,
455 Some(&output_store_config),
456 10,
457 false,
458 false,
459 )?;
460 let local_checkpoints_by_epoch =
461 find_all_dirs_with_epoch_prefix(&db_checkpoint_handler.input_object_store, None)
462 .await?;
463 assert!(!local_checkpoints_by_epoch.is_empty());
464 assert_eq!(*local_checkpoints_by_epoch.first_key_value().unwrap().0, 0);
465 assert_eq!(
466 path_to_filesystem(
467 db_checkpoint_handler.input_root_path.clone(),
468 local_checkpoints_by_epoch.first_key_value().unwrap().1
469 )
470 .unwrap(),
471 std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
472 );
473 let missing_epochs = find_missing_epochs_dirs(
474 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
475 SUCCESS_MARKER,
476 )
477 .await?;
478 db_checkpoint_handler
479 .upload_db_checkpoints_to_object_store(missing_epochs)
480 .await?;
481
482 assert!(remote_epoch0_checkpoint.join("file1").exists());
483 assert!(remote_epoch0_checkpoint.join("file2").exists());
484 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
485 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
486 assert!(
487 local_epoch0_checkpoint
488 .join(UPLOAD_COMPLETED_MARKER)
489 .exists()
490 );
491
492 let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
494 fs::write(test_marker, b"Lorem ipsum")?;
495 db_checkpoint_handler
496 .garbage_collect_old_db_checkpoints()
497 .await?;
498
499 assert!(!local_epoch0_checkpoint.join("file1").exists());
500 assert!(!local_epoch0_checkpoint.join("file1").exists());
501 assert!(!local_epoch0_checkpoint.join("file2").exists());
502 assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
503 Ok(())
504 }
505
506 #[tokio::test]
507 async fn test_upload_resumes() -> anyhow::Result<()> {
508 let checkpoint_dir = TempDir::new()?;
509 let checkpoint_dir_path = checkpoint_dir.path();
510 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
511
512 let remote_checkpoint_dir = TempDir::new()?;
513 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
514 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
515
516 let input_store_config = ObjectStoreConfig {
517 object_store: Some(ObjectStoreType::File),
518 directory: Some(checkpoint_dir_path.to_path_buf()),
519 ..Default::default()
520 };
521 let output_store_config = ObjectStoreConfig {
522 object_store: Some(ObjectStoreType::File),
523 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
524 ..Default::default()
525 };
526 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
527 &input_store_config,
528 Some(&output_store_config),
529 10,
530 false,
531 false,
532 )?;
533
534 fs::create_dir(&local_epoch0_checkpoint)?;
535 let file1 = local_epoch0_checkpoint.join("file1");
536 fs::write(file1, b"Lorem ipsum")?;
537 let file2 = local_epoch0_checkpoint.join("file2");
538 fs::write(file2, b"Lorem ipsum")?;
539 let nested_dir = local_epoch0_checkpoint.join("data");
540 fs::create_dir(&nested_dir)?;
541 let file3 = nested_dir.join("file3");
542 fs::write(file3, b"Lorem ipsum")?;
543
544 let missing_epochs = find_missing_epochs_dirs(
545 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
546 SUCCESS_MARKER,
547 )
548 .await?;
549 db_checkpoint_handler
550 .upload_db_checkpoints_to_object_store(missing_epochs)
551 .await?;
552 assert!(remote_epoch0_checkpoint.join("file1").exists());
553 assert!(remote_epoch0_checkpoint.join("file2").exists());
554 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
555 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
556 assert!(
557 local_epoch0_checkpoint
558 .join(UPLOAD_COMPLETED_MARKER)
559 .exists()
560 );
561
562 let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
564 fs::create_dir(&local_epoch1_checkpoint)?;
565 let file1 = local_epoch1_checkpoint.join("file1");
566 fs::write(file1, b"Lorem ipsum")?;
567 let file2 = local_epoch1_checkpoint.join("file2");
568 fs::write(file2, b"Lorem ipsum")?;
569 let nested_dir = local_epoch1_checkpoint.join("data");
570 fs::create_dir(&nested_dir)?;
571 let file3 = nested_dir.join("file3");
572 fs::write(file3, b"Lorem ipsum")?;
573
574 fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
577
578 let missing_epochs = find_missing_epochs_dirs(
581 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
582 SUCCESS_MARKER,
583 )
584 .await?;
585 db_checkpoint_handler
586 .upload_db_checkpoints_to_object_store(missing_epochs)
587 .await?;
588 assert!(remote_epoch0_checkpoint.join("file1").exists());
589 assert!(remote_epoch0_checkpoint.join("file2").exists());
590 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
591 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
592 assert!(
593 local_epoch0_checkpoint
594 .join(UPLOAD_COMPLETED_MARKER)
595 .exists()
596 );
597
598 let remote_epoch1_checkpoint = remote_checkpoint_dir_path.join("epoch_1");
599 assert!(remote_epoch1_checkpoint.join("file1").exists());
600 assert!(remote_epoch1_checkpoint.join("file2").exists());
601 assert!(remote_epoch1_checkpoint.join("data").join("file3").exists());
602 assert!(remote_epoch1_checkpoint.join(SUCCESS_MARKER).exists());
603 assert!(
604 local_epoch1_checkpoint
605 .join(UPLOAD_COMPLETED_MARKER)
606 .exists()
607 );
608
609 let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
611 fs::write(test_marker, b"Lorem ipsum")?;
612 let test_marker = local_epoch1_checkpoint.join(TEST_MARKER);
613 fs::write(test_marker, b"Lorem ipsum")?;
614
615 db_checkpoint_handler
616 .garbage_collect_old_db_checkpoints()
617 .await?;
618 assert!(!local_epoch0_checkpoint.join("file1").exists());
619 assert!(!local_epoch0_checkpoint.join("file1").exists());
620 assert!(!local_epoch0_checkpoint.join("file2").exists());
621 assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
622 assert!(!local_epoch1_checkpoint.join("file1").exists());
623 assert!(!local_epoch1_checkpoint.join("file1").exists());
624 assert!(!local_epoch1_checkpoint.join("file2").exists());
625 assert!(!local_epoch1_checkpoint.join("data").join("file3").exists());
626 Ok(())
627 }
628
629 #[tokio::test]
630 async fn test_missing_epochs() -> anyhow::Result<()> {
631 let checkpoint_dir = TempDir::new()?;
632 let checkpoint_dir_path = checkpoint_dir.path();
633 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
634 fs::create_dir(&local_epoch0_checkpoint)?;
635 let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
636 fs::create_dir(&local_epoch1_checkpoint)?;
637 let local_epoch3_checkpoint = checkpoint_dir_path.join("epoch_3");
639 fs::create_dir(&local_epoch3_checkpoint)?;
640 let remote_checkpoint_dir = TempDir::new()?;
641 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
642
643 let input_store_config = ObjectStoreConfig {
644 object_store: Some(ObjectStoreType::File),
645 directory: Some(checkpoint_dir_path.to_path_buf()),
646 ..Default::default()
647 };
648
649 let output_store_config = ObjectStoreConfig {
650 object_store: Some(ObjectStoreType::File),
651 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
652 ..Default::default()
653 };
654 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
655 &input_store_config,
656 Some(&output_store_config),
657 10,
658 false,
659 false,
660 )?;
661
662 let missing_epochs = find_missing_epochs_dirs(
663 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
664 SUCCESS_MARKER,
665 )
666 .await?;
667 db_checkpoint_handler
668 .upload_db_checkpoints_to_object_store(missing_epochs)
669 .await?;
670
671 let first_missing_epoch = find_missing_epochs_dirs(
672 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
673 SUCCESS_MARKER,
674 )
675 .await?
676 .first()
677 .cloned()
678 .unwrap();
679 assert_eq!(first_missing_epoch, 2);
680
681 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
682 fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
683
684 let first_missing_epoch = find_missing_epochs_dirs(
685 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
686 SUCCESS_MARKER,
687 )
688 .await?
689 .first()
690 .cloned()
691 .unwrap();
692 assert_eq!(first_missing_epoch, 0);
693
694 Ok(())
695 }
696
697 #[tokio::test]
698 async fn test_range_missing_epochs() -> anyhow::Result<()> {
699 let checkpoint_dir = TempDir::new()?;
700 let checkpoint_dir_path = checkpoint_dir.path();
701 let local_epoch100_checkpoint = checkpoint_dir_path.join("epoch_100");
702 fs::create_dir(&local_epoch100_checkpoint)?;
703 let local_epoch200_checkpoint = checkpoint_dir_path.join("epoch_200");
704 fs::create_dir(&local_epoch200_checkpoint)?;
705 let remote_checkpoint_dir = TempDir::new()?;
706 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
707
708 let input_store_config = ObjectStoreConfig {
709 object_store: Some(ObjectStoreType::File),
710 directory: Some(checkpoint_dir_path.to_path_buf()),
711 ..Default::default()
712 };
713
714 let output_store_config = ObjectStoreConfig {
715 object_store: Some(ObjectStoreType::File),
716 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
717 ..Default::default()
718 };
719 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
720 &input_store_config,
721 Some(&output_store_config),
722 10,
723 false,
724 false,
725 )?;
726
727 let missing_epochs = find_missing_epochs_dirs(
728 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
729 SUCCESS_MARKER,
730 )
731 .await?;
732 assert_eq!(missing_epochs, vec![0]);
733 db_checkpoint_handler
734 .upload_db_checkpoints_to_object_store(missing_epochs)
735 .await?;
736
737 let missing_epochs = find_missing_epochs_dirs(
738 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
739 SUCCESS_MARKER,
740 )
741 .await?;
742 let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
743 expected_missing_epochs.extend((101..200).collect_vec().iter());
744 expected_missing_epochs.push(201);
745 assert_eq!(missing_epochs, expected_missing_epochs);
746 Ok(())
747 }
748}