iota_data_ingestion_core/progress_store/file.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{io::SeekFrom, path::PathBuf};
6
7use async_trait::async_trait;
8use iota_types::messages_checkpoint::CheckpointSequenceNumber;
9use serde_json::{Number, Value};
10use tokio::{
11 fs::File,
12 io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
13};
14
15use crate::{IngestionError, IngestionResult, progress_store::ProgressStore};
16
17/// Manages persistent progress information stored in a JSON file.
18///
19/// This struct encapsulates file operations for reading, writing, and
20/// synchronizing progress data to disk. It uses asynchronous I/O provided by
21/// [`tokio::fs`](tokio::fs) for efficient operation within a Tokio runtime.
22///
23/// # Example
24/// ```
25/// use iota_data_ingestion_core::{FileProgressStore, ProgressStore};
26///
27/// #[tokio::main]
28/// async fn main() {
29/// let mut store = FileProgressStore::new("progress.json").await.unwrap();
30/// store.save("task1".into(), 42).await.unwrap();
31/// # tokio::time::sleep(std::time::Duration::from_secs(1)).await;
32/// let checkpoint = store.load("task1".into()).await.unwrap();
33/// # tokio::fs::remove_file("progress.json").await.unwrap();
34/// assert_eq!(checkpoint, 42);
35/// }
36/// ```
37pub struct FileProgressStore {
38 /// The path to the progress file.
39 path: PathBuf,
40 /// The [`File`] handle used to interact with the progress file.
41 file: File,
42}
43
44impl FileProgressStore {
45 /// Creates a new `FileProgressStore` by opening or creating the file at the
46 /// specified path.
47 pub async fn new(path: impl Into<PathBuf>) -> IngestionResult<Self> {
48 let path = path.into();
49 Self::open_or_create_file(&path)
50 .await
51 .map(|file| Self { file, path })
52 }
53
54 /// Open or create the file at the specified path.
55 async fn open_or_create_file(path: &PathBuf) -> IngestionResult<File> {
56 Ok(File::options()
57 .read(true)
58 .write(true)
59 .create(true)
60 .truncate(false)
61 .open(path)
62 .await?)
63 }
64
65 /// Returns an empty JSON object.
66 fn empty_json_object() -> Value {
67 Value::Object(serde_json::Map::new())
68 }
69
70 /// Checks if the file is empty.
71 async fn is_file_empty(&self) -> IngestionResult<bool> {
72 Ok(self.file.metadata().await.map(|m| m.len() == 0)?)
73 }
74
75 /// Reads the file content and parses it as a JSON [`Value`].
76 ///
77 /// - If the file is empty, this function *avoids reading the file* and
78 /// immediately returns an empty JSON object.
79 /// - If the file is not empty, it reads the entire file content and parses
80 /// into a JSON [`Value`].
81 /// - If JSON parsing fails (indicating a corrupted or invalid JSON file),
82 /// it also returns an empty JSON object. This ensures that the progress
83 /// store starts with a clean state in case of file corruption. Later, the
84 /// `ProgressStore::load` method will interpret this empty JSON object as
85 /// a default checkpoint sequence number of 0.
86 async fn read_file_to_json_value(&mut self) -> IngestionResult<Value> {
87 if self.is_file_empty().await? {
88 return Ok(Self::empty_json_object());
89 }
90 // before reading seek to the start of the file
91 self.file.seek(SeekFrom::Start(0)).await?;
92 let mut buf = Vec::new();
93 self.file.read_to_end(&mut buf).await?;
94 Ok(serde_json::from_slice::<Value>(buf.as_slice())
95 .inspect_err(|err| tracing::warn!("corrupted or invalid JSON file: {err}"))
96 .unwrap_or_else(|_| Self::empty_json_object()))
97 }
98
99 /// Writes the given data to the file, overwriting any existing content.
100 async fn write_to_file(&mut self, data: impl AsRef<[u8]>) -> IngestionResult<()> {
101 let tmp_path = self.path.with_extension("tmp");
102
103 {
104 let mut tmp_file = File::options()
105 .write(true)
106 .create(true)
107 .truncate(true)
108 .open(&tmp_path)
109 .await?;
110 tmp_file.write_all(data.as_ref()).await?;
111 tmp_file.sync_data().await?;
112
113 // only for testing add a small delay, useful for simulate crashes
114 if cfg!(test) {
115 tokio::time::sleep(std::time::Duration::from_nanos(10)).await;
116 }
117 }
118
119 // Atomically replace the original file
120 tokio::fs::rename(&tmp_path, &self.path).await?;
121
122 // Re-open the file handle for further reads
123 self.file = File::open(&self.path).await?;
124
125 Ok(())
126 }
127}
128
129#[async_trait]
130impl ProgressStore for FileProgressStore {
131 type Error = IngestionError;
132
133 async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber, Self::Error> {
134 let content = self.read_file_to_json_value().await?;
135 Ok(content
136 .get(&task_name)
137 .and_then(|v| v.as_u64())
138 .unwrap_or_default())
139 }
140 async fn save(
141 &mut self,
142 task_name: String,
143 checkpoint_number: CheckpointSequenceNumber,
144 ) -> Result<(), Self::Error> {
145 let mut content = self.read_file_to_json_value().await?;
146 content[task_name] = Value::Number(Number::from(checkpoint_number));
147 self.write_to_file(serde_json::to_string_pretty(&content)?)
148 .await?;
149 Ok(())
150 }
151}