iota_core/
db_checkpoint_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{fs, num::NonZeroUsize, path::PathBuf, sync::Arc, time::Duration};
6
7use anyhow::Result;
8use bytes::Bytes;
9use futures::future::try_join_all;
10use iota_config::{
11    node::AuthorityStorePruningConfig,
12    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
13};
14use iota_storage::{
15    mutex_table::RwLockTable,
16    object_store::util::{
17        copy_recursively, find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs,
18        path_to_filesystem, put, run_manifest_update_loop, write_snapshot_manifest,
19    },
20};
21use object_store::{DynObjectStore, path::Path};
22use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
23use tracing::{debug, error, info};
24use typed_store::rocks::MetricConf;
25
26use crate::{
27    authority::{
28        authority_store_pruner::{
29            AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
30        },
31        authority_store_tables::AuthorityPerpetualTables,
32    },
33    checkpoints::CheckpointStore,
34    rest_index::RestIndexStore,
35};
36
37pub const SUCCESS_MARKER: &str = "_SUCCESS";
38pub const TEST_MARKER: &str = "_TEST";
39pub const UPLOAD_COMPLETED_MARKER: &str = "_UPLOAD_COMPLETED";
40pub const STATE_SNAPSHOT_COMPLETED_MARKER: &str = "_STATE_SNAPSHOT_COMPLETED";
41
42pub struct DBCheckpointMetrics {
43    pub first_missing_db_checkpoint_epoch: IntGauge,
44    pub num_local_db_checkpoints: IntGauge,
45}
46
47impl DBCheckpointMetrics {
48    pub fn new(registry: &Registry) -> Arc<Self> {
49        let this = Self {
50            first_missing_db_checkpoint_epoch: register_int_gauge_with_registry!(
51                "first_missing_db_checkpoint_epoch",
52                "First epoch for which we have no db checkpoint in remote store",
53                registry
54            )
55            .unwrap(),
56            num_local_db_checkpoints: register_int_gauge_with_registry!(
57                "num_local_db_checkpoints",
58                "Number of RocksDB checkpoints currently residing on local disk (i.e. not yet garbage collected)",
59                registry
60            )
61            .unwrap(),
62        };
63        Arc::new(this)
64    }
65}
66
67pub struct DBCheckpointHandler {
68    /// Directory on local disk where db checkpoints are stored
69    input_object_store: Arc<DynObjectStore>,
70    /// DB checkpoint directory on local filesystem
71    input_root_path: PathBuf,
72    /// Bucket on cloud object store where db checkpoints will be copied
73    output_object_store: Option<Arc<DynObjectStore>>,
74    /// Time interval to check for presence of new db checkpoint (60s)
75    interval: Duration,
76    /// File markers which signal that local db checkpoint can be garbage
77    /// collected
78    gc_markers: Vec<String>,
79    /// Boolean flag to enable/disable object pruning and manual compaction
80    /// before upload
81    prune_and_compact_before_upload: bool,
82    /// Indirect object config for pruner
83    indirect_objects_threshold: usize,
84    /// If true, upload will block on state snapshot upload completed marker
85    state_snapshot_enabled: bool,
86    /// Pruning objects
87    pruning_config: AuthorityStorePruningConfig,
88    metrics: Arc<DBCheckpointMetrics>,
89}
90
91impl DBCheckpointHandler {
92    pub fn new(
93        input_path: &std::path::Path,
94        output_object_store_config: Option<&ObjectStoreConfig>,
95        interval_s: u64,
96        prune_and_compact_before_upload: bool,
97        indirect_objects_threshold: usize,
98        pruning_config: AuthorityStorePruningConfig,
99        registry: &Registry,
100        state_snapshot_enabled: bool,
101    ) -> Result<Arc<Self>> {
102        let input_store_config = ObjectStoreConfig {
103            object_store: Some(ObjectStoreType::File),
104            directory: Some(input_path.to_path_buf()),
105            ..Default::default()
106        };
107        let mut gc_markers = vec![UPLOAD_COMPLETED_MARKER.to_string()];
108        if state_snapshot_enabled {
109            gc_markers.push(STATE_SNAPSHOT_COMPLETED_MARKER.to_string());
110        }
111        Ok(Arc::new(DBCheckpointHandler {
112            input_object_store: input_store_config.make()?,
113            input_root_path: input_path.to_path_buf(),
114            output_object_store: output_object_store_config
115                .map(|config| config.make().expect("Failed to make object store")),
116            interval: Duration::from_secs(interval_s),
117            gc_markers,
118            prune_and_compact_before_upload,
119            indirect_objects_threshold,
120            state_snapshot_enabled,
121            pruning_config,
122            metrics: DBCheckpointMetrics::new(registry),
123        }))
124    }
125    pub fn new_for_test(
126        input_object_store_config: &ObjectStoreConfig,
127        output_object_store_config: Option<&ObjectStoreConfig>,
128        interval_s: u64,
129        prune_and_compact_before_upload: bool,
130        state_snapshot_enabled: bool,
131    ) -> Result<Arc<Self>> {
132        Ok(Arc::new(DBCheckpointHandler {
133            input_object_store: input_object_store_config.make()?,
134            input_root_path: input_object_store_config
135                .directory
136                .as_ref()
137                .unwrap()
138                .clone(),
139            output_object_store: output_object_store_config
140                .map(|config| config.make().expect("Failed to make object store")),
141            interval: Duration::from_secs(interval_s),
142            gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
143            prune_and_compact_before_upload,
144            indirect_objects_threshold: 0,
145            state_snapshot_enabled,
146            pruning_config: AuthorityStorePruningConfig::default(),
147            metrics: DBCheckpointMetrics::new(&Registry::default()),
148        }))
149    }
150
151    /// Starts the db checkpoint uploader and manifest update loops if a remote
152    /// store is specified. If no remote store is specified, it starts a
153    /// loop that adds an UPLOAD_COMPLETED_MARKER to the epoch directory.
154    /// Additionally, a db checkpoint gc loop is started regardless of the
155    /// remote store configuration.
156    pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
157        let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
158        if self.output_object_store.is_some() {
159            tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
160                self.clone(),
161                kill_sender.subscribe(),
162            ));
163            tokio::task::spawn(run_manifest_update_loop(
164                self.output_object_store.as_ref().unwrap().clone(),
165                kill_sender.subscribe(),
166            ));
167        } else {
168            // if db checkpoint remote store is not specified, cleanup loop
169            // is run to immediately mark db checkpoint upload as successful
170            // so that they can be snapshotted and garbage collected
171            tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
172                self.clone(),
173                kill_sender.subscribe(),
174            ));
175        }
176        // Starts db checkpoint gc loop to remove epoch directories
177        // that contains success markers
178        tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
179            self,
180            kill_sender.subscribe(),
181        ));
182        kill_sender
183    }
184
185    /// Main loop that checks for missing db checkpoints and uploads them to
186    /// remote store.
187    async fn run_db_checkpoint_upload_loop(
188        self: Arc<Self>,
189        mut recv: tokio::sync::broadcast::Receiver<()>,
190    ) -> Result<()> {
191        let mut interval = tokio::time::interval(self.interval);
192        info!("DB checkpoint upload loop started");
193        loop {
194            tokio::select! {
195                _now = interval.tick() => {
196                    let local_checkpoints_by_epoch =
197                        find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
198                    self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
199                    match find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
200                        Ok(epochs) => {
201                            self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
202                            if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
203                                error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
204                            }
205                        }
206                        Err(err) => {
207                            error!("Failed to find missing db checkpoints in remote store: {:?}", err);
208                        }
209                    }
210                },
211                 _ = recv.recv() => break,
212            }
213        }
214        Ok(())
215    }
216
217    /// Main loop that adds UPLOAD_COMPLETED_MARKER to epoch directories in
218    /// local store.
219    async fn run_db_checkpoint_cleanup_loop(
220        self: Arc<Self>,
221        mut recv: tokio::sync::broadcast::Receiver<()>,
222    ) -> Result<()> {
223        let mut interval = tokio::time::interval(self.interval);
224        info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
225        loop {
226            tokio::select! {
227                _now = interval.tick() => {
228                    let local_checkpoints_by_epoch =
229                        find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
230                    self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
231                    let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
232                    dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
233                    for (_, db_path) in dirs {
234                        // If db checkpoint marked as completed, skip
235                        let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
236                        let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
237                        if upload_completed_path.exists() {
238                            continue;
239                        }
240                        let bytes = Bytes::from_static(b"success");
241                        let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
242                        put(&self.input_object_store,
243                            &upload_completed_marker,
244                            bytes.clone(),
245                        )
246                        .await?;
247                    }
248                },
249                 _ = recv.recv() => break,
250            }
251        }
252        Ok(())
253    }
254
255    /// Main loop that checks for epoch directory with UPLOAD_COMPLETED_MARKER
256    /// marker and deletes them every 30 seconds.
257    async fn run_db_checkpoint_gc_loop(
258        self: Arc<Self>,
259        mut recv: tokio::sync::broadcast::Receiver<()>,
260    ) -> Result<()> {
261        let mut gc_interval = tokio::time::interval(Duration::from_secs(30));
262        info!("DB checkpoint garbage collection loop started");
263        loop {
264            tokio::select! {
265                _now = gc_interval.tick() => {
266                    if let Ok(deleted) = self.garbage_collect_old_db_checkpoints().await {
267                        if !deleted.is_empty() {
268                            info!("Garbage collected local db checkpoints: {:?}", deleted);
269                        }
270                    }
271                },
272                 _ = recv.recv() => break,
273            }
274        }
275        Ok(())
276    }
277
278    async fn prune_and_compact(
279        &self,
280        db_path: PathBuf,
281        epoch: u64,
282        epoch_duration_ms: u64,
283    ) -> Result<()> {
284        let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&db_path.join("store"), None));
285        let checkpoint_store = Arc::new(CheckpointStore::open_tables_read_write(
286            db_path.join("checkpoints"),
287            MetricConf::new("db_checkpoint"),
288            None,
289            None,
290        ));
291        let rest_index = RestIndexStore::new_without_init(db_path.join("rest_index"));
292        let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
293        let lock_table = Arc::new(RwLockTable::new(1));
294        info!(
295            "Pruning db checkpoint in {:?} for epoch: {epoch}",
296            db_path.display()
297        );
298        AuthorityStorePruner::prune_objects_for_eligible_epochs(
299            &perpetual_db,
300            &checkpoint_store,
301            Some(&rest_index),
302            &lock_table,
303            self.pruning_config.clone(),
304            metrics,
305            self.indirect_objects_threshold,
306            epoch_duration_ms,
307        )
308        .await?;
309        info!(
310            "Compacting db checkpoint in {:?} for epoch: {epoch}",
311            db_path.display()
312        );
313        AuthorityStorePruner::compact(&perpetual_db)?;
314        Ok(())
315    }
316
317    /// Checks for missing db checkpoints in remote store and uploads them from
318    /// the local store. And writes a MANIFEST file which contains a list of all
319    /// files that under the db checkpoint directory.
320    async fn upload_db_checkpoints_to_object_store(
321        &self,
322        missing_epochs: Vec<u64>,
323    ) -> Result<(), anyhow::Error> {
324        let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
325        let local_checkpoints_by_epoch =
326            find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
327        let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
328        dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
329        let object_store = self
330            .output_object_store
331            .as_ref()
332            .expect("Expected object store to exist")
333            .clone();
334        for (epoch, db_path) in dirs {
335            // Convert `db_path` to the local filesystem path to where db checkpoint is
336            // stored
337            let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
338            if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
339                if self.state_snapshot_enabled {
340                    let snapshot_completed_marker =
341                        local_db_path.join(STATE_SNAPSHOT_COMPLETED_MARKER);
342                    if !snapshot_completed_marker.exists() {
343                        info!(
344                            "DB checkpoint upload for epoch {} to wait until state snasphot uploaded",
345                            *epoch
346                        );
347                        continue;
348                    }
349                }
350
351                if self.prune_and_compact_before_upload {
352                    // Invoke pruning and compaction on the db checkpoint
353                    self.prune_and_compact(local_db_path, *epoch, EPOCH_DURATION_MS_FOR_TESTING)
354                        .await?;
355                }
356
357                info!("Copying db checkpoint for epoch: {epoch} to remote storage");
358                copy_recursively(
359                    db_path,
360                    &self.input_object_store,
361                    &object_store,
362                    NonZeroUsize::new(20).unwrap(),
363                )
364                .await?;
365
366                // This writes a single "MANIFEST" file which contains a list of all files that
367                // make up a db snapshot
368                write_snapshot_manifest(db_path, &object_store, format!("epoch_{}/", epoch))
369                    .await?;
370                // Drop marker in the output directory that upload completed successfully
371                let bytes = Bytes::from_static(b"success");
372                let success_marker = db_path.child(SUCCESS_MARKER);
373                put(&object_store, &success_marker, bytes.clone()).await?;
374            }
375            let bytes = Bytes::from_static(b"success");
376            let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
377            put(
378                &self.input_object_store,
379                &upload_completed_marker,
380                bytes.clone(),
381            )
382            .await?;
383        }
384        Ok(())
385    }
386
387    /// Deletes old db checkpoints in the local store by checking for the
388    /// presence of all success markers in the directory.
389    async fn garbage_collect_old_db_checkpoints(&self) -> Result<Vec<u64>> {
390        let local_checkpoints_by_epoch =
391            find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
392        let mut deleted = Vec::new();
393        for (epoch, path) in local_checkpoints_by_epoch.iter() {
394            let marker_paths: Vec<Path> = self
395                .gc_markers
396                .iter()
397                .map(|marker| path.child(marker.clone()))
398                .collect();
399            // Check if all markers are present in the epoch directory
400            let all_markers_present = try_join_all(
401                marker_paths
402                    .iter()
403                    .map(|path| self.input_object_store.get(path)),
404            )
405            .await;
406            match all_markers_present {
407                // After state snapshots, gc will also need to wait for a state snapshot
408                // upload completed marker
409                Ok(_) => {
410                    info!("Deleting db checkpoint dir: {path} for epoch: {epoch}");
411                    deleted.push(*epoch);
412                    let local_fs_path = path_to_filesystem(self.input_root_path.clone(), path)?;
413                    fs::remove_dir_all(&local_fs_path)?;
414                }
415                Err(_) => {
416                    debug!("Not ready for deletion yet: {path}");
417                }
418            }
419        }
420        Ok(deleted)
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use std::fs;
427
428    use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
429    use iota_storage::object_store::util::{
430        find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, path_to_filesystem,
431    };
432    use itertools::Itertools;
433    use tempfile::TempDir;
434
435    use crate::db_checkpoint_handler::{
436        DBCheckpointHandler, SUCCESS_MARKER, TEST_MARKER, UPLOAD_COMPLETED_MARKER,
437    };
438
439    #[tokio::test]
440    async fn test_basic() -> anyhow::Result<()> {
441        let checkpoint_dir = TempDir::new()?;
442        let checkpoint_dir_path = checkpoint_dir.path();
443        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
444        fs::create_dir(&local_epoch0_checkpoint)?;
445        let file1 = local_epoch0_checkpoint.join("file1");
446        fs::write(file1, b"Lorem ipsum")?;
447        let file2 = local_epoch0_checkpoint.join("file2");
448        fs::write(file2, b"Lorem ipsum")?;
449        let nested_dir = local_epoch0_checkpoint.join("data");
450        fs::create_dir(&nested_dir)?;
451        let file3 = nested_dir.join("file3");
452        fs::write(file3, b"Lorem ipsum")?;
453
454        let remote_checkpoint_dir = TempDir::new()?;
455        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
456        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
457
458        let input_store_config = ObjectStoreConfig {
459            object_store: Some(ObjectStoreType::File),
460            directory: Some(checkpoint_dir_path.to_path_buf()),
461            ..Default::default()
462        };
463        let output_store_config = ObjectStoreConfig {
464            object_store: Some(ObjectStoreType::File),
465            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
466            ..Default::default()
467        };
468        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
469            &input_store_config,
470            Some(&output_store_config),
471            10,
472            false,
473            false,
474        )?;
475        let local_checkpoints_by_epoch =
476            find_all_dirs_with_epoch_prefix(&db_checkpoint_handler.input_object_store, None)
477                .await?;
478        assert!(!local_checkpoints_by_epoch.is_empty());
479        assert_eq!(*local_checkpoints_by_epoch.first_key_value().unwrap().0, 0);
480        assert_eq!(
481            path_to_filesystem(
482                db_checkpoint_handler.input_root_path.clone(),
483                local_checkpoints_by_epoch.first_key_value().unwrap().1
484            )
485            .unwrap(),
486            std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
487        );
488        let missing_epochs = find_missing_epochs_dirs(
489            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
490            SUCCESS_MARKER,
491        )
492        .await?;
493        db_checkpoint_handler
494            .upload_db_checkpoints_to_object_store(missing_epochs)
495            .await?;
496
497        assert!(remote_epoch0_checkpoint.join("file1").exists());
498        assert!(remote_epoch0_checkpoint.join("file2").exists());
499        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
500        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
501        assert!(
502            local_epoch0_checkpoint
503                .join(UPLOAD_COMPLETED_MARKER)
504                .exists()
505        );
506
507        // Drop an extra gc marker meant only for gc to trigger
508        let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
509        fs::write(test_marker, b"Lorem ipsum")?;
510        db_checkpoint_handler
511            .garbage_collect_old_db_checkpoints()
512            .await?;
513
514        assert!(!local_epoch0_checkpoint.join("file1").exists());
515        assert!(!local_epoch0_checkpoint.join("file1").exists());
516        assert!(!local_epoch0_checkpoint.join("file2").exists());
517        assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
518        Ok(())
519    }
520
521    #[tokio::test]
522    async fn test_upload_resumes() -> anyhow::Result<()> {
523        let checkpoint_dir = TempDir::new()?;
524        let checkpoint_dir_path = checkpoint_dir.path();
525        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
526
527        let remote_checkpoint_dir = TempDir::new()?;
528        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
529        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
530
531        let input_store_config = ObjectStoreConfig {
532            object_store: Some(ObjectStoreType::File),
533            directory: Some(checkpoint_dir_path.to_path_buf()),
534            ..Default::default()
535        };
536        let output_store_config = ObjectStoreConfig {
537            object_store: Some(ObjectStoreType::File),
538            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
539            ..Default::default()
540        };
541        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
542            &input_store_config,
543            Some(&output_store_config),
544            10,
545            false,
546            false,
547        )?;
548
549        fs::create_dir(&local_epoch0_checkpoint)?;
550        let file1 = local_epoch0_checkpoint.join("file1");
551        fs::write(file1, b"Lorem ipsum")?;
552        let file2 = local_epoch0_checkpoint.join("file2");
553        fs::write(file2, b"Lorem ipsum")?;
554        let nested_dir = local_epoch0_checkpoint.join("data");
555        fs::create_dir(&nested_dir)?;
556        let file3 = nested_dir.join("file3");
557        fs::write(file3, b"Lorem ipsum")?;
558
559        let missing_epochs = find_missing_epochs_dirs(
560            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
561            SUCCESS_MARKER,
562        )
563        .await?;
564        db_checkpoint_handler
565            .upload_db_checkpoints_to_object_store(missing_epochs)
566            .await?;
567        assert!(remote_epoch0_checkpoint.join("file1").exists());
568        assert!(remote_epoch0_checkpoint.join("file2").exists());
569        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
570        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
571        assert!(
572            local_epoch0_checkpoint
573                .join(UPLOAD_COMPLETED_MARKER)
574                .exists()
575        );
576
577        // Add a new db checkpoint to the local checkpoint directory
578        let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
579        fs::create_dir(&local_epoch1_checkpoint)?;
580        let file1 = local_epoch1_checkpoint.join("file1");
581        fs::write(file1, b"Lorem ipsum")?;
582        let file2 = local_epoch1_checkpoint.join("file2");
583        fs::write(file2, b"Lorem ipsum")?;
584        let nested_dir = local_epoch1_checkpoint.join("data");
585        fs::create_dir(&nested_dir)?;
586        let file3 = nested_dir.join("file3");
587        fs::write(file3, b"Lorem ipsum")?;
588
589        // Now delete the success marker from remote checkpointed directory
590        // This is the scenario where uploads stops mid way because system stopped
591        fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
592
593        // Checkpoint handler should copy checkpoint for epoch_0 first before copying
594        // epoch_1
595        let missing_epochs = find_missing_epochs_dirs(
596            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
597            SUCCESS_MARKER,
598        )
599        .await?;
600        db_checkpoint_handler
601            .upload_db_checkpoints_to_object_store(missing_epochs)
602            .await?;
603        assert!(remote_epoch0_checkpoint.join("file1").exists());
604        assert!(remote_epoch0_checkpoint.join("file2").exists());
605        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
606        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
607        assert!(
608            local_epoch0_checkpoint
609                .join(UPLOAD_COMPLETED_MARKER)
610                .exists()
611        );
612
613        let remote_epoch1_checkpoint = remote_checkpoint_dir_path.join("epoch_1");
614        assert!(remote_epoch1_checkpoint.join("file1").exists());
615        assert!(remote_epoch1_checkpoint.join("file2").exists());
616        assert!(remote_epoch1_checkpoint.join("data").join("file3").exists());
617        assert!(remote_epoch1_checkpoint.join(SUCCESS_MARKER).exists());
618        assert!(
619            local_epoch1_checkpoint
620                .join(UPLOAD_COMPLETED_MARKER)
621                .exists()
622        );
623
624        // Drop an extra gc marker meant only for gc to trigger
625        let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
626        fs::write(test_marker, b"Lorem ipsum")?;
627        let test_marker = local_epoch1_checkpoint.join(TEST_MARKER);
628        fs::write(test_marker, b"Lorem ipsum")?;
629
630        db_checkpoint_handler
631            .garbage_collect_old_db_checkpoints()
632            .await?;
633        assert!(!local_epoch0_checkpoint.join("file1").exists());
634        assert!(!local_epoch0_checkpoint.join("file1").exists());
635        assert!(!local_epoch0_checkpoint.join("file2").exists());
636        assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
637        assert!(!local_epoch1_checkpoint.join("file1").exists());
638        assert!(!local_epoch1_checkpoint.join("file1").exists());
639        assert!(!local_epoch1_checkpoint.join("file2").exists());
640        assert!(!local_epoch1_checkpoint.join("data").join("file3").exists());
641        Ok(())
642    }
643
644    #[tokio::test]
645    async fn test_missing_epochs() -> anyhow::Result<()> {
646        let checkpoint_dir = TempDir::new()?;
647        let checkpoint_dir_path = checkpoint_dir.path();
648        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
649        fs::create_dir(&local_epoch0_checkpoint)?;
650        let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
651        fs::create_dir(&local_epoch1_checkpoint)?;
652        // Missing epoch 2
653        let local_epoch3_checkpoint = checkpoint_dir_path.join("epoch_3");
654        fs::create_dir(&local_epoch3_checkpoint)?;
655        let remote_checkpoint_dir = TempDir::new()?;
656        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
657
658        let input_store_config = ObjectStoreConfig {
659            object_store: Some(ObjectStoreType::File),
660            directory: Some(checkpoint_dir_path.to_path_buf()),
661            ..Default::default()
662        };
663
664        let output_store_config = ObjectStoreConfig {
665            object_store: Some(ObjectStoreType::File),
666            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
667            ..Default::default()
668        };
669        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
670            &input_store_config,
671            Some(&output_store_config),
672            10,
673            false,
674            false,
675        )?;
676
677        let missing_epochs = find_missing_epochs_dirs(
678            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
679            SUCCESS_MARKER,
680        )
681        .await?;
682        db_checkpoint_handler
683            .upload_db_checkpoints_to_object_store(missing_epochs)
684            .await?;
685
686        let first_missing_epoch = find_missing_epochs_dirs(
687            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
688            SUCCESS_MARKER,
689        )
690        .await?
691        .first()
692        .cloned()
693        .unwrap();
694        assert_eq!(first_missing_epoch, 2);
695
696        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
697        fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
698
699        let first_missing_epoch = find_missing_epochs_dirs(
700            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
701            SUCCESS_MARKER,
702        )
703        .await?
704        .first()
705        .cloned()
706        .unwrap();
707        assert_eq!(first_missing_epoch, 0);
708
709        Ok(())
710    }
711
712    #[tokio::test]
713    async fn test_range_missing_epochs() -> anyhow::Result<()> {
714        let checkpoint_dir = TempDir::new()?;
715        let checkpoint_dir_path = checkpoint_dir.path();
716        let local_epoch100_checkpoint = checkpoint_dir_path.join("epoch_100");
717        fs::create_dir(&local_epoch100_checkpoint)?;
718        let local_epoch200_checkpoint = checkpoint_dir_path.join("epoch_200");
719        fs::create_dir(&local_epoch200_checkpoint)?;
720        let remote_checkpoint_dir = TempDir::new()?;
721        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
722
723        let input_store_config = ObjectStoreConfig {
724            object_store: Some(ObjectStoreType::File),
725            directory: Some(checkpoint_dir_path.to_path_buf()),
726            ..Default::default()
727        };
728
729        let output_store_config = ObjectStoreConfig {
730            object_store: Some(ObjectStoreType::File),
731            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
732            ..Default::default()
733        };
734        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
735            &input_store_config,
736            Some(&output_store_config),
737            10,
738            false,
739            false,
740        )?;
741
742        let missing_epochs = find_missing_epochs_dirs(
743            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
744            SUCCESS_MARKER,
745        )
746        .await?;
747        assert_eq!(missing_epochs, vec![0]);
748        db_checkpoint_handler
749            .upload_db_checkpoints_to_object_store(missing_epochs)
750            .await?;
751
752        let missing_epochs = find_missing_epochs_dirs(
753            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
754            SUCCESS_MARKER,
755        )
756        .await?;
757        let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
758        expected_missing_epochs.extend((101..200).collect_vec().iter());
759        expected_missing_epochs.push(201);
760        assert_eq!(missing_epochs, expected_missing_epochs);
761        Ok(())
762    }
763}