iota_core/
db_checkpoint_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{fs, num::NonZeroUsize, path::PathBuf, sync::Arc, time::Duration};
6
7use anyhow::Result;
8use bytes::Bytes;
9use futures::future::try_join_all;
10use iota_config::{
11    node::AuthorityStorePruningConfig,
12    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
13};
14use iota_storage::{
15    mutex_table::RwLockTable,
16    object_store::util::{
17        copy_recursively, find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs,
18        path_to_filesystem, put, run_manifest_update_loop, write_snapshot_manifest,
19    },
20};
21use object_store::{DynObjectStore, path::Path};
22use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
23use tracing::{debug, error, info};
24use typed_store::rocks::MetricConf;
25
26use crate::{
27    authority::{
28        authority_store_pruner::{
29            AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
30        },
31        authority_store_tables::AuthorityPerpetualTables,
32    },
33    checkpoints::CheckpointStore,
34    rest_index::RestIndexStore,
35};
36
37pub const SUCCESS_MARKER: &str = "_SUCCESS";
38pub const TEST_MARKER: &str = "_TEST";
39pub const UPLOAD_COMPLETED_MARKER: &str = "_UPLOAD_COMPLETED";
40pub const STATE_SNAPSHOT_COMPLETED_MARKER: &str = "_STATE_SNAPSHOT_COMPLETED";
41
42pub struct DBCheckpointMetrics {
43    pub first_missing_db_checkpoint_epoch: IntGauge,
44    pub num_local_db_checkpoints: IntGauge,
45}
46
47impl DBCheckpointMetrics {
48    pub fn new(registry: &Registry) -> Arc<Self> {
49        let this = Self {
50            first_missing_db_checkpoint_epoch: register_int_gauge_with_registry!(
51                "first_missing_db_checkpoint_epoch",
52                "First epoch for which we have no db checkpoint in remote store",
53                registry
54            )
55            .unwrap(),
56            num_local_db_checkpoints: register_int_gauge_with_registry!(
57                "num_local_db_checkpoints",
58                "Number of RocksDB checkpoints currently residing on local disk (i.e. not yet garbage collected)",
59                registry
60            )
61            .unwrap(),
62        };
63        Arc::new(this)
64    }
65}
66
67pub struct DBCheckpointHandler {
68    /// Directory on local disk where db checkpoints are stored
69    input_object_store: Arc<DynObjectStore>,
70    /// DB checkpoint directory on local filesystem
71    input_root_path: PathBuf,
72    /// Bucket on cloud object store where db checkpoints will be copied
73    output_object_store: Option<Arc<DynObjectStore>>,
74    /// Time interval to check for presence of new db checkpoint (60s)
75    interval: Duration,
76    /// File markers which signal that local db checkpoint can be garbage
77    /// collected
78    gc_markers: Vec<String>,
79    /// Boolean flag to enable/disable object pruning and manual compaction
80    /// before upload
81    prune_and_compact_before_upload: bool,
82    /// Indirect object config for pruner
83    indirect_objects_threshold: usize,
84    /// If true, upload will block on state snapshot upload completed marker
85    state_snapshot_enabled: bool,
86    /// Pruning objects
87    pruning_config: AuthorityStorePruningConfig,
88    metrics: Arc<DBCheckpointMetrics>,
89}
90
91impl DBCheckpointHandler {
92    pub fn new(
93        input_path: &std::path::Path,
94        output_object_store_config: Option<&ObjectStoreConfig>,
95        interval_s: u64,
96        prune_and_compact_before_upload: bool,
97        indirect_objects_threshold: usize,
98        pruning_config: AuthorityStorePruningConfig,
99        registry: &Registry,
100        state_snapshot_enabled: bool,
101    ) -> Result<Arc<Self>> {
102        let input_store_config = ObjectStoreConfig {
103            object_store: Some(ObjectStoreType::File),
104            directory: Some(input_path.to_path_buf()),
105            ..Default::default()
106        };
107        let mut gc_markers = vec![UPLOAD_COMPLETED_MARKER.to_string()];
108        if state_snapshot_enabled {
109            gc_markers.push(STATE_SNAPSHOT_COMPLETED_MARKER.to_string());
110        }
111        Ok(Arc::new(DBCheckpointHandler {
112            input_object_store: input_store_config.make()?,
113            input_root_path: input_path.to_path_buf(),
114            output_object_store: output_object_store_config
115                .map(|config| config.make().expect("Failed to make object store")),
116            interval: Duration::from_secs(interval_s),
117            gc_markers,
118            prune_and_compact_before_upload,
119            indirect_objects_threshold,
120            state_snapshot_enabled,
121            pruning_config,
122            metrics: DBCheckpointMetrics::new(registry),
123        }))
124    }
125    pub fn new_for_test(
126        input_object_store_config: &ObjectStoreConfig,
127        output_object_store_config: Option<&ObjectStoreConfig>,
128        interval_s: u64,
129        prune_and_compact_before_upload: bool,
130        state_snapshot_enabled: bool,
131    ) -> Result<Arc<Self>> {
132        Ok(Arc::new(DBCheckpointHandler {
133            input_object_store: input_object_store_config.make()?,
134            input_root_path: input_object_store_config
135                .directory
136                .as_ref()
137                .unwrap()
138                .clone(),
139            output_object_store: output_object_store_config
140                .map(|config| config.make().expect("Failed to make object store")),
141            interval: Duration::from_secs(interval_s),
142            gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
143            prune_and_compact_before_upload,
144            indirect_objects_threshold: 0,
145            state_snapshot_enabled,
146            pruning_config: AuthorityStorePruningConfig::default(),
147            metrics: DBCheckpointMetrics::new(&Registry::default()),
148        }))
149    }
150
151    /// Starts the db checkpoint uploader and manifest update loops if a remote
152    /// store is specified. If no remote store is specified, it starts a
153    /// loop that adds an UPLOAD_COMPLETED_MARKER to the epoch directory.
154    /// Additionally, a db checkpoint gc loop is started regardless of the
155    /// remote store configuration.
156    pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
157        let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
158        if self.output_object_store.is_some() {
159            tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
160                self.clone(),
161                kill_sender.subscribe(),
162            ));
163            tokio::task::spawn(run_manifest_update_loop(
164                self.output_object_store.as_ref().unwrap().clone(),
165                kill_sender.subscribe(),
166            ));
167        } else {
168            // if db checkpoint remote store is not specified, cleanup loop
169            // is run to immediately mark db checkpoint upload as successful
170            // so that they can be snapshotted and garbage collected
171            tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
172                self.clone(),
173                kill_sender.subscribe(),
174            ));
175        }
176        // Starts db checkpoint gc loop to remove epoch directories
177        // that contains success markers
178        tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
179            self,
180            kill_sender.subscribe(),
181        ));
182        kill_sender
183    }
184
185    /// Main loop that checks for missing db checkpoints and uploads them to
186    /// remote store.
187    async fn run_db_checkpoint_upload_loop(
188        self: Arc<Self>,
189        mut recv: tokio::sync::broadcast::Receiver<()>,
190    ) -> Result<()> {
191        let mut interval = tokio::time::interval(self.interval);
192        info!("DB checkpoint upload loop started");
193        loop {
194            tokio::select! {
195                _now = interval.tick() => {
196                    let local_checkpoints_by_epoch =
197                        find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
198                    self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
199                    match find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
200                        Ok(epochs) => {
201                            self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
202                            if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
203                                error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
204                            }
205                        }
206                        Err(err) => {
207                            error!("Failed to find missing db checkpoints in remote store: {:?}", err);
208                        }
209                    }
210                },
211                 _ = recv.recv() => break,
212            }
213        }
214        Ok(())
215    }
216
217    /// Main loop that adds UPLOAD_COMPLETED_MARKER to epoch directories in
218    /// local store.
219    async fn run_db_checkpoint_cleanup_loop(
220        self: Arc<Self>,
221        mut recv: tokio::sync::broadcast::Receiver<()>,
222    ) -> Result<()> {
223        let mut interval = tokio::time::interval(self.interval);
224        info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
225        loop {
226            tokio::select! {
227                _now = interval.tick() => {
228                    let local_checkpoints_by_epoch =
229                        find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
230                    self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
231                    let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
232                    dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
233                    for (_, db_path) in dirs {
234                        // If db checkpoint marked as completed, skip
235                        let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
236                        let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
237                        if upload_completed_path.exists() {
238                            continue;
239                        }
240                        let bytes = Bytes::from_static(b"success");
241                        let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
242                        put(&self.input_object_store,
243                            &upload_completed_marker,
244                            bytes.clone(),
245                        )
246                        .await?;
247                    }
248                },
249                 _ = recv.recv() => break,
250            }
251        }
252        Ok(())
253    }
254
255    /// Main loop that checks for epoch directory with UPLOAD_COMPLETED_MARKER
256    /// marker and deletes them every 30 seconds.
257    async fn run_db_checkpoint_gc_loop(
258        self: Arc<Self>,
259        mut recv: tokio::sync::broadcast::Receiver<()>,
260    ) -> Result<()> {
261        let mut gc_interval = tokio::time::interval(Duration::from_secs(30));
262        info!("DB checkpoint garbage collection loop started");
263        loop {
264            tokio::select! {
265                _now = gc_interval.tick() => {
266                    if let Ok(deleted) = self.garbage_collect_old_db_checkpoints().await {
267                        if !deleted.is_empty() {
268                            info!("Garbage collected local db checkpoints: {:?}", deleted);
269                        }
270                    }
271                },
272                 _ = recv.recv() => break,
273            }
274        }
275        Ok(())
276    }
277
278    async fn prune_and_compact(
279        &self,
280        db_path: PathBuf,
281        epoch: u64,
282        epoch_duration_ms: u64,
283    ) -> Result<()> {
284        let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&db_path.join("store"), None));
285        let checkpoint_store = Arc::new(CheckpointStore::open_tables_read_write(
286            db_path.join("checkpoints"),
287            MetricConf::new("db_checkpoint"),
288            None,
289            None,
290        ));
291        let rest_index = RestIndexStore::new_without_init(db_path.join("rest_index"));
292        let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
293        let lock_table = Arc::new(RwLockTable::new(1));
294        info!(
295            "Pruning db checkpoint in {:?} for epoch: {epoch}",
296            db_path.display()
297        );
298        AuthorityStorePruner::prune_objects_for_eligible_epochs(
299            &perpetual_db,
300            &checkpoint_store,
301            Some(&rest_index),
302            &lock_table,
303            self.pruning_config.clone(),
304            metrics,
305            self.indirect_objects_threshold,
306            epoch_duration_ms,
307        )
308        .await?;
309        info!(
310            "Compacting db checkpoint in {:?} for epoch: {epoch}",
311            db_path.display()
312        );
313        AuthorityStorePruner::compact(&perpetual_db)?;
314        Ok(())
315    }
316
317    /// Checks for missing db checkpoints in remote store and uploads them from
318    /// the local store. And writes a MANIFEST file which contains a list of all
319    /// files that under the db checkpoint directory.
320    async fn upload_db_checkpoints_to_object_store(
321        &self,
322        missing_epochs: Vec<u64>,
323    ) -> Result<(), anyhow::Error> {
324        let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
325        let local_checkpoints_by_epoch =
326            find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
327        let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
328        dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
329        let object_store = self
330            .output_object_store
331            .as_ref()
332            .expect("Expected object store to exist")
333            .clone();
334        for (epoch, db_path) in dirs {
335            // Convert `db_path` to the local filesystem path to where db checkpoint is
336            // stored
337            let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
338            if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
339                if self.state_snapshot_enabled {
340                    let snapshot_completed_marker =
341                        local_db_path.join(STATE_SNAPSHOT_COMPLETED_MARKER);
342                    if !snapshot_completed_marker.exists() {
343                        info!(
344                            "DB checkpoint upload for epoch {} to wait until state snasphot uploaded",
345                            *epoch
346                        );
347                        continue;
348                    }
349                }
350
351                if self.prune_and_compact_before_upload {
352                    // Invoke pruning and compaction on the db checkpoint
353                    self.prune_and_compact(local_db_path, *epoch, EPOCH_DURATION_MS_FOR_TESTING)
354                        .await?;
355                }
356
357                info!("Copying db checkpoint for epoch: {epoch} to remote storage");
358                copy_recursively(
359                    db_path,
360                    &self.input_object_store,
361                    &object_store,
362                    NonZeroUsize::new(20).unwrap(),
363                )
364                .await?;
365
366                // This writes a single "MANIFEST" file which contains a list of all files that
367                // make up a db snapshot
368                write_snapshot_manifest(db_path, &object_store, format!("epoch_{epoch}/")).await?;
369                // Drop marker in the output directory that upload completed successfully
370                let bytes = Bytes::from_static(b"success");
371                let success_marker = db_path.child(SUCCESS_MARKER);
372                put(&object_store, &success_marker, bytes.clone()).await?;
373            }
374            let bytes = Bytes::from_static(b"success");
375            let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
376            put(
377                &self.input_object_store,
378                &upload_completed_marker,
379                bytes.clone(),
380            )
381            .await?;
382        }
383        Ok(())
384    }
385
386    /// Deletes old db checkpoints in the local store by checking for the
387    /// presence of all success markers in the directory.
388    async fn garbage_collect_old_db_checkpoints(&self) -> Result<Vec<u64>> {
389        let local_checkpoints_by_epoch =
390            find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
391        let mut deleted = Vec::new();
392        for (epoch, path) in local_checkpoints_by_epoch.iter() {
393            let marker_paths: Vec<Path> = self
394                .gc_markers
395                .iter()
396                .map(|marker| path.child(marker.clone()))
397                .collect();
398            // Check if all markers are present in the epoch directory
399            let all_markers_present = try_join_all(
400                marker_paths
401                    .iter()
402                    .map(|path| self.input_object_store.get(path)),
403            )
404            .await;
405            match all_markers_present {
406                // After state snapshots, gc will also need to wait for a state snapshot
407                // upload completed marker
408                Ok(_) => {
409                    info!("Deleting db checkpoint dir: {path} for epoch: {epoch}");
410                    deleted.push(*epoch);
411                    let local_fs_path = path_to_filesystem(self.input_root_path.clone(), path)?;
412                    fs::remove_dir_all(&local_fs_path)?;
413                }
414                Err(_) => {
415                    debug!("Not ready for deletion yet: {path}");
416                }
417            }
418        }
419        Ok(deleted)
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use std::fs;
426
427    use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
428    use iota_storage::object_store::util::{
429        find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, path_to_filesystem,
430    };
431    use itertools::Itertools;
432    use tempfile::TempDir;
433
434    use crate::db_checkpoint_handler::{
435        DBCheckpointHandler, SUCCESS_MARKER, TEST_MARKER, UPLOAD_COMPLETED_MARKER,
436    };
437
438    #[tokio::test]
439    async fn test_basic() -> anyhow::Result<()> {
440        let checkpoint_dir = TempDir::new()?;
441        let checkpoint_dir_path = checkpoint_dir.path();
442        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
443        fs::create_dir(&local_epoch0_checkpoint)?;
444        let file1 = local_epoch0_checkpoint.join("file1");
445        fs::write(file1, b"Lorem ipsum")?;
446        let file2 = local_epoch0_checkpoint.join("file2");
447        fs::write(file2, b"Lorem ipsum")?;
448        let nested_dir = local_epoch0_checkpoint.join("data");
449        fs::create_dir(&nested_dir)?;
450        let file3 = nested_dir.join("file3");
451        fs::write(file3, b"Lorem ipsum")?;
452
453        let remote_checkpoint_dir = TempDir::new()?;
454        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
455        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
456
457        let input_store_config = ObjectStoreConfig {
458            object_store: Some(ObjectStoreType::File),
459            directory: Some(checkpoint_dir_path.to_path_buf()),
460            ..Default::default()
461        };
462        let output_store_config = ObjectStoreConfig {
463            object_store: Some(ObjectStoreType::File),
464            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
465            ..Default::default()
466        };
467        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
468            &input_store_config,
469            Some(&output_store_config),
470            10,
471            false,
472            false,
473        )?;
474        let local_checkpoints_by_epoch =
475            find_all_dirs_with_epoch_prefix(&db_checkpoint_handler.input_object_store, None)
476                .await?;
477        assert!(!local_checkpoints_by_epoch.is_empty());
478        assert_eq!(*local_checkpoints_by_epoch.first_key_value().unwrap().0, 0);
479        assert_eq!(
480            path_to_filesystem(
481                db_checkpoint_handler.input_root_path.clone(),
482                local_checkpoints_by_epoch.first_key_value().unwrap().1
483            )
484            .unwrap(),
485            std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
486        );
487        let missing_epochs = find_missing_epochs_dirs(
488            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
489            SUCCESS_MARKER,
490        )
491        .await?;
492        db_checkpoint_handler
493            .upload_db_checkpoints_to_object_store(missing_epochs)
494            .await?;
495
496        assert!(remote_epoch0_checkpoint.join("file1").exists());
497        assert!(remote_epoch0_checkpoint.join("file2").exists());
498        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
499        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
500        assert!(
501            local_epoch0_checkpoint
502                .join(UPLOAD_COMPLETED_MARKER)
503                .exists()
504        );
505
506        // Drop an extra gc marker meant only for gc to trigger
507        let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
508        fs::write(test_marker, b"Lorem ipsum")?;
509        db_checkpoint_handler
510            .garbage_collect_old_db_checkpoints()
511            .await?;
512
513        assert!(!local_epoch0_checkpoint.join("file1").exists());
514        assert!(!local_epoch0_checkpoint.join("file1").exists());
515        assert!(!local_epoch0_checkpoint.join("file2").exists());
516        assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
517        Ok(())
518    }
519
520    #[tokio::test]
521    async fn test_upload_resumes() -> anyhow::Result<()> {
522        let checkpoint_dir = TempDir::new()?;
523        let checkpoint_dir_path = checkpoint_dir.path();
524        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
525
526        let remote_checkpoint_dir = TempDir::new()?;
527        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
528        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
529
530        let input_store_config = ObjectStoreConfig {
531            object_store: Some(ObjectStoreType::File),
532            directory: Some(checkpoint_dir_path.to_path_buf()),
533            ..Default::default()
534        };
535        let output_store_config = ObjectStoreConfig {
536            object_store: Some(ObjectStoreType::File),
537            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
538            ..Default::default()
539        };
540        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
541            &input_store_config,
542            Some(&output_store_config),
543            10,
544            false,
545            false,
546        )?;
547
548        fs::create_dir(&local_epoch0_checkpoint)?;
549        let file1 = local_epoch0_checkpoint.join("file1");
550        fs::write(file1, b"Lorem ipsum")?;
551        let file2 = local_epoch0_checkpoint.join("file2");
552        fs::write(file2, b"Lorem ipsum")?;
553        let nested_dir = local_epoch0_checkpoint.join("data");
554        fs::create_dir(&nested_dir)?;
555        let file3 = nested_dir.join("file3");
556        fs::write(file3, b"Lorem ipsum")?;
557
558        let missing_epochs = find_missing_epochs_dirs(
559            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
560            SUCCESS_MARKER,
561        )
562        .await?;
563        db_checkpoint_handler
564            .upload_db_checkpoints_to_object_store(missing_epochs)
565            .await?;
566        assert!(remote_epoch0_checkpoint.join("file1").exists());
567        assert!(remote_epoch0_checkpoint.join("file2").exists());
568        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
569        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
570        assert!(
571            local_epoch0_checkpoint
572                .join(UPLOAD_COMPLETED_MARKER)
573                .exists()
574        );
575
576        // Add a new db checkpoint to the local checkpoint directory
577        let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
578        fs::create_dir(&local_epoch1_checkpoint)?;
579        let file1 = local_epoch1_checkpoint.join("file1");
580        fs::write(file1, b"Lorem ipsum")?;
581        let file2 = local_epoch1_checkpoint.join("file2");
582        fs::write(file2, b"Lorem ipsum")?;
583        let nested_dir = local_epoch1_checkpoint.join("data");
584        fs::create_dir(&nested_dir)?;
585        let file3 = nested_dir.join("file3");
586        fs::write(file3, b"Lorem ipsum")?;
587
588        // Now delete the success marker from remote checkpointed directory
589        // This is the scenario where uploads stops mid way because system stopped
590        fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
591
592        // Checkpoint handler should copy checkpoint for epoch_0 first before copying
593        // epoch_1
594        let missing_epochs = find_missing_epochs_dirs(
595            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
596            SUCCESS_MARKER,
597        )
598        .await?;
599        db_checkpoint_handler
600            .upload_db_checkpoints_to_object_store(missing_epochs)
601            .await?;
602        assert!(remote_epoch0_checkpoint.join("file1").exists());
603        assert!(remote_epoch0_checkpoint.join("file2").exists());
604        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
605        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
606        assert!(
607            local_epoch0_checkpoint
608                .join(UPLOAD_COMPLETED_MARKER)
609                .exists()
610        );
611
612        let remote_epoch1_checkpoint = remote_checkpoint_dir_path.join("epoch_1");
613        assert!(remote_epoch1_checkpoint.join("file1").exists());
614        assert!(remote_epoch1_checkpoint.join("file2").exists());
615        assert!(remote_epoch1_checkpoint.join("data").join("file3").exists());
616        assert!(remote_epoch1_checkpoint.join(SUCCESS_MARKER).exists());
617        assert!(
618            local_epoch1_checkpoint
619                .join(UPLOAD_COMPLETED_MARKER)
620                .exists()
621        );
622
623        // Drop an extra gc marker meant only for gc to trigger
624        let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
625        fs::write(test_marker, b"Lorem ipsum")?;
626        let test_marker = local_epoch1_checkpoint.join(TEST_MARKER);
627        fs::write(test_marker, b"Lorem ipsum")?;
628
629        db_checkpoint_handler
630            .garbage_collect_old_db_checkpoints()
631            .await?;
632        assert!(!local_epoch0_checkpoint.join("file1").exists());
633        assert!(!local_epoch0_checkpoint.join("file1").exists());
634        assert!(!local_epoch0_checkpoint.join("file2").exists());
635        assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
636        assert!(!local_epoch1_checkpoint.join("file1").exists());
637        assert!(!local_epoch1_checkpoint.join("file1").exists());
638        assert!(!local_epoch1_checkpoint.join("file2").exists());
639        assert!(!local_epoch1_checkpoint.join("data").join("file3").exists());
640        Ok(())
641    }
642
643    #[tokio::test]
644    async fn test_missing_epochs() -> anyhow::Result<()> {
645        let checkpoint_dir = TempDir::new()?;
646        let checkpoint_dir_path = checkpoint_dir.path();
647        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
648        fs::create_dir(&local_epoch0_checkpoint)?;
649        let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
650        fs::create_dir(&local_epoch1_checkpoint)?;
651        // Missing epoch 2
652        let local_epoch3_checkpoint = checkpoint_dir_path.join("epoch_3");
653        fs::create_dir(&local_epoch3_checkpoint)?;
654        let remote_checkpoint_dir = TempDir::new()?;
655        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
656
657        let input_store_config = ObjectStoreConfig {
658            object_store: Some(ObjectStoreType::File),
659            directory: Some(checkpoint_dir_path.to_path_buf()),
660            ..Default::default()
661        };
662
663        let output_store_config = ObjectStoreConfig {
664            object_store: Some(ObjectStoreType::File),
665            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
666            ..Default::default()
667        };
668        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
669            &input_store_config,
670            Some(&output_store_config),
671            10,
672            false,
673            false,
674        )?;
675
676        let missing_epochs = find_missing_epochs_dirs(
677            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
678            SUCCESS_MARKER,
679        )
680        .await?;
681        db_checkpoint_handler
682            .upload_db_checkpoints_to_object_store(missing_epochs)
683            .await?;
684
685        let first_missing_epoch = find_missing_epochs_dirs(
686            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
687            SUCCESS_MARKER,
688        )
689        .await?
690        .first()
691        .cloned()
692        .unwrap();
693        assert_eq!(first_missing_epoch, 2);
694
695        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
696        fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
697
698        let first_missing_epoch = find_missing_epochs_dirs(
699            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
700            SUCCESS_MARKER,
701        )
702        .await?
703        .first()
704        .cloned()
705        .unwrap();
706        assert_eq!(first_missing_epoch, 0);
707
708        Ok(())
709    }
710
711    #[tokio::test]
712    async fn test_range_missing_epochs() -> anyhow::Result<()> {
713        let checkpoint_dir = TempDir::new()?;
714        let checkpoint_dir_path = checkpoint_dir.path();
715        let local_epoch100_checkpoint = checkpoint_dir_path.join("epoch_100");
716        fs::create_dir(&local_epoch100_checkpoint)?;
717        let local_epoch200_checkpoint = checkpoint_dir_path.join("epoch_200");
718        fs::create_dir(&local_epoch200_checkpoint)?;
719        let remote_checkpoint_dir = TempDir::new()?;
720        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
721
722        let input_store_config = ObjectStoreConfig {
723            object_store: Some(ObjectStoreType::File),
724            directory: Some(checkpoint_dir_path.to_path_buf()),
725            ..Default::default()
726        };
727
728        let output_store_config = ObjectStoreConfig {
729            object_store: Some(ObjectStoreType::File),
730            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
731            ..Default::default()
732        };
733        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
734            &input_store_config,
735            Some(&output_store_config),
736            10,
737            false,
738            false,
739        )?;
740
741        let missing_epochs = find_missing_epochs_dirs(
742            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
743            SUCCESS_MARKER,
744        )
745        .await?;
746        assert_eq!(missing_epochs, vec![0]);
747        db_checkpoint_handler
748            .upload_db_checkpoints_to_object_store(missing_epochs)
749            .await?;
750
751        let missing_epochs = find_missing_epochs_dirs(
752            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
753            SUCCESS_MARKER,
754        )
755        .await?;
756        let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
757        expected_missing_epochs.extend((101..200).collect_vec().iter());
758        expected_missing_epochs.push(201);
759        assert_eq!(missing_epochs, expected_missing_epochs);
760        Ok(())
761    }
762}