iota_data_ingestion_core/history/
reader.rs1use std::{ops::Range, sync::Arc, time::Duration};
6
7use bytes::{Buf, Bytes, buf::Reader};
8use futures::{Stream, StreamExt, TryStreamExt};
9use iota_config::node::ArchiveReaderConfig as HistoricalReaderConfig;
10use iota_storage::{
11 compute_sha3_checksum_for_bytes, make_iterator,
12 object_store::{ObjectStoreGetExt, http::HttpDownloaderBuilder, util::get},
13};
14use iota_types::{
15 full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
16};
17use object_store::path::Path;
18use tokio::sync::{
19 Mutex,
20 oneshot::{self, Sender},
21};
22use tracing::info;
23
24use crate::{
25 IngestionError,
26 errors::IngestionResult as Result,
27 history::{
28 CHECKPOINT_FILE_MAGIC,
29 manifest::{FileMetadata, Manifest, read_manifest},
30 },
31};
32
33#[derive(Clone)]
34pub struct HistoricalReader {
35 concurrency: usize,
36 #[expect(dead_code)]
37 sender: Arc<Sender<()>>,
41 manifest: Arc<Mutex<Manifest>>,
42 remote_object_store: Arc<dyn ObjectStoreGetExt>,
43}
44
45impl HistoricalReader {
46 pub fn new(config: HistoricalReaderConfig) -> Result<Self> {
47 let remote_object_store = if config.remote_store_config.no_sign_request {
48 config.remote_store_config.make_http()?
49 } else {
50 config.remote_store_config.make().map(Arc::new)?
51 };
52 let (sender, recv) = oneshot::channel();
53 let manifest = Arc::new(Mutex::new(Manifest::new(0)));
54 Self::spawn_manifest_sync_task(remote_object_store.clone(), manifest.clone(), recv);
56 Ok(Self {
57 manifest,
58 sender: Arc::new(sender),
59 remote_object_store,
60 concurrency: config.download_concurrency.get(),
61 })
62 }
63
64 pub fn verify_and_get_manifest_files(&self, manifest: Manifest) -> Result<Vec<FileMetadata>> {
71 let mut files = manifest.to_files();
72 if files.is_empty() {
73 return Err(IngestionError::HistoryRead(
74 "unexpected empty remote store of historical data".to_string(),
75 ));
76 }
77
78 files.sort_by_key(|f| f.checkpoint_seq_range.start);
79
80 assert!(
81 files
82 .windows(2)
83 .all(|w| w[1].checkpoint_seq_range.start == w[0].checkpoint_seq_range.end)
84 );
85
86 assert_eq!(files.first().map(|f| f.checkpoint_seq_range.start), Some(0));
87
88 Ok(files)
89 }
90
91 pub async fn verify_file_consistency(&self, files: Vec<FileMetadata>) -> Result<()> {
94 let remote_object_store = self.remote_object_store.clone();
95 futures::stream::iter(files.iter())
96 .map(|metadata| {
97 let remote_object_store = remote_object_store.clone();
98 async move {
99 let checkpoint_data = get(&remote_object_store, &metadata.file_path()).await?;
100 Ok::<(Bytes, &FileMetadata), IngestionError>((checkpoint_data, metadata))
101 }
102 })
103 .boxed()
104 .buffer_unordered(self.concurrency)
105 .try_for_each(|(checkpoint_data, metadata)| {
106 let checksum = compute_sha3_checksum_for_bytes(checkpoint_data).map_err(Into::into);
107 let result = checksum.and_then(|checksum| {
108 if checksum == metadata.sha3_digest {
109 return Ok(());
110 };
111 Err(IngestionError::HistoryRead(format!(
112 "checksum doesn't match for file: {:?}",
113 metadata.file_path()
114 )))
115 });
116 futures::future::ready(result)
117 })
118 .await
119 }
120
121 pub async fn stream_blobs_for_range(
151 &self,
152 checkpoint_range: Range<CheckpointSequenceNumber>,
153 ) -> Result<impl Stream<Item = Result<Bytes>> + Send + use<'_>> {
154 let files = self.get_files_for_range(checkpoint_range).await?;
155 Ok(futures::stream::iter(files)
156 .map(move |metadata| async move {
157 let remote_object_store = Arc::clone(&self.remote_object_store);
158 let file_path = metadata.file_path();
159 Ok(get(&remote_object_store, &file_path).await?)
160 })
161 .buffered(self.concurrency))
162 }
163
164 pub async fn iter_for_range(
175 &self,
176 checkpoint_range: Range<CheckpointSequenceNumber>,
177 ) -> Result<impl Iterator<Item = CheckpointData>> {
178 let blobs = self
179 .stream_blobs_for_range(checkpoint_range.clone())
180 .await?
181 .try_collect::<Vec<_>>()
182 .await?;
183 let data_iterators = blobs
184 .into_iter()
185 .map(|blob| {
186 let range = checkpoint_range.clone();
187 make_blob_iterator_for_range(blob, range)
188 })
189 .collect::<Result<Vec<_>>>()?;
190 Ok(data_iterators.into_iter().flatten())
191 }
192
193 pub async fn iter_for_file(
206 &self,
207 file_path: Path,
208 ) -> Result<impl Iterator<Item = CheckpointData>> {
209 let raw_data_batch = get(&self.remote_object_store, &file_path).await?;
210 make_blob_iterator(raw_data_batch)
211 }
212
213 pub async fn latest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber> {
215 self.manifest
216 .lock()
217 .await
218 .next_checkpoint_seq_num()
219 .checked_sub(1)
220 .ok_or_else(|| {
221 IngestionError::HistoryRead("no checkpoint data in the remote store".into())
222 })
223 }
224
225 pub fn remote_store_identifier(&self) -> String {
226 self.remote_object_store.to_string()
227 }
228
229 pub async fn sync_manifest_once(&self) -> Result<()> {
231 Self::sync_manifest(self.remote_object_store.clone(), self.manifest.clone()).await?;
232 Ok(())
233 }
234
235 pub async fn get_manifest(&self) -> Manifest {
236 self.manifest.lock().await.clone()
237 }
238
239 async fn sync_manifest(
241 remote_store: Arc<dyn ObjectStoreGetExt>,
242 manifest: Arc<Mutex<Manifest>>,
243 ) -> Result<()> {
244 let new_manifest = read_manifest(remote_store.clone()).await?;
245 let mut locked = manifest.lock().await;
246 *locked = new_manifest;
247 Ok(())
248 }
249
250 async fn get_files_for_range(
261 &self,
262 checkpoint_range: Range<CheckpointSequenceNumber>,
263 ) -> Result<impl Iterator<Item = FileMetadata>> {
264 let manifest = self.get_manifest().await;
265
266 let latest_available_checkpoint = manifest
267 .next_checkpoint_seq_num()
268 .checked_sub(1)
269 .ok_or_else(|| {
270 IngestionError::HistoryRead("no checkpoint data in the remote store".into())
271 })?;
272
273 if checkpoint_range.start > latest_available_checkpoint {
274 return Err(IngestionError::HistoryRead(format!(
275 "latest available checkpoint is: {latest_available_checkpoint}",
276 )));
277 }
278
279 let files = self.verify_and_get_manifest_files(manifest)?;
280
281 let start_index = match files
282 .binary_search_by_key(&checkpoint_range.start, |s| s.checkpoint_seq_range.start)
283 {
284 Ok(index) => index,
285 Err(index) => index - 1,
286 };
287
288 let end_index = match files
289 .binary_search_by_key(&checkpoint_range.end, |s| s.checkpoint_seq_range.start)
290 {
291 Ok(index) => index,
292 Err(index) => index,
293 };
294
295 Ok(files
296 .into_iter()
297 .enumerate()
298 .filter_map(move |(index, metadata)| {
299 (index >= start_index && index < end_index).then_some(metadata)
300 }))
301 }
302
303 fn spawn_manifest_sync_task(
304 remote_store: Arc<dyn ObjectStoreGetExt>,
305 manifest: Arc<Mutex<Manifest>>,
306 mut recv: oneshot::Receiver<()>,
307 ) {
308 tokio::task::spawn(async move {
309 let mut interval = tokio::time::interval(Duration::from_secs(60));
310 loop {
311 tokio::select! {
312 _ = interval.tick() => {
313 Self::sync_manifest(remote_store.clone(), manifest.clone()).await?;
314 }
315 _ = &mut recv => break,
316 }
317 }
318 info!("terminating the manifest sync loop");
319 Ok::<(), IngestionError>(())
320 });
321 }
322}
323
324fn make_blob_iterator(blob: Bytes) -> Result<impl Iterator<Item = CheckpointData>> {
325 Ok(make_iterator::<CheckpointData, Reader<Bytes>>(
326 CHECKPOINT_FILE_MAGIC,
327 blob.reader(),
328 )?)
329}
330
331pub fn make_blob_iterator_for_range(
339 blob: Bytes,
340 range: Range<CheckpointSequenceNumber>,
341) -> Result<impl Iterator<Item = CheckpointData>> {
342 Ok(make_blob_iterator(blob)?
343 .filter(move |data| range.contains(&data.checkpoint_summary.sequence_number)))
344}