iota_data_ingestion_core/progress_store/
file.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{io::SeekFrom, path::PathBuf};
6
7use async_trait::async_trait;
8use iota_types::messages_checkpoint::CheckpointSequenceNumber;
9use serde_json::{Number, Value};
10use tokio::{
11    fs::File,
12    io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
13};
14
15use crate::{IngestionError, IngestionResult, progress_store::ProgressStore};
16
17/// Manages persistent progress information stored in a JSON file.
18///
19/// This struct encapsulates file operations for reading, writing, and
20/// synchronizing progress data to disk. It uses asynchronous I/O provided by
21/// [`tokio::fs`](tokio::fs) for efficient operation within a Tokio runtime.
22///
23/// # Example
24/// ```
25/// use iota_data_ingestion_core::{FileProgressStore, ProgressStore};
26///
27/// #[tokio::main]
28/// async fn main() {
29///     let mut store = FileProgressStore::new("progress.json").await.unwrap();
30///     store.save("task1".into(), 42).await.unwrap();
31///     # tokio::time::sleep(std::time::Duration::from_secs(1)).await;
32///     let checkpoint = store.load("task1".into()).await.unwrap();
33///     # tokio::fs::remove_file("progress.json").await.unwrap();
34///     assert_eq!(checkpoint, 42);
35/// }
36/// ```
37pub struct FileProgressStore {
38    /// The path to the progress file.
39    path: PathBuf,
40    /// The [`File`] handle used to interact with the progress file.
41    file: File,
42}
43
44impl FileProgressStore {
45    /// Creates a new `FileProgressStore` by opening or creating the file at the
46    /// specified path.
47    pub async fn new(path: impl Into<PathBuf>) -> IngestionResult<Self> {
48        let path = path.into();
49        Self::open_or_create_file(&path)
50            .await
51            .map(|file| Self { file, path })
52    }
53
54    /// Open or create the file at the specified path.
55    async fn open_or_create_file(path: &PathBuf) -> IngestionResult<File> {
56        Ok(File::options()
57            .read(true)
58            .write(true)
59            .create(true)
60            .truncate(false)
61            .open(path)
62            .await?)
63    }
64
65    /// Returns an empty JSON object.
66    fn empty_json_object() -> Value {
67        Value::Object(serde_json::Map::new())
68    }
69
70    /// Checks if the file is empty.
71    async fn is_file_empty(&self) -> IngestionResult<bool> {
72        Ok(self.file.metadata().await.map(|m| m.len() == 0)?)
73    }
74
75    /// Reads the file content and parses it as a JSON [`Value`].
76    ///
77    /// - If the file is empty, this function *avoids reading the file* and
78    ///   immediately returns an empty JSON object.
79    /// - If the file is not empty, it reads the entire file content and parses
80    ///   into a JSON [`Value`].
81    /// - If JSON parsing fails (indicating a corrupted or invalid JSON file),
82    ///   it also returns an empty JSON object. This ensures that the progress
83    ///   store starts with a clean state in case of file corruption. Later, the
84    ///   `ProgressStore::load` method will interpret this empty JSON object as
85    ///   a default checkpoint sequence number of 0.
86    async fn read_file_to_json_value(&mut self) -> IngestionResult<Value> {
87        if self.is_file_empty().await? {
88            return Ok(Self::empty_json_object());
89        }
90        // before reading seek to the start of the file
91        self.file.seek(SeekFrom::Start(0)).await?;
92        let mut buf = Vec::new();
93        self.file.read_to_end(&mut buf).await?;
94        Ok(serde_json::from_slice::<Value>(buf.as_slice())
95            .inspect_err(|err| tracing::warn!("corrupted or invalid JSON file: {err}"))
96            .unwrap_or_else(|_| Self::empty_json_object()))
97    }
98
99    /// Writes the given data to the file, overwriting any existing content.
100    async fn write_to_file(&mut self, data: impl AsRef<[u8]>) -> IngestionResult<()> {
101        let tmp_path = self.path.with_extension("tmp");
102
103        {
104            let mut tmp_file = File::options()
105                .write(true)
106                .create(true)
107                .truncate(true)
108                .open(&tmp_path)
109                .await?;
110            tmp_file.write_all(data.as_ref()).await?;
111            tmp_file.sync_data().await?;
112
113            // only for testing add a small delay, useful for simulate crashes
114            if cfg!(test) {
115                tokio::time::sleep(std::time::Duration::from_nanos(10)).await;
116            }
117        }
118
119        // Atomically replace the original file
120        tokio::fs::rename(&tmp_path, &self.path).await?;
121
122        // Re-open the file handle for further reads
123        self.file = File::open(&self.path).await?;
124
125        Ok(())
126    }
127}
128
129#[async_trait]
130impl ProgressStore for FileProgressStore {
131    type Error = IngestionError;
132
133    async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber, Self::Error> {
134        let content = self.read_file_to_json_value().await?;
135        Ok(content
136            .get(&task_name)
137            .and_then(|v| v.as_u64())
138            .unwrap_or_default())
139    }
140    async fn save(
141        &mut self,
142        task_name: String,
143        checkpoint_number: CheckpointSequenceNumber,
144    ) -> Result<(), Self::Error> {
145        let mut content = self.read_file_to_json_value().await?;
146        content[task_name] = Value::Number(Number::from(checkpoint_number));
147        self.write_to_file(serde_json::to_string_pretty(&content)?)
148            .await?;
149        Ok(())
150    }
151}