iota_core/
db_checkpoint_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{fs, num::NonZeroUsize, path::PathBuf, sync::Arc, time::Duration};
6
7use anyhow::Result;
8use bytes::Bytes;
9use futures::future::try_join_all;
10use iota_config::{
11    node::AuthorityStorePruningConfig,
12    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
13};
14use iota_storage::object_store::util::{
15    copy_recursively, find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs,
16    path_to_filesystem, put, run_manifest_update_loop, write_snapshot_manifest,
17};
18use object_store::{DynObjectStore, path::Path};
19use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
20use tracing::{debug, error, info};
21
22use crate::{
23    authority::{
24        authority_store_pruner::{
25            AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
26        },
27        authority_store_tables::AuthorityPerpetualTables,
28    },
29    checkpoints::CheckpointStore,
30    rest_index::RestIndexStore,
31};
32
33pub const SUCCESS_MARKER: &str = "_SUCCESS";
34pub const TEST_MARKER: &str = "_TEST";
35pub const UPLOAD_COMPLETED_MARKER: &str = "_UPLOAD_COMPLETED";
36pub const STATE_SNAPSHOT_COMPLETED_MARKER: &str = "_STATE_SNAPSHOT_COMPLETED";
37
38pub struct DBCheckpointMetrics {
39    pub first_missing_db_checkpoint_epoch: IntGauge,
40    pub num_local_db_checkpoints: IntGauge,
41}
42
43impl DBCheckpointMetrics {
44    pub fn new(registry: &Registry) -> Arc<Self> {
45        let this = Self {
46            first_missing_db_checkpoint_epoch: register_int_gauge_with_registry!(
47                "first_missing_db_checkpoint_epoch",
48                "First epoch for which we have no db checkpoint in remote store",
49                registry
50            )
51            .unwrap(),
52            num_local_db_checkpoints: register_int_gauge_with_registry!(
53                "num_local_db_checkpoints",
54                "Number of RocksDB checkpoints currently residing on local disk (i.e. not yet garbage collected)",
55                registry
56            )
57            .unwrap(),
58        };
59        Arc::new(this)
60    }
61}
62
63pub struct DBCheckpointHandler {
64    /// Directory on local disk where db checkpoints are stored
65    input_object_store: Arc<DynObjectStore>,
66    /// DB checkpoint directory on local filesystem
67    input_root_path: PathBuf,
68    /// Bucket on cloud object store where db checkpoints will be copied
69    output_object_store: Option<Arc<DynObjectStore>>,
70    /// Time interval to check for presence of new db checkpoint (60s)
71    interval: Duration,
72    /// File markers which signal that local db checkpoint can be garbage
73    /// collected
74    gc_markers: Vec<String>,
75    /// Boolean flag to enable/disable object pruning and manual compaction
76    /// before upload
77    prune_and_compact_before_upload: bool,
78    /// If true, upload will block on state snapshot upload completed marker
79    state_snapshot_enabled: bool,
80    /// Pruning objects
81    pruning_config: AuthorityStorePruningConfig,
82    metrics: Arc<DBCheckpointMetrics>,
83}
84
85impl DBCheckpointHandler {
86    pub fn new(
87        input_path: &std::path::Path,
88        output_object_store_config: Option<&ObjectStoreConfig>,
89        interval_s: u64,
90        prune_and_compact_before_upload: bool,
91        pruning_config: AuthorityStorePruningConfig,
92        registry: &Registry,
93        state_snapshot_enabled: bool,
94    ) -> Result<Arc<Self>> {
95        let input_store_config = ObjectStoreConfig {
96            object_store: Some(ObjectStoreType::File),
97            directory: Some(input_path.to_path_buf()),
98            ..Default::default()
99        };
100        let mut gc_markers = vec![UPLOAD_COMPLETED_MARKER.to_string()];
101        if state_snapshot_enabled {
102            gc_markers.push(STATE_SNAPSHOT_COMPLETED_MARKER.to_string());
103        }
104        Ok(Arc::new(DBCheckpointHandler {
105            input_object_store: input_store_config.make()?,
106            input_root_path: input_path.to_path_buf(),
107            output_object_store: output_object_store_config
108                .map(|config| config.make().expect("Failed to make object store")),
109            interval: Duration::from_secs(interval_s),
110            gc_markers,
111            prune_and_compact_before_upload,
112            state_snapshot_enabled,
113            pruning_config,
114            metrics: DBCheckpointMetrics::new(registry),
115        }))
116    }
117    pub fn new_for_test(
118        input_object_store_config: &ObjectStoreConfig,
119        output_object_store_config: Option<&ObjectStoreConfig>,
120        interval_s: u64,
121        prune_and_compact_before_upload: bool,
122        state_snapshot_enabled: bool,
123    ) -> Result<Arc<Self>> {
124        Ok(Arc::new(DBCheckpointHandler {
125            input_object_store: input_object_store_config.make()?,
126            input_root_path: input_object_store_config
127                .directory
128                .as_ref()
129                .unwrap()
130                .clone(),
131            output_object_store: output_object_store_config
132                .map(|config| config.make().expect("Failed to make object store")),
133            interval: Duration::from_secs(interval_s),
134            gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
135            prune_and_compact_before_upload,
136            state_snapshot_enabled,
137            pruning_config: AuthorityStorePruningConfig::default(),
138            metrics: DBCheckpointMetrics::new(&Registry::default()),
139        }))
140    }
141
142    /// Starts the db checkpoint uploader and manifest update loops if a remote
143    /// store is specified. If no remote store is specified, it starts a
144    /// loop that adds an UPLOAD_COMPLETED_MARKER to the epoch directory.
145    /// Additionally, a db checkpoint gc loop is started regardless of the
146    /// remote store configuration.
147    pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
148        let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
149        if self.output_object_store.is_some() {
150            tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
151                self.clone(),
152                kill_sender.subscribe(),
153            ));
154            tokio::task::spawn(run_manifest_update_loop(
155                self.output_object_store.as_ref().unwrap().clone(),
156                kill_sender.subscribe(),
157            ));
158        } else {
159            // if db checkpoint remote store is not specified, cleanup loop
160            // is run to immediately mark db checkpoint upload as successful
161            // so that they can be snapshotted and garbage collected
162            tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
163                self.clone(),
164                kill_sender.subscribe(),
165            ));
166        }
167        // Starts db checkpoint gc loop to remove epoch directories
168        // that contains success markers
169        tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
170            self,
171            kill_sender.subscribe(),
172        ));
173        kill_sender
174    }
175
176    /// Main loop that checks for missing db checkpoints and uploads them to
177    /// remote store.
178    async fn run_db_checkpoint_upload_loop(
179        self: Arc<Self>,
180        mut recv: tokio::sync::broadcast::Receiver<()>,
181    ) -> Result<()> {
182        let mut interval = tokio::time::interval(self.interval);
183        info!("DB checkpoint upload loop started");
184        loop {
185            tokio::select! {
186                _now = interval.tick() => {
187                    let local_checkpoints_by_epoch =
188                        find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
189                    self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
190                    match find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
191                        Ok(epochs) => {
192                            self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
193                            if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
194                                error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
195                            }
196                        }
197                        Err(err) => {
198                            error!("Failed to find missing db checkpoints in remote store: {:?}", err);
199                        }
200                    }
201                },
202                 _ = recv.recv() => break,
203            }
204        }
205        Ok(())
206    }
207
208    /// Main loop that adds UPLOAD_COMPLETED_MARKER to epoch directories in
209    /// local store.
210    async fn run_db_checkpoint_cleanup_loop(
211        self: Arc<Self>,
212        mut recv: tokio::sync::broadcast::Receiver<()>,
213    ) -> Result<()> {
214        let mut interval = tokio::time::interval(self.interval);
215        info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
216        loop {
217            tokio::select! {
218                _now = interval.tick() => {
219                    let local_checkpoints_by_epoch =
220                        find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
221                    self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
222                    let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
223                    dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
224                    for (_, db_path) in dirs {
225                        // If db checkpoint marked as completed, skip
226                        let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
227                        let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
228                        if upload_completed_path.exists() {
229                            continue;
230                        }
231                        let bytes = Bytes::from_static(b"success");
232                        let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
233                        put(&self.input_object_store,
234                            &upload_completed_marker,
235                            bytes.clone(),
236                        )
237                        .await?;
238                    }
239                },
240                 _ = recv.recv() => break,
241            }
242        }
243        Ok(())
244    }
245
246    /// Main loop that checks for epoch directory with UPLOAD_COMPLETED_MARKER
247    /// marker and deletes them every 30 seconds.
248    async fn run_db_checkpoint_gc_loop(
249        self: Arc<Self>,
250        mut recv: tokio::sync::broadcast::Receiver<()>,
251    ) -> Result<()> {
252        let mut gc_interval = tokio::time::interval(Duration::from_secs(30));
253        info!("DB checkpoint garbage collection loop started");
254        loop {
255            tokio::select! {
256                _now = gc_interval.tick() => {
257                    if let Ok(deleted) = self.garbage_collect_old_db_checkpoints().await {
258                        if !deleted.is_empty() {
259                            info!("Garbage collected local db checkpoints: {:?}", deleted);
260                        }
261                    }
262                },
263                 _ = recv.recv() => break,
264            }
265        }
266        Ok(())
267    }
268
269    async fn prune_and_compact(
270        &self,
271        db_path: PathBuf,
272        epoch: u64,
273        epoch_duration_ms: u64,
274    ) -> Result<()> {
275        let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&db_path.join("store"), None));
276        let checkpoint_store = Arc::new(CheckpointStore::new_for_db_checkpoint_handler(
277            &db_path.join("checkpoints"),
278        ));
279        let rest_index = RestIndexStore::new_without_init(db_path.join("rest_index"));
280        let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
281        info!(
282            "Pruning db checkpoint in {:?} for epoch: {epoch}",
283            db_path.display()
284        );
285        AuthorityStorePruner::prune_objects_for_eligible_epochs(
286            &perpetual_db,
287            &checkpoint_store,
288            Some(&rest_index),
289            None,
290            self.pruning_config.clone(),
291            metrics,
292            epoch_duration_ms,
293        )
294        .await?;
295        info!(
296            "Compacting db checkpoint in {:?} for epoch: {epoch}",
297            db_path.display()
298        );
299        AuthorityStorePruner::compact(&perpetual_db)?;
300        Ok(())
301    }
302
303    /// Checks for missing db checkpoints in remote store and uploads them from
304    /// the local store. And writes a MANIFEST file which contains a list of all
305    /// files that under the db checkpoint directory.
306    async fn upload_db_checkpoints_to_object_store(
307        &self,
308        missing_epochs: Vec<u64>,
309    ) -> Result<(), anyhow::Error> {
310        let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
311        let local_checkpoints_by_epoch =
312            find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
313        let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
314        dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
315        let object_store = self
316            .output_object_store
317            .as_ref()
318            .expect("Expected object store to exist")
319            .clone();
320        for (epoch, db_path) in dirs {
321            // Convert `db_path` to the local filesystem path to where db checkpoint is
322            // stored
323            let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
324            if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
325                if self.state_snapshot_enabled {
326                    let snapshot_completed_marker =
327                        local_db_path.join(STATE_SNAPSHOT_COMPLETED_MARKER);
328                    if !snapshot_completed_marker.exists() {
329                        info!(
330                            "DB checkpoint upload for epoch {} to wait until state snasphot uploaded",
331                            *epoch
332                        );
333                        continue;
334                    }
335                }
336
337                if self.prune_and_compact_before_upload {
338                    // Invoke pruning and compaction on the db checkpoint
339                    self.prune_and_compact(local_db_path, *epoch, EPOCH_DURATION_MS_FOR_TESTING)
340                        .await?;
341                }
342
343                info!("Copying db checkpoint for epoch: {epoch} to remote storage");
344                copy_recursively(
345                    db_path,
346                    &self.input_object_store,
347                    &object_store,
348                    NonZeroUsize::new(20).unwrap(),
349                )
350                .await?;
351
352                // This writes a single "MANIFEST" file which contains a list of all files that
353                // make up a db snapshot
354                write_snapshot_manifest(db_path, &object_store, format!("epoch_{epoch}/")).await?;
355                // Drop marker in the output directory that upload completed successfully
356                let bytes = Bytes::from_static(b"success");
357                let success_marker = db_path.child(SUCCESS_MARKER);
358                put(&object_store, &success_marker, bytes.clone()).await?;
359            }
360            let bytes = Bytes::from_static(b"success");
361            let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
362            put(
363                &self.input_object_store,
364                &upload_completed_marker,
365                bytes.clone(),
366            )
367            .await?;
368        }
369        Ok(())
370    }
371
372    /// Deletes old db checkpoints in the local store by checking for the
373    /// presence of all success markers in the directory.
374    async fn garbage_collect_old_db_checkpoints(&self) -> Result<Vec<u64>> {
375        let local_checkpoints_by_epoch =
376            find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
377        let mut deleted = Vec::new();
378        for (epoch, path) in local_checkpoints_by_epoch.iter() {
379            let marker_paths: Vec<Path> = self
380                .gc_markers
381                .iter()
382                .map(|marker| path.child(marker.clone()))
383                .collect();
384            // Check if all markers are present in the epoch directory
385            let all_markers_present = try_join_all(
386                marker_paths
387                    .iter()
388                    .map(|path| self.input_object_store.get(path)),
389            )
390            .await;
391            match all_markers_present {
392                // After state snapshots, gc will also need to wait for a state snapshot
393                // upload completed marker
394                Ok(_) => {
395                    info!("Deleting db checkpoint dir: {path} for epoch: {epoch}");
396                    deleted.push(*epoch);
397                    let local_fs_path = path_to_filesystem(self.input_root_path.clone(), path)?;
398                    fs::remove_dir_all(&local_fs_path)?;
399                }
400                Err(_) => {
401                    debug!("Not ready for deletion yet: {path}");
402                }
403            }
404        }
405        Ok(deleted)
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use std::fs;
412
413    use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
414    use iota_storage::object_store::util::{
415        find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, path_to_filesystem,
416    };
417    use itertools::Itertools;
418    use tempfile::TempDir;
419
420    use crate::db_checkpoint_handler::{
421        DBCheckpointHandler, SUCCESS_MARKER, TEST_MARKER, UPLOAD_COMPLETED_MARKER,
422    };
423
424    #[tokio::test]
425    async fn test_basic() -> anyhow::Result<()> {
426        let checkpoint_dir = TempDir::new()?;
427        let checkpoint_dir_path = checkpoint_dir.path();
428        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
429        fs::create_dir(&local_epoch0_checkpoint)?;
430        let file1 = local_epoch0_checkpoint.join("file1");
431        fs::write(file1, b"Lorem ipsum")?;
432        let file2 = local_epoch0_checkpoint.join("file2");
433        fs::write(file2, b"Lorem ipsum")?;
434        let nested_dir = local_epoch0_checkpoint.join("data");
435        fs::create_dir(&nested_dir)?;
436        let file3 = nested_dir.join("file3");
437        fs::write(file3, b"Lorem ipsum")?;
438
439        let remote_checkpoint_dir = TempDir::new()?;
440        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
441        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
442
443        let input_store_config = ObjectStoreConfig {
444            object_store: Some(ObjectStoreType::File),
445            directory: Some(checkpoint_dir_path.to_path_buf()),
446            ..Default::default()
447        };
448        let output_store_config = ObjectStoreConfig {
449            object_store: Some(ObjectStoreType::File),
450            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
451            ..Default::default()
452        };
453        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
454            &input_store_config,
455            Some(&output_store_config),
456            10,
457            false,
458            false,
459        )?;
460        let local_checkpoints_by_epoch =
461            find_all_dirs_with_epoch_prefix(&db_checkpoint_handler.input_object_store, None)
462                .await?;
463        assert!(!local_checkpoints_by_epoch.is_empty());
464        assert_eq!(*local_checkpoints_by_epoch.first_key_value().unwrap().0, 0);
465        assert_eq!(
466            path_to_filesystem(
467                db_checkpoint_handler.input_root_path.clone(),
468                local_checkpoints_by_epoch.first_key_value().unwrap().1
469            )
470            .unwrap(),
471            std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
472        );
473        let missing_epochs = find_missing_epochs_dirs(
474            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
475            SUCCESS_MARKER,
476        )
477        .await?;
478        db_checkpoint_handler
479            .upload_db_checkpoints_to_object_store(missing_epochs)
480            .await?;
481
482        assert!(remote_epoch0_checkpoint.join("file1").exists());
483        assert!(remote_epoch0_checkpoint.join("file2").exists());
484        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
485        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
486        assert!(
487            local_epoch0_checkpoint
488                .join(UPLOAD_COMPLETED_MARKER)
489                .exists()
490        );
491
492        // Drop an extra gc marker meant only for gc to trigger
493        let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
494        fs::write(test_marker, b"Lorem ipsum")?;
495        db_checkpoint_handler
496            .garbage_collect_old_db_checkpoints()
497            .await?;
498
499        assert!(!local_epoch0_checkpoint.join("file1").exists());
500        assert!(!local_epoch0_checkpoint.join("file1").exists());
501        assert!(!local_epoch0_checkpoint.join("file2").exists());
502        assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
503        Ok(())
504    }
505
506    #[tokio::test]
507    async fn test_upload_resumes() -> anyhow::Result<()> {
508        let checkpoint_dir = TempDir::new()?;
509        let checkpoint_dir_path = checkpoint_dir.path();
510        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
511
512        let remote_checkpoint_dir = TempDir::new()?;
513        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
514        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
515
516        let input_store_config = ObjectStoreConfig {
517            object_store: Some(ObjectStoreType::File),
518            directory: Some(checkpoint_dir_path.to_path_buf()),
519            ..Default::default()
520        };
521        let output_store_config = ObjectStoreConfig {
522            object_store: Some(ObjectStoreType::File),
523            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
524            ..Default::default()
525        };
526        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
527            &input_store_config,
528            Some(&output_store_config),
529            10,
530            false,
531            false,
532        )?;
533
534        fs::create_dir(&local_epoch0_checkpoint)?;
535        let file1 = local_epoch0_checkpoint.join("file1");
536        fs::write(file1, b"Lorem ipsum")?;
537        let file2 = local_epoch0_checkpoint.join("file2");
538        fs::write(file2, b"Lorem ipsum")?;
539        let nested_dir = local_epoch0_checkpoint.join("data");
540        fs::create_dir(&nested_dir)?;
541        let file3 = nested_dir.join("file3");
542        fs::write(file3, b"Lorem ipsum")?;
543
544        let missing_epochs = find_missing_epochs_dirs(
545            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
546            SUCCESS_MARKER,
547        )
548        .await?;
549        db_checkpoint_handler
550            .upload_db_checkpoints_to_object_store(missing_epochs)
551            .await?;
552        assert!(remote_epoch0_checkpoint.join("file1").exists());
553        assert!(remote_epoch0_checkpoint.join("file2").exists());
554        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
555        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
556        assert!(
557            local_epoch0_checkpoint
558                .join(UPLOAD_COMPLETED_MARKER)
559                .exists()
560        );
561
562        // Add a new db checkpoint to the local checkpoint directory
563        let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
564        fs::create_dir(&local_epoch1_checkpoint)?;
565        let file1 = local_epoch1_checkpoint.join("file1");
566        fs::write(file1, b"Lorem ipsum")?;
567        let file2 = local_epoch1_checkpoint.join("file2");
568        fs::write(file2, b"Lorem ipsum")?;
569        let nested_dir = local_epoch1_checkpoint.join("data");
570        fs::create_dir(&nested_dir)?;
571        let file3 = nested_dir.join("file3");
572        fs::write(file3, b"Lorem ipsum")?;
573
574        // Now delete the success marker from remote checkpointed directory
575        // This is the scenario where uploads stops mid way because system stopped
576        fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
577
578        // Checkpoint handler should copy checkpoint for epoch_0 first before copying
579        // epoch_1
580        let missing_epochs = find_missing_epochs_dirs(
581            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
582            SUCCESS_MARKER,
583        )
584        .await?;
585        db_checkpoint_handler
586            .upload_db_checkpoints_to_object_store(missing_epochs)
587            .await?;
588        assert!(remote_epoch0_checkpoint.join("file1").exists());
589        assert!(remote_epoch0_checkpoint.join("file2").exists());
590        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
591        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
592        assert!(
593            local_epoch0_checkpoint
594                .join(UPLOAD_COMPLETED_MARKER)
595                .exists()
596        );
597
598        let remote_epoch1_checkpoint = remote_checkpoint_dir_path.join("epoch_1");
599        assert!(remote_epoch1_checkpoint.join("file1").exists());
600        assert!(remote_epoch1_checkpoint.join("file2").exists());
601        assert!(remote_epoch1_checkpoint.join("data").join("file3").exists());
602        assert!(remote_epoch1_checkpoint.join(SUCCESS_MARKER).exists());
603        assert!(
604            local_epoch1_checkpoint
605                .join(UPLOAD_COMPLETED_MARKER)
606                .exists()
607        );
608
609        // Drop an extra gc marker meant only for gc to trigger
610        let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
611        fs::write(test_marker, b"Lorem ipsum")?;
612        let test_marker = local_epoch1_checkpoint.join(TEST_MARKER);
613        fs::write(test_marker, b"Lorem ipsum")?;
614
615        db_checkpoint_handler
616            .garbage_collect_old_db_checkpoints()
617            .await?;
618        assert!(!local_epoch0_checkpoint.join("file1").exists());
619        assert!(!local_epoch0_checkpoint.join("file1").exists());
620        assert!(!local_epoch0_checkpoint.join("file2").exists());
621        assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
622        assert!(!local_epoch1_checkpoint.join("file1").exists());
623        assert!(!local_epoch1_checkpoint.join("file1").exists());
624        assert!(!local_epoch1_checkpoint.join("file2").exists());
625        assert!(!local_epoch1_checkpoint.join("data").join("file3").exists());
626        Ok(())
627    }
628
629    #[tokio::test]
630    async fn test_missing_epochs() -> anyhow::Result<()> {
631        let checkpoint_dir = TempDir::new()?;
632        let checkpoint_dir_path = checkpoint_dir.path();
633        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
634        fs::create_dir(&local_epoch0_checkpoint)?;
635        let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
636        fs::create_dir(&local_epoch1_checkpoint)?;
637        // Missing epoch 2
638        let local_epoch3_checkpoint = checkpoint_dir_path.join("epoch_3");
639        fs::create_dir(&local_epoch3_checkpoint)?;
640        let remote_checkpoint_dir = TempDir::new()?;
641        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
642
643        let input_store_config = ObjectStoreConfig {
644            object_store: Some(ObjectStoreType::File),
645            directory: Some(checkpoint_dir_path.to_path_buf()),
646            ..Default::default()
647        };
648
649        let output_store_config = ObjectStoreConfig {
650            object_store: Some(ObjectStoreType::File),
651            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
652            ..Default::default()
653        };
654        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
655            &input_store_config,
656            Some(&output_store_config),
657            10,
658            false,
659            false,
660        )?;
661
662        let missing_epochs = find_missing_epochs_dirs(
663            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
664            SUCCESS_MARKER,
665        )
666        .await?;
667        db_checkpoint_handler
668            .upload_db_checkpoints_to_object_store(missing_epochs)
669            .await?;
670
671        let first_missing_epoch = find_missing_epochs_dirs(
672            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
673            SUCCESS_MARKER,
674        )
675        .await?
676        .first()
677        .cloned()
678        .unwrap();
679        assert_eq!(first_missing_epoch, 2);
680
681        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
682        fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
683
684        let first_missing_epoch = find_missing_epochs_dirs(
685            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
686            SUCCESS_MARKER,
687        )
688        .await?
689        .first()
690        .cloned()
691        .unwrap();
692        assert_eq!(first_missing_epoch, 0);
693
694        Ok(())
695    }
696
697    #[tokio::test]
698    async fn test_range_missing_epochs() -> anyhow::Result<()> {
699        let checkpoint_dir = TempDir::new()?;
700        let checkpoint_dir_path = checkpoint_dir.path();
701        let local_epoch100_checkpoint = checkpoint_dir_path.join("epoch_100");
702        fs::create_dir(&local_epoch100_checkpoint)?;
703        let local_epoch200_checkpoint = checkpoint_dir_path.join("epoch_200");
704        fs::create_dir(&local_epoch200_checkpoint)?;
705        let remote_checkpoint_dir = TempDir::new()?;
706        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
707
708        let input_store_config = ObjectStoreConfig {
709            object_store: Some(ObjectStoreType::File),
710            directory: Some(checkpoint_dir_path.to_path_buf()),
711            ..Default::default()
712        };
713
714        let output_store_config = ObjectStoreConfig {
715            object_store: Some(ObjectStoreType::File),
716            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
717            ..Default::default()
718        };
719        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
720            &input_store_config,
721            Some(&output_store_config),
722            10,
723            false,
724            false,
725        )?;
726
727        let missing_epochs = find_missing_epochs_dirs(
728            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
729            SUCCESS_MARKER,
730        )
731        .await?;
732        assert_eq!(missing_epochs, vec![0]);
733        db_checkpoint_handler
734            .upload_db_checkpoints_to_object_store(missing_epochs)
735            .await?;
736
737        let missing_epochs = find_missing_epochs_dirs(
738            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
739            SUCCESS_MARKER,
740        )
741        .await?;
742        let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
743        expected_missing_epochs.extend((101..200).collect_vec().iter());
744        expected_missing_epochs.push(201);
745        assert_eq!(missing_epochs, expected_missing_epochs);
746        Ok(())
747    }
748}