iota_data_ingestion_core/reader/
v2.rs1use std::{
5 num::NonZeroUsize,
6 path::{Path, PathBuf},
7 sync::Arc,
8 time::Duration,
9};
10
11use backoff::backoff::Backoff;
12use futures::StreamExt;
13use iota_config::{
14 node::ArchiveReaderConfig,
15 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
16};
17use iota_metrics::spawn_monitored_task;
18use iota_rest_api::CheckpointData;
19use iota_types::messages_checkpoint::CheckpointSequenceNumber;
20use object_store::ObjectStore;
21use serde::{Deserialize, Serialize};
22use tap::Pipe;
23use tokio::{
24 sync::mpsc::{self},
25 task::JoinHandle,
26 time::timeout,
27};
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, error, info};
30
31#[cfg(not(target_os = "macos"))]
32use crate::reader::fetch::init_watcher;
33use crate::{
34 IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, create_remote_store_client,
35 history::reader::HistoricalReader,
36 reader::{
37 fetch::{LocalRead, ReadSource, fetch_from_full_node, fetch_from_object_store},
38 v1::{DataLimiter, ReaderOptions},
39 },
40};
41
42#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
49pub enum RemoteUrl {
50 Fullnode(String),
58 HybridHistoricalStore {
61 historical_url: String,
69 live_url: Option<String>,
77 },
78}
79
80enum RemoteStore {
86 Fullnode(iota_rest_api::Client),
87 HybridHistoricalStore {
88 historical: HistoricalReader,
89 live: Option<Box<dyn ObjectStore>>,
90 },
91}
92
93impl RemoteStore {
94 async fn new(
95 remote_url: RemoteUrl,
96 batch_size: usize,
97 timeout_secs: u64,
98 ) -> IngestionResult<Self> {
99 let store = match remote_url {
100 RemoteUrl::Fullnode(url) => RemoteStore::Fullnode(iota_rest_api::Client::new(url)),
101 RemoteUrl::HybridHistoricalStore {
102 historical_url,
103 live_url,
104 } => {
105 let config = ArchiveReaderConfig {
106 download_concurrency: NonZeroUsize::new(batch_size)
107 .expect("batch size must be greater than zero"),
108 remote_store_config: ObjectStoreConfig {
109 object_store: Some(ObjectStoreType::S3),
110 object_store_connection_limit: 20,
111 aws_endpoint: Some(historical_url),
112 aws_virtual_hosted_style_request: true,
113 no_sign_request: true,
114 ..Default::default()
115 },
116 use_for_pruning_watermark: false,
117 };
118 let historical = HistoricalReader::new(config)
119 .inspect_err(|e| error!("unable to instantiate historical reader: {e}"))?;
120
121 let live = live_url
122 .map(|url| create_remote_store_client(url, Default::default(), timeout_secs))
123 .transpose()?;
124
125 RemoteStore::HybridHistoricalStore { historical, live }
126 }
127 };
128 Ok(store)
129 }
130}
131
132#[derive(Default, Clone)]
135pub struct CheckpointReaderConfig {
136 pub reader_options: ReaderOptions,
138 pub ingestion_path: Option<PathBuf>,
141 pub remote_store_url: Option<RemoteUrl>,
143}
144
145struct CheckpointReaderActor {
169 path: PathBuf,
171 current_checkpoint_number: CheckpointSequenceNumber,
173 last_pruned_watermark: CheckpointSequenceNumber,
176 checkpoint_tx: mpsc::Sender<Arc<CheckpointData>>,
178 gc_signal_rx: mpsc::Receiver<CheckpointSequenceNumber>,
181 remote_store: Option<Arc<RemoteStore>>,
183 token: CancellationToken,
185 reader_options: ReaderOptions,
187 data_limiter: DataLimiter,
189}
190
191impl LocalRead for CheckpointReaderActor {
192 fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
193 ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
194 || self.data_limiter.exceeds()
195 }
196
197 fn path(&self) -> &Path {
198 &self.path
199 }
200
201 fn current_checkpoint_number(&self) -> CheckpointSequenceNumber {
202 self.current_checkpoint_number
203 }
204
205 fn update_last_pruned_watermark(&mut self, watermark: CheckpointSequenceNumber) {
206 self.last_pruned_watermark = watermark;
207 }
208}
209
210impl CheckpointReaderActor {
211 fn should_fetch_from_remote(&self, checkpoints: &[Arc<CheckpointData>]) -> bool {
212 self.remote_store.is_some()
213 && (checkpoints.is_empty()
214 || self.is_checkpoint_ahead(&checkpoints[0], self.current_checkpoint_number))
215 }
216
217 async fn fetch_historical(
220 &mut self,
221 historical_reader: &HistoricalReader,
222 ) -> IngestionResult<()> {
223 if self.current_checkpoint_number > historical_reader.latest_available_checkpoint().await? {
227 timeout(
228 Duration::from_secs(self.reader_options.timeout_secs),
229 historical_reader.sync_manifest_once(),
230 )
231 .await
232 .map_err(|_| {
233 IngestionError::HistoryRead("reading manifest exceeded the timeout".into())
234 })??;
235
236 if self.current_checkpoint_number
239 > historical_reader.latest_available_checkpoint().await?
240 {
241 return Err(IngestionError::CheckpointNotAvailableYet);
242 }
243 }
244
245 let manifest = historical_reader.get_manifest().await;
246
247 let files = historical_reader.verify_and_get_manifest_files(manifest)?;
248
249 let start_index = match files.binary_search_by_key(&self.current_checkpoint_number, |s| {
250 s.checkpoint_seq_range.start
251 }) {
252 Ok(index) => index,
253 Err(index) => index - 1,
254 };
255
256 for metadata in files
257 .into_iter()
258 .enumerate()
259 .filter_map(|(index, metadata)| (index >= start_index).then_some(metadata))
260 {
261 let checkpoints = timeout(
262 Duration::from_secs(self.reader_options.timeout_secs),
263 historical_reader.iter_for_file(metadata.file_path()),
264 )
265 .await
266 .map_err(|_| {
267 IngestionError::HistoryRead(format!(
268 "reading checkpoint {} exceeded the timeout",
269 metadata.file_path()
270 ))
271 })??
272 .filter(|c| c.checkpoint_summary.sequence_number >= self.current_checkpoint_number)
273 .collect::<Vec<CheckpointData>>();
274
275 for checkpoint in checkpoints {
276 let size = bcs::serialized_size(&checkpoint)?;
277 self.send_remote_checkpoint_with_capacity_check(Arc::new(checkpoint), size)
278 .await?;
279 }
280 }
281
282 Ok(())
283 }
284
285 async fn fetch_and_send_to_channel(&mut self) -> IngestionResult<()> {
292 let Some(remote_store) = self.remote_store.as_ref().map(Arc::clone) else {
293 return Ok(());
294 };
295 let batch_size = self.reader_options.batch_size;
296 match remote_store.as_ref() {
297 RemoteStore::Fullnode(client) => {
298 let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
299 .map(|checkpoint_number| fetch_from_full_node(client, checkpoint_number))
300 .pipe(futures::stream::iter)
301 .buffered(batch_size);
302
303 while let Some(checkpoint_result) = self
304 .token
305 .run_until_cancelled(checkpoint_stream.next())
306 .await
307 .flatten()
308 {
309 let (checkpoint, size) = checkpoint_result?;
310 self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
311 .await?;
312 }
313 }
314 RemoteStore::HybridHistoricalStore { historical, live } => {
315 if let Some(Err(err)) = self
316 .token
317 .clone()
318 .run_until_cancelled(self.fetch_historical(historical))
319 .await
320 {
321 if matches!(err, IngestionError::CheckpointNotAvailableYet) {
322 let live = live.as_ref().ok_or(err)?;
323 let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
324 .map(|checkpoint_number| {
325 fetch_from_object_store(live, checkpoint_number)
326 })
327 .pipe(futures::stream::iter)
328 .buffered(batch_size);
329
330 while let Some(checkpoint_result) = self
331 .token
332 .run_until_cancelled(checkpoint_stream.next())
333 .await
334 .flatten()
335 {
336 let (checkpoint, size) = checkpoint_result?;
337 self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
338 .await?;
339 }
340 return Ok(());
341 }
342 return Err(err);
343 }
344 }
345 };
346 Ok(())
347 }
348
349 async fn fetch_and_send_to_channel_with_retry(&mut self) {
353 let mut backoff = backoff::ExponentialBackoff::default();
354 backoff.max_elapsed_time = Some(Duration::from_secs(60));
355 backoff.initial_interval = Duration::from_millis(100);
356 backoff.current_interval = backoff.initial_interval;
357 backoff.multiplier = 1.0;
358
359 loop {
360 match self.fetch_and_send_to_channel().await {
361 Ok(_) => break,
362 Err(IngestionError::MaxCheckpointsCapacityReached) => break,
363 Err(IngestionError::CheckpointNotAvailableYet) => {
364 break info!("historical reader does not have the requested checkpoint yet");
365 }
366 Err(err) => match backoff.next_backoff() {
367 Some(duration) => {
368 if !err.to_string().to_lowercase().contains("not found") {
369 debug!(
370 "remote reader retry in {} ms. Error is {err:?}",
371 duration.as_millis(),
372 );
373 }
374 if self
375 .token
376 .run_until_cancelled(tokio::time::sleep(duration))
377 .await
378 .is_none()
379 {
380 break;
381 }
382 }
383 None => {
384 break error!("remote reader transient error {err:?}");
385 }
386 },
387 }
388 }
389 }
390
391 async fn send_remote_checkpoint_with_capacity_check(
399 &mut self,
400 checkpoint: Arc<CheckpointData>,
401 size: usize,
402 ) -> IngestionResult<()> {
403 if self.exceeds_capacity(checkpoint.checkpoint_summary.sequence_number) {
404 return Err(IngestionError::MaxCheckpointsCapacityReached);
405 }
406 self.data_limiter.add(&checkpoint, size);
407 self.send_checkpoint_to_channel(checkpoint).await
408 }
409
410 async fn send_local_checkpoints_to_channel(
417 &mut self,
418 checkpoints: Vec<Arc<CheckpointData>>,
419 ) -> IngestionResult<()> {
420 for checkpoint in checkpoints {
421 if self.is_checkpoint_ahead(&checkpoint, self.current_checkpoint_number) {
422 break;
423 }
424 self.send_checkpoint_to_channel(checkpoint).await?;
425 }
426 Ok(())
427 }
428
429 async fn send_checkpoint_to_channel(
436 &mut self,
437 checkpoint: Arc<CheckpointData>,
438 ) -> IngestionResult<()> {
439 assert_eq!(
440 checkpoint.checkpoint_summary.sequence_number,
441 self.current_checkpoint_number
442 );
443 self.checkpoint_tx.send(checkpoint).await.map_err(|_| {
444 IngestionError::Channel(
445 "unable to send checkpoint to executor, receiver half closed".to_owned(),
446 )
447 })?;
448 self.current_checkpoint_number += 1;
449 Ok(())
450 }
451
452 async fn sync(&mut self) -> IngestionResult<()> {
455 let mut remote_source = ReadSource::Local;
456 let checkpoints = self.read_local_files_with_retry().await?;
457 let should_fetch_from_remote = self.should_fetch_from_remote(&checkpoints);
458
459 if should_fetch_from_remote {
460 remote_source = ReadSource::Remote;
461 self.fetch_and_send_to_channel_with_retry().await;
462 } else {
463 self.send_local_checkpoints_to_channel(checkpoints).await?;
464 }
465
466 info!(
467 "Read from {remote_source}. Current checkpoint number: {}, pruning watermark: {}",
468 self.current_checkpoint_number, self.last_pruned_watermark,
469 );
470
471 Ok(())
472 }
473
474 async fn run(mut self) {
476 let (_inotify_tx, mut inotify_rx) = mpsc::channel::<()>(1);
477 std::fs::create_dir_all(self.path()).expect("failed to create a directory");
478
479 #[cfg(not(target_os = "macos"))]
480 let _watcher = init_watcher(_inotify_tx, self.path());
481
482 self.data_limiter.gc(self.last_pruned_watermark);
483 self.gc_processed_files(self.last_pruned_watermark)
484 .expect("failed to clean the directory");
485
486 loop {
487 tokio::select! {
488 _ = self.token.cancelled() => break,
489 Some(watermark) = self.gc_signal_rx.recv() => {
490 self.data_limiter.gc(watermark);
491 self.gc_processed_files(watermark).expect("failed to clean the directory");
492 }
493 Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.reader_options.tick_interval_ms), inotify_rx.recv()) => {
494 self.sync().await.expect("failed to read checkpoint files");
495 }
496 }
497 }
498 }
499}
500
501pub(crate) struct CheckpointReader {
508 handle: JoinHandle<()>,
509 gc_signal_tx: mpsc::Sender<CheckpointSequenceNumber>,
510 checkpoint_rx: mpsc::Receiver<Arc<CheckpointData>>,
511 token: CancellationToken,
512}
513
514impl CheckpointReader {
515 pub(crate) async fn new(
516 starting_checkpoint_number: CheckpointSequenceNumber,
517 config: CheckpointReaderConfig,
518 ) -> IngestionResult<Self> {
519 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
520 let (gc_signal_tx, gc_signal_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
521
522 let remote_store = if let Some(url) = config.remote_store_url {
523 Some(Arc::new(
524 RemoteStore::new(
525 url,
526 config.reader_options.batch_size,
527 config.reader_options.timeout_secs,
528 )
529 .await?,
530 ))
531 } else {
532 None
533 };
534
535 let path = match config.ingestion_path {
536 Some(p) => p,
537 None => tempfile::tempdir()?.keep(),
538 };
539 let token = CancellationToken::new();
540 let reader = CheckpointReaderActor {
541 path,
542 current_checkpoint_number: starting_checkpoint_number,
543 last_pruned_watermark: starting_checkpoint_number,
544 checkpoint_tx,
545 gc_signal_rx,
546 remote_store,
547 token: token.clone(),
548 data_limiter: DataLimiter::new(config.reader_options.data_limit),
549 reader_options: config.reader_options,
550 };
551
552 let handle = spawn_monitored_task!(reader.run());
553
554 Ok(Self {
555 handle,
556 gc_signal_tx,
557 checkpoint_rx,
558 token,
559 })
560 }
561
562 pub(crate) async fn checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
564 self.checkpoint_rx.recv().await
565 }
566
567 pub(crate) async fn send_gc_signal(
574 &self,
575 watermark: CheckpointSequenceNumber,
576 ) -> IngestionResult<()> {
577 self.gc_signal_tx.send(watermark).await.map_err(|_| {
578 IngestionError::Channel(
579 "unable to send GC operation to checkpoint reader, receiver half closed".into(),
580 )
581 })
582 }
583
584 pub(crate) async fn shutdown(self) -> IngestionResult<()> {
590 self.token.cancel();
591 self.handle.await.map_err(|err| IngestionError::Shutdown {
592 component: "checkpoint reader".into(),
593 msg: err.to_string(),
594 })
595 }
596}