iota_data_ingestion_core/history/
reader.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{ops::Range, sync::Arc, time::Duration};
6
7use bytes::{Buf, Bytes, buf::Reader};
8use futures::{Stream, StreamExt, TryStreamExt};
9use iota_config::node::ArchiveReaderConfig as HistoricalReaderConfig;
10use iota_storage::{
11    compute_sha3_checksum_for_bytes, make_iterator,
12    object_store::{ObjectStoreGetExt, http::HttpDownloaderBuilder, util::get},
13};
14use iota_types::{
15    full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
16};
17use object_store::path::Path;
18use tokio::sync::{
19    Mutex,
20    oneshot::{self, Sender},
21};
22use tracing::info;
23
24use crate::{
25    IngestionError,
26    errors::IngestionResult as Result,
27    history::{
28        CHECKPOINT_FILE_MAGIC,
29        manifest::{FileMetadata, Manifest, read_manifest},
30    },
31};
32
33#[derive(Clone)]
34pub struct HistoricalReader {
35    concurrency: usize,
36    #[expect(dead_code)]
37    /// We store this to get dropped along with the
38    /// reader and hence terminate the manifest sync
39    /// process.
40    sender: Arc<Sender<()>>,
41    manifest: Arc<Mutex<Manifest>>,
42    remote_object_store: Arc<dyn ObjectStoreGetExt>,
43}
44
45impl HistoricalReader {
46    pub fn new(config: HistoricalReaderConfig) -> Result<Self> {
47        let remote_object_store = if config.remote_store_config.no_sign_request {
48            config.remote_store_config.make_http()?
49        } else {
50            config.remote_store_config.make().map(Arc::new)?
51        };
52        let (sender, recv) = oneshot::channel();
53        let manifest = Arc::new(Mutex::new(Manifest::new(0)));
54        // Start a background tokio task to keep local manifest in sync with remote
55        Self::spawn_manifest_sync_task(remote_object_store.clone(), manifest.clone(), recv);
56        Ok(Self {
57            manifest,
58            sender: Arc::new(sender),
59            remote_object_store,
60            concurrency: config.download_concurrency.get(),
61        })
62    }
63
64    /// This function verifies the manifest and returns the file metadata
65    /// sorted by the starting sequence number.
66    ///
67    /// More specifically it verifies that the files in the remote store
68    /// cover the entire range of checkpoints from sequence number 0
69    /// until the latest available checkpoint with no missing checkpoint.
70    pub fn verify_and_get_manifest_files(&self, manifest: Manifest) -> Result<Vec<FileMetadata>> {
71        let mut files = manifest.to_files();
72        if files.is_empty() {
73            return Err(IngestionError::HistoryRead(
74                "unexpected empty remote store of historical data".to_string(),
75            ));
76        }
77
78        files.sort_by_key(|f| f.checkpoint_seq_range.start);
79
80        assert!(
81            files
82                .windows(2)
83                .all(|w| w[1].checkpoint_seq_range.start == w[0].checkpoint_seq_range.end)
84        );
85
86        assert_eq!(files.first().map(|f| f.checkpoint_seq_range.start), Some(0));
87
88        Ok(files)
89    }
90
91    /// This function downloads checkpoint data files and ensures their
92    /// computed checksum matches the one in manifest.
93    pub async fn verify_file_consistency(&self, files: Vec<FileMetadata>) -> Result<()> {
94        let remote_object_store = self.remote_object_store.clone();
95        futures::stream::iter(files.iter())
96            .map(|metadata| {
97                let remote_object_store = remote_object_store.clone();
98                async move {
99                    let checkpoint_data = get(&remote_object_store, &metadata.file_path()).await?;
100                    Ok::<(Bytes, &FileMetadata), IngestionError>((checkpoint_data, metadata))
101                }
102            })
103            .boxed()
104            .buffer_unordered(self.concurrency)
105            .try_for_each(|(checkpoint_data, metadata)| {
106                let checksum = compute_sha3_checksum_for_bytes(checkpoint_data).map_err(Into::into);
107                let result = checksum.and_then(|checksum| {
108                    if checksum == metadata.sha3_digest {
109                        return Ok(());
110                    };
111                    Err(IngestionError::HistoryRead(format!(
112                        "checksum doesn't match for file: {:?}",
113                        metadata.file_path()
114                    )))
115                });
116                futures::future::ready(result)
117            })
118            .await
119    }
120
121    /// Stream blobs of [`Bytes`] that include checkpoint data for the specified
122    /// range.
123    ///
124    /// This method retrieves files with batches of serialized checkpoint
125    /// data from the remote store, and streams the respective contents
126    /// as blobs.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if resolving the files that need to be fetched from the
131    /// remote store fails.
132    ///
133    /// Additionally the stream may fail if fetching the file from the remote
134    /// store fails.
135    ///
136    /// # Examples
137    ///
138    /// ```ignore
139    /// use futures::StreamExt;
140    ///
141    /// let range = 100..200;
142    /// let mut stream = historical_reader.stream_blobs_for_range(range.clone()).await?;
143    /// while let Some(Ok(blob)) = stream.next().await {
144    ///     // we can now iterate over the checkpoint data
145    ///     for data in make_blob_iterator_for_range(blob, range.clone())? {
146    ///         println!("Received checkpoint data: {data:?}");
147    ///     }
148    /// }
149    /// ```
150    pub async fn stream_blobs_for_range(
151        &self,
152        checkpoint_range: Range<CheckpointSequenceNumber>,
153    ) -> Result<impl Stream<Item = Result<Bytes>> + Send + use<'_>> {
154        let files = self.get_files_for_range(checkpoint_range).await?;
155        Ok(futures::stream::iter(files)
156            .map(move |metadata| async move {
157                let remote_object_store = Arc::clone(&self.remote_object_store);
158                let file_path = metadata.file_path();
159                Ok(get(&remote_object_store, &file_path).await?)
160            })
161            .buffered(self.concurrency))
162    }
163
164    /// Construct an [`Iterator`] over [`CheckpointData`] for the specified
165    /// range.
166    ///
167    /// This method eagerly consumes the stream of blobs returned from
168    /// [`Self::stream_blobs_for_range`] and holds the data in memory until
169    /// the iterator is consumed.
170    ///
171    /// For lazy processing of the blobs use directly
172    /// [`Self::stream_blobs_for_range`] along with
173    /// [`make_blob_iterator_for_range`].
174    pub async fn iter_for_range(
175        &self,
176        checkpoint_range: Range<CheckpointSequenceNumber>,
177    ) -> Result<impl Iterator<Item = CheckpointData>> {
178        let blobs = self
179            .stream_blobs_for_range(checkpoint_range.clone())
180            .await?
181            .try_collect::<Vec<_>>()
182            .await?;
183        let data_iterators = blobs
184            .into_iter()
185            .map(|blob| {
186                let range = checkpoint_range.clone();
187                make_blob_iterator_for_range(blob, range)
188            })
189            .collect::<Result<Vec<_>>>()?;
190        Ok(data_iterators.into_iter().flatten())
191    }
192
193    /// Iterate [`CheckpointData`] from the given remote file.
194    ///
195    /// This method retrieves the file with batches of serialized checkpoint
196    /// data from the remote store, decodes the raw data, and streams the
197    /// deserialized values.
198    ///
199    /// # Errors
200    ///
201    /// Returns an error in the following cases:
202    ///
203    /// * If fetching the file from the remote store fails.
204    /// * If the file is corrupted and fails to decode.
205    pub async fn iter_for_file(
206        &self,
207        file_path: Path,
208    ) -> Result<impl Iterator<Item = CheckpointData>> {
209        let raw_data_batch = get(&self.remote_object_store, &file_path).await?;
210        make_blob_iterator(raw_data_batch)
211    }
212
213    /// Return latest available checkpoint in archive.
214    pub async fn latest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber> {
215        self.manifest
216            .lock()
217            .await
218            .next_checkpoint_seq_num()
219            .checked_sub(1)
220            .ok_or_else(|| {
221                IngestionError::HistoryRead("no checkpoint data in the remote store".into())
222            })
223    }
224
225    pub fn remote_store_identifier(&self) -> String {
226        self.remote_object_store.to_string()
227    }
228
229    /// Syncs the Manifest from remote store.
230    pub async fn sync_manifest_once(&self) -> Result<()> {
231        Self::sync_manifest(self.remote_object_store.clone(), self.manifest.clone()).await?;
232        Ok(())
233    }
234
235    pub async fn get_manifest(&self) -> Manifest {
236        self.manifest.lock().await.clone()
237    }
238
239    /// Copies Manifest from remote store to the given Manifest.
240    async fn sync_manifest(
241        remote_store: Arc<dyn ObjectStoreGetExt>,
242        manifest: Arc<Mutex<Manifest>>,
243    ) -> Result<()> {
244        let new_manifest = read_manifest(remote_store.clone()).await?;
245        let mut locked = manifest.lock().await;
246        *locked = new_manifest;
247        Ok(())
248    }
249
250    /// Resolve the files to fetch for the specified range.
251    ///
252    /// The method retrieves the manifest from the remote store and
253    /// searches for the files that cover the given range of checkpoint
254    /// data.
255    ///
256    /// # Errors
257    ///
258    /// The method fails if the remote store has no data, or if the
259    /// manifest fails to verify.
260    async fn get_files_for_range(
261        &self,
262        checkpoint_range: Range<CheckpointSequenceNumber>,
263    ) -> Result<impl Iterator<Item = FileMetadata>> {
264        let manifest = self.get_manifest().await;
265
266        let latest_available_checkpoint = manifest
267            .next_checkpoint_seq_num()
268            .checked_sub(1)
269            .ok_or_else(|| {
270                IngestionError::HistoryRead("no checkpoint data in the remote store".into())
271            })?;
272
273        if checkpoint_range.start > latest_available_checkpoint {
274            return Err(IngestionError::HistoryRead(format!(
275                "latest available checkpoint is: {latest_available_checkpoint}",
276            )));
277        }
278
279        let files = self.verify_and_get_manifest_files(manifest)?;
280
281        let start_index = match files
282            .binary_search_by_key(&checkpoint_range.start, |s| s.checkpoint_seq_range.start)
283        {
284            Ok(index) => index,
285            Err(index) => index - 1,
286        };
287
288        let end_index = match files
289            .binary_search_by_key(&checkpoint_range.end, |s| s.checkpoint_seq_range.start)
290        {
291            Ok(index) => index,
292            Err(index) => index,
293        };
294
295        Ok(files
296            .into_iter()
297            .enumerate()
298            .filter_map(move |(index, metadata)| {
299                (index >= start_index && index < end_index).then_some(metadata)
300            }))
301    }
302
303    fn spawn_manifest_sync_task(
304        remote_store: Arc<dyn ObjectStoreGetExt>,
305        manifest: Arc<Mutex<Manifest>>,
306        mut recv: oneshot::Receiver<()>,
307    ) {
308        tokio::task::spawn(async move {
309            let mut interval = tokio::time::interval(Duration::from_secs(60));
310            loop {
311                tokio::select! {
312                    _ = interval.tick() => {
313                        Self::sync_manifest(remote_store.clone(), manifest.clone()).await?;
314                    }
315                    _ = &mut recv => break,
316                }
317            }
318            info!("terminating the manifest sync loop");
319            Ok::<(), IngestionError>(())
320        });
321    }
322}
323
324fn make_blob_iterator(blob: Bytes) -> Result<impl Iterator<Item = CheckpointData>> {
325    Ok(make_iterator::<CheckpointData, Reader<Bytes>>(
326        CHECKPOINT_FILE_MAGIC,
327        blob.reader(),
328    )?)
329}
330
331/// Construct an iterator over a blob of checkpoint data.
332///
333/// The iterator filters checkpoints that belong to the specified range.
334///
335/// # Errors
336///
337/// The function fails if the blob is corrupted and fails to decode.
338pub fn make_blob_iterator_for_range(
339    blob: Bytes,
340    range: Range<CheckpointSequenceNumber>,
341) -> Result<impl Iterator<Item = CheckpointData>> {
342    Ok(make_blob_iterator(blob)?
343        .filter(move |data| range.contains(&data.checkpoint_summary.sequence_number)))
344}