iota_data_ingestion_core/history/
reader.rs1use std::{ops::Range, sync::Arc, time::Duration};
6
7use bytes::{Buf, Bytes, buf::Reader};
8use futures::{Stream, StreamExt, TryStreamExt};
9use iota_config::node::ArchiveReaderConfig as HistoricalReaderConfig;
10use iota_storage::{
11 compute_sha3_checksum_for_bytes, make_iterator,
12 object_store::{ObjectStoreGetExt, http::HttpDownloaderBuilder, util::get},
13};
14use iota_types::{
15 full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
16};
17use object_store::path::Path;
18use tokio::sync::{
19 Mutex,
20 oneshot::{self, Sender},
21};
22use tracing::info;
23
24use crate::{
25 IngestionError,
26 errors::IngestionResult as Result,
27 history::{
28 CHECKPOINT_FILE_MAGIC,
29 epoch_boundaries::{EpochBoundaries, read_epoch_boundaries},
30 manifest::{FileMetadata, Manifest, read_manifest},
31 },
32};
33
34#[derive(Clone)]
35pub struct HistoricalReader {
36 concurrency: usize,
37 #[expect(dead_code)]
38 sender: Arc<Sender<()>>,
42 manifest: Arc<Mutex<Manifest>>,
43 remote_object_store: Arc<dyn ObjectStoreGetExt>,
44}
45
46impl HistoricalReader {
47 pub fn new(config: HistoricalReaderConfig) -> Result<Self> {
48 let remote_object_store = if config.remote_store_config.no_sign_request {
49 config.remote_store_config.make_http()?
50 } else {
51 config.remote_store_config.make().map(Arc::new)?
52 };
53 let (sender, recv) = oneshot::channel();
54 let manifest = Arc::new(Mutex::new(Manifest::new(0)));
55 Self::spawn_manifest_sync_task(remote_object_store.clone(), manifest.clone(), recv);
57 Ok(Self {
58 manifest,
59 sender: Arc::new(sender),
60 remote_object_store,
61 concurrency: config.download_concurrency.get(),
62 })
63 }
64
65 pub fn verify_and_get_manifest_files(&self, manifest: Manifest) -> Result<Vec<FileMetadata>> {
72 let mut files = manifest.to_files();
73 if files.is_empty() {
74 return Err(IngestionError::HistoryRead(
75 "unexpected empty remote store of historical data".to_string(),
76 ));
77 }
78
79 files.sort_by_key(|f| f.checkpoint_seq_range.start);
80
81 assert!(
82 files
83 .windows(2)
84 .all(|w| w[1].checkpoint_seq_range.start == w[0].checkpoint_seq_range.end)
85 );
86
87 assert_eq!(files.first().map(|f| f.checkpoint_seq_range.start), Some(0));
88
89 Ok(files)
90 }
91
92 pub async fn verify_file_consistency(&self, files: Vec<FileMetadata>) -> Result<()> {
95 let remote_object_store = self.remote_object_store.clone();
96 futures::stream::iter(files.iter())
97 .map(|metadata| {
98 let remote_object_store = remote_object_store.clone();
99 async move {
100 let checkpoint_data = get(&remote_object_store, &metadata.file_path()).await?;
101 Ok::<(Bytes, &FileMetadata), IngestionError>((checkpoint_data, metadata))
102 }
103 })
104 .boxed()
105 .buffer_unordered(self.concurrency)
106 .try_for_each(|(checkpoint_data, metadata)| {
107 let checksum = compute_sha3_checksum_for_bytes(checkpoint_data).map_err(Into::into);
108 let result = checksum.and_then(|checksum| {
109 if checksum == metadata.sha3_digest {
110 return Ok(());
111 };
112 Err(IngestionError::HistoryRead(format!(
113 "checksum doesn't match for file: {:?}",
114 metadata.file_path()
115 )))
116 });
117 futures::future::ready(result)
118 })
119 .await
120 }
121
122 pub async fn stream_blobs_for_range(
152 &self,
153 checkpoint_range: Range<CheckpointSequenceNumber>,
154 ) -> Result<impl Stream<Item = Result<Bytes>> + Send + use<'_>> {
155 let files = self.get_files_for_range(checkpoint_range).await?;
156 Ok(futures::stream::iter(files)
157 .map(move |metadata| async move {
158 let remote_object_store = Arc::clone(&self.remote_object_store);
159 let file_path = metadata.file_path();
160 Ok(get(&remote_object_store, &file_path).await?)
161 })
162 .buffered(self.concurrency))
163 }
164
165 pub async fn iter_for_range(
176 &self,
177 checkpoint_range: Range<CheckpointSequenceNumber>,
178 ) -> Result<impl Iterator<Item = CheckpointData>> {
179 let blobs = self
180 .stream_blobs_for_range(checkpoint_range.clone())
181 .await?
182 .try_collect::<Vec<_>>()
183 .await?;
184 let data_iterators = blobs
185 .into_iter()
186 .map(|blob| {
187 let range = checkpoint_range.clone();
188 make_blob_iterator_for_range(blob, range)
189 })
190 .collect::<Result<Vec<_>>>()?;
191 Ok(data_iterators.into_iter().flatten())
192 }
193
194 pub async fn iter_for_file(
207 &self,
208 file_path: Path,
209 ) -> Result<impl Iterator<Item = CheckpointData>> {
210 let raw_data_batch = get(&self.remote_object_store, &file_path).await?;
211 make_blob_iterator(raw_data_batch)
212 }
213
214 pub async fn latest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber> {
216 self.manifest
217 .lock()
218 .await
219 .next_checkpoint_seq_num()
220 .checked_sub(1)
221 .ok_or_else(|| {
222 IngestionError::HistoryRead("no checkpoint data in the remote store".into())
223 })
224 }
225
226 pub fn remote_store_identifier(&self) -> String {
227 self.remote_object_store.to_string()
228 }
229
230 pub async fn epoch_boundaries(&self) -> Result<EpochBoundaries> {
240 read_epoch_boundaries(self.remote_object_store.clone()).await
241 }
242
243 pub async fn sync_manifest_once(&self) -> Result<()> {
245 Self::sync_manifest(self.remote_object_store.clone(), self.manifest.clone()).await?;
246 Ok(())
247 }
248
249 pub async fn get_manifest(&self) -> Manifest {
250 self.manifest.lock().await.clone()
251 }
252
253 async fn sync_manifest(
255 remote_store: Arc<dyn ObjectStoreGetExt>,
256 manifest: Arc<Mutex<Manifest>>,
257 ) -> Result<()> {
258 let new_manifest = read_manifest(remote_store.clone()).await?;
259 let mut locked = manifest.lock().await;
260 *locked = new_manifest;
261 Ok(())
262 }
263
264 async fn get_files_for_range(
275 &self,
276 checkpoint_range: Range<CheckpointSequenceNumber>,
277 ) -> Result<impl Iterator<Item = FileMetadata>> {
278 let manifest = self.get_manifest().await;
279
280 let latest_available_checkpoint = manifest
281 .next_checkpoint_seq_num()
282 .checked_sub(1)
283 .ok_or_else(|| {
284 IngestionError::HistoryRead("no checkpoint data in the remote store".into())
285 })?;
286
287 if checkpoint_range.start > latest_available_checkpoint {
288 return Err(IngestionError::HistoryRead(format!(
289 "latest available checkpoint is: {latest_available_checkpoint}",
290 )));
291 }
292
293 let files = self.verify_and_get_manifest_files(manifest)?;
294
295 let start_index = match files
296 .binary_search_by_key(&checkpoint_range.start, |s| s.checkpoint_seq_range.start)
297 {
298 Ok(index) => index,
299 Err(index) => index - 1,
300 };
301
302 let end_index = match files
303 .binary_search_by_key(&checkpoint_range.end, |s| s.checkpoint_seq_range.start)
304 {
305 Ok(index) => index,
306 Err(index) => index,
307 };
308
309 Ok(files
310 .into_iter()
311 .enumerate()
312 .filter_map(move |(index, metadata)| {
313 (index >= start_index && index < end_index).then_some(metadata)
314 }))
315 }
316
317 fn spawn_manifest_sync_task(
318 remote_store: Arc<dyn ObjectStoreGetExt>,
319 manifest: Arc<Mutex<Manifest>>,
320 mut recv: oneshot::Receiver<()>,
321 ) {
322 tokio::task::spawn(async move {
323 let mut interval = tokio::time::interval(Duration::from_secs(60));
324 loop {
325 tokio::select! {
326 _ = interval.tick() => {
327 Self::sync_manifest(remote_store.clone(), manifest.clone()).await?;
328 }
329 _ = &mut recv => break,
330 }
331 }
332 info!("terminating the manifest sync loop");
333 Ok::<(), IngestionError>(())
334 });
335 }
336}
337
338fn make_blob_iterator(blob: Bytes) -> Result<impl Iterator<Item = CheckpointData>> {
339 Ok(make_iterator::<CheckpointData, Reader<Bytes>>(
340 CHECKPOINT_FILE_MAGIC,
341 blob.reader(),
342 )?)
343}
344
345pub fn make_blob_iterator_for_range(
353 blob: Bytes,
354 range: Range<CheckpointSequenceNumber>,
355) -> Result<impl Iterator<Item = CheckpointData>> {
356 Ok(make_blob_iterator(blob)?
357 .filter(move |data| range.contains(&data.checkpoint_summary.sequence_number)))
358}