iota_storage/object_store/
util.rs1use std::{
6 collections::BTreeMap, num::NonZeroUsize, ops::Range, path::PathBuf, sync::Arc, time::Duration,
7};
8
9use anyhow::{Context, Result, anyhow};
10use backoff::future::retry;
11use bytes::Bytes;
12use futures::{StreamExt, TryStreamExt};
13use indicatif::ProgressBar;
14use itertools::Itertools;
15use object_store::{DynObjectStore, Error, ObjectStore, ObjectStoreExt, path::Path};
16use serde::{Deserialize, Serialize};
17use tokio::time::Instant;
18use tracing::{error, warn};
19use url::Url;
20
21use crate::object_store::{
22 ObjectStoreDeleteExt, ObjectStoreGetExt, ObjectStoreListExt, ObjectStorePutExt,
23};
24
25pub const MANIFEST_FILENAME: &str = "MANIFEST";
26pub const EPOCH_METADATA_FILENAME: &str = "_epoch_metadata.json";
27
28#[derive(Serialize, Deserialize)]
29pub struct Manifest {
30 pub available_epochs: Vec<(u64, Option<u64>)>,
32}
33
34impl Manifest {
35 pub fn new(available_epochs: Vec<(u64, Option<u64>)>) -> Self {
36 Manifest { available_epochs }
37 }
38
39 pub fn epoch_exists(&self, epoch: u64) -> bool {
40 self.available_epochs.iter().any(|(e, _)| *e == epoch)
41 }
42}
43
44#[derive(Serialize, Deserialize)]
45pub struct EpochMetadata {
46 pub epoch_start_timestamp_ms: u64,
47}
48
49impl EpochMetadata {
50 pub fn to_bytes(&self) -> Result<Bytes> {
51 Ok(Bytes::from(serde_json::to_vec(self)?))
52 }
53}
54
55#[derive(Debug, Clone)]
56pub struct PerEpochManifest {
57 pub lines: Vec<String>,
58}
59
60impl PerEpochManifest {
61 pub fn new(lines: Vec<String>) -> Self {
62 PerEpochManifest { lines }
63 }
64
65 pub fn serialize_as_newline_delimited(&self) -> String {
66 self.lines.join("\n")
67 }
68
69 pub fn deserialize_from_newline_delimited(s: &str) -> PerEpochManifest {
70 PerEpochManifest {
71 lines: s.lines().map(String::from).collect(),
72 }
73 }
74
75 pub fn filter_by_prefix(&self, prefix: &str) -> PerEpochManifest {
77 let filtered_lines = self
78 .lines
79 .iter()
80 .filter(|line| line.starts_with(prefix))
81 .cloned()
82 .collect();
83
84 PerEpochManifest {
85 lines: filtered_lines,
86 }
87 }
88}
89
90pub async fn get<S: ObjectStoreGetExt>(store: &S, src: &Path) -> Result<Bytes> {
91 let bytes = retry(backoff::ExponentialBackoff::default(), || async {
92 store.get_bytes(src).await.map_err(|e| {
93 error!("Failed to read file from object store with error: {:?}", &e);
94 backoff::Error::transient(e)
95 })
96 })
97 .await?;
98 Ok(bytes)
99}
100
101pub async fn exists<S: ObjectStoreGetExt>(store: &S, src: &Path) -> bool {
102 store.get_bytes(src).await.is_ok()
103}
104
105pub async fn put<S: ObjectStorePutExt>(store: &S, src: &Path, bytes: Bytes) -> Result<()> {
107 retry(backoff::ExponentialBackoff::default(), || async {
108 if !bytes.is_empty() {
109 store.put_bytes(src, bytes.clone()).await.map_err(|e| {
110 error!("Failed to write file to object store with error: {:?}", &e);
111 backoff::Error::transient(e)
112 })
113 } else {
114 warn!("Not copying empty file: {:?}", src);
115 Ok(())
116 }
117 })
118 .await?;
119 Ok(())
120}
121
122pub async fn copy_file<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
123 src: &Path,
124 dest: &Path,
125 src_store: &S,
126 dest_store: &D,
127) -> Result<()> {
128 let bytes = get(src_store, src).await?;
129 if !bytes.is_empty() {
130 put(dest_store, dest, bytes).await
131 } else {
132 warn!("Not copying empty file: {:?}", src);
133 Ok(())
134 }
135}
136
137pub async fn copy_files<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
138 src: &[Path],
139 dest: &[Path],
140 src_store: &S,
141 dest_store: &D,
142 concurrency: NonZeroUsize,
143 progress_bar: Option<ProgressBar>,
144) -> Result<Vec<()>> {
145 let mut instant = Instant::now();
146 let progress_bar_clone = progress_bar.clone();
147 let results = futures::stream::iter(src.iter().zip(dest.iter()))
150 .map(|(path_in, path_out)| async move {
151 let ret = copy_file(path_in, path_out, src_store, dest_store).await;
152 Ok((path_out.clone(), ret))
153 })
154 .boxed()
155 .buffer_unordered(concurrency.get())
156 .try_for_each(|(path, ret)| {
157 if let Some(progress_bar_clone) = &progress_bar_clone {
158 progress_bar_clone.inc(1);
159 progress_bar_clone.set_message(format!("file: {path}"));
160 instant = Instant::now();
161 }
162 futures::future::ready(ret)
163 })
164 .await;
165 Ok(results.into_iter().collect())
166}
167
168pub async fn copy_recursively<S: ObjectStoreGetExt + ObjectStoreListExt, D: ObjectStorePutExt>(
171 dir: &Path,
172 src_store: &S,
173 dest_store: &D,
174 concurrency: NonZeroUsize,
175) -> Result<Vec<()>> {
176 let mut input_paths = vec![];
177 let mut output_paths = vec![];
178 let mut paths = src_store.list_objects(Some(dir)).await;
179 while let Some(res) = paths.next().await {
180 if let Ok(object_metadata) = res {
181 input_paths.push(object_metadata.location.clone());
182 output_paths.push(object_metadata.location);
183 } else {
184 return Err(res.err().unwrap().into());
185 }
186 }
187 copy_files(
188 &input_paths,
189 &output_paths,
190 src_store,
191 dest_store,
192 concurrency,
193 None,
194 )
195 .await
196}
197
198pub async fn delete_files<S: ObjectStoreDeleteExt>(
199 files: &[Path],
200 store: &S,
201 concurrency: NonZeroUsize,
202) -> Result<Vec<()>> {
203 let results: Vec<Result<()>> = futures::stream::iter(files)
204 .map(|f| {
205 retry(backoff::ExponentialBackoff::default(), || async {
206 store.delete_object(f).await.map_err(|e| {
207 error!("Failed to delete file on object store with error: {:?}", &e);
208 backoff::Error::transient(e)
209 })
210 })
211 })
212 .boxed()
213 .buffer_unordered(concurrency.get())
214 .collect()
215 .await;
216 results.into_iter().collect()
217}
218
219pub async fn delete_recursively<S: ObjectStoreDeleteExt + ObjectStoreListExt>(
220 path: &Path,
221 store: &S,
222 concurrency: NonZeroUsize,
223) -> Result<Vec<()>> {
224 let mut paths_to_delete = vec![];
225 let mut paths = store.list_objects(Some(path)).await;
226 while let Some(res) = paths.next().await {
227 if let Ok(object_metadata) = res {
228 paths_to_delete.push(object_metadata.location);
229 } else {
230 return Err(res.err().unwrap().into());
231 }
232 }
233 delete_files(&paths_to_delete, store, concurrency).await
234}
235
236pub fn path_to_filesystem(local_dir_path: PathBuf, location: &Path) -> anyhow::Result<PathBuf> {
237 let path = std::fs::canonicalize(local_dir_path)?;
239 let mut url = Url::from_file_path(&path)
240 .map_err(|_| anyhow!("Failed to parse input path: {}", path.display()))?;
241 url.path_segments_mut()
242 .map_err(|_| anyhow!("Failed to get path segments: {}", path.display()))?
243 .pop_if_empty()
244 .extend(location.parts());
245 let new_path = url
246 .to_file_path()
247 .map_err(|_| anyhow!("Failed to convert url to path: {}", url.as_str()))?;
248 Ok(new_path)
249}
250
251pub async fn find_all_dirs_with_epoch_prefix(
255 store: &Arc<DynObjectStore>,
256 prefix: Option<&Path>,
257) -> anyhow::Result<BTreeMap<u64, Path>> {
258 let mut dirs = BTreeMap::new();
259 let entries = store.list_with_delimiter(prefix).await?;
260 for entry in entries.common_prefixes {
261 if let Some(filename) = entry.filename() {
262 if !filename.starts_with("epoch_") || filename.ends_with(".tmp") {
263 continue;
264 }
265 let epoch = filename
266 .split_once('_')
267 .context("Failed to split dir name")
268 .map(|(_, epoch)| epoch.parse::<u64>())??;
269 dirs.insert(epoch, entry);
270 }
271 }
272 Ok(dirs)
273}
274
275pub async fn list_all_epochs(object_store: Arc<DynObjectStore>) -> Result<Vec<(u64, Option<u64>)>> {
278 let remote_epoch_dirs = find_all_dirs_with_epoch_prefix(&object_store, None).await?;
279 let mut out = vec![];
280 let mut success_marker_found = false;
281 for (epoch, path) in remote_epoch_dirs.iter().sorted() {
282 let success_marker = path.child("_SUCCESS");
283 let get_result = object_store.get(&success_marker).await;
284 match get_result {
285 Err(_) => {
286 if !success_marker_found {
287 error!("No success marker found for epoch: {epoch}");
288 }
289 }
290 Ok(_) => {
291 let metadata_path = path.child(EPOCH_METADATA_FILENAME);
292 let epoch_start_timestamp_ms = match object_store.get_bytes(&metadata_path).await {
293 Ok(bytes) => match serde_json::from_slice::<EpochMetadata>(&bytes) {
294 Ok(metadata) => Some(metadata.epoch_start_timestamp_ms),
295 Err(err) => {
296 warn!("Failed to parse epoch metadata for epoch {epoch}: {err}");
297 None
298 }
299 },
300 Err(_) => None,
301 };
302 out.push((*epoch, epoch_start_timestamp_ms));
303 success_marker_found = true;
304 }
305 }
306 }
307 Ok(out)
308}
309
310pub async fn run_manifest_update_loop(
315 store: Arc<DynObjectStore>,
316 mut recv: tokio::sync::broadcast::Receiver<()>,
317) -> Result<()> {
318 let mut update_interval = tokio::time::interval(Duration::from_secs(300));
319 loop {
320 tokio::select! {
321 _now = update_interval.tick() => {
322 if let Ok(available_epochs) = list_all_epochs(store.clone()).await {
323 let manifest_path = Path::from(MANIFEST_FILENAME);
324 let manifest = Manifest { available_epochs };
325 let bytes = serde_json::to_string(&manifest)?;
326 put(&store, &manifest_path, Bytes::from(bytes)).await?;
327 }
328 },
329 _ = recv.recv() => break,
330 }
331 }
332 Ok(())
333}
334
335pub async fn find_all_files_with_epoch_prefix(
339 store: &Arc<DynObjectStore>,
340 prefix: Option<&Path>,
341) -> anyhow::Result<Vec<Range<u64>>> {
342 let mut ranges = Vec::new();
343 let entries = store.list_with_delimiter(prefix).await?;
344 for entry in entries.objects {
345 let checkpoint_seq_range = entry
346 .location
347 .filename()
348 .ok_or(anyhow!("Illegal file name"))?
349 .split_once('.')
350 .context("Failed to split dir name")?
351 .0
352 .split_once('_')
353 .context("Failed to split dir name")
354 .map(|(start, end)| Range {
355 start: start.parse::<u64>().unwrap(),
356 end: end.parse::<u64>().unwrap(),
357 })?;
358
359 ranges.push(checkpoint_seq_range);
360 }
361 Ok(ranges)
362}
363
364pub async fn find_missing_epochs_dirs(
372 store: &Arc<DynObjectStore>,
373 success_marker: &str,
374) -> anyhow::Result<Vec<u64>> {
375 let remote_checkpoints_by_epoch = find_all_dirs_with_epoch_prefix(store, None).await?;
376 let mut dirs: Vec<_> = remote_checkpoints_by_epoch.iter().collect();
377 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
378 let mut candidate_epoch: u64 = 0;
379 let mut missing_epochs = Vec::new();
380 for (epoch_num, path) in dirs {
381 while candidate_epoch < *epoch_num {
382 missing_epochs.push(candidate_epoch);
384 candidate_epoch += 1;
385 continue;
386 }
387 let success_marker = path.child(success_marker);
388 let get_result = store.get(&success_marker).await;
389 match get_result {
390 Err(Error::NotFound { .. }) => {
391 error!("No success marker found in db checkpoint for epoch: {epoch_num}");
392 missing_epochs.push(*epoch_num);
393 }
394 Err(_) => {
395 warn!(
397 "Failed while trying to read success marker in db checkpoint for epoch: {epoch_num}"
398 );
399 }
400 Ok(_) => {
401 }
403 }
404 candidate_epoch += 1
405 }
406 missing_epochs.push(candidate_epoch);
407 Ok(missing_epochs)
408}
409
410pub fn get_path(prefix: &str) -> Path {
411 Path::from(prefix)
412}
413
414pub async fn write_snapshot_manifest<S: ObjectStoreListExt + ObjectStorePutExt>(
418 dir: &Path,
419 store: &S,
420 epoch_prefix: String,
421) -> Result<()> {
422 let mut file_names = vec![];
423 let mut paths = store.list_objects(Some(dir)).await;
424 while let Some(res) = paths.next().await {
425 if let Ok(object_metadata) = res {
426 let mut path_str = object_metadata.location.to_string();
428 if path_str.starts_with(&epoch_prefix) {
429 path_str = String::from(&path_str[epoch_prefix.len()..]);
430 file_names.push(path_str);
431 } else {
432 warn!("{path_str}, should be coming from the files in the {epoch_prefix} dir",)
433 }
434 } else {
435 return Err(res.err().unwrap().into());
436 }
437 }
438
439 let epoch_manifest = PerEpochManifest::new(file_names);
440 let bytes = Bytes::from(epoch_manifest.serialize_as_newline_delimited());
441 put(
442 store,
443 &Path::from(format!("{dir}/{MANIFEST_FILENAME}")),
444 bytes,
445 )
446 .await?;
447
448 Ok(())
449}
450
451#[cfg(test)]
452mod tests {
453 use std::{fs, num::NonZeroUsize};
454
455 use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
456 use object_store::path::Path;
457 use tempfile::TempDir;
458
459 use crate::object_store::util::{
460 MANIFEST_FILENAME, copy_recursively, delete_recursively, write_snapshot_manifest,
461 };
462
463 #[tokio::test]
464 pub async fn test_copy_recursively() -> anyhow::Result<()> {
465 let input = TempDir::new()?;
466 let input_path = input.path();
467 let child = input_path.join("child");
468 fs::create_dir(&child)?;
469 let file1 = child.join("file1");
470 fs::write(file1, b"Lorem ipsum")?;
471 let grandchild = child.join("grand_child");
472 fs::create_dir(&grandchild)?;
473 let file2 = grandchild.join("file2");
474 fs::write(file2, b"Lorem ipsum")?;
475
476 let output = TempDir::new()?;
477 let output_path = output.path();
478
479 let input_store = ObjectStoreConfig {
480 object_store: Some(ObjectStoreType::File),
481 directory: Some(input_path.to_path_buf()),
482 ..Default::default()
483 }
484 .make()?;
485
486 let output_store = ObjectStoreConfig {
487 object_store: Some(ObjectStoreType::File),
488 directory: Some(output_path.to_path_buf()),
489 ..Default::default()
490 }
491 .make()?;
492
493 copy_recursively(
494 &Path::from("child"),
495 &input_store,
496 &output_store,
497 NonZeroUsize::new(1).unwrap(),
498 )
499 .await?;
500
501 assert!(output_path.join("child").exists());
502 assert!(output_path.join("child").join("file1").exists());
503 assert!(output_path.join("child").join("grand_child").exists());
504 assert!(
505 output_path
506 .join("child")
507 .join("grand_child")
508 .join("file2")
509 .exists()
510 );
511 let content = fs::read_to_string(output_path.join("child").join("file1"))?;
512 assert_eq!(content, "Lorem ipsum");
513 let content =
514 fs::read_to_string(output_path.join("child").join("grand_child").join("file2"))?;
515 assert_eq!(content, "Lorem ipsum");
516 Ok(())
517 }
518
519 #[tokio::test]
520 pub async fn test_write_snapshot_manifest() -> anyhow::Result<()> {
521 let input = TempDir::new()?;
522 let input_path = input.path();
523 let epoch_0 = input_path.join("epoch_0");
524 fs::create_dir(&epoch_0)?;
525 let file1 = epoch_0.join("file1");
526 fs::write(file1, b"Lorem ipsum")?;
527 let file2 = epoch_0.join("file2");
528 fs::write(file2, b"Lorem ipsum")?;
529 let grandchild = epoch_0.join("grand_child");
530 fs::create_dir(&grandchild)?;
531 let file3 = grandchild.join("file2.tar.gz");
532 fs::write(file3, b"Lorem ipsum")?;
533
534 let input_store = ObjectStoreConfig {
535 object_store: Some(ObjectStoreType::File),
536 directory: Some(input_path.to_path_buf()),
537 ..Default::default()
538 }
539 .make()?;
540
541 write_snapshot_manifest(
542 &Path::from("epoch_0"),
543 &input_store,
544 String::from("epoch_0/"),
545 )
546 .await?;
547
548 assert!(input_path.join("epoch_0").join(MANIFEST_FILENAME).exists());
549 let content = fs::read_to_string(input_path.join("epoch_0").join(MANIFEST_FILENAME))?;
550 assert!(content.contains("file2"));
551 assert!(content.contains("file1"));
552 assert!(content.contains("grand_child/file2.tar.gz"));
553 Ok(())
554 }
555
556 #[tokio::test]
557 pub async fn test_delete_recursively() -> anyhow::Result<()> {
558 let input = TempDir::new()?;
559 let input_path = input.path();
560 let child = input_path.join("child");
561 fs::create_dir(&child)?;
562 let file1 = child.join("file1");
563 fs::write(file1, b"Lorem ipsum")?;
564 let grandchild = child.join("grand_child");
565 fs::create_dir(&grandchild)?;
566 let file2 = grandchild.join("file2");
567 fs::write(file2, b"Lorem ipsum")?;
568
569 let input_store = ObjectStoreConfig {
570 object_store: Some(ObjectStoreType::File),
571 directory: Some(input_path.to_path_buf()),
572 ..Default::default()
573 }
574 .make()?;
575
576 delete_recursively(
577 &Path::from("child"),
578 &input_store,
579 NonZeroUsize::new(1).unwrap(),
580 )
581 .await?;
582
583 assert!(!input_path.join("child").join("file1").exists());
584 assert!(
585 !input_path
586 .join("child")
587 .join("grand_child")
588 .join("file2")
589 .exists()
590 );
591 Ok(())
592 }
593}