iota_storage/object_store/
util.rs1use std::{
6 collections::BTreeMap, num::NonZeroUsize, ops::Range, path::PathBuf, sync::Arc, time::Duration,
7};
8
9use anyhow::{Context, Result, anyhow};
10use backoff::future::retry;
11use bytes::Bytes;
12use futures::{StreamExt, TryStreamExt};
13use indicatif::ProgressBar;
14use itertools::Itertools;
15use object_store::{DynObjectStore, Error, ObjectStore, path::Path};
16use serde::{Deserialize, Serialize};
17use tokio::time::Instant;
18use tracing::{error, warn};
19use url::Url;
20
21use crate::object_store::{
22 ObjectStoreDeleteExt, ObjectStoreGetExt, ObjectStoreListExt, ObjectStorePutExt,
23};
24
25pub const MANIFEST_FILENAME: &str = "MANIFEST";
26
27#[derive(Serialize, Deserialize)]
28
29pub struct Manifest {
30 pub available_epochs: Vec<u64>,
31}
32
33impl Manifest {
34 pub fn new(available_epochs: Vec<u64>) -> Self {
35 Manifest { available_epochs }
36 }
37
38 pub fn epoch_exists(&self, epoch: u64) -> bool {
39 self.available_epochs.contains(&epoch)
40 }
41}
42
43#[derive(Debug, Clone)]
44pub struct PerEpochManifest {
45 pub lines: Vec<String>,
46}
47
48impl PerEpochManifest {
49 pub fn new(lines: Vec<String>) -> Self {
50 PerEpochManifest { lines }
51 }
52
53 pub fn serialize_as_newline_delimited(&self) -> String {
54 self.lines.join("\n")
55 }
56
57 pub fn deserialize_from_newline_delimited(s: &str) -> PerEpochManifest {
58 PerEpochManifest {
59 lines: s.lines().map(String::from).collect(),
60 }
61 }
62
63 pub fn filter_by_prefix(&self, prefix: &str) -> PerEpochManifest {
65 let filtered_lines = self
66 .lines
67 .iter()
68 .filter(|line| line.starts_with(prefix))
69 .cloned()
70 .collect();
71
72 PerEpochManifest {
73 lines: filtered_lines,
74 }
75 }
76}
77
78pub async fn get<S: ObjectStoreGetExt>(store: &S, src: &Path) -> Result<Bytes> {
79 let bytes = retry(backoff::ExponentialBackoff::default(), || async {
80 store.get_bytes(src).await.map_err(|e| {
81 error!("Failed to read file from object store with error: {:?}", &e);
82 backoff::Error::transient(e)
83 })
84 })
85 .await?;
86 Ok(bytes)
87}
88
89pub async fn exists<S: ObjectStoreGetExt>(store: &S, src: &Path) -> bool {
90 store.get_bytes(src).await.is_ok()
91}
92
93pub async fn put<S: ObjectStorePutExt>(store: &S, src: &Path, bytes: Bytes) -> Result<()> {
95 retry(backoff::ExponentialBackoff::default(), || async {
96 if !bytes.is_empty() {
97 store.put_bytes(src, bytes.clone()).await.map_err(|e| {
98 error!("Failed to write file to object store with error: {:?}", &e);
99 backoff::Error::transient(e)
100 })
101 } else {
102 warn!("Not copying empty file: {:?}", src);
103 Ok(())
104 }
105 })
106 .await?;
107 Ok(())
108}
109
110pub async fn copy_file<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
111 src: &Path,
112 dest: &Path,
113 src_store: &S,
114 dest_store: &D,
115) -> Result<()> {
116 let bytes = get(src_store, src).await?;
117 if !bytes.is_empty() {
118 put(dest_store, dest, bytes).await
119 } else {
120 warn!("Not copying empty file: {:?}", src);
121 Ok(())
122 }
123}
124
125pub async fn copy_files<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
126 src: &[Path],
127 dest: &[Path],
128 src_store: &S,
129 dest_store: &D,
130 concurrency: NonZeroUsize,
131 progress_bar: Option<ProgressBar>,
132) -> Result<Vec<()>> {
133 let mut instant = Instant::now();
134 let progress_bar_clone = progress_bar.clone();
135 let results = futures::stream::iter(src.iter().zip(dest.iter()))
138 .map(|(path_in, path_out)| async move {
139 let ret = copy_file(path_in, path_out, src_store, dest_store).await;
140 Ok((path_out.clone(), ret))
141 })
142 .boxed()
143 .buffer_unordered(concurrency.get())
144 .try_for_each(|(path, ret)| {
145 if let Some(progress_bar_clone) = &progress_bar_clone {
146 progress_bar_clone.inc(1);
147 progress_bar_clone.set_message(format!("file: {}", path));
148 instant = Instant::now();
149 }
150 futures::future::ready(ret)
151 })
152 .await;
153 Ok(results.into_iter().collect())
154}
155
156pub async fn copy_recursively<S: ObjectStoreGetExt + ObjectStoreListExt, D: ObjectStorePutExt>(
159 dir: &Path,
160 src_store: &S,
161 dest_store: &D,
162 concurrency: NonZeroUsize,
163) -> Result<Vec<()>> {
164 let mut input_paths = vec![];
165 let mut output_paths = vec![];
166 let mut paths = src_store.list_objects(Some(dir)).await;
167 while let Some(res) = paths.next().await {
168 if let Ok(object_metadata) = res {
169 input_paths.push(object_metadata.location.clone());
170 output_paths.push(object_metadata.location);
171 } else {
172 return Err(res.err().unwrap().into());
173 }
174 }
175 copy_files(
176 &input_paths,
177 &output_paths,
178 src_store,
179 dest_store,
180 concurrency,
181 None,
182 )
183 .await
184}
185
186pub async fn delete_files<S: ObjectStoreDeleteExt>(
187 files: &[Path],
188 store: &S,
189 concurrency: NonZeroUsize,
190) -> Result<Vec<()>> {
191 let results: Vec<Result<()>> = futures::stream::iter(files)
192 .map(|f| {
193 retry(backoff::ExponentialBackoff::default(), || async {
194 store.delete_object(f).await.map_err(|e| {
195 error!("Failed to delete file on object store with error: {:?}", &e);
196 backoff::Error::transient(e)
197 })
198 })
199 })
200 .boxed()
201 .buffer_unordered(concurrency.get())
202 .collect()
203 .await;
204 results.into_iter().collect()
205}
206
207pub async fn delete_recursively<S: ObjectStoreDeleteExt + ObjectStoreListExt>(
208 path: &Path,
209 store: &S,
210 concurrency: NonZeroUsize,
211) -> Result<Vec<()>> {
212 let mut paths_to_delete = vec![];
213 let mut paths = store.list_objects(Some(path)).await;
214 while let Some(res) = paths.next().await {
215 if let Ok(object_metadata) = res {
216 paths_to_delete.push(object_metadata.location);
217 } else {
218 return Err(res.err().unwrap().into());
219 }
220 }
221 delete_files(&paths_to_delete, store, concurrency).await
222}
223
224pub fn path_to_filesystem(local_dir_path: PathBuf, location: &Path) -> anyhow::Result<PathBuf> {
225 let path = std::fs::canonicalize(local_dir_path)?;
227 let mut url = Url::from_file_path(&path)
228 .map_err(|_| anyhow!("Failed to parse input path: {}", path.display()))?;
229 url.path_segments_mut()
230 .map_err(|_| anyhow!("Failed to get path segments: {}", path.display()))?
231 .pop_if_empty()
232 .extend(location.parts());
233 let new_path = url
234 .to_file_path()
235 .map_err(|_| anyhow!("Failed to convert url to path: {}", url.as_str()))?;
236 Ok(new_path)
237}
238
239pub async fn find_all_dirs_with_epoch_prefix(
243 store: &Arc<DynObjectStore>,
244 prefix: Option<&Path>,
245) -> anyhow::Result<BTreeMap<u64, Path>> {
246 let mut dirs = BTreeMap::new();
247 let entries = store.list_with_delimiter(prefix).await?;
248 for entry in entries.common_prefixes {
249 if let Some(filename) = entry.filename() {
250 if !filename.starts_with("epoch_") || filename.ends_with(".tmp") {
251 continue;
252 }
253 let epoch = filename
254 .split_once('_')
255 .context("Failed to split dir name")
256 .map(|(_, epoch)| epoch.parse::<u64>())??;
257 dirs.insert(epoch, entry);
258 }
259 }
260 Ok(dirs)
261}
262
263pub async fn list_all_epochs(object_store: Arc<DynObjectStore>) -> Result<Vec<u64>> {
265 let remote_epoch_dirs = find_all_dirs_with_epoch_prefix(&object_store, None).await?;
266 let mut out = vec![];
267 let mut success_marker_found = false;
268 for (epoch, path) in remote_epoch_dirs.iter().sorted() {
269 let success_marker = path.child("_SUCCESS");
270 let get_result = object_store.get(&success_marker).await;
271 match get_result {
272 Err(_) => {
273 if !success_marker_found {
274 error!("No success marker found for epoch: {epoch}");
275 }
276 }
277 Ok(_) => {
278 out.push(*epoch);
279 success_marker_found = true;
280 }
281 }
282 }
283 Ok(out)
284}
285
286pub async fn run_manifest_update_loop(
291 store: Arc<DynObjectStore>,
292 mut recv: tokio::sync::broadcast::Receiver<()>,
293) -> Result<()> {
294 let mut update_interval = tokio::time::interval(Duration::from_secs(300));
295 loop {
296 tokio::select! {
297 _now = update_interval.tick() => {
298 if let Ok(epochs) = list_all_epochs(store.clone()).await {
299 let manifest_path = Path::from(MANIFEST_FILENAME);
300 let manifest = Manifest::new(epochs);
301 let bytes = serde_json::to_string(&manifest)?;
302 put(&store, &manifest_path, Bytes::from(bytes)).await?;
303 }
304 },
305 _ = recv.recv() => break,
306 }
307 }
308 Ok(())
309}
310
311pub async fn find_all_files_with_epoch_prefix(
315 store: &Arc<DynObjectStore>,
316 prefix: Option<&Path>,
317) -> anyhow::Result<Vec<Range<u64>>> {
318 let mut ranges = Vec::new();
319 let entries = store.list_with_delimiter(prefix).await?;
320 for entry in entries.objects {
321 let checkpoint_seq_range = entry
322 .location
323 .filename()
324 .ok_or(anyhow!("Illegal file name"))?
325 .split_once('.')
326 .context("Failed to split dir name")?
327 .0
328 .split_once('_')
329 .context("Failed to split dir name")
330 .map(|(start, end)| Range {
331 start: start.parse::<u64>().unwrap(),
332 end: end.parse::<u64>().unwrap(),
333 })?;
334
335 ranges.push(checkpoint_seq_range);
336 }
337 Ok(ranges)
338}
339
340pub async fn find_missing_epochs_dirs(
348 store: &Arc<DynObjectStore>,
349 success_marker: &str,
350) -> anyhow::Result<Vec<u64>> {
351 let remote_checkpoints_by_epoch = find_all_dirs_with_epoch_prefix(store, None).await?;
352 let mut dirs: Vec<_> = remote_checkpoints_by_epoch.iter().collect();
353 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
354 let mut candidate_epoch: u64 = 0;
355 let mut missing_epochs = Vec::new();
356 for (epoch_num, path) in dirs {
357 while candidate_epoch < *epoch_num {
358 missing_epochs.push(candidate_epoch);
360 candidate_epoch += 1;
361 continue;
362 }
363 let success_marker = path.child(success_marker);
364 let get_result = store.get(&success_marker).await;
365 match get_result {
366 Err(Error::NotFound { .. }) => {
367 error!("No success marker found in db checkpoint for epoch: {epoch_num}");
368 missing_epochs.push(*epoch_num);
369 }
370 Err(_) => {
371 warn!(
373 "Failed while trying to read success marker in db checkpoint for epoch: {epoch_num}"
374 );
375 }
376 Ok(_) => {
377 }
379 }
380 candidate_epoch += 1
381 }
382 missing_epochs.push(candidate_epoch);
383 Ok(missing_epochs)
384}
385
386pub fn get_path(prefix: &str) -> Path {
387 Path::from(prefix)
388}
389
390pub async fn write_snapshot_manifest<S: ObjectStoreListExt + ObjectStorePutExt>(
394 dir: &Path,
395 store: &S,
396 epoch_prefix: String,
397) -> Result<()> {
398 let mut file_names = vec![];
399 let mut paths = store.list_objects(Some(dir)).await;
400 while let Some(res) = paths.next().await {
401 if let Ok(object_metadata) = res {
402 let mut path_str = object_metadata.location.to_string();
404 if path_str.starts_with(&epoch_prefix) {
405 path_str = String::from(&path_str[epoch_prefix.len()..]);
406 file_names.push(path_str);
407 } else {
408 warn!("{path_str}, should be coming from the files in the {epoch_prefix} dir",)
409 }
410 } else {
411 return Err(res.err().unwrap().into());
412 }
413 }
414
415 let epoch_manifest = PerEpochManifest::new(file_names);
416 let bytes = Bytes::from(epoch_manifest.serialize_as_newline_delimited());
417 put(
418 store,
419 &Path::from(format!("{}/{}", dir, MANIFEST_FILENAME)),
420 bytes,
421 )
422 .await?;
423
424 Ok(())
425}
426
427#[cfg(test)]
428mod tests {
429 use std::{fs, num::NonZeroUsize};
430
431 use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
432 use object_store::path::Path;
433 use tempfile::TempDir;
434
435 use crate::object_store::util::{
436 MANIFEST_FILENAME, copy_recursively, delete_recursively, write_snapshot_manifest,
437 };
438
439 #[tokio::test]
440 pub async fn test_copy_recursively() -> anyhow::Result<()> {
441 let input = TempDir::new()?;
442 let input_path = input.path();
443 let child = input_path.join("child");
444 fs::create_dir(&child)?;
445 let file1 = child.join("file1");
446 fs::write(file1, b"Lorem ipsum")?;
447 let grandchild = child.join("grand_child");
448 fs::create_dir(&grandchild)?;
449 let file2 = grandchild.join("file2");
450 fs::write(file2, b"Lorem ipsum")?;
451
452 let output = TempDir::new()?;
453 let output_path = output.path();
454
455 let input_store = ObjectStoreConfig {
456 object_store: Some(ObjectStoreType::File),
457 directory: Some(input_path.to_path_buf()),
458 ..Default::default()
459 }
460 .make()?;
461
462 let output_store = ObjectStoreConfig {
463 object_store: Some(ObjectStoreType::File),
464 directory: Some(output_path.to_path_buf()),
465 ..Default::default()
466 }
467 .make()?;
468
469 copy_recursively(
470 &Path::from("child"),
471 &input_store,
472 &output_store,
473 NonZeroUsize::new(1).unwrap(),
474 )
475 .await?;
476
477 assert!(output_path.join("child").exists());
478 assert!(output_path.join("child").join("file1").exists());
479 assert!(output_path.join("child").join("grand_child").exists());
480 assert!(
481 output_path
482 .join("child")
483 .join("grand_child")
484 .join("file2")
485 .exists()
486 );
487 let content = fs::read_to_string(output_path.join("child").join("file1"))?;
488 assert_eq!(content, "Lorem ipsum");
489 let content =
490 fs::read_to_string(output_path.join("child").join("grand_child").join("file2"))?;
491 assert_eq!(content, "Lorem ipsum");
492 Ok(())
493 }
494
495 #[tokio::test]
496 pub async fn test_write_snapshot_manifest() -> anyhow::Result<()> {
497 let input = TempDir::new()?;
498 let input_path = input.path();
499 let epoch_0 = input_path.join("epoch_0");
500 fs::create_dir(&epoch_0)?;
501 let file1 = epoch_0.join("file1");
502 fs::write(file1, b"Lorem ipsum")?;
503 let file2 = epoch_0.join("file2");
504 fs::write(file2, b"Lorem ipsum")?;
505 let grandchild = epoch_0.join("grand_child");
506 fs::create_dir(&grandchild)?;
507 let file3 = grandchild.join("file2.tar.gz");
508 fs::write(file3, b"Lorem ipsum")?;
509
510 let input_store = ObjectStoreConfig {
511 object_store: Some(ObjectStoreType::File),
512 directory: Some(input_path.to_path_buf()),
513 ..Default::default()
514 }
515 .make()?;
516
517 write_snapshot_manifest(
518 &Path::from("epoch_0"),
519 &input_store,
520 String::from("epoch_0/"),
521 )
522 .await?;
523
524 assert!(input_path.join("epoch_0").join(MANIFEST_FILENAME).exists());
525 let content = fs::read_to_string(input_path.join("epoch_0").join(MANIFEST_FILENAME))?;
526 assert!(content.contains("file2"));
527 assert!(content.contains("file1"));
528 assert!(content.contains("grand_child/file2.tar.gz"));
529 Ok(())
530 }
531
532 #[tokio::test]
533 pub async fn test_delete_recursively() -> anyhow::Result<()> {
534 let input = TempDir::new()?;
535 let input_path = input.path();
536 let child = input_path.join("child");
537 fs::create_dir(&child)?;
538 let file1 = child.join("file1");
539 fs::write(file1, b"Lorem ipsum")?;
540 let grandchild = child.join("grand_child");
541 fs::create_dir(&grandchild)?;
542 let file2 = grandchild.join("file2");
543 fs::write(file2, b"Lorem ipsum")?;
544
545 let input_store = ObjectStoreConfig {
546 object_store: Some(ObjectStoreType::File),
547 directory: Some(input_path.to_path_buf()),
548 ..Default::default()
549 }
550 .make()?;
551
552 delete_recursively(
553 &Path::from("child"),
554 &input_store,
555 NonZeroUsize::new(1).unwrap(),
556 )
557 .await?;
558
559 assert!(!input_path.join("child").join("file1").exists());
560 assert!(
561 !input_path
562 .join("child")
563 .join("grand_child")
564 .join("file2")
565 .exists()
566 );
567 Ok(())
568 }
569}