iota_data_ingestion_core/reader/
v2.rs1use std::{
5 num::NonZeroUsize,
6 path::{Path, PathBuf},
7 sync::Arc,
8 time::Duration,
9};
10
11use backoff::backoff::Backoff;
12use futures::StreamExt;
13use iota_config::{
14 node::ArchiveReaderConfig,
15 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
16};
17use iota_metrics::spawn_monitored_task;
18use iota_rest_api::CheckpointData;
19use iota_types::messages_checkpoint::CheckpointSequenceNumber;
20use object_store::ObjectStore;
21use serde::{Deserialize, Serialize};
22use tap::Pipe;
23use tokio::{
24 sync::{
25 mpsc::{self},
26 oneshot,
27 },
28 task::JoinHandle,
29 time::timeout,
30};
31use tracing::{debug, error, info};
32
33use crate::{
34 IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, create_remote_store_client,
35 history::reader::HistoricalReader,
36 reader::{
37 fetch::{LocalRead, ReadSource, fetch_from_full_node, fetch_from_object_store},
38 v1::{DataLimiter, ReaderOptions},
39 },
40};
41
42#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
49pub enum RemoteUrl {
50 Fullnode(String),
58 HybridHistoricalStore {
61 historical_url: String,
69 live_url: Option<String>,
77 },
78}
79
80enum RemoteStore {
86 Fullnode(iota_rest_api::Client),
87 HybridHistoricalStore {
88 historical: HistoricalReader,
89 live: Option<Box<dyn ObjectStore>>,
90 },
91}
92
93impl RemoteStore {
94 async fn new(
95 remote_url: RemoteUrl,
96 batch_size: usize,
97 timeout_secs: u64,
98 ) -> IngestionResult<Self> {
99 let store = match remote_url {
100 RemoteUrl::Fullnode(url) => RemoteStore::Fullnode(iota_rest_api::Client::new(url)),
101 RemoteUrl::HybridHistoricalStore {
102 historical_url,
103 live_url,
104 } => {
105 let config = ArchiveReaderConfig {
106 download_concurrency: NonZeroUsize::new(batch_size)
107 .expect("batch size must be greater than zero"),
108 remote_store_config: ObjectStoreConfig {
109 object_store: Some(ObjectStoreType::S3),
110 object_store_connection_limit: 20,
111 aws_endpoint: Some(historical_url),
112 aws_virtual_hosted_style_request: true,
113 no_sign_request: true,
114 ..Default::default()
115 },
116 use_for_pruning_watermark: false,
117 };
118 let historical = HistoricalReader::new(config)
119 .inspect_err(|e| error!("Unable to instantiate historical reader: {e}"))?;
120
121 let live = live_url
122 .map(|url| create_remote_store_client(url, Default::default(), timeout_secs))
123 .transpose()?;
124
125 RemoteStore::HybridHistoricalStore { historical, live }
126 }
127 };
128 Ok(store)
129 }
130}
131
132#[derive(Default, Clone)]
135pub struct CheckpointReaderConfig {
136 pub reader_options: ReaderOptions,
138 pub ingestion_path: Option<PathBuf>,
141 pub remote_store_url: Option<RemoteUrl>,
143}
144
145struct CheckpointReaderActor {
169 path: PathBuf,
171 current_checkpoint_number: CheckpointSequenceNumber,
173 last_pruned_watermark: CheckpointSequenceNumber,
176 checkpoint_tx: mpsc::Sender<Arc<CheckpointData>>,
178 gc_signal_rx: mpsc::Receiver<CheckpointSequenceNumber>,
181 remote_store: Option<Arc<RemoteStore>>,
183 shutdown_rx: oneshot::Receiver<()>,
185 reader_options: ReaderOptions,
187 data_limiter: DataLimiter,
189}
190
191impl LocalRead for CheckpointReaderActor {
192 fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
193 ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
194 || self.data_limiter.exceeds()
195 }
196
197 fn path(&self) -> &Path {
198 &self.path
199 }
200
201 fn current_checkpoint_number(&self) -> CheckpointSequenceNumber {
202 self.current_checkpoint_number
203 }
204
205 fn update_last_pruned_watermark(&mut self, watermark: CheckpointSequenceNumber) {
206 self.last_pruned_watermark = watermark;
207 }
208}
209
210impl CheckpointReaderActor {
211 fn should_fetch_from_remote(&self, checkpoints: &[Arc<CheckpointData>]) -> bool {
212 self.remote_store.is_some()
213 && (checkpoints.is_empty()
214 || self.is_checkpoint_ahead(&checkpoints[0], self.current_checkpoint_number))
215 }
216
217 async fn fetch_historical(
220 &mut self,
221 historical_reader: &HistoricalReader,
222 ) -> IngestionResult<()> {
223 if self.current_checkpoint_number > historical_reader.latest_available_checkpoint().await? {
227 timeout(
228 Duration::from_secs(self.reader_options.timeout_secs),
229 historical_reader.sync_manifest_once(),
230 )
231 .await
232 .map_err(|_| {
233 IngestionError::HistoryRead("Reading Manifest exceeded the timeout".into())
234 })??;
235
236 if self.current_checkpoint_number
239 > historical_reader.latest_available_checkpoint().await?
240 {
241 return Err(IngestionError::CheckpointNotAvailableYet);
242 }
243 }
244
245 let manifest = historical_reader.get_manifest().await;
246
247 let files = historical_reader.verify_and_get_manifest_files(manifest)?;
248
249 let start_index = match files.binary_search_by_key(&self.current_checkpoint_number, |s| {
250 s.checkpoint_seq_range.start
251 }) {
252 Ok(index) => index,
253 Err(index) => index - 1,
254 };
255
256 for metadata in files
257 .into_iter()
258 .enumerate()
259 .filter_map(|(index, metadata)| (index >= start_index).then_some(metadata))
260 {
261 let checkpoints = timeout(
262 Duration::from_secs(self.reader_options.timeout_secs),
263 historical_reader.iter_for_file(metadata.file_path()),
264 )
265 .await
266 .map_err(|_| {
267 IngestionError::HistoryRead(format!(
268 "Reading checkpoint {} exceeded the timeout",
269 metadata.file_path()
270 ))
271 })??
272 .filter(|c| c.checkpoint_summary.sequence_number >= self.current_checkpoint_number)
273 .collect::<Vec<CheckpointData>>();
274
275 for checkpoint in checkpoints {
276 let size = bcs::serialized_size(&checkpoint)?;
277 self.send_remote_checkpoint_with_capacity_check(Arc::new(checkpoint), size)
278 .await?;
279 }
280 }
281
282 Ok(())
283 }
284
285 async fn fetch_and_send_to_channel(&mut self) -> IngestionResult<()> {
292 let Some(remote_store) = self.remote_store.as_ref().map(Arc::clone) else {
293 return Ok(());
294 };
295 let batch_size = self.reader_options.batch_size;
296 match remote_store.as_ref() {
297 RemoteStore::Fullnode(client) => {
298 let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
299 .map(|checkpoint_number| fetch_from_full_node(client, checkpoint_number))
300 .pipe(futures::stream::iter)
301 .buffered(batch_size);
302
303 while let Some(checkpoint_result) = checkpoint_stream.next().await {
304 let (checkpoint, size) = checkpoint_result?;
305 self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
306 .await?;
307 }
308 }
309 RemoteStore::HybridHistoricalStore { historical, live } => {
310 if let Err(err) = self.fetch_historical(historical).await {
311 if matches!(err, IngestionError::CheckpointNotAvailableYet) {
312 let live = live.as_ref().ok_or(err)?;
313 let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
314 .map(|checkpoint_number| {
315 fetch_from_object_store(live, checkpoint_number)
316 })
317 .pipe(futures::stream::iter)
318 .buffered(batch_size);
319
320 while let Some(checkpoint_result) = checkpoint_stream.next().await {
321 let (checkpoint, size) = checkpoint_result?;
322 self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
323 .await?;
324 }
325 return Ok(());
326 }
327 return Err(err);
328 }
329 }
330 };
331 Ok(())
332 }
333
334 async fn fetch_and_send_to_channel_with_retry(&mut self) {
338 let mut backoff = backoff::ExponentialBackoff::default();
339 backoff.max_elapsed_time = Some(Duration::from_secs(60));
340 backoff.initial_interval = Duration::from_millis(100);
341 backoff.current_interval = backoff.initial_interval;
342 backoff.multiplier = 1.0;
343
344 loop {
345 match self.fetch_and_send_to_channel().await {
346 Ok(_) => break,
347 Err(IngestionError::MaxCheckpointsCapacityReached) => break,
348 Err(IngestionError::CheckpointNotAvailableYet) => {
349 break info!("historical reader does not have the requested checkpoint yet");
350 }
351 Err(err) => match backoff.next_backoff() {
352 Some(duration) => {
353 if !err.to_string().to_lowercase().contains("not found") {
354 debug!(
355 "remote reader retry in {} ms. Error is {err:?}",
356 duration.as_millis(),
357 );
358 }
359 tokio::time::sleep(duration).await
360 }
361 None => {
362 break error!("remote reader transient error {err:?}");
363 }
364 },
365 }
366 }
367 }
368
369 async fn send_remote_checkpoint_with_capacity_check(
377 &mut self,
378 checkpoint: Arc<CheckpointData>,
379 size: usize,
380 ) -> IngestionResult<()> {
381 if self.exceeds_capacity(checkpoint.checkpoint_summary.sequence_number) {
382 return Err(IngestionError::MaxCheckpointsCapacityReached);
383 }
384 self.data_limiter.add(&checkpoint, size);
385 self.send_checkpoint_to_channel(checkpoint).await
386 }
387
388 async fn send_local_checkpoints_to_channel(
395 &mut self,
396 checkpoints: Vec<Arc<CheckpointData>>,
397 ) -> IngestionResult<()> {
398 for checkpoint in checkpoints {
399 if self.is_checkpoint_ahead(&checkpoint, self.current_checkpoint_number) {
400 break;
401 }
402 self.send_checkpoint_to_channel(checkpoint).await?;
403 }
404 Ok(())
405 }
406
407 async fn send_checkpoint_to_channel(
414 &mut self,
415 checkpoint: Arc<CheckpointData>,
416 ) -> IngestionResult<()> {
417 assert_eq!(
418 checkpoint.checkpoint_summary.sequence_number,
419 self.current_checkpoint_number
420 );
421 self.checkpoint_tx.send(checkpoint).await.map_err(|_| {
422 IngestionError::Channel(
423 "unable to send checkpoint to executor, receiver half closed".to_owned(),
424 )
425 })?;
426 self.current_checkpoint_number += 1;
427 Ok(())
428 }
429
430 async fn sync(&mut self) -> IngestionResult<()> {
433 let mut remote_source = ReadSource::Local;
434 let checkpoints = self.read_local_files_with_retry().await?;
435 let should_fetch_from_remote = self.should_fetch_from_remote(&checkpoints);
436
437 if should_fetch_from_remote {
438 remote_source = ReadSource::Remote;
439 self.fetch_and_send_to_channel_with_retry().await;
440 } else {
441 self.send_local_checkpoints_to_channel(checkpoints).await?;
442 }
443
444 info!(
445 "Read from {remote_source}. Current checkpoint number: {}, pruning watermark: {}",
446 self.current_checkpoint_number, self.last_pruned_watermark,
447 );
448
449 Ok(())
450 }
451
452 async fn run(mut self) {
454 let (_watcher, mut inotify_rx) = self.setup_directory_watcher();
455 self.data_limiter.gc(self.last_pruned_watermark);
456 self.gc_processed_files(self.last_pruned_watermark)
457 .expect("Failed to clean the directory");
458
459 loop {
460 tokio::select! {
461 _ = &mut self.shutdown_rx => break,
462 Some(watermark) = self.gc_signal_rx.recv() => {
463 self.data_limiter.gc(watermark);
464 self.gc_processed_files(watermark).expect("Failed to clean the directory");
465 }
466 Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.reader_options.tick_interval_ms), inotify_rx.recv()) => {
467 self.sync().await.expect("Failed to read checkpoint files");
468 }
469 }
470 }
471 }
472}
473
474pub(crate) struct CheckpointReader {
481 handle: JoinHandle<()>,
482 shutdown_tx: oneshot::Sender<()>,
483 gc_signal_tx: mpsc::Sender<CheckpointSequenceNumber>,
484 checkpoint_rx: mpsc::Receiver<Arc<CheckpointData>>,
485}
486
487impl CheckpointReader {
488 pub(crate) async fn new(
489 starting_checkpoint_number: CheckpointSequenceNumber,
490 config: CheckpointReaderConfig,
491 ) -> IngestionResult<Self> {
492 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
493 let (gc_signal_tx, gc_signal_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
494 let (shutdown_tx, shutdown_rx) = oneshot::channel();
495
496 let remote_store = if let Some(url) = config.remote_store_url {
497 Some(Arc::new(
498 RemoteStore::new(
499 url,
500 config.reader_options.batch_size,
501 config.reader_options.timeout_secs,
502 )
503 .await?,
504 ))
505 } else {
506 None
507 };
508
509 let path = match config.ingestion_path {
510 Some(p) => p,
511 None => tempfile::tempdir()?.keep(),
512 };
513
514 let reader = CheckpointReaderActor {
515 path,
516 current_checkpoint_number: starting_checkpoint_number,
517 last_pruned_watermark: starting_checkpoint_number,
518 checkpoint_tx,
519 gc_signal_rx,
520 remote_store,
521 shutdown_rx,
522 data_limiter: DataLimiter::new(config.reader_options.data_limit),
523 reader_options: config.reader_options,
524 };
525
526 let handle = spawn_monitored_task!(reader.run());
527
528 Ok(Self {
529 handle,
530 gc_signal_tx,
531 shutdown_tx,
532 checkpoint_rx,
533 })
534 }
535
536 pub(crate) async fn checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
538 self.checkpoint_rx.recv().await
539 }
540
541 pub(crate) async fn send_gc_signal(
548 &self,
549 watermark: CheckpointSequenceNumber,
550 ) -> IngestionResult<()> {
551 self.gc_signal_tx.send(watermark).await.map_err(|_| {
552 IngestionError::Channel(
553 "unable to send GC operation to checkpoint reader, receiver half closed".into(),
554 )
555 })
556 }
557
558 pub(crate) async fn shutdown(self) -> IngestionResult<()> {
564 _ = self.shutdown_tx.send(());
565 self.handle.await.map_err(|err| IngestionError::Shutdown {
566 component: "CheckpointReader".into(),
567 msg: err.to_string(),
568 })
569 }
570}