iota_core/
db_checkpoint_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{fs, num::NonZeroUsize, path::PathBuf, sync::Arc, time::Duration};
6
7use anyhow::Result;
8use bytes::Bytes;
9use futures::future::try_join_all;
10use iota_config::{
11    node::AuthorityStorePruningConfig,
12    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
13};
14use iota_storage::object_store::util::{
15    copy_recursively, find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs,
16    path_to_filesystem, put, run_manifest_update_loop, write_snapshot_manifest,
17};
18use object_store::{DynObjectStore, ObjectStoreExt, path::Path};
19use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
20use tracing::{debug, error, info};
21
22use crate::{
23    authority::{
24        authority_store_pruner::{
25            AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
26        },
27        authority_store_tables::AuthorityPerpetualTables,
28    },
29    checkpoint_progress_tracker::CheckpointProgressTracker,
30    checkpoints::CheckpointStore,
31    grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
32};
33
34pub const SUCCESS_MARKER: &str = "_SUCCESS";
35pub const TEST_MARKER: &str = "_TEST";
36pub const UPLOAD_COMPLETED_MARKER: &str = "_UPLOAD_COMPLETED";
37pub const STATE_SNAPSHOT_COMPLETED_MARKER: &str = "_STATE_SNAPSHOT_COMPLETED";
38
39pub struct DBCheckpointMetrics {
40    pub first_missing_db_checkpoint_epoch: IntGauge,
41    pub num_local_db_checkpoints: IntGauge,
42}
43
44impl DBCheckpointMetrics {
45    pub fn new(registry: &Registry) -> Arc<Self> {
46        let this = Self {
47            first_missing_db_checkpoint_epoch: register_int_gauge_with_registry!(
48                "first_missing_db_checkpoint_epoch",
49                "First epoch for which we have no db checkpoint in remote store",
50                registry
51            )
52            .unwrap(),
53            num_local_db_checkpoints: register_int_gauge_with_registry!(
54                "num_local_db_checkpoints",
55                "Number of RocksDB checkpoints currently residing on local disk (i.e. not yet garbage collected)",
56                registry
57            )
58            .unwrap(),
59        };
60        Arc::new(this)
61    }
62}
63
64pub struct DBCheckpointHandler {
65    /// Directory on local disk where db checkpoints are stored
66    input_object_store: Arc<DynObjectStore>,
67    /// DB checkpoint directory on local filesystem
68    input_root_path: PathBuf,
69    /// Bucket on cloud object store where db checkpoints will be copied
70    output_object_store: Option<Arc<DynObjectStore>>,
71    /// Time interval to check for presence of new db checkpoint (60s)
72    interval: Duration,
73    /// File markers which signal that local db checkpoint can be garbage
74    /// collected
75    gc_markers: Vec<String>,
76    /// Boolean flag to enable/disable object pruning and manual compaction
77    /// before upload
78    prune_and_compact_before_upload: bool,
79    /// If true, upload will block on state snapshot upload completed marker
80    state_snapshot_enabled: bool,
81    /// Pruning objects
82    pruning_config: AuthorityStorePruningConfig,
83    metrics: Arc<DBCheckpointMetrics>,
84    checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
85}
86
87impl DBCheckpointHandler {
88    pub fn new(
89        input_path: &std::path::Path,
90        output_object_store_config: Option<&ObjectStoreConfig>,
91        interval_s: u64,
92        prune_and_compact_before_upload: bool,
93        pruning_config: AuthorityStorePruningConfig,
94        registry: &Registry,
95        state_snapshot_enabled: bool,
96        checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
97    ) -> Result<Arc<Self>> {
98        let input_store_config = ObjectStoreConfig {
99            object_store: Some(ObjectStoreType::File),
100            directory: Some(input_path.to_path_buf()),
101            ..Default::default()
102        };
103        let mut gc_markers = vec![UPLOAD_COMPLETED_MARKER.to_string()];
104        if state_snapshot_enabled {
105            gc_markers.push(STATE_SNAPSHOT_COMPLETED_MARKER.to_string());
106        }
107        Ok(Arc::new(DBCheckpointHandler {
108            input_object_store: input_store_config.make()?,
109            input_root_path: input_path.to_path_buf(),
110            output_object_store: output_object_store_config
111                .map(|config| config.make().expect("Failed to make object store")),
112            interval: Duration::from_secs(interval_s),
113            gc_markers,
114            prune_and_compact_before_upload,
115            state_snapshot_enabled,
116            pruning_config,
117            metrics: DBCheckpointMetrics::new(registry),
118            checkpoint_progress_tracker,
119        }))
120    }
121    pub fn new_for_test(
122        input_object_store_config: &ObjectStoreConfig,
123        output_object_store_config: Option<&ObjectStoreConfig>,
124        interval_s: u64,
125        prune_and_compact_before_upload: bool,
126        state_snapshot_enabled: bool,
127    ) -> Result<Arc<Self>> {
128        Ok(Arc::new(DBCheckpointHandler {
129            input_object_store: input_object_store_config.make()?,
130            input_root_path: input_object_store_config
131                .directory
132                .as_ref()
133                .unwrap()
134                .clone(),
135            output_object_store: output_object_store_config
136                .map(|config| config.make().expect("Failed to make object store")),
137            interval: Duration::from_secs(interval_s),
138            gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
139            prune_and_compact_before_upload,
140            state_snapshot_enabled,
141            pruning_config: AuthorityStorePruningConfig::default(),
142            metrics: DBCheckpointMetrics::new(&Registry::default()),
143            checkpoint_progress_tracker: None,
144        }))
145    }
146
147    /// Starts the db checkpoint uploader and manifest update loops if a remote
148    /// store is specified. If no remote store is specified, it starts a
149    /// loop that adds an UPLOAD_COMPLETED_MARKER to the epoch directory.
150    /// Additionally, a db checkpoint gc loop is started regardless of the
151    /// remote store configuration.
152    pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
153        let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
154        if let Some(output_object_store) = &self.output_object_store {
155            tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
156                self.clone(),
157                kill_sender.subscribe(),
158            ));
159            tokio::task::spawn(run_manifest_update_loop(
160                output_object_store.clone(),
161                kill_sender.subscribe(),
162            ));
163        } else {
164            // if db checkpoint remote store is not specified, cleanup loop
165            // is run to immediately mark db checkpoint upload as successful
166            // so that they can be snapshotted and garbage collected
167            tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
168                self.clone(),
169                kill_sender.subscribe(),
170            ));
171        }
172        // Starts db checkpoint gc loop to remove epoch directories
173        // that contains success markers
174        tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
175            self,
176            kill_sender.subscribe(),
177        ));
178        kill_sender
179    }
180
181    /// Main loop that checks for missing db checkpoints and uploads them to
182    /// remote store.
183    async fn run_db_checkpoint_upload_loop(
184        self: Arc<Self>,
185        mut recv: tokio::sync::broadcast::Receiver<()>,
186    ) -> Result<()> {
187        let mut interval = tokio::time::interval(self.interval);
188        info!("DB checkpoint upload loop started");
189        loop {
190            tokio::select! {
191                _now = interval.tick() => {
192                    let local_checkpoints_by_epoch =
193                        find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
194                    self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
195                    match find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
196                        Ok(epochs) => {
197                            self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
198                            if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
199                                error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
200                            }
201                        }
202                        Err(err) => {
203                            error!("Failed to find missing db checkpoints in remote store: {:?}", err);
204                        }
205                    }
206                },
207                 _ = recv.recv() => break,
208            }
209        }
210        Ok(())
211    }
212
213    /// Main loop that adds UPLOAD_COMPLETED_MARKER to epoch directories in
214    /// local store.
215    async fn run_db_checkpoint_cleanup_loop(
216        self: Arc<Self>,
217        mut recv: tokio::sync::broadcast::Receiver<()>,
218    ) -> Result<()> {
219        let mut interval = tokio::time::interval(self.interval);
220        info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
221        loop {
222            tokio::select! {
223                _now = interval.tick() => {
224                    let local_checkpoints_by_epoch =
225                        find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
226                    self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
227                    let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
228                    dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
229                    for (_, db_path) in dirs {
230                        // If db checkpoint marked as completed, skip
231                        let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
232                        let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
233                        if upload_completed_path.exists() {
234                            continue;
235                        }
236                        let bytes = Bytes::from_static(b"success");
237                        let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
238                        put(&self.input_object_store,
239                            &upload_completed_marker,
240                            bytes.clone(),
241                        )
242                        .await?;
243                    }
244                },
245                 _ = recv.recv() => break,
246            }
247        }
248        Ok(())
249    }
250
251    /// Main loop that checks for epoch directory with UPLOAD_COMPLETED_MARKER
252    /// marker and deletes them every 30 seconds.
253    async fn run_db_checkpoint_gc_loop(
254        self: Arc<Self>,
255        mut recv: tokio::sync::broadcast::Receiver<()>,
256    ) -> Result<()> {
257        let mut gc_interval = tokio::time::interval(Duration::from_secs(30));
258        info!("DB checkpoint garbage collection loop started");
259        loop {
260            tokio::select! {
261                _now = gc_interval.tick() => {
262                    if let Ok(deleted) = self.garbage_collect_old_db_checkpoints().await {
263                        if !deleted.is_empty() {
264                            info!("Garbage collected local db checkpoints: {:?}", deleted);
265                        }
266                    }
267                },
268                 _ = recv.recv() => break,
269            }
270        }
271        Ok(())
272    }
273
274    async fn prune_and_compact(
275        &self,
276        db_path: PathBuf,
277        epoch: u64,
278        epoch_duration_ms: u64,
279    ) -> Result<()> {
280        let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&db_path.join("store"), None));
281        let checkpoint_store = Arc::new(CheckpointStore::new_for_db_checkpoint_handler(
282            &db_path.join("checkpoints"),
283        ));
284        let grpc_indexes_store = GrpcIndexesStore::new_without_init(db_path.join(GRPC_INDEXES_DIR));
285        let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
286        info!(
287            "Pruning db checkpoint in {:?} for epoch: {epoch}",
288            db_path.display()
289        );
290        AuthorityStorePruner::prune_objects_for_eligible_epochs(
291            &perpetual_db,
292            &checkpoint_store,
293            Some(&grpc_indexes_store),
294            None,
295            self.pruning_config.clone(),
296            metrics,
297            epoch_duration_ms,
298            self.checkpoint_progress_tracker.as_ref(),
299        )
300        .await?;
301        info!(
302            "Compacting db checkpoint in {:?} for epoch: {epoch}",
303            db_path.display()
304        );
305        AuthorityStorePruner::compact(&perpetual_db)?;
306        Ok(())
307    }
308
309    /// Checks for missing db checkpoints in remote store and uploads them from
310    /// the local store. And writes a MANIFEST file which contains a list of all
311    /// files that under the db checkpoint directory.
312    async fn upload_db_checkpoints_to_object_store(
313        &self,
314        missing_epochs: Vec<u64>,
315    ) -> Result<(), anyhow::Error> {
316        let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
317        let local_checkpoints_by_epoch =
318            find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
319        let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
320        dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
321        let object_store = self
322            .output_object_store
323            .as_ref()
324            .expect("Expected object store to exist")
325            .clone();
326        for (epoch, db_path) in dirs {
327            // Convert `db_path` to the local filesystem path to where db checkpoint is
328            // stored
329            let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
330            if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
331                if self.state_snapshot_enabled {
332                    let snapshot_completed_marker =
333                        local_db_path.join(STATE_SNAPSHOT_COMPLETED_MARKER);
334                    if !snapshot_completed_marker.exists() {
335                        info!(
336                            "DB checkpoint upload for epoch {} to wait until state snasphot uploaded",
337                            *epoch
338                        );
339                        continue;
340                    }
341                }
342
343                if self.prune_and_compact_before_upload {
344                    // Invoke pruning and compaction on the db checkpoint
345                    self.prune_and_compact(local_db_path, *epoch, EPOCH_DURATION_MS_FOR_TESTING)
346                        .await?;
347                }
348
349                info!("Copying db checkpoint for epoch: {epoch} to remote storage");
350                copy_recursively(
351                    db_path,
352                    &self.input_object_store,
353                    &object_store,
354                    NonZeroUsize::new(20).unwrap(),
355                )
356                .await?;
357
358                // This writes a single "MANIFEST" file which contains a list of all files that
359                // make up a db snapshot
360                write_snapshot_manifest(db_path, &object_store, format!("epoch_{epoch}/")).await?;
361                // Drop marker in the output directory that upload completed successfully
362                let bytes = Bytes::from_static(b"success");
363                let success_marker = db_path.child(SUCCESS_MARKER);
364                put(&object_store, &success_marker, bytes.clone()).await?;
365            }
366            let bytes = Bytes::from_static(b"success");
367            let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
368            put(
369                &self.input_object_store,
370                &upload_completed_marker,
371                bytes.clone(),
372            )
373            .await?;
374        }
375        Ok(())
376    }
377
378    /// Deletes old db checkpoints in the local store by checking for the
379    /// presence of all success markers in the directory.
380    async fn garbage_collect_old_db_checkpoints(&self) -> Result<Vec<u64>> {
381        let local_checkpoints_by_epoch =
382            find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
383        let mut deleted = Vec::new();
384        for (epoch, path) in local_checkpoints_by_epoch.iter() {
385            let marker_paths: Vec<Path> = self
386                .gc_markers
387                .iter()
388                .map(|marker| path.child(marker.clone()))
389                .collect();
390            // Check if all markers are present in the epoch directory
391            let all_markers_present = try_join_all(
392                marker_paths
393                    .iter()
394                    .map(|path| self.input_object_store.get(path)),
395            )
396            .await;
397            match all_markers_present {
398                // After state snapshots, gc will also need to wait for a state snapshot
399                // upload completed marker
400                Ok(_) => {
401                    info!("Deleting db checkpoint dir: {path} for epoch: {epoch}");
402                    deleted.push(*epoch);
403                    let local_fs_path = path_to_filesystem(self.input_root_path.clone(), path)?;
404                    fs::remove_dir_all(&local_fs_path)?;
405                }
406                Err(_) => {
407                    debug!("Not ready for deletion yet: {path}");
408                }
409            }
410        }
411        Ok(deleted)
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use std::fs;
418
419    use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
420    use iota_storage::object_store::util::{
421        find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, path_to_filesystem,
422    };
423    use itertools::Itertools;
424    use tempfile::TempDir;
425
426    use crate::db_checkpoint_handler::{
427        DBCheckpointHandler, SUCCESS_MARKER, TEST_MARKER, UPLOAD_COMPLETED_MARKER,
428    };
429
430    #[tokio::test]
431    async fn test_basic() -> anyhow::Result<()> {
432        let checkpoint_dir = TempDir::new()?;
433        let checkpoint_dir_path = checkpoint_dir.path();
434        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
435        fs::create_dir(&local_epoch0_checkpoint)?;
436        let file1 = local_epoch0_checkpoint.join("file1");
437        fs::write(file1, b"Lorem ipsum")?;
438        let file2 = local_epoch0_checkpoint.join("file2");
439        fs::write(file2, b"Lorem ipsum")?;
440        let nested_dir = local_epoch0_checkpoint.join("data");
441        fs::create_dir(&nested_dir)?;
442        let file3 = nested_dir.join("file3");
443        fs::write(file3, b"Lorem ipsum")?;
444
445        let remote_checkpoint_dir = TempDir::new()?;
446        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
447        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
448
449        let input_store_config = ObjectStoreConfig {
450            object_store: Some(ObjectStoreType::File),
451            directory: Some(checkpoint_dir_path.to_path_buf()),
452            ..Default::default()
453        };
454        let output_store_config = ObjectStoreConfig {
455            object_store: Some(ObjectStoreType::File),
456            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
457            ..Default::default()
458        };
459        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
460            &input_store_config,
461            Some(&output_store_config),
462            10,
463            false,
464            false,
465        )?;
466        let local_checkpoints_by_epoch =
467            find_all_dirs_with_epoch_prefix(&db_checkpoint_handler.input_object_store, None)
468                .await?;
469        assert!(!local_checkpoints_by_epoch.is_empty());
470        assert_eq!(*local_checkpoints_by_epoch.first_key_value().unwrap().0, 0);
471        assert_eq!(
472            path_to_filesystem(
473                db_checkpoint_handler.input_root_path.clone(),
474                local_checkpoints_by_epoch.first_key_value().unwrap().1
475            )
476            .unwrap(),
477            std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
478        );
479        let missing_epochs = find_missing_epochs_dirs(
480            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
481            SUCCESS_MARKER,
482        )
483        .await?;
484        db_checkpoint_handler
485            .upload_db_checkpoints_to_object_store(missing_epochs)
486            .await?;
487
488        assert!(remote_epoch0_checkpoint.join("file1").exists());
489        assert!(remote_epoch0_checkpoint.join("file2").exists());
490        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
491        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
492        assert!(
493            local_epoch0_checkpoint
494                .join(UPLOAD_COMPLETED_MARKER)
495                .exists()
496        );
497
498        // Drop an extra gc marker meant only for gc to trigger
499        let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
500        fs::write(test_marker, b"Lorem ipsum")?;
501        db_checkpoint_handler
502            .garbage_collect_old_db_checkpoints()
503            .await?;
504
505        assert!(!local_epoch0_checkpoint.join("file1").exists());
506        assert!(!local_epoch0_checkpoint.join("file1").exists());
507        assert!(!local_epoch0_checkpoint.join("file2").exists());
508        assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
509        Ok(())
510    }
511
512    #[tokio::test]
513    async fn test_upload_resumes() -> anyhow::Result<()> {
514        let checkpoint_dir = TempDir::new()?;
515        let checkpoint_dir_path = checkpoint_dir.path();
516        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
517
518        let remote_checkpoint_dir = TempDir::new()?;
519        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
520        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
521
522        let input_store_config = ObjectStoreConfig {
523            object_store: Some(ObjectStoreType::File),
524            directory: Some(checkpoint_dir_path.to_path_buf()),
525            ..Default::default()
526        };
527        let output_store_config = ObjectStoreConfig {
528            object_store: Some(ObjectStoreType::File),
529            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
530            ..Default::default()
531        };
532        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
533            &input_store_config,
534            Some(&output_store_config),
535            10,
536            false,
537            false,
538        )?;
539
540        fs::create_dir(&local_epoch0_checkpoint)?;
541        let file1 = local_epoch0_checkpoint.join("file1");
542        fs::write(file1, b"Lorem ipsum")?;
543        let file2 = local_epoch0_checkpoint.join("file2");
544        fs::write(file2, b"Lorem ipsum")?;
545        let nested_dir = local_epoch0_checkpoint.join("data");
546        fs::create_dir(&nested_dir)?;
547        let file3 = nested_dir.join("file3");
548        fs::write(file3, b"Lorem ipsum")?;
549
550        let missing_epochs = find_missing_epochs_dirs(
551            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
552            SUCCESS_MARKER,
553        )
554        .await?;
555        db_checkpoint_handler
556            .upload_db_checkpoints_to_object_store(missing_epochs)
557            .await?;
558        assert!(remote_epoch0_checkpoint.join("file1").exists());
559        assert!(remote_epoch0_checkpoint.join("file2").exists());
560        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
561        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
562        assert!(
563            local_epoch0_checkpoint
564                .join(UPLOAD_COMPLETED_MARKER)
565                .exists()
566        );
567
568        // Add a new db checkpoint to the local checkpoint directory
569        let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
570        fs::create_dir(&local_epoch1_checkpoint)?;
571        let file1 = local_epoch1_checkpoint.join("file1");
572        fs::write(file1, b"Lorem ipsum")?;
573        let file2 = local_epoch1_checkpoint.join("file2");
574        fs::write(file2, b"Lorem ipsum")?;
575        let nested_dir = local_epoch1_checkpoint.join("data");
576        fs::create_dir(&nested_dir)?;
577        let file3 = nested_dir.join("file3");
578        fs::write(file3, b"Lorem ipsum")?;
579
580        // Now delete the success marker from remote checkpointed directory
581        // This is the scenario where uploads stops mid way because system stopped
582        fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
583
584        // Checkpoint handler should copy checkpoint for epoch_0 first before copying
585        // epoch_1
586        let missing_epochs = find_missing_epochs_dirs(
587            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
588            SUCCESS_MARKER,
589        )
590        .await?;
591        db_checkpoint_handler
592            .upload_db_checkpoints_to_object_store(missing_epochs)
593            .await?;
594        assert!(remote_epoch0_checkpoint.join("file1").exists());
595        assert!(remote_epoch0_checkpoint.join("file2").exists());
596        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
597        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
598        assert!(
599            local_epoch0_checkpoint
600                .join(UPLOAD_COMPLETED_MARKER)
601                .exists()
602        );
603
604        let remote_epoch1_checkpoint = remote_checkpoint_dir_path.join("epoch_1");
605        assert!(remote_epoch1_checkpoint.join("file1").exists());
606        assert!(remote_epoch1_checkpoint.join("file2").exists());
607        assert!(remote_epoch1_checkpoint.join("data").join("file3").exists());
608        assert!(remote_epoch1_checkpoint.join(SUCCESS_MARKER).exists());
609        assert!(
610            local_epoch1_checkpoint
611                .join(UPLOAD_COMPLETED_MARKER)
612                .exists()
613        );
614
615        // Drop an extra gc marker meant only for gc to trigger
616        let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
617        fs::write(test_marker, b"Lorem ipsum")?;
618        let test_marker = local_epoch1_checkpoint.join(TEST_MARKER);
619        fs::write(test_marker, b"Lorem ipsum")?;
620
621        db_checkpoint_handler
622            .garbage_collect_old_db_checkpoints()
623            .await?;
624        assert!(!local_epoch0_checkpoint.join("file1").exists());
625        assert!(!local_epoch0_checkpoint.join("file1").exists());
626        assert!(!local_epoch0_checkpoint.join("file2").exists());
627        assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
628        assert!(!local_epoch1_checkpoint.join("file1").exists());
629        assert!(!local_epoch1_checkpoint.join("file1").exists());
630        assert!(!local_epoch1_checkpoint.join("file2").exists());
631        assert!(!local_epoch1_checkpoint.join("data").join("file3").exists());
632        Ok(())
633    }
634
635    #[tokio::test]
636    async fn test_missing_epochs() -> anyhow::Result<()> {
637        let checkpoint_dir = TempDir::new()?;
638        let checkpoint_dir_path = checkpoint_dir.path();
639        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
640        fs::create_dir(&local_epoch0_checkpoint)?;
641        let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
642        fs::create_dir(&local_epoch1_checkpoint)?;
643        // Missing epoch 2
644        let local_epoch3_checkpoint = checkpoint_dir_path.join("epoch_3");
645        fs::create_dir(&local_epoch3_checkpoint)?;
646        let remote_checkpoint_dir = TempDir::new()?;
647        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
648
649        let input_store_config = ObjectStoreConfig {
650            object_store: Some(ObjectStoreType::File),
651            directory: Some(checkpoint_dir_path.to_path_buf()),
652            ..Default::default()
653        };
654
655        let output_store_config = ObjectStoreConfig {
656            object_store: Some(ObjectStoreType::File),
657            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
658            ..Default::default()
659        };
660        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
661            &input_store_config,
662            Some(&output_store_config),
663            10,
664            false,
665            false,
666        )?;
667
668        let missing_epochs = find_missing_epochs_dirs(
669            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
670            SUCCESS_MARKER,
671        )
672        .await?;
673        db_checkpoint_handler
674            .upload_db_checkpoints_to_object_store(missing_epochs)
675            .await?;
676
677        let first_missing_epoch = find_missing_epochs_dirs(
678            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
679            SUCCESS_MARKER,
680        )
681        .await?
682        .first()
683        .cloned()
684        .unwrap();
685        assert_eq!(first_missing_epoch, 2);
686
687        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
688        fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
689
690        let first_missing_epoch = find_missing_epochs_dirs(
691            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
692            SUCCESS_MARKER,
693        )
694        .await?
695        .first()
696        .cloned()
697        .unwrap();
698        assert_eq!(first_missing_epoch, 0);
699
700        Ok(())
701    }
702
703    #[tokio::test]
704    async fn test_range_missing_epochs() -> anyhow::Result<()> {
705        let checkpoint_dir = TempDir::new()?;
706        let checkpoint_dir_path = checkpoint_dir.path();
707        let local_epoch100_checkpoint = checkpoint_dir_path.join("epoch_100");
708        fs::create_dir(&local_epoch100_checkpoint)?;
709        let local_epoch200_checkpoint = checkpoint_dir_path.join("epoch_200");
710        fs::create_dir(&local_epoch200_checkpoint)?;
711        let remote_checkpoint_dir = TempDir::new()?;
712        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
713
714        let input_store_config = ObjectStoreConfig {
715            object_store: Some(ObjectStoreType::File),
716            directory: Some(checkpoint_dir_path.to_path_buf()),
717            ..Default::default()
718        };
719
720        let output_store_config = ObjectStoreConfig {
721            object_store: Some(ObjectStoreType::File),
722            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
723            ..Default::default()
724        };
725        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
726            &input_store_config,
727            Some(&output_store_config),
728            10,
729            false,
730            false,
731        )?;
732
733        let missing_epochs = find_missing_epochs_dirs(
734            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
735            SUCCESS_MARKER,
736        )
737        .await?;
738        assert_eq!(missing_epochs, vec![0]);
739        db_checkpoint_handler
740            .upload_db_checkpoints_to_object_store(missing_epochs)
741            .await?;
742
743        let missing_epochs = find_missing_epochs_dirs(
744            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
745            SUCCESS_MARKER,
746        )
747        .await?;
748        let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
749        expected_missing_epochs.extend((101..200).collect_vec().iter());
750        expected_missing_epochs.push(201);
751        assert_eq!(missing_epochs, expected_missing_epochs);
752        Ok(())
753    }
754}