1use std::{
5 num::NonZeroUsize,
6 path::{Path, PathBuf},
7 sync::Arc,
8 time::Duration,
9};
10
11use backoff::backoff::Backoff;
12use futures::{StreamExt, TryStreamExt};
13use iota_config::{
14 node::ArchiveReaderConfig,
15 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
16};
17use iota_grpc_client::Client as GrpcClient;
18use iota_metrics::spawn_monitored_task;
19use iota_types::{
20 full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
21};
22use object_store::ObjectStore;
23use serde::{Deserialize, Serialize};
24use tap::Pipe;
25use tokio::{
26 sync::mpsc::{self},
27 task::JoinHandle,
28 time::timeout,
29};
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, info};
32
33#[cfg(not(target_os = "macos"))]
34use crate::reader::fetch::init_watcher;
35use crate::{
36 IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS,
37 config::CheckpointReaderConfigExt,
38 create_remote_store_client,
39 history::reader::HistoricalReader,
40 reader::{
41 ReaderOptions,
42 common::DataLimiter,
43 fetch::{
44 GRPC_MAX_DECODING_MESSAGE_SIZE_BYTES, LocalRead, ReadSource, fetch_from_object_store,
45 },
46 filters::fullnode::TransactionFilter,
47 },
48};
49
50#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
57pub enum RemoteUrl {
58 Fullnode(String),
66 HybridHistoricalStore {
69 historical_url: String,
77 live_url: Option<String>,
85 },
86}
87
88enum RemoteStore {
94 Fullnode(GrpcClient),
95 HybridHistoricalStore {
96 historical: HistoricalReader,
97 live: Option<Box<dyn ObjectStore>>,
98 },
99}
100
101impl RemoteStore {
102 async fn new(
103 remote_url: RemoteUrl,
104 batch_size: usize,
105 timeout_secs: u64,
106 ) -> IngestionResult<Self> {
107 let store = match remote_url {
108 RemoteUrl::Fullnode(ref url) => {
109 let grpc_client = GrpcClient::new(url).map(|client| {
110 client.with_max_decoding_message_size(GRPC_MAX_DECODING_MESSAGE_SIZE_BYTES)
111 })?;
112 RemoteStore::Fullnode(grpc_client)
113 }
114 RemoteUrl::HybridHistoricalStore {
115 historical_url,
116 live_url,
117 } => {
118 let config = ArchiveReaderConfig {
119 download_concurrency: NonZeroUsize::new(batch_size)
120 .expect("batch size must be greater than zero"),
121 remote_store_config: ObjectStoreConfig {
122 object_store: Some(ObjectStoreType::S3),
123 object_store_connection_limit: 20,
124 aws_endpoint: Some(historical_url),
125 aws_virtual_hosted_style_request: true,
126 no_sign_request: true,
127 ..Default::default()
128 },
129 use_for_pruning_watermark: false,
130 };
131 let historical = HistoricalReader::new(config)
132 .inspect_err(|e| error!("unable to instantiate historical reader: {e}"))?;
133
134 let live = live_url
135 .map(|url| create_remote_store_client(url, Default::default(), timeout_secs))
136 .transpose()?;
137
138 RemoteStore::HybridHistoricalStore { historical, live }
139 }
140 };
141 Ok(store)
142 }
143}
144
145#[derive(Default, Clone)]
148pub struct CheckpointReaderConfig {
149 pub reader_options: ReaderOptions,
151 pub ingestion_path: Option<PathBuf>,
154 pub remote_store_url: Option<RemoteUrl>,
156}
157
158struct CheckpointReaderActor {
182 path: PathBuf,
184 current_checkpoint_number: CheckpointSequenceNumber,
186 last_pruned_watermark: CheckpointSequenceNumber,
189 checkpoint_tx: mpsc::Sender<Arc<CheckpointData>>,
191 gc_signal_rx: mpsc::Receiver<CheckpointSequenceNumber>,
194 remote_store: Option<Arc<RemoteStore>>,
196 token: CancellationToken,
198 reader_options: ReaderOptions,
200 data_limiter: DataLimiter,
202 fullnode_transaction_filter: Option<TransactionFilter>,
204}
205
206impl LocalRead for CheckpointReaderActor {
207 fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
208 ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
209 || self.data_limiter.exceeds()
210 }
211
212 fn path(&self) -> &Path {
213 &self.path
214 }
215
216 fn current_checkpoint_number(&self) -> CheckpointSequenceNumber {
217 self.current_checkpoint_number
218 }
219
220 fn update_last_pruned_watermark(&mut self, watermark: CheckpointSequenceNumber) {
221 self.last_pruned_watermark = watermark;
222 }
223}
224
225impl CheckpointReaderActor {
226 fn should_fetch_from_remote(&self, checkpoints: &[Arc<CheckpointData>]) -> bool {
227 self.remote_store.is_some()
228 && (checkpoints.is_empty()
229 || self.is_checkpoint_ahead(&checkpoints[0], self.current_checkpoint_number))
230 }
231
232 async fn relay_from_historical(
235 &mut self,
236 historical_reader: &HistoricalReader,
237 ) -> IngestionResult<()> {
238 if self.current_checkpoint_number > historical_reader.latest_available_checkpoint().await? {
242 timeout(
243 Duration::from_secs(self.reader_options.timeout_secs),
244 historical_reader.sync_manifest_once(),
245 )
246 .await
247 .map_err(|_| {
248 IngestionError::HistoryRead("reading manifest exceeded the timeout".into())
249 })??;
250
251 if self.current_checkpoint_number
254 > historical_reader.latest_available_checkpoint().await?
255 {
256 return Err(IngestionError::CheckpointNotAvailableYet);
257 }
258 }
259
260 let manifest = historical_reader.get_manifest().await;
261
262 let files = historical_reader.verify_and_get_manifest_files(manifest)?;
263
264 let start_index = match files.binary_search_by_key(&self.current_checkpoint_number, |s| {
265 s.checkpoint_seq_range.start
266 }) {
267 Ok(index) => index,
268 Err(index) => index - 1,
269 };
270
271 for metadata in files
272 .into_iter()
273 .enumerate()
274 .filter_map(|(index, metadata)| (index >= start_index).then_some(metadata))
275 {
276 let checkpoints = timeout(
277 Duration::from_secs(self.reader_options.timeout_secs),
278 historical_reader.iter_for_file(metadata.file_path()),
279 )
280 .await
281 .map_err(|_| {
282 IngestionError::HistoryRead(format!(
283 "reading checkpoint {} exceeded the timeout",
284 metadata.file_path()
285 ))
286 })??
287 .filter(|c| c.checkpoint_summary.sequence_number >= self.current_checkpoint_number)
288 .collect::<Vec<CheckpointData>>();
289
290 for checkpoint in checkpoints {
291 let size = bcs::serialized_size(&checkpoint)?;
292 self.send_remote_checkpoint_with_capacity_check(Arc::new(checkpoint), size)
293 .await?;
294 }
295 }
296
297 Ok(())
298 }
299
300 async fn relay_from_live(
303 &mut self,
304 batch_size: usize,
305 live: &dyn ObjectStore,
306 ) -> IngestionResult<()> {
307 let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
308 .map(|checkpoint_number| fetch_from_object_store(live, checkpoint_number))
309 .pipe(futures::stream::iter)
310 .buffered(batch_size);
311 while let Some((checkpoint, size)) = self
312 .token
313 .run_until_cancelled(checkpoint_stream.try_next())
314 .await
315 .transpose()?
316 .flatten()
317 {
318 self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
319 .await?;
320 }
321 Ok(())
322 }
323
324 async fn relay_from_fullnode(&mut self, client: &mut GrpcClient) -> IngestionResult<()> {
327 let mut checkpoints_stream = client
328 .stream_checkpoints(
329 Some(self.current_checkpoint_number),
330 None,
331 Some(iota_grpc_client::CHECKPOINT_RESPONSE_CHECKPOINT_DATA.into()),
332 self.fullnode_transaction_filter.clone().map(Into::into),
333 None,
334 )
335 .await
336 .map_err(|e| {
337 IngestionError::Grpc(format!("failed to initialize the checkpoint stream: {e}"))
338 })?
339 .into_inner();
340
341 while let Some(grpc_checkpoint) = self
342 .token
343 .run_until_cancelled(checkpoints_stream.try_next())
344 .await
345 .transpose()?
346 .flatten()
347 {
348 let checkpoint = grpc_checkpoint.checkpoint_data()?.try_into()?;
349 let size = bcs::serialized_size(&checkpoint)?;
350 self.send_remote_checkpoint_with_capacity_check(Arc::new(checkpoint), size)
351 .await?;
352 }
353
354 Ok(())
355 }
356
357 async fn fetch_and_send_to_channel(&mut self) -> IngestionResult<()> {
364 let Some(remote_store) = self.remote_store.as_ref().map(Arc::clone) else {
365 return Ok(());
366 };
367 let batch_size = self.reader_options.batch_size;
368 match remote_store.as_ref() {
369 RemoteStore::Fullnode(client) => {
370 self.relay_from_fullnode(&mut client.clone()).await?;
371 }
372 RemoteStore::HybridHistoricalStore { historical, live } => {
373 if let Some(Err(err)) = self
374 .token
375 .clone()
376 .run_until_cancelled(self.relay_from_historical(historical))
377 .await
378 {
379 if matches!(err, IngestionError::CheckpointNotAvailableYet) {
380 let live = live.as_ref().ok_or(err)?;
381 return self.relay_from_live(batch_size, live).await;
382 }
383 return Err(err);
384 }
385 }
386 };
387 Ok(())
388 }
389
390 async fn fetch_and_send_to_channel_with_retry(&mut self) {
394 let mut backoff = backoff::ExponentialBackoff::default();
395 backoff.max_elapsed_time = Some(Duration::from_secs(60));
396 backoff.initial_interval = Duration::from_millis(100);
397 backoff.current_interval = backoff.initial_interval;
398 backoff.multiplier = 1.0;
399
400 loop {
401 match self.fetch_and_send_to_channel().await {
402 Ok(_) => break,
403 Err(IngestionError::MaxCheckpointsCapacityReached) => break,
404 Err(IngestionError::CheckpointNotAvailableYet) => {
405 break info!("historical reader does not have the requested checkpoint yet");
406 }
407 Err(err) => match backoff.next_backoff() {
408 Some(duration) => {
409 if !err.to_string().to_lowercase().contains("not found") {
410 debug!(
411 "remote reader retry in {} ms. Error is {err:?}",
412 duration.as_millis(),
413 );
414 }
415 if self
416 .token
417 .run_until_cancelled(tokio::time::sleep(duration))
418 .await
419 .is_none()
420 {
421 break;
422 }
423 }
424 None => {
425 break error!("remote reader transient error {err:?}");
426 }
427 },
428 }
429 }
430 }
431
432 async fn send_remote_checkpoint_with_capacity_check(
440 &mut self,
441 checkpoint: Arc<CheckpointData>,
442 size: usize,
443 ) -> IngestionResult<()> {
444 if self.exceeds_capacity(checkpoint.checkpoint_summary.sequence_number) {
445 return Err(IngestionError::MaxCheckpointsCapacityReached);
446 }
447 self.data_limiter.add(&checkpoint, size);
448 self.send_checkpoint_to_channel(checkpoint).await
449 }
450
451 async fn send_local_checkpoints_to_channel(
458 &mut self,
459 checkpoints: Vec<Arc<CheckpointData>>,
460 ) -> IngestionResult<()> {
461 for checkpoint in checkpoints {
462 if self.is_checkpoint_ahead(&checkpoint, self.current_checkpoint_number) {
463 break;
464 }
465 self.send_checkpoint_to_channel(checkpoint).await?;
466 }
467 Ok(())
468 }
469
470 async fn send_checkpoint_to_channel(
477 &mut self,
478 checkpoint: Arc<CheckpointData>,
479 ) -> IngestionResult<()> {
480 assert_eq!(
481 checkpoint.checkpoint_summary.sequence_number,
482 self.current_checkpoint_number
483 );
484 self.checkpoint_tx.send(checkpoint).await.map_err(|_| {
485 IngestionError::Channel(
486 "unable to send checkpoint to executor, receiver half closed".to_owned(),
487 )
488 })?;
489 self.current_checkpoint_number += 1;
490 Ok(())
491 }
492
493 async fn sync(&mut self) -> IngestionResult<()> {
496 let mut remote_source = ReadSource::Local;
497 let checkpoints = self.read_local_files_with_retry().await?;
498 let should_fetch_from_remote = self.should_fetch_from_remote(&checkpoints);
499
500 if should_fetch_from_remote {
501 remote_source = ReadSource::Remote;
502 self.fetch_and_send_to_channel_with_retry().await;
503 } else {
504 self.send_local_checkpoints_to_channel(checkpoints).await?;
505 }
506
507 info!(
508 "Read from {remote_source}. Current checkpoint number: {}, pruning watermark: {}",
509 self.current_checkpoint_number, self.last_pruned_watermark,
510 );
511
512 Ok(())
513 }
514
515 async fn run(mut self) {
517 let (_inotify_tx, mut inotify_rx) = mpsc::channel::<()>(1);
518 std::fs::create_dir_all(self.path()).expect("failed to create a directory");
519
520 #[cfg(not(target_os = "macos"))]
521 let _watcher = init_watcher(_inotify_tx, self.path());
522
523 self.data_limiter.gc(self.last_pruned_watermark);
524 self.gc_processed_files(self.last_pruned_watermark)
525 .expect("failed to clean the directory");
526
527 loop {
528 tokio::select! {
529 _ = self.token.cancelled() => break,
530 Some(watermark) = self.gc_signal_rx.recv() => {
531 self.data_limiter.gc(watermark);
532 self.gc_processed_files(watermark).expect("failed to clean the directory");
533 }
534 Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.reader_options.tick_interval_ms), inotify_rx.recv()) => {
535 self.sync().await.expect("failed to read checkpoint files");
536 }
537 }
538 }
539 }
540}
541
542pub(crate) struct CheckpointReader {
549 handle: JoinHandle<()>,
550 gc_signal_tx: mpsc::Sender<CheckpointSequenceNumber>,
551 checkpoint_rx: mpsc::Receiver<Arc<CheckpointData>>,
552 token: CancellationToken,
553}
554
555impl CheckpointReader {
556 pub(crate) async fn new(
557 starting_checkpoint_number: CheckpointSequenceNumber,
558 config: CheckpointReaderConfigExt,
559 ) -> IngestionResult<Self> {
560 if config.fullnode_transaction_filter.is_some()
561 && !matches!(config.base.remote_store_url, Some(RemoteUrl::Fullnode(_)))
562 {
563 return Err(IngestionError::Unsupported(
564 "filter is only supported on `RemoteUrl::Fullnode` connections".into(),
565 ));
566 }
567
568 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
569 let (gc_signal_tx, gc_signal_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
570
571 let remote_store = if let Some(url) = config.base.remote_store_url {
572 Some(Arc::new(
573 RemoteStore::new(
574 url,
575 config.base.reader_options.batch_size,
576 config.base.reader_options.timeout_secs,
577 )
578 .await?,
579 ))
580 } else {
581 None
582 };
583
584 let path = match config.base.ingestion_path {
585 Some(p) => p,
586 None => tempfile::tempdir()?.keep(),
587 };
588 let token = CancellationToken::new();
589 let reader = CheckpointReaderActor {
590 path,
591 current_checkpoint_number: starting_checkpoint_number,
592 last_pruned_watermark: starting_checkpoint_number,
593 checkpoint_tx,
594 gc_signal_rx,
595 remote_store,
596 token: token.clone(),
597 data_limiter: DataLimiter::new(config.base.reader_options.data_limit),
598 reader_options: config.base.reader_options,
599 fullnode_transaction_filter: config.fullnode_transaction_filter,
600 };
601
602 let handle = spawn_monitored_task!(reader.run());
603
604 Ok(Self {
605 handle,
606 gc_signal_tx,
607 checkpoint_rx,
608 token,
609 })
610 }
611
612 pub(crate) async fn checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
614 self.checkpoint_rx.recv().await
615 }
616
617 pub(crate) async fn send_gc_signal(
624 &self,
625 watermark: CheckpointSequenceNumber,
626 ) -> IngestionResult<()> {
627 self.gc_signal_tx.send(watermark).await.map_err(|_| {
628 IngestionError::Channel(
629 "unable to send GC operation to checkpoint reader, receiver half closed".into(),
630 )
631 })
632 }
633
634 pub(crate) async fn shutdown(self) -> IngestionResult<()> {
640 self.token.cancel();
641 self.handle.await.map_err(|err| IngestionError::Shutdown {
642 component: "checkpoint reader".into(),
643 msg: err.to_string(),
644 })
645 }
646}