iota_data_ingestion_core/progress_store/
file.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{io::SeekFrom, path::PathBuf};
6
7use async_trait::async_trait;
8use iota_types::messages_checkpoint::CheckpointSequenceNumber;
9use serde_json::{Number, Value};
10use tokio::{
11    fs::{File, OpenOptions},
12    io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
13};
14
15use crate::{IngestionError, IngestionResult, progress_store::ProgressStore};
16
17/// Manages persistent progress information stored in a JSON file.
18///
19/// This struct encapsulates file operations for reading, writing, and
20/// synchronizing progress data to disk. It uses asynchronous I/O provided by
21/// [`tokio::fs`](tokio::fs) for efficient operation within a Tokio runtime.
22///
23/// # Example
24/// ```
25/// use iota_data_ingestion_core::{FileProgressStore, ProgressStore};
26///
27/// #[tokio::main]
28/// async fn main() {
29///     let mut store = FileProgressStore::new("progress.json").await.unwrap();
30///     store.save("task1".into(), 42).await.unwrap();
31///     # tokio::time::sleep(std::time::Duration::from_secs(1)).await;
32///     let checkpoint = store.load("task1".into()).await.unwrap();
33///     # tokio::fs::remove_file("progress.json").await.unwrap();
34///     assert_eq!(checkpoint, 42);
35/// }
36/// ```
37pub struct FileProgressStore {
38    /// The [`File`] handle used to interact with the progress file.
39    file: File,
40}
41
42impl FileProgressStore {
43    /// Creates a new `FileProgressStore` by opening or creating the file at the
44    /// specified path.
45    pub async fn new(path: impl Into<PathBuf>) -> IngestionResult<Self> {
46        Self::open_or_create_file(path.into())
47            .await
48            .map(|file| Self { file })
49    }
50
51    /// Open or create the file at the specified path.
52    async fn open_or_create_file(path: PathBuf) -> IngestionResult<File> {
53        Ok(OpenOptions::new()
54            .read(true)
55            .write(true)
56            .create(true)
57            .truncate(false)
58            .open(path)
59            .await?)
60    }
61
62    /// Returns an empty JSON object.
63    fn empty_json_object() -> Value {
64        Value::Object(serde_json::Map::new())
65    }
66
67    /// Checks if the file is empty.
68    async fn is_file_empty(&self) -> IngestionResult<bool> {
69        Ok(self.file.metadata().await.map(|m| m.len() == 0)?)
70    }
71
72    /// Reads the file content and parses it as a JSON [`Value`].
73    ///
74    /// - If the file is empty, this function *avoids reading the file* and
75    ///   immediately returns an empty JSON object.
76    /// - If the file is not empty, it reads the entire file content and parses
77    ///   into a JSON [`Value`].
78    /// - If JSON parsing fails (indicating a corrupted or invalid JSON file),
79    ///   it also returns an empty JSON object. This ensures that the progress
80    ///   store starts with a clean state in case of file corruption. Later, the
81    ///   `ProgressStore::load` method will interpret this empty JSON object as
82    ///   a default checkpoint sequence number of 0.
83    async fn read_file_to_json_value(&mut self) -> IngestionResult<Value> {
84        if self.is_file_empty().await? {
85            return Ok(Self::empty_json_object());
86        }
87        // before reading seek to the start of the file
88        self.file.seek(SeekFrom::Start(0)).await?;
89        let mut buf = Vec::new();
90        self.file.read_to_end(&mut buf).await?;
91        Ok(serde_json::from_slice::<Value>(buf.as_slice())
92            .inspect_err(|err| tracing::warn!("corrupted or invalid JSON file: {err}"))
93            .unwrap_or_else(|_| Self::empty_json_object()))
94    }
95
96    /// Writes the given data to the file, overwriting any existing content.
97    async fn write_to_file(&mut self, data: impl AsRef<[u8]>) -> IngestionResult<()> {
98        // before writing seek to the start of the file
99        self.file.seek(SeekFrom::Start(0)).await?;
100        // clear the file content
101        self.file.set_len(0).await?;
102        Ok(self.file.write_all(data.as_ref()).await?)
103    }
104}
105
106#[async_trait]
107impl ProgressStore for FileProgressStore {
108    type Error = IngestionError;
109
110    async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber, Self::Error> {
111        let content = self.read_file_to_json_value().await?;
112        Ok(content
113            .get(&task_name)
114            .and_then(|v| v.as_u64())
115            .unwrap_or_default())
116    }
117    async fn save(
118        &mut self,
119        task_name: String,
120        checkpoint_number: CheckpointSequenceNumber,
121    ) -> Result<(), Self::Error> {
122        let mut content = self.read_file_to_json_value().await?;
123        content[task_name] = Value::Number(Number::from(checkpoint_number));
124        self.write_to_file(serde_json::to_string_pretty(&content)?)
125            .await?;
126        Ok(())
127    }
128}