Skip to main content

iota_data_ingestion_core/history/
reader.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{ops::Range, sync::Arc, time::Duration};
6
7use bytes::{Buf, Bytes, buf::Reader};
8use futures::{Stream, StreamExt, TryStreamExt};
9use iota_config::node::ArchiveReaderConfig as HistoricalReaderConfig;
10use iota_storage::{
11    compute_sha3_checksum_for_bytes, make_iterator,
12    object_store::{ObjectStoreGetExt, http::HttpDownloaderBuilder, util::get},
13};
14use iota_types::{
15    full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
16};
17use object_store::path::Path;
18use tokio::sync::{
19    Mutex,
20    oneshot::{self, Sender},
21};
22use tracing::info;
23
24use crate::{
25    IngestionError,
26    errors::IngestionResult as Result,
27    history::{
28        CHECKPOINT_FILE_MAGIC,
29        epoch_boundaries::{EpochBoundaries, read_epoch_boundaries},
30        manifest::{FileMetadata, Manifest, read_manifest},
31    },
32};
33
34#[derive(Clone)]
35pub struct HistoricalReader {
36    concurrency: usize,
37    #[expect(dead_code)]
38    /// We store this to get dropped along with the
39    /// reader and hence terminate the manifest sync
40    /// process.
41    sender: Arc<Sender<()>>,
42    manifest: Arc<Mutex<Manifest>>,
43    remote_object_store: Arc<dyn ObjectStoreGetExt>,
44}
45
46impl HistoricalReader {
47    pub fn new(config: HistoricalReaderConfig) -> Result<Self> {
48        let remote_object_store = if config.remote_store_config.no_sign_request {
49            config.remote_store_config.make_http()?
50        } else {
51            config.remote_store_config.make().map(Arc::new)?
52        };
53        let (sender, recv) = oneshot::channel();
54        let manifest = Arc::new(Mutex::new(Manifest::new(0)));
55        // Start a background tokio task to keep local manifest in sync with remote
56        Self::spawn_manifest_sync_task(remote_object_store.clone(), manifest.clone(), recv);
57        Ok(Self {
58            manifest,
59            sender: Arc::new(sender),
60            remote_object_store,
61            concurrency: config.download_concurrency.get(),
62        })
63    }
64
65    /// This function verifies the manifest and returns the file metadata
66    /// sorted by the starting sequence number.
67    ///
68    /// More specifically it verifies that the files in the remote store
69    /// cover the entire range of checkpoints from sequence number 0
70    /// until the latest available checkpoint with no missing checkpoint.
71    pub fn verify_and_get_manifest_files(&self, manifest: Manifest) -> Result<Vec<FileMetadata>> {
72        let mut files = manifest.to_files();
73        if files.is_empty() {
74            return Err(IngestionError::HistoryRead(
75                "unexpected empty remote store of historical data".to_string(),
76            ));
77        }
78
79        files.sort_by_key(|f| f.checkpoint_seq_range.start);
80
81        assert!(
82            files
83                .windows(2)
84                .all(|w| w[1].checkpoint_seq_range.start == w[0].checkpoint_seq_range.end)
85        );
86
87        assert_eq!(files.first().map(|f| f.checkpoint_seq_range.start), Some(0));
88
89        Ok(files)
90    }
91
92    /// This function downloads checkpoint data files and ensures their
93    /// computed checksum matches the one in manifest.
94    pub async fn verify_file_consistency(&self, files: Vec<FileMetadata>) -> Result<()> {
95        let remote_object_store = self.remote_object_store.clone();
96        futures::stream::iter(files.iter())
97            .map(|metadata| {
98                let remote_object_store = remote_object_store.clone();
99                async move {
100                    let checkpoint_data = get(&remote_object_store, &metadata.file_path()).await?;
101                    Ok::<(Bytes, &FileMetadata), IngestionError>((checkpoint_data, metadata))
102                }
103            })
104            .boxed()
105            .buffer_unordered(self.concurrency)
106            .try_for_each(|(checkpoint_data, metadata)| {
107                let checksum = compute_sha3_checksum_for_bytes(checkpoint_data).map_err(Into::into);
108                let result = checksum.and_then(|checksum| {
109                    if checksum == metadata.sha3_digest {
110                        return Ok(());
111                    };
112                    Err(IngestionError::HistoryRead(format!(
113                        "checksum doesn't match for file: {:?}",
114                        metadata.file_path()
115                    )))
116                });
117                futures::future::ready(result)
118            })
119            .await
120    }
121
122    /// Stream blobs of [`Bytes`] that include checkpoint data for the specified
123    /// range.
124    ///
125    /// This method retrieves files with batches of serialized checkpoint
126    /// data from the remote store, and streams the respective contents
127    /// as blobs.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if resolving the files that need to be fetched from the
132    /// remote store fails.
133    ///
134    /// Additionally the stream may fail if fetching the file from the remote
135    /// store fails.
136    ///
137    /// # Examples
138    ///
139    /// ```ignore
140    /// use futures::StreamExt;
141    ///
142    /// let range = 100..200;
143    /// let mut stream = historical_reader.stream_blobs_for_range(range.clone()).await?;
144    /// while let Some(Ok(blob)) = stream.next().await {
145    ///     // we can now iterate over the checkpoint data
146    ///     for data in make_blob_iterator_for_range(blob, range.clone())? {
147    ///         println!("Received checkpoint data: {data:?}");
148    ///     }
149    /// }
150    /// ```
151    pub async fn stream_blobs_for_range(
152        &self,
153        checkpoint_range: Range<CheckpointSequenceNumber>,
154    ) -> Result<impl Stream<Item = Result<Bytes>> + Send + use<'_>> {
155        let files = self.get_files_for_range(checkpoint_range).await?;
156        Ok(futures::stream::iter(files)
157            .map(move |metadata| async move {
158                let remote_object_store = Arc::clone(&self.remote_object_store);
159                let file_path = metadata.file_path();
160                Ok(get(&remote_object_store, &file_path).await?)
161            })
162            .buffered(self.concurrency))
163    }
164
165    /// Construct an [`Iterator`] over [`CheckpointData`] for the specified
166    /// range.
167    ///
168    /// This method eagerly consumes the stream of blobs returned from
169    /// [`Self::stream_blobs_for_range`] and holds the data in memory until
170    /// the iterator is consumed.
171    ///
172    /// For lazy processing of the blobs use directly
173    /// [`Self::stream_blobs_for_range`] along with
174    /// [`make_blob_iterator_for_range`].
175    pub async fn iter_for_range(
176        &self,
177        checkpoint_range: Range<CheckpointSequenceNumber>,
178    ) -> Result<impl Iterator<Item = CheckpointData>> {
179        let blobs = self
180            .stream_blobs_for_range(checkpoint_range.clone())
181            .await?
182            .try_collect::<Vec<_>>()
183            .await?;
184        let data_iterators = blobs
185            .into_iter()
186            .map(|blob| {
187                let range = checkpoint_range.clone();
188                make_blob_iterator_for_range(blob, range)
189            })
190            .collect::<Result<Vec<_>>>()?;
191        Ok(data_iterators.into_iter().flatten())
192    }
193
194    /// Iterate [`CheckpointData`] from the given remote file.
195    ///
196    /// This method retrieves the file with batches of serialized checkpoint
197    /// data from the remote store, decodes the raw data, and streams the
198    /// deserialized values.
199    ///
200    /// # Errors
201    ///
202    /// Returns an error in the following cases:
203    ///
204    /// * If fetching the file from the remote store fails.
205    /// * If the file is corrupted and fails to decode.
206    pub async fn iter_for_file(
207        &self,
208        file_path: Path,
209    ) -> Result<impl Iterator<Item = CheckpointData>> {
210        let raw_data_batch = get(&self.remote_object_store, &file_path).await?;
211        make_blob_iterator(raw_data_batch)
212    }
213
214    /// Return latest available checkpoint in archive.
215    pub async fn latest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber> {
216        self.manifest
217            .lock()
218            .await
219            .next_checkpoint_seq_num()
220            .checked_sub(1)
221            .ok_or_else(|| {
222                IngestionError::HistoryRead("no checkpoint data in the remote store".into())
223            })
224    }
225
226    pub fn remote_store_identifier(&self) -> String {
227        self.remote_object_store.to_string()
228    }
229
230    /// Returns the last checkpoint of each epoch, indexed by epoch.
231    ///
232    /// Read from the epoch boundaries file maintained alongside the manifest.
233    /// Callers slice the boundaries by epoch range as needed.
234    ///
235    /// # Errors
236    ///
237    /// Fails if the epoch boundaries file cannot be read or if it fails to
238    /// decode.
239    pub async fn epoch_boundaries(&self) -> Result<EpochBoundaries> {
240        read_epoch_boundaries(self.remote_object_store.clone()).await
241    }
242
243    /// Syncs the Manifest from remote store.
244    pub async fn sync_manifest_once(&self) -> Result<()> {
245        Self::sync_manifest(self.remote_object_store.clone(), self.manifest.clone()).await?;
246        Ok(())
247    }
248
249    pub async fn get_manifest(&self) -> Manifest {
250        self.manifest.lock().await.clone()
251    }
252
253    /// Copies Manifest from remote store to the given Manifest.
254    async fn sync_manifest(
255        remote_store: Arc<dyn ObjectStoreGetExt>,
256        manifest: Arc<Mutex<Manifest>>,
257    ) -> Result<()> {
258        let new_manifest = read_manifest(remote_store.clone()).await?;
259        let mut locked = manifest.lock().await;
260        *locked = new_manifest;
261        Ok(())
262    }
263
264    /// Resolve the files to fetch for the specified range.
265    ///
266    /// The method retrieves the manifest from the remote store and
267    /// searches for the files that cover the given range of checkpoint
268    /// data.
269    ///
270    /// # Errors
271    ///
272    /// The method fails if the remote store has no data, or if the
273    /// manifest fails to verify.
274    async fn get_files_for_range(
275        &self,
276        checkpoint_range: Range<CheckpointSequenceNumber>,
277    ) -> Result<impl Iterator<Item = FileMetadata>> {
278        let manifest = self.get_manifest().await;
279
280        let latest_available_checkpoint = manifest
281            .next_checkpoint_seq_num()
282            .checked_sub(1)
283            .ok_or_else(|| {
284                IngestionError::HistoryRead("no checkpoint data in the remote store".into())
285            })?;
286
287        if checkpoint_range.start > latest_available_checkpoint {
288            return Err(IngestionError::HistoryRead(format!(
289                "latest available checkpoint is: {latest_available_checkpoint}",
290            )));
291        }
292
293        let files = self.verify_and_get_manifest_files(manifest)?;
294
295        let start_index = match files
296            .binary_search_by_key(&checkpoint_range.start, |s| s.checkpoint_seq_range.start)
297        {
298            Ok(index) => index,
299            Err(index) => index - 1,
300        };
301
302        let end_index = match files
303            .binary_search_by_key(&checkpoint_range.end, |s| s.checkpoint_seq_range.start)
304        {
305            Ok(index) => index,
306            Err(index) => index,
307        };
308
309        Ok(files
310            .into_iter()
311            .enumerate()
312            .filter_map(move |(index, metadata)| {
313                (index >= start_index && index < end_index).then_some(metadata)
314            }))
315    }
316
317    fn spawn_manifest_sync_task(
318        remote_store: Arc<dyn ObjectStoreGetExt>,
319        manifest: Arc<Mutex<Manifest>>,
320        mut recv: oneshot::Receiver<()>,
321    ) {
322        tokio::task::spawn(async move {
323            let mut interval = tokio::time::interval(Duration::from_secs(60));
324            loop {
325                tokio::select! {
326                    _ = interval.tick() => {
327                        Self::sync_manifest(remote_store.clone(), manifest.clone()).await?;
328                    }
329                    _ = &mut recv => break,
330                }
331            }
332            info!("terminating the manifest sync loop");
333            Ok::<(), IngestionError>(())
334        });
335    }
336}
337
338fn make_blob_iterator(blob: Bytes) -> Result<impl Iterator<Item = CheckpointData>> {
339    Ok(make_iterator::<CheckpointData, Reader<Bytes>>(
340        CHECKPOINT_FILE_MAGIC,
341        blob.reader(),
342    )?)
343}
344
345/// Construct an iterator over a blob of checkpoint data.
346///
347/// The iterator filters checkpoints that belong to the specified range.
348///
349/// # Errors
350///
351/// The function fails if the blob is corrupted and fails to decode.
352pub fn make_blob_iterator_for_range(
353    blob: Bytes,
354    range: Range<CheckpointSequenceNumber>,
355) -> Result<impl Iterator<Item = CheckpointData>> {
356    Ok(make_blob_iterator(blob)?
357        .filter(move |data| range.contains(&data.checkpoint_summary.sequence_number)))
358}