iota_data_ingestion_core/progress_store/file.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{io::SeekFrom, path::PathBuf};
6
7use async_trait::async_trait;
8use iota_types::messages_checkpoint::CheckpointSequenceNumber;
9use serde_json::{Number, Value};
10use tokio::{
11 fs::{File, OpenOptions},
12 io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
13};
14
15use crate::{IngestionError, IngestionResult, progress_store::ProgressStore};
16
17/// Manages persistent progress information stored in a JSON file.
18///
19/// This struct encapsulates file operations for reading, writing, and
20/// synchronizing progress data to disk. It uses asynchronous I/O provided by
21/// [`tokio::fs`](tokio::fs) for efficient operation within a Tokio runtime.
22///
23/// # Example
24/// ```
25/// use iota_data_ingestion_core::{FileProgressStore, ProgressStore};
26///
27/// #[tokio::main]
28/// async fn main() {
29/// let mut store = FileProgressStore::new("progress.json").await.unwrap();
30/// store.save("task1".into(), 42).await.unwrap();
31/// # tokio::time::sleep(std::time::Duration::from_secs(1)).await;
32/// let checkpoint = store.load("task1".into()).await.unwrap();
33/// # tokio::fs::remove_file("progress.json").await.unwrap();
34/// assert_eq!(checkpoint, 42);
35/// }
36/// ```
37pub struct FileProgressStore {
38 /// The [`File`] handle used to interact with the progress file.
39 file: File,
40}
41
42impl FileProgressStore {
43 /// Creates a new `FileProgressStore` by opening or creating the file at the
44 /// specified path.
45 pub async fn new(path: impl Into<PathBuf>) -> IngestionResult<Self> {
46 Self::open_or_create_file(path.into())
47 .await
48 .map(|file| Self { file })
49 }
50
51 /// Open or create the file at the specified path.
52 async fn open_or_create_file(path: PathBuf) -> IngestionResult<File> {
53 Ok(OpenOptions::new()
54 .read(true)
55 .write(true)
56 .create(true)
57 .truncate(false)
58 .open(path)
59 .await?)
60 }
61
62 /// Returns an empty JSON object.
63 fn empty_json_object() -> Value {
64 Value::Object(serde_json::Map::new())
65 }
66
67 /// Checks if the file is empty.
68 async fn is_file_empty(&self) -> IngestionResult<bool> {
69 Ok(self.file.metadata().await.map(|m| m.len() == 0)?)
70 }
71
72 /// Reads the file content and parses it as a JSON [`Value`].
73 ///
74 /// - If the file is empty, this function *avoids reading the file* and
75 /// immediately returns an empty JSON object.
76 /// - If the file is not empty, it reads the entire file content and parses
77 /// into a JSON [`Value`].
78 /// - If JSON parsing fails (indicating a corrupted or invalid JSON file),
79 /// it also returns an empty JSON object. This ensures that the progress
80 /// store starts with a clean state in case of file corruption. Later, the
81 /// `ProgressStore::load` method will interpret this empty JSON object as
82 /// a default checkpoint sequence number of 0.
83 async fn read_file_to_json_value(&mut self) -> IngestionResult<Value> {
84 if self.is_file_empty().await? {
85 return Ok(Self::empty_json_object());
86 }
87 // before reading seek to the start of the file
88 self.file.seek(SeekFrom::Start(0)).await?;
89 let mut buf = Vec::new();
90 self.file.read_to_end(&mut buf).await?;
91 Ok(serde_json::from_slice::<Value>(buf.as_slice())
92 .inspect_err(|err| tracing::warn!("corrupted or invalid JSON file: {err}"))
93 .unwrap_or_else(|_| Self::empty_json_object()))
94 }
95
96 /// Writes the given data to the file, overwriting any existing content.
97 async fn write_to_file(&mut self, data: impl AsRef<[u8]>) -> IngestionResult<()> {
98 // before writing seek to the start of the file
99 self.file.seek(SeekFrom::Start(0)).await?;
100 // clear the file content
101 self.file.set_len(0).await?;
102 Ok(self.file.write_all(data.as_ref()).await?)
103 }
104}
105
106#[async_trait]
107impl ProgressStore for FileProgressStore {
108 type Error = IngestionError;
109
110 async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber, Self::Error> {
111 let content = self.read_file_to_json_value().await?;
112 Ok(content
113 .get(&task_name)
114 .and_then(|v| v.as_u64())
115 .unwrap_or_default())
116 }
117 async fn save(
118 &mut self,
119 task_name: String,
120 checkpoint_number: CheckpointSequenceNumber,
121 ) -> Result<(), Self::Error> {
122 let mut content = self.read_file_to_json_value().await?;
123 content[task_name] = Value::Number(Number::from(checkpoint_number));
124 self.write_to_file(serde_json::to_string_pretty(&content)?)
125 .await?;
126 Ok(())
127 }
128}