1use std::{fs, num::NonZeroUsize, path::PathBuf, sync::Arc, time::Duration};
6
7use anyhow::Result;
8use bytes::Bytes;
9use futures::future::try_join_all;
10use iota_config::{
11 node::AuthorityStorePruningConfig,
12 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
13};
14use iota_storage::{
15 mutex_table::RwLockTable,
16 object_store::util::{
17 copy_recursively, find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs,
18 path_to_filesystem, put, run_manifest_update_loop, write_snapshot_manifest,
19 },
20};
21use object_store::{DynObjectStore, path::Path};
22use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
23use tracing::{debug, error, info};
24use typed_store::rocks::MetricConf;
25
26use crate::{
27 authority::{
28 authority_store_pruner::{
29 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
30 },
31 authority_store_tables::AuthorityPerpetualTables,
32 },
33 checkpoints::CheckpointStore,
34 rest_index::RestIndexStore,
35};
36
37pub const SUCCESS_MARKER: &str = "_SUCCESS";
38pub const TEST_MARKER: &str = "_TEST";
39pub const UPLOAD_COMPLETED_MARKER: &str = "_UPLOAD_COMPLETED";
40pub const STATE_SNAPSHOT_COMPLETED_MARKER: &str = "_STATE_SNAPSHOT_COMPLETED";
41
42pub struct DBCheckpointMetrics {
43 pub first_missing_db_checkpoint_epoch: IntGauge,
44 pub num_local_db_checkpoints: IntGauge,
45}
46
47impl DBCheckpointMetrics {
48 pub fn new(registry: &Registry) -> Arc<Self> {
49 let this = Self {
50 first_missing_db_checkpoint_epoch: register_int_gauge_with_registry!(
51 "first_missing_db_checkpoint_epoch",
52 "First epoch for which we have no db checkpoint in remote store",
53 registry
54 )
55 .unwrap(),
56 num_local_db_checkpoints: register_int_gauge_with_registry!(
57 "num_local_db_checkpoints",
58 "Number of RocksDB checkpoints currently residing on local disk (i.e. not yet garbage collected)",
59 registry
60 )
61 .unwrap(),
62 };
63 Arc::new(this)
64 }
65}
66
67pub struct DBCheckpointHandler {
68 input_object_store: Arc<DynObjectStore>,
70 input_root_path: PathBuf,
72 output_object_store: Option<Arc<DynObjectStore>>,
74 interval: Duration,
76 gc_markers: Vec<String>,
79 prune_and_compact_before_upload: bool,
82 indirect_objects_threshold: usize,
84 state_snapshot_enabled: bool,
86 pruning_config: AuthorityStorePruningConfig,
88 metrics: Arc<DBCheckpointMetrics>,
89}
90
91impl DBCheckpointHandler {
92 pub fn new(
93 input_path: &std::path::Path,
94 output_object_store_config: Option<&ObjectStoreConfig>,
95 interval_s: u64,
96 prune_and_compact_before_upload: bool,
97 indirect_objects_threshold: usize,
98 pruning_config: AuthorityStorePruningConfig,
99 registry: &Registry,
100 state_snapshot_enabled: bool,
101 ) -> Result<Arc<Self>> {
102 let input_store_config = ObjectStoreConfig {
103 object_store: Some(ObjectStoreType::File),
104 directory: Some(input_path.to_path_buf()),
105 ..Default::default()
106 };
107 let mut gc_markers = vec![UPLOAD_COMPLETED_MARKER.to_string()];
108 if state_snapshot_enabled {
109 gc_markers.push(STATE_SNAPSHOT_COMPLETED_MARKER.to_string());
110 }
111 Ok(Arc::new(DBCheckpointHandler {
112 input_object_store: input_store_config.make()?,
113 input_root_path: input_path.to_path_buf(),
114 output_object_store: output_object_store_config
115 .map(|config| config.make().expect("Failed to make object store")),
116 interval: Duration::from_secs(interval_s),
117 gc_markers,
118 prune_and_compact_before_upload,
119 indirect_objects_threshold,
120 state_snapshot_enabled,
121 pruning_config,
122 metrics: DBCheckpointMetrics::new(registry),
123 }))
124 }
125 pub fn new_for_test(
126 input_object_store_config: &ObjectStoreConfig,
127 output_object_store_config: Option<&ObjectStoreConfig>,
128 interval_s: u64,
129 prune_and_compact_before_upload: bool,
130 state_snapshot_enabled: bool,
131 ) -> Result<Arc<Self>> {
132 Ok(Arc::new(DBCheckpointHandler {
133 input_object_store: input_object_store_config.make()?,
134 input_root_path: input_object_store_config
135 .directory
136 .as_ref()
137 .unwrap()
138 .clone(),
139 output_object_store: output_object_store_config
140 .map(|config| config.make().expect("Failed to make object store")),
141 interval: Duration::from_secs(interval_s),
142 gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
143 prune_and_compact_before_upload,
144 indirect_objects_threshold: 0,
145 state_snapshot_enabled,
146 pruning_config: AuthorityStorePruningConfig::default(),
147 metrics: DBCheckpointMetrics::new(&Registry::default()),
148 }))
149 }
150
151 pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
157 let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
158 if self.output_object_store.is_some() {
159 tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
160 self.clone(),
161 kill_sender.subscribe(),
162 ));
163 tokio::task::spawn(run_manifest_update_loop(
164 self.output_object_store.as_ref().unwrap().clone(),
165 kill_sender.subscribe(),
166 ));
167 } else {
168 tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
172 self.clone(),
173 kill_sender.subscribe(),
174 ));
175 }
176 tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
179 self,
180 kill_sender.subscribe(),
181 ));
182 kill_sender
183 }
184
185 async fn run_db_checkpoint_upload_loop(
188 self: Arc<Self>,
189 mut recv: tokio::sync::broadcast::Receiver<()>,
190 ) -> Result<()> {
191 let mut interval = tokio::time::interval(self.interval);
192 info!("DB checkpoint upload loop started");
193 loop {
194 tokio::select! {
195 _now = interval.tick() => {
196 let local_checkpoints_by_epoch =
197 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
198 self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
199 match find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
200 Ok(epochs) => {
201 self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
202 if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
203 error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
204 }
205 }
206 Err(err) => {
207 error!("Failed to find missing db checkpoints in remote store: {:?}", err);
208 }
209 }
210 },
211 _ = recv.recv() => break,
212 }
213 }
214 Ok(())
215 }
216
217 async fn run_db_checkpoint_cleanup_loop(
220 self: Arc<Self>,
221 mut recv: tokio::sync::broadcast::Receiver<()>,
222 ) -> Result<()> {
223 let mut interval = tokio::time::interval(self.interval);
224 info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
225 loop {
226 tokio::select! {
227 _now = interval.tick() => {
228 let local_checkpoints_by_epoch =
229 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
230 self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
231 let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
232 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
233 for (_, db_path) in dirs {
234 let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
236 let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
237 if upload_completed_path.exists() {
238 continue;
239 }
240 let bytes = Bytes::from_static(b"success");
241 let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
242 put(&self.input_object_store,
243 &upload_completed_marker,
244 bytes.clone(),
245 )
246 .await?;
247 }
248 },
249 _ = recv.recv() => break,
250 }
251 }
252 Ok(())
253 }
254
255 async fn run_db_checkpoint_gc_loop(
258 self: Arc<Self>,
259 mut recv: tokio::sync::broadcast::Receiver<()>,
260 ) -> Result<()> {
261 let mut gc_interval = tokio::time::interval(Duration::from_secs(30));
262 info!("DB checkpoint garbage collection loop started");
263 loop {
264 tokio::select! {
265 _now = gc_interval.tick() => {
266 if let Ok(deleted) = self.garbage_collect_old_db_checkpoints().await {
267 if !deleted.is_empty() {
268 info!("Garbage collected local db checkpoints: {:?}", deleted);
269 }
270 }
271 },
272 _ = recv.recv() => break,
273 }
274 }
275 Ok(())
276 }
277
278 async fn prune_and_compact(
279 &self,
280 db_path: PathBuf,
281 epoch: u64,
282 epoch_duration_ms: u64,
283 ) -> Result<()> {
284 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&db_path.join("store"), None));
285 let checkpoint_store = Arc::new(CheckpointStore::open_tables_read_write(
286 db_path.join("checkpoints"),
287 MetricConf::new("db_checkpoint"),
288 None,
289 None,
290 ));
291 let rest_index = RestIndexStore::new_without_init(db_path.join("rest_index"));
292 let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
293 let lock_table = Arc::new(RwLockTable::new(1));
294 info!(
295 "Pruning db checkpoint in {:?} for epoch: {epoch}",
296 db_path.display()
297 );
298 AuthorityStorePruner::prune_objects_for_eligible_epochs(
299 &perpetual_db,
300 &checkpoint_store,
301 Some(&rest_index),
302 &lock_table,
303 self.pruning_config.clone(),
304 metrics,
305 self.indirect_objects_threshold,
306 epoch_duration_ms,
307 )
308 .await?;
309 info!(
310 "Compacting db checkpoint in {:?} for epoch: {epoch}",
311 db_path.display()
312 );
313 AuthorityStorePruner::compact(&perpetual_db)?;
314 Ok(())
315 }
316
317 async fn upload_db_checkpoints_to_object_store(
321 &self,
322 missing_epochs: Vec<u64>,
323 ) -> Result<(), anyhow::Error> {
324 let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
325 let local_checkpoints_by_epoch =
326 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
327 let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
328 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
329 let object_store = self
330 .output_object_store
331 .as_ref()
332 .expect("Expected object store to exist")
333 .clone();
334 for (epoch, db_path) in dirs {
335 let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
338 if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
339 if self.state_snapshot_enabled {
340 let snapshot_completed_marker =
341 local_db_path.join(STATE_SNAPSHOT_COMPLETED_MARKER);
342 if !snapshot_completed_marker.exists() {
343 info!(
344 "DB checkpoint upload for epoch {} to wait until state snasphot uploaded",
345 *epoch
346 );
347 continue;
348 }
349 }
350
351 if self.prune_and_compact_before_upload {
352 self.prune_and_compact(local_db_path, *epoch, EPOCH_DURATION_MS_FOR_TESTING)
354 .await?;
355 }
356
357 info!("Copying db checkpoint for epoch: {epoch} to remote storage");
358 copy_recursively(
359 db_path,
360 &self.input_object_store,
361 &object_store,
362 NonZeroUsize::new(20).unwrap(),
363 )
364 .await?;
365
366 write_snapshot_manifest(db_path, &object_store, format!("epoch_{epoch}/")).await?;
369 let bytes = Bytes::from_static(b"success");
371 let success_marker = db_path.child(SUCCESS_MARKER);
372 put(&object_store, &success_marker, bytes.clone()).await?;
373 }
374 let bytes = Bytes::from_static(b"success");
375 let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
376 put(
377 &self.input_object_store,
378 &upload_completed_marker,
379 bytes.clone(),
380 )
381 .await?;
382 }
383 Ok(())
384 }
385
386 async fn garbage_collect_old_db_checkpoints(&self) -> Result<Vec<u64>> {
389 let local_checkpoints_by_epoch =
390 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
391 let mut deleted = Vec::new();
392 for (epoch, path) in local_checkpoints_by_epoch.iter() {
393 let marker_paths: Vec<Path> = self
394 .gc_markers
395 .iter()
396 .map(|marker| path.child(marker.clone()))
397 .collect();
398 let all_markers_present = try_join_all(
400 marker_paths
401 .iter()
402 .map(|path| self.input_object_store.get(path)),
403 )
404 .await;
405 match all_markers_present {
406 Ok(_) => {
409 info!("Deleting db checkpoint dir: {path} for epoch: {epoch}");
410 deleted.push(*epoch);
411 let local_fs_path = path_to_filesystem(self.input_root_path.clone(), path)?;
412 fs::remove_dir_all(&local_fs_path)?;
413 }
414 Err(_) => {
415 debug!("Not ready for deletion yet: {path}");
416 }
417 }
418 }
419 Ok(deleted)
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use std::fs;
426
427 use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
428 use iota_storage::object_store::util::{
429 find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, path_to_filesystem,
430 };
431 use itertools::Itertools;
432 use tempfile::TempDir;
433
434 use crate::db_checkpoint_handler::{
435 DBCheckpointHandler, SUCCESS_MARKER, TEST_MARKER, UPLOAD_COMPLETED_MARKER,
436 };
437
438 #[tokio::test]
439 async fn test_basic() -> anyhow::Result<()> {
440 let checkpoint_dir = TempDir::new()?;
441 let checkpoint_dir_path = checkpoint_dir.path();
442 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
443 fs::create_dir(&local_epoch0_checkpoint)?;
444 let file1 = local_epoch0_checkpoint.join("file1");
445 fs::write(file1, b"Lorem ipsum")?;
446 let file2 = local_epoch0_checkpoint.join("file2");
447 fs::write(file2, b"Lorem ipsum")?;
448 let nested_dir = local_epoch0_checkpoint.join("data");
449 fs::create_dir(&nested_dir)?;
450 let file3 = nested_dir.join("file3");
451 fs::write(file3, b"Lorem ipsum")?;
452
453 let remote_checkpoint_dir = TempDir::new()?;
454 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
455 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
456
457 let input_store_config = ObjectStoreConfig {
458 object_store: Some(ObjectStoreType::File),
459 directory: Some(checkpoint_dir_path.to_path_buf()),
460 ..Default::default()
461 };
462 let output_store_config = ObjectStoreConfig {
463 object_store: Some(ObjectStoreType::File),
464 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
465 ..Default::default()
466 };
467 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
468 &input_store_config,
469 Some(&output_store_config),
470 10,
471 false,
472 false,
473 )?;
474 let local_checkpoints_by_epoch =
475 find_all_dirs_with_epoch_prefix(&db_checkpoint_handler.input_object_store, None)
476 .await?;
477 assert!(!local_checkpoints_by_epoch.is_empty());
478 assert_eq!(*local_checkpoints_by_epoch.first_key_value().unwrap().0, 0);
479 assert_eq!(
480 path_to_filesystem(
481 db_checkpoint_handler.input_root_path.clone(),
482 local_checkpoints_by_epoch.first_key_value().unwrap().1
483 )
484 .unwrap(),
485 std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
486 );
487 let missing_epochs = find_missing_epochs_dirs(
488 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
489 SUCCESS_MARKER,
490 )
491 .await?;
492 db_checkpoint_handler
493 .upload_db_checkpoints_to_object_store(missing_epochs)
494 .await?;
495
496 assert!(remote_epoch0_checkpoint.join("file1").exists());
497 assert!(remote_epoch0_checkpoint.join("file2").exists());
498 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
499 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
500 assert!(
501 local_epoch0_checkpoint
502 .join(UPLOAD_COMPLETED_MARKER)
503 .exists()
504 );
505
506 let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
508 fs::write(test_marker, b"Lorem ipsum")?;
509 db_checkpoint_handler
510 .garbage_collect_old_db_checkpoints()
511 .await?;
512
513 assert!(!local_epoch0_checkpoint.join("file1").exists());
514 assert!(!local_epoch0_checkpoint.join("file1").exists());
515 assert!(!local_epoch0_checkpoint.join("file2").exists());
516 assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
517 Ok(())
518 }
519
520 #[tokio::test]
521 async fn test_upload_resumes() -> anyhow::Result<()> {
522 let checkpoint_dir = TempDir::new()?;
523 let checkpoint_dir_path = checkpoint_dir.path();
524 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
525
526 let remote_checkpoint_dir = TempDir::new()?;
527 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
528 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
529
530 let input_store_config = ObjectStoreConfig {
531 object_store: Some(ObjectStoreType::File),
532 directory: Some(checkpoint_dir_path.to_path_buf()),
533 ..Default::default()
534 };
535 let output_store_config = ObjectStoreConfig {
536 object_store: Some(ObjectStoreType::File),
537 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
538 ..Default::default()
539 };
540 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
541 &input_store_config,
542 Some(&output_store_config),
543 10,
544 false,
545 false,
546 )?;
547
548 fs::create_dir(&local_epoch0_checkpoint)?;
549 let file1 = local_epoch0_checkpoint.join("file1");
550 fs::write(file1, b"Lorem ipsum")?;
551 let file2 = local_epoch0_checkpoint.join("file2");
552 fs::write(file2, b"Lorem ipsum")?;
553 let nested_dir = local_epoch0_checkpoint.join("data");
554 fs::create_dir(&nested_dir)?;
555 let file3 = nested_dir.join("file3");
556 fs::write(file3, b"Lorem ipsum")?;
557
558 let missing_epochs = find_missing_epochs_dirs(
559 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
560 SUCCESS_MARKER,
561 )
562 .await?;
563 db_checkpoint_handler
564 .upload_db_checkpoints_to_object_store(missing_epochs)
565 .await?;
566 assert!(remote_epoch0_checkpoint.join("file1").exists());
567 assert!(remote_epoch0_checkpoint.join("file2").exists());
568 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
569 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
570 assert!(
571 local_epoch0_checkpoint
572 .join(UPLOAD_COMPLETED_MARKER)
573 .exists()
574 );
575
576 let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
578 fs::create_dir(&local_epoch1_checkpoint)?;
579 let file1 = local_epoch1_checkpoint.join("file1");
580 fs::write(file1, b"Lorem ipsum")?;
581 let file2 = local_epoch1_checkpoint.join("file2");
582 fs::write(file2, b"Lorem ipsum")?;
583 let nested_dir = local_epoch1_checkpoint.join("data");
584 fs::create_dir(&nested_dir)?;
585 let file3 = nested_dir.join("file3");
586 fs::write(file3, b"Lorem ipsum")?;
587
588 fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
591
592 let missing_epochs = find_missing_epochs_dirs(
595 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
596 SUCCESS_MARKER,
597 )
598 .await?;
599 db_checkpoint_handler
600 .upload_db_checkpoints_to_object_store(missing_epochs)
601 .await?;
602 assert!(remote_epoch0_checkpoint.join("file1").exists());
603 assert!(remote_epoch0_checkpoint.join("file2").exists());
604 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
605 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
606 assert!(
607 local_epoch0_checkpoint
608 .join(UPLOAD_COMPLETED_MARKER)
609 .exists()
610 );
611
612 let remote_epoch1_checkpoint = remote_checkpoint_dir_path.join("epoch_1");
613 assert!(remote_epoch1_checkpoint.join("file1").exists());
614 assert!(remote_epoch1_checkpoint.join("file2").exists());
615 assert!(remote_epoch1_checkpoint.join("data").join("file3").exists());
616 assert!(remote_epoch1_checkpoint.join(SUCCESS_MARKER).exists());
617 assert!(
618 local_epoch1_checkpoint
619 .join(UPLOAD_COMPLETED_MARKER)
620 .exists()
621 );
622
623 let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
625 fs::write(test_marker, b"Lorem ipsum")?;
626 let test_marker = local_epoch1_checkpoint.join(TEST_MARKER);
627 fs::write(test_marker, b"Lorem ipsum")?;
628
629 db_checkpoint_handler
630 .garbage_collect_old_db_checkpoints()
631 .await?;
632 assert!(!local_epoch0_checkpoint.join("file1").exists());
633 assert!(!local_epoch0_checkpoint.join("file1").exists());
634 assert!(!local_epoch0_checkpoint.join("file2").exists());
635 assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
636 assert!(!local_epoch1_checkpoint.join("file1").exists());
637 assert!(!local_epoch1_checkpoint.join("file1").exists());
638 assert!(!local_epoch1_checkpoint.join("file2").exists());
639 assert!(!local_epoch1_checkpoint.join("data").join("file3").exists());
640 Ok(())
641 }
642
643 #[tokio::test]
644 async fn test_missing_epochs() -> anyhow::Result<()> {
645 let checkpoint_dir = TempDir::new()?;
646 let checkpoint_dir_path = checkpoint_dir.path();
647 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
648 fs::create_dir(&local_epoch0_checkpoint)?;
649 let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
650 fs::create_dir(&local_epoch1_checkpoint)?;
651 let local_epoch3_checkpoint = checkpoint_dir_path.join("epoch_3");
653 fs::create_dir(&local_epoch3_checkpoint)?;
654 let remote_checkpoint_dir = TempDir::new()?;
655 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
656
657 let input_store_config = ObjectStoreConfig {
658 object_store: Some(ObjectStoreType::File),
659 directory: Some(checkpoint_dir_path.to_path_buf()),
660 ..Default::default()
661 };
662
663 let output_store_config = ObjectStoreConfig {
664 object_store: Some(ObjectStoreType::File),
665 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
666 ..Default::default()
667 };
668 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
669 &input_store_config,
670 Some(&output_store_config),
671 10,
672 false,
673 false,
674 )?;
675
676 let missing_epochs = find_missing_epochs_dirs(
677 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
678 SUCCESS_MARKER,
679 )
680 .await?;
681 db_checkpoint_handler
682 .upload_db_checkpoints_to_object_store(missing_epochs)
683 .await?;
684
685 let first_missing_epoch = find_missing_epochs_dirs(
686 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
687 SUCCESS_MARKER,
688 )
689 .await?
690 .first()
691 .cloned()
692 .unwrap();
693 assert_eq!(first_missing_epoch, 2);
694
695 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
696 fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
697
698 let first_missing_epoch = find_missing_epochs_dirs(
699 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
700 SUCCESS_MARKER,
701 )
702 .await?
703 .first()
704 .cloned()
705 .unwrap();
706 assert_eq!(first_missing_epoch, 0);
707
708 Ok(())
709 }
710
711 #[tokio::test]
712 async fn test_range_missing_epochs() -> anyhow::Result<()> {
713 let checkpoint_dir = TempDir::new()?;
714 let checkpoint_dir_path = checkpoint_dir.path();
715 let local_epoch100_checkpoint = checkpoint_dir_path.join("epoch_100");
716 fs::create_dir(&local_epoch100_checkpoint)?;
717 let local_epoch200_checkpoint = checkpoint_dir_path.join("epoch_200");
718 fs::create_dir(&local_epoch200_checkpoint)?;
719 let remote_checkpoint_dir = TempDir::new()?;
720 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
721
722 let input_store_config = ObjectStoreConfig {
723 object_store: Some(ObjectStoreType::File),
724 directory: Some(checkpoint_dir_path.to_path_buf()),
725 ..Default::default()
726 };
727
728 let output_store_config = ObjectStoreConfig {
729 object_store: Some(ObjectStoreType::File),
730 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
731 ..Default::default()
732 };
733 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
734 &input_store_config,
735 Some(&output_store_config),
736 10,
737 false,
738 false,
739 )?;
740
741 let missing_epochs = find_missing_epochs_dirs(
742 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
743 SUCCESS_MARKER,
744 )
745 .await?;
746 assert_eq!(missing_epochs, vec![0]);
747 db_checkpoint_handler
748 .upload_db_checkpoints_to_object_store(missing_epochs)
749 .await?;
750
751 let missing_epochs = find_missing_epochs_dirs(
752 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
753 SUCCESS_MARKER,
754 )
755 .await?;
756 let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
757 expected_missing_epochs.extend((101..200).collect_vec().iter());
758 expected_missing_epochs.push(201);
759 assert_eq!(missing_epochs, expected_missing_epochs);
760 Ok(())
761 }
762}