iota_storage/object_store/
util.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::BTreeMap, num::NonZeroUsize, ops::Range, path::PathBuf, sync::Arc, time::Duration,
7};
8
9use anyhow::{Context, Result, anyhow};
10use backoff::future::retry;
11use bytes::Bytes;
12use futures::{StreamExt, TryStreamExt};
13use indicatif::ProgressBar;
14use itertools::Itertools;
15use object_store::{DynObjectStore, Error, ObjectStore, ObjectStoreExt, path::Path};
16use serde::{Deserialize, Serialize};
17use tokio::time::Instant;
18use tracing::{error, warn};
19use url::Url;
20
21use crate::object_store::{
22    ObjectStoreDeleteExt, ObjectStoreGetExt, ObjectStoreListExt, ObjectStorePutExt,
23};
24
25pub const MANIFEST_FILENAME: &str = "MANIFEST";
26pub const EPOCH_METADATA_FILENAME: &str = "_epoch_metadata.json";
27
28#[derive(Serialize, Deserialize)]
29pub struct Manifest {
30    /// Epoch number paired with its start timestamp in ms (when known).
31    pub available_epochs: Vec<(u64, Option<u64>)>,
32}
33
34impl Manifest {
35    pub fn new(available_epochs: Vec<(u64, Option<u64>)>) -> Self {
36        Manifest { available_epochs }
37    }
38
39    pub fn epoch_exists(&self, epoch: u64) -> bool {
40        self.available_epochs.iter().any(|(e, _)| *e == epoch)
41    }
42}
43
44#[derive(Serialize, Deserialize)]
45pub struct EpochMetadata {
46    pub epoch_start_timestamp_ms: u64,
47}
48
49impl EpochMetadata {
50    pub fn to_bytes(&self) -> Result<Bytes> {
51        Ok(Bytes::from(serde_json::to_vec(self)?))
52    }
53}
54
55#[derive(Debug, Clone)]
56pub struct PerEpochManifest {
57    pub lines: Vec<String>,
58}
59
60impl PerEpochManifest {
61    pub fn new(lines: Vec<String>) -> Self {
62        PerEpochManifest { lines }
63    }
64
65    pub fn serialize_as_newline_delimited(&self) -> String {
66        self.lines.join("\n")
67    }
68
69    pub fn deserialize_from_newline_delimited(s: &str) -> PerEpochManifest {
70        PerEpochManifest {
71            lines: s.lines().map(String::from).collect(),
72        }
73    }
74
75    // Method to filter lines by a given prefix
76    pub fn filter_by_prefix(&self, prefix: &str) -> PerEpochManifest {
77        let filtered_lines = self
78            .lines
79            .iter()
80            .filter(|line| line.starts_with(prefix))
81            .cloned()
82            .collect();
83
84        PerEpochManifest {
85            lines: filtered_lines,
86        }
87    }
88}
89
90pub async fn get<S: ObjectStoreGetExt>(store: &S, src: &Path) -> Result<Bytes> {
91    let bytes = retry(backoff::ExponentialBackoff::default(), || async {
92        store.get_bytes(src).await.map_err(|e| {
93            error!("Failed to read file from object store with error: {:?}", &e);
94            backoff::Error::transient(e)
95        })
96    })
97    .await?;
98    Ok(bytes)
99}
100
101pub async fn exists<S: ObjectStoreGetExt>(store: &S, src: &Path) -> bool {
102    store.get_bytes(src).await.is_ok()
103}
104
105/// Writes bytes in the store with specified path.
106pub async fn put<S: ObjectStorePutExt>(store: &S, src: &Path, bytes: Bytes) -> Result<()> {
107    retry(backoff::ExponentialBackoff::default(), || async {
108        if !bytes.is_empty() {
109            store.put_bytes(src, bytes.clone()).await.map_err(|e| {
110                error!("Failed to write file to object store with error: {:?}", &e);
111                backoff::Error::transient(e)
112            })
113        } else {
114            warn!("Not copying empty file: {:?}", src);
115            Ok(())
116        }
117    })
118    .await?;
119    Ok(())
120}
121
122pub async fn copy_file<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
123    src: &Path,
124    dest: &Path,
125    src_store: &S,
126    dest_store: &D,
127) -> Result<()> {
128    let bytes = get(src_store, src).await?;
129    if !bytes.is_empty() {
130        put(dest_store, dest, bytes).await
131    } else {
132        warn!("Not copying empty file: {:?}", src);
133        Ok(())
134    }
135}
136
137pub async fn copy_files<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
138    src: &[Path],
139    dest: &[Path],
140    src_store: &S,
141    dest_store: &D,
142    concurrency: NonZeroUsize,
143    progress_bar: Option<ProgressBar>,
144) -> Result<Vec<()>> {
145    let mut instant = Instant::now();
146    let progress_bar_clone = progress_bar.clone();
147    // Copies files from dest to src in parallel, and updates the progress bar if
148    // it's provided
149    let results = futures::stream::iter(src.iter().zip(dest.iter()))
150        .map(|(path_in, path_out)| async move {
151            let ret = copy_file(path_in, path_out, src_store, dest_store).await;
152            Ok((path_out.clone(), ret))
153        })
154        .boxed()
155        .buffer_unordered(concurrency.get())
156        .try_for_each(|(path, ret)| {
157            if let Some(progress_bar_clone) = &progress_bar_clone {
158                progress_bar_clone.inc(1);
159                progress_bar_clone.set_message(format!("file: {path}"));
160                instant = Instant::now();
161            }
162            futures::future::ready(ret)
163        })
164        .await;
165    Ok(results.into_iter().collect())
166}
167
168/// Copies all files in the directory from the source store to the destination
169/// store.
170pub async fn copy_recursively<S: ObjectStoreGetExt + ObjectStoreListExt, D: ObjectStorePutExt>(
171    dir: &Path,
172    src_store: &S,
173    dest_store: &D,
174    concurrency: NonZeroUsize,
175) -> Result<Vec<()>> {
176    let mut input_paths = vec![];
177    let mut output_paths = vec![];
178    let mut paths = src_store.list_objects(Some(dir)).await;
179    while let Some(res) = paths.next().await {
180        if let Ok(object_metadata) = res {
181            input_paths.push(object_metadata.location.clone());
182            output_paths.push(object_metadata.location);
183        } else {
184            return Err(res.err().unwrap().into());
185        }
186    }
187    copy_files(
188        &input_paths,
189        &output_paths,
190        src_store,
191        dest_store,
192        concurrency,
193        None,
194    )
195    .await
196}
197
198pub async fn delete_files<S: ObjectStoreDeleteExt>(
199    files: &[Path],
200    store: &S,
201    concurrency: NonZeroUsize,
202) -> Result<Vec<()>> {
203    let results: Vec<Result<()>> = futures::stream::iter(files)
204        .map(|f| {
205            retry(backoff::ExponentialBackoff::default(), || async {
206                store.delete_object(f).await.map_err(|e| {
207                    error!("Failed to delete file on object store with error: {:?}", &e);
208                    backoff::Error::transient(e)
209                })
210            })
211        })
212        .boxed()
213        .buffer_unordered(concurrency.get())
214        .collect()
215        .await;
216    results.into_iter().collect()
217}
218
219pub async fn delete_recursively<S: ObjectStoreDeleteExt + ObjectStoreListExt>(
220    path: &Path,
221    store: &S,
222    concurrency: NonZeroUsize,
223) -> Result<Vec<()>> {
224    let mut paths_to_delete = vec![];
225    let mut paths = store.list_objects(Some(path)).await;
226    while let Some(res) = paths.next().await {
227        if let Ok(object_metadata) = res {
228            paths_to_delete.push(object_metadata.location);
229        } else {
230            return Err(res.err().unwrap().into());
231        }
232    }
233    delete_files(&paths_to_delete, store, concurrency).await
234}
235
236pub fn path_to_filesystem(local_dir_path: PathBuf, location: &Path) -> anyhow::Result<PathBuf> {
237    // Convert an `object_store::path::Path` to `std::path::PathBuf`
238    let path = std::fs::canonicalize(local_dir_path)?;
239    let mut url = Url::from_file_path(&path)
240        .map_err(|_| anyhow!("Failed to parse input path: {}", path.display()))?;
241    url.path_segments_mut()
242        .map_err(|_| anyhow!("Failed to get path segments: {}", path.display()))?
243        .pop_if_empty()
244        .extend(location.parts());
245    let new_path = url
246        .to_file_path()
247        .map_err(|_| anyhow!("Failed to convert url to path: {}", url.as_str()))?;
248    Ok(new_path)
249}
250
251/// This function will find all child directories in the input store which are
252/// of the form "epoch_num" and return a map of epoch number to the directory
253/// path
254pub async fn find_all_dirs_with_epoch_prefix(
255    store: &Arc<DynObjectStore>,
256    prefix: Option<&Path>,
257) -> anyhow::Result<BTreeMap<u64, Path>> {
258    let mut dirs = BTreeMap::new();
259    let entries = store.list_with_delimiter(prefix).await?;
260    for entry in entries.common_prefixes {
261        if let Some(filename) = entry.filename() {
262            if !filename.starts_with("epoch_") || filename.ends_with(".tmp") {
263                continue;
264            }
265            let epoch = filename
266                .split_once('_')
267                .context("Failed to split dir name")
268                .map(|(_, epoch)| epoch.parse::<u64>())??;
269            dirs.insert(epoch, entry);
270        }
271    }
272    Ok(dirs)
273}
274
275/// Finds all epochs in the store and returns them as a sorted list, paired
276/// with each epoch's start timestamp in ms when its metadata file is present.
277pub async fn list_all_epochs(object_store: Arc<DynObjectStore>) -> Result<Vec<(u64, Option<u64>)>> {
278    let remote_epoch_dirs = find_all_dirs_with_epoch_prefix(&object_store, None).await?;
279    let mut out = vec![];
280    let mut success_marker_found = false;
281    for (epoch, path) in remote_epoch_dirs.iter().sorted() {
282        let success_marker = path.child("_SUCCESS");
283        let get_result = object_store.get(&success_marker).await;
284        match get_result {
285            Err(_) => {
286                if !success_marker_found {
287                    error!("No success marker found for epoch: {epoch}");
288                }
289            }
290            Ok(_) => {
291                let metadata_path = path.child(EPOCH_METADATA_FILENAME);
292                let epoch_start_timestamp_ms = match object_store.get_bytes(&metadata_path).await {
293                    Ok(bytes) => match serde_json::from_slice::<EpochMetadata>(&bytes) {
294                        Ok(metadata) => Some(metadata.epoch_start_timestamp_ms),
295                        Err(err) => {
296                            warn!("Failed to parse epoch metadata for epoch {epoch}: {err}");
297                            None
298                        }
299                    },
300                    Err(_) => None,
301                };
302                out.push((*epoch, epoch_start_timestamp_ms));
303                success_marker_found = true;
304            }
305        }
306    }
307    Ok(out)
308}
309
310/// Writes the epochs existed in the store to the root MANIFEST (contains only a
311/// list of epochs in the store) every 300 seconds.
312// TODO: Is 300 seconds too frequent? Or should this be triggered by other
313// events?
314pub async fn run_manifest_update_loop(
315    store: Arc<DynObjectStore>,
316    mut recv: tokio::sync::broadcast::Receiver<()>,
317) -> Result<()> {
318    let mut update_interval = tokio::time::interval(Duration::from_secs(300));
319    loop {
320        tokio::select! {
321            _now = update_interval.tick() => {
322                if let Ok(available_epochs) = list_all_epochs(store.clone()).await {
323                    let manifest_path = Path::from(MANIFEST_FILENAME);
324                    let manifest = Manifest { available_epochs };
325                    let bytes = serde_json::to_string(&manifest)?;
326                    put(&store, &manifest_path, Bytes::from(bytes)).await?;
327                }
328            },
329             _ = recv.recv() => break,
330        }
331    }
332    Ok(())
333}
334
335/// This function will find all child directories in the input store which are
336/// of the form "epoch_num" and return a map of epoch number to the directory
337/// path
338pub async fn find_all_files_with_epoch_prefix(
339    store: &Arc<DynObjectStore>,
340    prefix: Option<&Path>,
341) -> anyhow::Result<Vec<Range<u64>>> {
342    let mut ranges = Vec::new();
343    let entries = store.list_with_delimiter(prefix).await?;
344    for entry in entries.objects {
345        let checkpoint_seq_range = entry
346            .location
347            .filename()
348            .ok_or(anyhow!("Illegal file name"))?
349            .split_once('.')
350            .context("Failed to split dir name")?
351            .0
352            .split_once('_')
353            .context("Failed to split dir name")
354            .map(|(start, end)| Range {
355                start: start.parse::<u64>().unwrap(),
356                end: end.parse::<u64>().unwrap(),
357            })?;
358
359        ranges.push(checkpoint_seq_range);
360    }
361    Ok(ranges)
362}
363
364/// This function will find missing epoch directories in the input store and
365/// return a list of such epoch numbers. If the highest epoch directory in the
366/// store is `epoch_N` then it is expected that the store will have all epoch
367/// directories from `epoch_0` to `epoch_N`. Additionally, any epoch directory
368/// should have the passed in marker file present or else that epoch number is
369/// already considered as missing.
370/// The returned list will contain epoch_N+1.
371pub async fn find_missing_epochs_dirs(
372    store: &Arc<DynObjectStore>,
373    success_marker: &str,
374) -> anyhow::Result<Vec<u64>> {
375    let remote_checkpoints_by_epoch = find_all_dirs_with_epoch_prefix(store, None).await?;
376    let mut dirs: Vec<_> = remote_checkpoints_by_epoch.iter().collect();
377    dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
378    let mut candidate_epoch: u64 = 0;
379    let mut missing_epochs = Vec::new();
380    for (epoch_num, path) in dirs {
381        while candidate_epoch < *epoch_num {
382            // The whole epoch directory is missing
383            missing_epochs.push(candidate_epoch);
384            candidate_epoch += 1;
385            continue;
386        }
387        let success_marker = path.child(success_marker);
388        let get_result = store.get(&success_marker).await;
389        match get_result {
390            Err(Error::NotFound { .. }) => {
391                error!("No success marker found in db checkpoint for epoch: {epoch_num}");
392                missing_epochs.push(*epoch_num);
393            }
394            Err(_) => {
395                // Probably a transient error
396                warn!(
397                    "Failed while trying to read success marker in db checkpoint for epoch: {epoch_num}"
398                );
399            }
400            Ok(_) => {
401                // Nothing to do
402            }
403        }
404        candidate_epoch += 1
405    }
406    missing_epochs.push(candidate_epoch);
407    Ok(missing_epochs)
408}
409
410pub fn get_path(prefix: &str) -> Path {
411    Path::from(prefix)
412}
413
414// Snapshot MANIFEST file is very simple. Just a newline delimited list of all
415// paths in the snapshot directory this simplicity enables easy parsing for
416// scripts to download snapshots
417pub async fn write_snapshot_manifest<S: ObjectStoreListExt + ObjectStorePutExt>(
418    dir: &Path,
419    store: &S,
420    epoch_prefix: String,
421) -> Result<()> {
422    let mut file_names = vec![];
423    let mut paths = store.list_objects(Some(dir)).await;
424    while let Some(res) = paths.next().await {
425        if let Ok(object_metadata) = res {
426            // trim the "epoch_XX/" dir prefix here
427            let mut path_str = object_metadata.location.to_string();
428            if path_str.starts_with(&epoch_prefix) {
429                path_str = String::from(&path_str[epoch_prefix.len()..]);
430                file_names.push(path_str);
431            } else {
432                warn!("{path_str}, should be coming from the files in the {epoch_prefix} dir",)
433            }
434        } else {
435            return Err(res.err().unwrap().into());
436        }
437    }
438
439    let epoch_manifest = PerEpochManifest::new(file_names);
440    let bytes = Bytes::from(epoch_manifest.serialize_as_newline_delimited());
441    put(
442        store,
443        &Path::from(format!("{dir}/{MANIFEST_FILENAME}")),
444        bytes,
445    )
446    .await?;
447
448    Ok(())
449}
450
451#[cfg(test)]
452mod tests {
453    use std::{fs, num::NonZeroUsize};
454
455    use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
456    use object_store::path::Path;
457    use tempfile::TempDir;
458
459    use crate::object_store::util::{
460        MANIFEST_FILENAME, copy_recursively, delete_recursively, write_snapshot_manifest,
461    };
462
463    #[tokio::test]
464    pub async fn test_copy_recursively() -> anyhow::Result<()> {
465        let input = TempDir::new()?;
466        let input_path = input.path();
467        let child = input_path.join("child");
468        fs::create_dir(&child)?;
469        let file1 = child.join("file1");
470        fs::write(file1, b"Lorem ipsum")?;
471        let grandchild = child.join("grand_child");
472        fs::create_dir(&grandchild)?;
473        let file2 = grandchild.join("file2");
474        fs::write(file2, b"Lorem ipsum")?;
475
476        let output = TempDir::new()?;
477        let output_path = output.path();
478
479        let input_store = ObjectStoreConfig {
480            object_store: Some(ObjectStoreType::File),
481            directory: Some(input_path.to_path_buf()),
482            ..Default::default()
483        }
484        .make()?;
485
486        let output_store = ObjectStoreConfig {
487            object_store: Some(ObjectStoreType::File),
488            directory: Some(output_path.to_path_buf()),
489            ..Default::default()
490        }
491        .make()?;
492
493        copy_recursively(
494            &Path::from("child"),
495            &input_store,
496            &output_store,
497            NonZeroUsize::new(1).unwrap(),
498        )
499        .await?;
500
501        assert!(output_path.join("child").exists());
502        assert!(output_path.join("child").join("file1").exists());
503        assert!(output_path.join("child").join("grand_child").exists());
504        assert!(
505            output_path
506                .join("child")
507                .join("grand_child")
508                .join("file2")
509                .exists()
510        );
511        let content = fs::read_to_string(output_path.join("child").join("file1"))?;
512        assert_eq!(content, "Lorem ipsum");
513        let content =
514            fs::read_to_string(output_path.join("child").join("grand_child").join("file2"))?;
515        assert_eq!(content, "Lorem ipsum");
516        Ok(())
517    }
518
519    #[tokio::test]
520    pub async fn test_write_snapshot_manifest() -> anyhow::Result<()> {
521        let input = TempDir::new()?;
522        let input_path = input.path();
523        let epoch_0 = input_path.join("epoch_0");
524        fs::create_dir(&epoch_0)?;
525        let file1 = epoch_0.join("file1");
526        fs::write(file1, b"Lorem ipsum")?;
527        let file2 = epoch_0.join("file2");
528        fs::write(file2, b"Lorem ipsum")?;
529        let grandchild = epoch_0.join("grand_child");
530        fs::create_dir(&grandchild)?;
531        let file3 = grandchild.join("file2.tar.gz");
532        fs::write(file3, b"Lorem ipsum")?;
533
534        let input_store = ObjectStoreConfig {
535            object_store: Some(ObjectStoreType::File),
536            directory: Some(input_path.to_path_buf()),
537            ..Default::default()
538        }
539        .make()?;
540
541        write_snapshot_manifest(
542            &Path::from("epoch_0"),
543            &input_store,
544            String::from("epoch_0/"),
545        )
546        .await?;
547
548        assert!(input_path.join("epoch_0").join(MANIFEST_FILENAME).exists());
549        let content = fs::read_to_string(input_path.join("epoch_0").join(MANIFEST_FILENAME))?;
550        assert!(content.contains("file2"));
551        assert!(content.contains("file1"));
552        assert!(content.contains("grand_child/file2.tar.gz"));
553        Ok(())
554    }
555
556    #[tokio::test]
557    pub async fn test_delete_recursively() -> anyhow::Result<()> {
558        let input = TempDir::new()?;
559        let input_path = input.path();
560        let child = input_path.join("child");
561        fs::create_dir(&child)?;
562        let file1 = child.join("file1");
563        fs::write(file1, b"Lorem ipsum")?;
564        let grandchild = child.join("grand_child");
565        fs::create_dir(&grandchild)?;
566        let file2 = grandchild.join("file2");
567        fs::write(file2, b"Lorem ipsum")?;
568
569        let input_store = ObjectStoreConfig {
570            object_store: Some(ObjectStoreType::File),
571            directory: Some(input_path.to_path_buf()),
572            ..Default::default()
573        }
574        .make()?;
575
576        delete_recursively(
577            &Path::from("child"),
578            &input_store,
579            NonZeroUsize::new(1).unwrap(),
580        )
581        .await?;
582
583        assert!(!input_path.join("child").join("file1").exists());
584        assert!(
585            !input_path
586                .join("child")
587                .join("grand_child")
588                .join("file2")
589                .exists()
590        );
591        Ok(())
592    }
593}