iota_storage/object_store/
util.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::BTreeMap, num::NonZeroUsize, ops::Range, path::PathBuf, sync::Arc, time::Duration,
7};
8
9use anyhow::{Context, Result, anyhow};
10use backoff::future::retry;
11use bytes::Bytes;
12use futures::{StreamExt, TryStreamExt};
13use indicatif::ProgressBar;
14use itertools::Itertools;
15use object_store::{DynObjectStore, Error, ObjectStore, path::Path};
16use serde::{Deserialize, Serialize};
17use tokio::time::Instant;
18use tracing::{error, warn};
19use url::Url;
20
21use crate::object_store::{
22    ObjectStoreDeleteExt, ObjectStoreGetExt, ObjectStoreListExt, ObjectStorePutExt,
23};
24
25pub const MANIFEST_FILENAME: &str = "MANIFEST";
26
27#[derive(Serialize, Deserialize)]
28
29pub struct Manifest {
30    pub available_epochs: Vec<u64>,
31}
32
33impl Manifest {
34    pub fn new(available_epochs: Vec<u64>) -> Self {
35        Manifest { available_epochs }
36    }
37
38    pub fn epoch_exists(&self, epoch: u64) -> bool {
39        self.available_epochs.contains(&epoch)
40    }
41}
42
43#[derive(Debug, Clone)]
44pub struct PerEpochManifest {
45    pub lines: Vec<String>,
46}
47
48impl PerEpochManifest {
49    pub fn new(lines: Vec<String>) -> Self {
50        PerEpochManifest { lines }
51    }
52
53    pub fn serialize_as_newline_delimited(&self) -> String {
54        self.lines.join("\n")
55    }
56
57    pub fn deserialize_from_newline_delimited(s: &str) -> PerEpochManifest {
58        PerEpochManifest {
59            lines: s.lines().map(String::from).collect(),
60        }
61    }
62
63    // Method to filter lines by a given prefix
64    pub fn filter_by_prefix(&self, prefix: &str) -> PerEpochManifest {
65        let filtered_lines = self
66            .lines
67            .iter()
68            .filter(|line| line.starts_with(prefix))
69            .cloned()
70            .collect();
71
72        PerEpochManifest {
73            lines: filtered_lines,
74        }
75    }
76}
77
78pub async fn get<S: ObjectStoreGetExt>(store: &S, src: &Path) -> Result<Bytes> {
79    let bytes = retry(backoff::ExponentialBackoff::default(), || async {
80        store.get_bytes(src).await.map_err(|e| {
81            error!("Failed to read file from object store with error: {:?}", &e);
82            backoff::Error::transient(e)
83        })
84    })
85    .await?;
86    Ok(bytes)
87}
88
89pub async fn exists<S: ObjectStoreGetExt>(store: &S, src: &Path) -> bool {
90    store.get_bytes(src).await.is_ok()
91}
92
93/// Writes bytes in the store with specified path.
94pub async fn put<S: ObjectStorePutExt>(store: &S, src: &Path, bytes: Bytes) -> Result<()> {
95    retry(backoff::ExponentialBackoff::default(), || async {
96        if !bytes.is_empty() {
97            store.put_bytes(src, bytes.clone()).await.map_err(|e| {
98                error!("Failed to write file to object store with error: {:?}", &e);
99                backoff::Error::transient(e)
100            })
101        } else {
102            warn!("Not copying empty file: {:?}", src);
103            Ok(())
104        }
105    })
106    .await?;
107    Ok(())
108}
109
110pub async fn copy_file<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
111    src: &Path,
112    dest: &Path,
113    src_store: &S,
114    dest_store: &D,
115) -> Result<()> {
116    let bytes = get(src_store, src).await?;
117    if !bytes.is_empty() {
118        put(dest_store, dest, bytes).await
119    } else {
120        warn!("Not copying empty file: {:?}", src);
121        Ok(())
122    }
123}
124
125pub async fn copy_files<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
126    src: &[Path],
127    dest: &[Path],
128    src_store: &S,
129    dest_store: &D,
130    concurrency: NonZeroUsize,
131    progress_bar: Option<ProgressBar>,
132) -> Result<Vec<()>> {
133    let mut instant = Instant::now();
134    let progress_bar_clone = progress_bar.clone();
135    // Copies files from dest to src in parallel, and updates the progress bar if
136    // it's provided
137    let results = futures::stream::iter(src.iter().zip(dest.iter()))
138        .map(|(path_in, path_out)| async move {
139            let ret = copy_file(path_in, path_out, src_store, dest_store).await;
140            Ok((path_out.clone(), ret))
141        })
142        .boxed()
143        .buffer_unordered(concurrency.get())
144        .try_for_each(|(path, ret)| {
145            if let Some(progress_bar_clone) = &progress_bar_clone {
146                progress_bar_clone.inc(1);
147                progress_bar_clone.set_message(format!("file: {}", path));
148                instant = Instant::now();
149            }
150            futures::future::ready(ret)
151        })
152        .await;
153    Ok(results.into_iter().collect())
154}
155
156/// Copies all files in the directory from the source store to the destination
157/// store.
158pub async fn copy_recursively<S: ObjectStoreGetExt + ObjectStoreListExt, D: ObjectStorePutExt>(
159    dir: &Path,
160    src_store: &S,
161    dest_store: &D,
162    concurrency: NonZeroUsize,
163) -> Result<Vec<()>> {
164    let mut input_paths = vec![];
165    let mut output_paths = vec![];
166    let mut paths = src_store.list_objects(Some(dir)).await;
167    while let Some(res) = paths.next().await {
168        if let Ok(object_metadata) = res {
169            input_paths.push(object_metadata.location.clone());
170            output_paths.push(object_metadata.location);
171        } else {
172            return Err(res.err().unwrap().into());
173        }
174    }
175    copy_files(
176        &input_paths,
177        &output_paths,
178        src_store,
179        dest_store,
180        concurrency,
181        None,
182    )
183    .await
184}
185
186pub async fn delete_files<S: ObjectStoreDeleteExt>(
187    files: &[Path],
188    store: &S,
189    concurrency: NonZeroUsize,
190) -> Result<Vec<()>> {
191    let results: Vec<Result<()>> = futures::stream::iter(files)
192        .map(|f| {
193            retry(backoff::ExponentialBackoff::default(), || async {
194                store.delete_object(f).await.map_err(|e| {
195                    error!("Failed to delete file on object store with error: {:?}", &e);
196                    backoff::Error::transient(e)
197                })
198            })
199        })
200        .boxed()
201        .buffer_unordered(concurrency.get())
202        .collect()
203        .await;
204    results.into_iter().collect()
205}
206
207pub async fn delete_recursively<S: ObjectStoreDeleteExt + ObjectStoreListExt>(
208    path: &Path,
209    store: &S,
210    concurrency: NonZeroUsize,
211) -> Result<Vec<()>> {
212    let mut paths_to_delete = vec![];
213    let mut paths = store.list_objects(Some(path)).await;
214    while let Some(res) = paths.next().await {
215        if let Ok(object_metadata) = res {
216            paths_to_delete.push(object_metadata.location);
217        } else {
218            return Err(res.err().unwrap().into());
219        }
220    }
221    delete_files(&paths_to_delete, store, concurrency).await
222}
223
224pub fn path_to_filesystem(local_dir_path: PathBuf, location: &Path) -> anyhow::Result<PathBuf> {
225    // Convert an `object_store::path::Path` to `std::path::PathBuf`
226    let path = std::fs::canonicalize(local_dir_path)?;
227    let mut url = Url::from_file_path(&path)
228        .map_err(|_| anyhow!("Failed to parse input path: {}", path.display()))?;
229    url.path_segments_mut()
230        .map_err(|_| anyhow!("Failed to get path segments: {}", path.display()))?
231        .pop_if_empty()
232        .extend(location.parts());
233    let new_path = url
234        .to_file_path()
235        .map_err(|_| anyhow!("Failed to convert url to path: {}", url.as_str()))?;
236    Ok(new_path)
237}
238
239/// This function will find all child directories in the input store which are
240/// of the form "epoch_num" and return a map of epoch number to the directory
241/// path
242pub async fn find_all_dirs_with_epoch_prefix(
243    store: &Arc<DynObjectStore>,
244    prefix: Option<&Path>,
245) -> anyhow::Result<BTreeMap<u64, Path>> {
246    let mut dirs = BTreeMap::new();
247    let entries = store.list_with_delimiter(prefix).await?;
248    for entry in entries.common_prefixes {
249        if let Some(filename) = entry.filename() {
250            if !filename.starts_with("epoch_") || filename.ends_with(".tmp") {
251                continue;
252            }
253            let epoch = filename
254                .split_once('_')
255                .context("Failed to split dir name")
256                .map(|(_, epoch)| epoch.parse::<u64>())??;
257            dirs.insert(epoch, entry);
258        }
259    }
260    Ok(dirs)
261}
262
263/// Finds all epochs in the store and returns them as a sorted list.
264pub async fn list_all_epochs(object_store: Arc<DynObjectStore>) -> Result<Vec<u64>> {
265    let remote_epoch_dirs = find_all_dirs_with_epoch_prefix(&object_store, None).await?;
266    let mut out = vec![];
267    let mut success_marker_found = false;
268    for (epoch, path) in remote_epoch_dirs.iter().sorted() {
269        let success_marker = path.child("_SUCCESS");
270        let get_result = object_store.get(&success_marker).await;
271        match get_result {
272            Err(_) => {
273                if !success_marker_found {
274                    error!("No success marker found for epoch: {epoch}");
275                }
276            }
277            Ok(_) => {
278                out.push(*epoch);
279                success_marker_found = true;
280            }
281        }
282    }
283    Ok(out)
284}
285
286/// Writes the epochs existed in the store to the root MANIFEST (contains only a
287/// list of epochs in the store) every 300 seconds.
288// TODO: Is 300 seconds too frequent? Or should this be triggered by other
289// events?
290pub async fn run_manifest_update_loop(
291    store: Arc<DynObjectStore>,
292    mut recv: tokio::sync::broadcast::Receiver<()>,
293) -> Result<()> {
294    let mut update_interval = tokio::time::interval(Duration::from_secs(300));
295    loop {
296        tokio::select! {
297            _now = update_interval.tick() => {
298                if let Ok(epochs) = list_all_epochs(store.clone()).await {
299                    let manifest_path = Path::from(MANIFEST_FILENAME);
300                    let manifest = Manifest::new(epochs);
301                    let bytes = serde_json::to_string(&manifest)?;
302                    put(&store, &manifest_path, Bytes::from(bytes)).await?;
303                }
304            },
305             _ = recv.recv() => break,
306        }
307    }
308    Ok(())
309}
310
311/// This function will find all child directories in the input store which are
312/// of the form "epoch_num" and return a map of epoch number to the directory
313/// path
314pub async fn find_all_files_with_epoch_prefix(
315    store: &Arc<DynObjectStore>,
316    prefix: Option<&Path>,
317) -> anyhow::Result<Vec<Range<u64>>> {
318    let mut ranges = Vec::new();
319    let entries = store.list_with_delimiter(prefix).await?;
320    for entry in entries.objects {
321        let checkpoint_seq_range = entry
322            .location
323            .filename()
324            .ok_or(anyhow!("Illegal file name"))?
325            .split_once('.')
326            .context("Failed to split dir name")?
327            .0
328            .split_once('_')
329            .context("Failed to split dir name")
330            .map(|(start, end)| Range {
331                start: start.parse::<u64>().unwrap(),
332                end: end.parse::<u64>().unwrap(),
333            })?;
334
335        ranges.push(checkpoint_seq_range);
336    }
337    Ok(ranges)
338}
339
340/// This function will find missing epoch directories in the input store and
341/// return a list of such epoch numbers. If the highest epoch directory in the
342/// store is `epoch_N` then it is expected that the store will have all epoch
343/// directories from `epoch_0` to `epoch_N`. Additionally, any epoch directory
344/// should have the passed in marker file present or else that epoch number is
345/// already considered as missing.
346/// The returned list will contain epoch_N+1.
347pub async fn find_missing_epochs_dirs(
348    store: &Arc<DynObjectStore>,
349    success_marker: &str,
350) -> anyhow::Result<Vec<u64>> {
351    let remote_checkpoints_by_epoch = find_all_dirs_with_epoch_prefix(store, None).await?;
352    let mut dirs: Vec<_> = remote_checkpoints_by_epoch.iter().collect();
353    dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
354    let mut candidate_epoch: u64 = 0;
355    let mut missing_epochs = Vec::new();
356    for (epoch_num, path) in dirs {
357        while candidate_epoch < *epoch_num {
358            // The whole epoch directory is missing
359            missing_epochs.push(candidate_epoch);
360            candidate_epoch += 1;
361            continue;
362        }
363        let success_marker = path.child(success_marker);
364        let get_result = store.get(&success_marker).await;
365        match get_result {
366            Err(Error::NotFound { .. }) => {
367                error!("No success marker found in db checkpoint for epoch: {epoch_num}");
368                missing_epochs.push(*epoch_num);
369            }
370            Err(_) => {
371                // Probably a transient error
372                warn!(
373                    "Failed while trying to read success marker in db checkpoint for epoch: {epoch_num}"
374                );
375            }
376            Ok(_) => {
377                // Nothing to do
378            }
379        }
380        candidate_epoch += 1
381    }
382    missing_epochs.push(candidate_epoch);
383    Ok(missing_epochs)
384}
385
386pub fn get_path(prefix: &str) -> Path {
387    Path::from(prefix)
388}
389
390// Snapshot MANIFEST file is very simple. Just a newline delimited list of all
391// paths in the snapshot directory this simplicity enables easy parsing for
392// scripts to download snapshots
393pub async fn write_snapshot_manifest<S: ObjectStoreListExt + ObjectStorePutExt>(
394    dir: &Path,
395    store: &S,
396    epoch_prefix: String,
397) -> Result<()> {
398    let mut file_names = vec![];
399    let mut paths = store.list_objects(Some(dir)).await;
400    while let Some(res) = paths.next().await {
401        if let Ok(object_metadata) = res {
402            // trim the "epoch_XX/" dir prefix here
403            let mut path_str = object_metadata.location.to_string();
404            if path_str.starts_with(&epoch_prefix) {
405                path_str = String::from(&path_str[epoch_prefix.len()..]);
406                file_names.push(path_str);
407            } else {
408                warn!("{path_str}, should be coming from the files in the {epoch_prefix} dir",)
409            }
410        } else {
411            return Err(res.err().unwrap().into());
412        }
413    }
414
415    let epoch_manifest = PerEpochManifest::new(file_names);
416    let bytes = Bytes::from(epoch_manifest.serialize_as_newline_delimited());
417    put(
418        store,
419        &Path::from(format!("{}/{}", dir, MANIFEST_FILENAME)),
420        bytes,
421    )
422    .await?;
423
424    Ok(())
425}
426
427#[cfg(test)]
428mod tests {
429    use std::{fs, num::NonZeroUsize};
430
431    use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
432    use object_store::path::Path;
433    use tempfile::TempDir;
434
435    use crate::object_store::util::{
436        MANIFEST_FILENAME, copy_recursively, delete_recursively, write_snapshot_manifest,
437    };
438
439    #[tokio::test]
440    pub async fn test_copy_recursively() -> anyhow::Result<()> {
441        let input = TempDir::new()?;
442        let input_path = input.path();
443        let child = input_path.join("child");
444        fs::create_dir(&child)?;
445        let file1 = child.join("file1");
446        fs::write(file1, b"Lorem ipsum")?;
447        let grandchild = child.join("grand_child");
448        fs::create_dir(&grandchild)?;
449        let file2 = grandchild.join("file2");
450        fs::write(file2, b"Lorem ipsum")?;
451
452        let output = TempDir::new()?;
453        let output_path = output.path();
454
455        let input_store = ObjectStoreConfig {
456            object_store: Some(ObjectStoreType::File),
457            directory: Some(input_path.to_path_buf()),
458            ..Default::default()
459        }
460        .make()?;
461
462        let output_store = ObjectStoreConfig {
463            object_store: Some(ObjectStoreType::File),
464            directory: Some(output_path.to_path_buf()),
465            ..Default::default()
466        }
467        .make()?;
468
469        copy_recursively(
470            &Path::from("child"),
471            &input_store,
472            &output_store,
473            NonZeroUsize::new(1).unwrap(),
474        )
475        .await?;
476
477        assert!(output_path.join("child").exists());
478        assert!(output_path.join("child").join("file1").exists());
479        assert!(output_path.join("child").join("grand_child").exists());
480        assert!(
481            output_path
482                .join("child")
483                .join("grand_child")
484                .join("file2")
485                .exists()
486        );
487        let content = fs::read_to_string(output_path.join("child").join("file1"))?;
488        assert_eq!(content, "Lorem ipsum");
489        let content =
490            fs::read_to_string(output_path.join("child").join("grand_child").join("file2"))?;
491        assert_eq!(content, "Lorem ipsum");
492        Ok(())
493    }
494
495    #[tokio::test]
496    pub async fn test_write_snapshot_manifest() -> anyhow::Result<()> {
497        let input = TempDir::new()?;
498        let input_path = input.path();
499        let epoch_0 = input_path.join("epoch_0");
500        fs::create_dir(&epoch_0)?;
501        let file1 = epoch_0.join("file1");
502        fs::write(file1, b"Lorem ipsum")?;
503        let file2 = epoch_0.join("file2");
504        fs::write(file2, b"Lorem ipsum")?;
505        let grandchild = epoch_0.join("grand_child");
506        fs::create_dir(&grandchild)?;
507        let file3 = grandchild.join("file2.tar.gz");
508        fs::write(file3, b"Lorem ipsum")?;
509
510        let input_store = ObjectStoreConfig {
511            object_store: Some(ObjectStoreType::File),
512            directory: Some(input_path.to_path_buf()),
513            ..Default::default()
514        }
515        .make()?;
516
517        write_snapshot_manifest(
518            &Path::from("epoch_0"),
519            &input_store,
520            String::from("epoch_0/"),
521        )
522        .await?;
523
524        assert!(input_path.join("epoch_0").join(MANIFEST_FILENAME).exists());
525        let content = fs::read_to_string(input_path.join("epoch_0").join(MANIFEST_FILENAME))?;
526        assert!(content.contains("file2"));
527        assert!(content.contains("file1"));
528        assert!(content.contains("grand_child/file2.tar.gz"));
529        Ok(())
530    }
531
532    #[tokio::test]
533    pub async fn test_delete_recursively() -> anyhow::Result<()> {
534        let input = TempDir::new()?;
535        let input_path = input.path();
536        let child = input_path.join("child");
537        fs::create_dir(&child)?;
538        let file1 = child.join("file1");
539        fs::write(file1, b"Lorem ipsum")?;
540        let grandchild = child.join("grand_child");
541        fs::create_dir(&grandchild)?;
542        let file2 = grandchild.join("file2");
543        fs::write(file2, b"Lorem ipsum")?;
544
545        let input_store = ObjectStoreConfig {
546            object_store: Some(ObjectStoreType::File),
547            directory: Some(input_path.to_path_buf()),
548            ..Default::default()
549        }
550        .make()?;
551
552        delete_recursively(
553            &Path::from("child"),
554            &input_store,
555            NonZeroUsize::new(1).unwrap(),
556        )
557        .await?;
558
559        assert!(!input_path.join("child").join("file1").exists());
560        assert!(
561            !input_path
562                .join("child")
563                .join("grand_child")
564                .join("file2")
565                .exists()
566        );
567        Ok(())
568    }
569}