Skip to main content

iota_data_ingestion_core/reader/
v2.rs

1// Copyright (c) 2025 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    num::NonZeroUsize,
6    path::{Path, PathBuf},
7    sync::Arc,
8    time::Duration,
9};
10
11use backoff::backoff::Backoff;
12use futures::{StreamExt, TryStreamExt};
13use iota_config::{
14    node::ArchiveReaderConfig,
15    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
16};
17use iota_grpc_client::Client as GrpcClient;
18use iota_metrics::spawn_monitored_task;
19use iota_types::{
20    full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
21};
22use object_store::ObjectStore;
23use serde::{Deserialize, Serialize};
24use tap::Pipe;
25use tokio::{
26    sync::mpsc::{self},
27    task::JoinHandle,
28    time::timeout,
29};
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, info};
32
33#[cfg(not(target_os = "macos"))]
34use crate::reader::fetch::init_watcher;
35use crate::{
36    IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS,
37    config::CheckpointReaderConfigExt,
38    create_remote_store_client,
39    history::reader::HistoricalReader,
40    reader::{
41        ReaderOptions,
42        common::DataLimiter,
43        fetch::{
44            GRPC_MAX_DECODING_MESSAGE_SIZE_BYTES, LocalRead, ReadSource, fetch_from_object_store,
45        },
46        filters::fullnode::TransactionFilter,
47    },
48};
49
50/// Available sources for checkpoint streams supported by the ingestion
51/// framework.
52///
53/// This enum represents the different types of remote sources from which
54/// checkpoint data can be fetched. Each variant corresponds to a supported
55/// backend or combination of backends for checkpoint retrieval.
56#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
57pub enum RemoteUrl {
58    /// The URL to the Fullnode server that exposes
59    /// checkpoint data streaming through gRPC.
60    ///
61    /// # Example
62    /// ```text
63    /// "http://127.0.0.1:50051"
64    /// ```
65    Fullnode(String),
66    /// A hybrid source combining historical object store and optional live
67    /// object store.
68    HybridHistoricalStore {
69        /// The URL path to the historical object store that contains `*.chk`,
70        /// `*.sum` & `MANIFEST` files.
71        ///
72        /// # Example
73        /// ```text
74        /// "https://checkpoints.mainnet.iota.cafe/ingestion/historical"
75        /// ```
76        historical_url: String,
77        /// The URL path to the live object store that contains `*.chk`
78        /// checkpoint files.
79        ///
80        /// # Example
81        /// ```text
82        /// "https://checkpoints.mainnet.iota.cafe/ingestion/live"
83        /// ```
84        live_url: Option<String>,
85    },
86}
87
88/// Represents a remote backend for checkpoint data retrieval.
89///
90/// This enum encapsulates the supported remote storage mechanisms that can be
91/// used by the ingestion framework to fetch checkpoint data. Each variant
92/// corresponds to a different type of remote source.
93enum RemoteStore {
94    Fullnode(GrpcClient),
95    HybridHistoricalStore {
96        historical: HistoricalReader,
97        live: Option<Box<dyn ObjectStore>>,
98    },
99}
100
101impl RemoteStore {
102    async fn new(
103        remote_url: RemoteUrl,
104        batch_size: usize,
105        timeout_secs: u64,
106    ) -> IngestionResult<Self> {
107        let store = match remote_url {
108            RemoteUrl::Fullnode(ref url) => {
109                let grpc_client = GrpcClient::new(url).map(|client| {
110                    client.with_max_decoding_message_size(GRPC_MAX_DECODING_MESSAGE_SIZE_BYTES)
111                })?;
112                RemoteStore::Fullnode(grpc_client)
113            }
114            RemoteUrl::HybridHistoricalStore {
115                historical_url,
116                live_url,
117            } => {
118                let config = ArchiveReaderConfig {
119                    download_concurrency: NonZeroUsize::new(batch_size)
120                        .expect("batch size must be greater than zero"),
121                    remote_store_config: ObjectStoreConfig {
122                        object_store: Some(ObjectStoreType::S3),
123                        object_store_connection_limit: 20,
124                        aws_endpoint: Some(historical_url),
125                        aws_virtual_hosted_style_request: true,
126                        no_sign_request: true,
127                        ..Default::default()
128                    },
129                    use_for_pruning_watermark: false,
130                };
131                let historical = HistoricalReader::new(config)
132                    .inspect_err(|e| error!("unable to instantiate historical reader: {e}"))?;
133
134                let live = live_url
135                    .map(|url| create_remote_store_client(url, Default::default(), timeout_secs))
136                    .transpose()?;
137
138                RemoteStore::HybridHistoricalStore { historical, live }
139            }
140        };
141        Ok(store)
142    }
143}
144
145/// Configuration options to control the behavior of a checkpoint
146/// reader.
147#[derive(Default, Clone)]
148pub struct CheckpointReaderConfig {
149    /// Config the checkpoint reader behavior for downloading new checkpoints.
150    pub reader_options: ReaderOptions,
151    /// Local path for checkpoint ingestion. If not provided, checkpoints will
152    /// be ingested from a temporary directory.
153    pub ingestion_path: Option<PathBuf>,
154    /// Remote source for checkpoint data stream.
155    pub remote_store_url: Option<RemoteUrl>,
156}
157
158/// Internal actor responsible for reading and streaming checkpoints.
159///
160/// `CheckpointReaderActor` is the core background task that manages the logic
161/// for fetching, batching, and streaming checkpoint data from local or remote
162/// sources. It handles checkpoint discovery, garbage collection signals, and
163/// coordinates with remote fetchers as needed.
164///
165/// This struct is intended to be run as an asynchronous task and is not
166/// typically interacted with directly. Instead, users should use
167/// [`CheckpointReader`], which provides a safe and ergonomic API for
168/// interacting with the running actor, such as receiving checkpoints, sending
169/// GC signals, or triggering shutdown.
170///
171/// # Responsibilities
172/// - Periodically scans for new checkpoints from configured sources.
173/// - Streams checkpoints to consumers via channels.
174/// - Handles garbage collection signals to prune processed checkpoints.
175/// - Coordinates with remote fetchers for batch downloads and retries.
176///
177/// # Usage
178/// Users should not construct or manage `CheckpointReader` directly. Instead,
179/// use [`CheckpointReader::new`] to spawn the actor and obtain a handle
180/// for interaction.
181struct CheckpointReaderActor {
182    /// Filesystem path to the local checkpoint directory.
183    path: PathBuf,
184    /// Start fetch from the current checkpoint sequence.
185    current_checkpoint_number: CheckpointSequenceNumber,
186    /// Keeps tracks the last processed checkpoint sequence number, used to
187    /// delete checkpoint files from ingestion path.
188    last_pruned_watermark: CheckpointSequenceNumber,
189    /// Channel for sending checkpoints to WorkerPools.
190    checkpoint_tx: mpsc::Sender<Arc<CheckpointData>>,
191    /// Sends a garbage collection (GC) signal to prune checkpoint files below
192    /// the specified watermark.
193    gc_signal_rx: mpsc::Receiver<CheckpointSequenceNumber>,
194    /// Remote checkpoint reader for fetching checkpoints from the network.
195    remote_store: Option<Arc<RemoteStore>>,
196    /// Shutdown signal for the actor.
197    token: CancellationToken,
198    /// Configures the behavior of the checkpoint reader.
199    reader_options: ReaderOptions,
200    /// Limit the amount of downloaded checkpoints held in memory to avoid OOM.
201    data_limiter: DataLimiter,
202    /// Filter applied to transactions within a checkpoint.
203    fullnode_transaction_filter: Option<TransactionFilter>,
204}
205
206impl LocalRead for CheckpointReaderActor {
207    fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
208        ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
209            || self.data_limiter.exceeds()
210    }
211
212    fn path(&self) -> &Path {
213        &self.path
214    }
215
216    fn current_checkpoint_number(&self) -> CheckpointSequenceNumber {
217        self.current_checkpoint_number
218    }
219
220    fn update_last_pruned_watermark(&mut self, watermark: CheckpointSequenceNumber) {
221        self.last_pruned_watermark = watermark;
222    }
223}
224
225impl CheckpointReaderActor {
226    fn should_fetch_from_remote(&self, checkpoints: &[Arc<CheckpointData>]) -> bool {
227        self.remote_store.is_some()
228            && (checkpoints.is_empty()
229                || self.is_checkpoint_ahead(&checkpoints[0], self.current_checkpoint_number))
230    }
231
232    /// Fetches checkpoints from the historical object store and streams them to
233    /// a channel.
234    async fn relay_from_historical(
235        &mut self,
236        historical_reader: &HistoricalReader,
237    ) -> IngestionResult<()> {
238        // Only sync the manifest when needed to avoid unnecessary network calls.
239        // If the requested checkpoint is beyond what's currently available in our
240        // cached manifest, we need to refresh it to check for newer checkpoints.
241        if self.current_checkpoint_number > historical_reader.latest_available_checkpoint().await? {
242            timeout(
243                Duration::from_secs(self.reader_options.timeout_secs),
244                historical_reader.sync_manifest_once(),
245            )
246            .await
247            .map_err(|_| {
248                IngestionError::HistoryRead("reading manifest exceeded the timeout".into())
249            })??;
250
251            // Verify the requested checkpoint is now available after the manifest refresh.
252            // If it's still not available, the checkpoint hasn't been published yet.
253            if self.current_checkpoint_number
254                > historical_reader.latest_available_checkpoint().await?
255            {
256                return Err(IngestionError::CheckpointNotAvailableYet);
257            }
258        }
259
260        let manifest = historical_reader.get_manifest().await;
261
262        let files = historical_reader.verify_and_get_manifest_files(manifest)?;
263
264        let start_index = match files.binary_search_by_key(&self.current_checkpoint_number, |s| {
265            s.checkpoint_seq_range.start
266        }) {
267            Ok(index) => index,
268            Err(index) => index - 1,
269        };
270
271        for metadata in files
272            .into_iter()
273            .enumerate()
274            .filter_map(|(index, metadata)| (index >= start_index).then_some(metadata))
275        {
276            let checkpoints = timeout(
277                Duration::from_secs(self.reader_options.timeout_secs),
278                historical_reader.iter_for_file(metadata.file_path()),
279            )
280            .await
281            .map_err(|_| {
282                IngestionError::HistoryRead(format!(
283                    "reading checkpoint {} exceeded the timeout",
284                    metadata.file_path()
285                ))
286            })??
287            .filter(|c| c.checkpoint_summary.sequence_number >= self.current_checkpoint_number)
288            .collect::<Vec<CheckpointData>>();
289
290            for checkpoint in checkpoints {
291                let size = bcs::serialized_size(&checkpoint)?;
292                self.send_remote_checkpoint_with_capacity_check(Arc::new(checkpoint), size)
293                    .await?;
294            }
295        }
296
297        Ok(())
298    }
299
300    /// Fetches checkpoints from the live object store and streams them to a
301    /// channel.
302    async fn relay_from_live(
303        &mut self,
304        batch_size: usize,
305        live: &dyn ObjectStore,
306    ) -> IngestionResult<()> {
307        let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
308            .map(|checkpoint_number| fetch_from_object_store(live, checkpoint_number))
309            .pipe(futures::stream::iter)
310            .buffered(batch_size);
311        while let Some((checkpoint, size)) = self
312            .token
313            .run_until_cancelled(checkpoint_stream.try_next())
314            .await
315            .transpose()?
316            .flatten()
317        {
318            self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
319                .await?;
320        }
321        Ok(())
322    }
323
324    /// Fetches checkpoints from the fullnode through a gRPC streaming
325    /// connection and streams them to a channel.
326    async fn relay_from_fullnode(&mut self, client: &mut GrpcClient) -> IngestionResult<()> {
327        let mut checkpoints_stream = client
328            .stream_checkpoints(
329                Some(self.current_checkpoint_number),
330                None,
331                Some(iota_grpc_client::CHECKPOINT_RESPONSE_CHECKPOINT_DATA.into()),
332                self.fullnode_transaction_filter.clone().map(Into::into),
333                None,
334            )
335            .await
336            .map_err(|e| {
337                IngestionError::Grpc(format!("failed to initialize the checkpoint stream: {e}"))
338            })?
339            .into_inner();
340
341        while let Some(grpc_checkpoint) = self
342            .token
343            .run_until_cancelled(checkpoints_stream.try_next())
344            .await
345            .transpose()?
346            .flatten()
347        {
348            let checkpoint = grpc_checkpoint.checkpoint_data()?.try_into()?;
349            let size = bcs::serialized_size(&checkpoint)?;
350            self.send_remote_checkpoint_with_capacity_check(Arc::new(checkpoint), size)
351                .await?;
352        }
353
354        Ok(())
355    }
356
357    /// Fetches remote checkpoints from the remote store and streams them to the
358    /// channel.
359    ///
360    /// For every successfully fetched checkpoint, this function updates the
361    /// current checkpoint number and the data limiter. If an error occurs while
362    /// fetching a checkpoint, the function returns immediately with that error.
363    async fn fetch_and_send_to_channel(&mut self) -> IngestionResult<()> {
364        let Some(remote_store) = self.remote_store.as_ref().map(Arc::clone) else {
365            return Ok(());
366        };
367        let batch_size = self.reader_options.batch_size;
368        match remote_store.as_ref() {
369            RemoteStore::Fullnode(client) => {
370                self.relay_from_fullnode(&mut client.clone()).await?;
371            }
372            RemoteStore::HybridHistoricalStore { historical, live } => {
373                if let Some(Err(err)) = self
374                    .token
375                    .clone()
376                    .run_until_cancelled(self.relay_from_historical(historical))
377                    .await
378                {
379                    if matches!(err, IngestionError::CheckpointNotAvailableYet) {
380                        let live = live.as_ref().ok_or(err)?;
381                        return self.relay_from_live(batch_size, live).await;
382                    }
383                    return Err(err);
384                }
385            }
386        };
387        Ok(())
388    }
389
390    /// Fetches and sends checkpoints to the channel with retry logic.
391    ///
392    /// Uses an exponential backoff strategy to retry failed requests.
393    async fn fetch_and_send_to_channel_with_retry(&mut self) {
394        let mut backoff = backoff::ExponentialBackoff::default();
395        backoff.max_elapsed_time = Some(Duration::from_secs(60));
396        backoff.initial_interval = Duration::from_millis(100);
397        backoff.current_interval = backoff.initial_interval;
398        backoff.multiplier = 1.0;
399
400        loop {
401            match self.fetch_and_send_to_channel().await {
402                Ok(_) => break,
403                Err(IngestionError::MaxCheckpointsCapacityReached) => break,
404                Err(IngestionError::CheckpointNotAvailableYet) => {
405                    break info!("historical reader does not have the requested checkpoint yet");
406                }
407                Err(err) => match backoff.next_backoff() {
408                    Some(duration) => {
409                        if !err.to_string().to_lowercase().contains("not found") {
410                            debug!(
411                                "remote reader retry in {} ms. Error is {err:?}",
412                                duration.as_millis(),
413                            );
414                        }
415                        if self
416                            .token
417                            .run_until_cancelled(tokio::time::sleep(duration))
418                            .await
419                            .is_none()
420                        {
421                            break;
422                        }
423                    }
424                    None => {
425                        break error!("remote reader transient error {err:?}");
426                    }
427                },
428            }
429        }
430    }
431
432    /// Attempts to send a checkpoint from remote source to the channel if
433    /// capacity allows.
434    ///
435    /// If the checkpoint's sequence number would exceed the allowed capacity,
436    /// returns `IngestionError::MaxCheckpointsCapacityReached` and does not
437    /// send. Otherwise, adds the checkpoint to the data limiter and sends
438    /// it to the channel.
439    async fn send_remote_checkpoint_with_capacity_check(
440        &mut self,
441        checkpoint: Arc<CheckpointData>,
442        size: usize,
443    ) -> IngestionResult<()> {
444        if self.exceeds_capacity(checkpoint.checkpoint_summary.sequence_number) {
445            return Err(IngestionError::MaxCheckpointsCapacityReached);
446        }
447        self.data_limiter.add(&checkpoint, size);
448        self.send_checkpoint_to_channel(checkpoint).await
449    }
450
451    /// Sends a batch of local checkpoints to the channel in order.
452    ///
453    /// Each checkpoint is sent sequentially until a gap is detected (i.e., a
454    /// checkpoint with a sequence number greater than the current
455    /// checkpoint number). If a gap is found, the function breaks early. If
456    /// sending fails, returns the error immediately.
457    async fn send_local_checkpoints_to_channel(
458        &mut self,
459        checkpoints: Vec<Arc<CheckpointData>>,
460    ) -> IngestionResult<()> {
461        for checkpoint in checkpoints {
462            if self.is_checkpoint_ahead(&checkpoint, self.current_checkpoint_number) {
463                break;
464            }
465            self.send_checkpoint_to_channel(checkpoint).await?;
466        }
467        Ok(())
468    }
469
470    /// Sends a single checkpoint to the channel and advances the current
471    /// checkpoint number.
472    ///
473    /// Asserts that the checkpoint's sequence number matches the expected
474    /// current number. Increments the current checkpoint number after
475    /// sending.
476    async fn send_checkpoint_to_channel(
477        &mut self,
478        checkpoint: Arc<CheckpointData>,
479    ) -> IngestionResult<()> {
480        assert_eq!(
481            checkpoint.checkpoint_summary.sequence_number,
482            self.current_checkpoint_number
483        );
484        self.checkpoint_tx.send(checkpoint).await.map_err(|_| {
485            IngestionError::Channel(
486                "unable to send checkpoint to executor, receiver half closed".to_owned(),
487            )
488        })?;
489        self.current_checkpoint_number += 1;
490        Ok(())
491    }
492
493    /// Sync from either local or remote source new checkpoints to be processed
494    /// by the executor.
495    async fn sync(&mut self) -> IngestionResult<()> {
496        let mut remote_source = ReadSource::Local;
497        let checkpoints = self.read_local_files_with_retry().await?;
498        let should_fetch_from_remote = self.should_fetch_from_remote(&checkpoints);
499
500        if should_fetch_from_remote {
501            remote_source = ReadSource::Remote;
502            self.fetch_and_send_to_channel_with_retry().await;
503        } else {
504            self.send_local_checkpoints_to_channel(checkpoints).await?;
505        }
506
507        info!(
508            "Read from {remote_source}. Current checkpoint number: {}, pruning watermark: {}",
509            self.current_checkpoint_number, self.last_pruned_watermark,
510        );
511
512        Ok(())
513    }
514
515    /// Run the main loop of the checkpoint reader actor.
516    async fn run(mut self) {
517        let (_inotify_tx, mut inotify_rx) = mpsc::channel::<()>(1);
518        std::fs::create_dir_all(self.path()).expect("failed to create a directory");
519
520        #[cfg(not(target_os = "macos"))]
521        let _watcher = init_watcher(_inotify_tx, self.path());
522
523        self.data_limiter.gc(self.last_pruned_watermark);
524        self.gc_processed_files(self.last_pruned_watermark)
525            .expect("failed to clean the directory");
526
527        loop {
528            tokio::select! {
529                _ = self.token.cancelled() => break,
530                Some(watermark) = self.gc_signal_rx.recv() => {
531                    self.data_limiter.gc(watermark);
532                    self.gc_processed_files(watermark).expect("failed to clean the directory");
533                }
534                Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.reader_options.tick_interval_ms), inotify_rx.recv())  => {
535                    self.sync().await.expect("failed to read checkpoint files");
536                }
537            }
538        }
539    }
540}
541
542/// Public API for interacting with the checkpoint reader actor.
543///
544/// It provides methods to receive streamed checkpoints, send garbage collection
545/// signals, and gracefully shut down the background checkpoint reading task.
546/// Internally, it communicates with a [`CheckpointReaderActor`], which manages
547/// the actual checkpoint fetching and streaming logic.
548pub(crate) struct CheckpointReader {
549    handle: JoinHandle<()>,
550    gc_signal_tx: mpsc::Sender<CheckpointSequenceNumber>,
551    checkpoint_rx: mpsc::Receiver<Arc<CheckpointData>>,
552    token: CancellationToken,
553}
554
555impl CheckpointReader {
556    pub(crate) async fn new(
557        starting_checkpoint_number: CheckpointSequenceNumber,
558        config: CheckpointReaderConfigExt,
559    ) -> IngestionResult<Self> {
560        if config.fullnode_transaction_filter.is_some()
561            && !matches!(config.base.remote_store_url, Some(RemoteUrl::Fullnode(_)))
562        {
563            return Err(IngestionError::Unsupported(
564                "filter is only supported on `RemoteUrl::Fullnode` connections".into(),
565            ));
566        }
567
568        let (checkpoint_tx, checkpoint_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
569        let (gc_signal_tx, gc_signal_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
570
571        let remote_store = if let Some(url) = config.base.remote_store_url {
572            Some(Arc::new(
573                RemoteStore::new(
574                    url,
575                    config.base.reader_options.batch_size,
576                    config.base.reader_options.timeout_secs,
577                )
578                .await?,
579            ))
580        } else {
581            None
582        };
583
584        let path = match config.base.ingestion_path {
585            Some(p) => p,
586            None => tempfile::tempdir()?.keep(),
587        };
588        let token = CancellationToken::new();
589        let reader = CheckpointReaderActor {
590            path,
591            current_checkpoint_number: starting_checkpoint_number,
592            last_pruned_watermark: starting_checkpoint_number,
593            checkpoint_tx,
594            gc_signal_rx,
595            remote_store,
596            token: token.clone(),
597            data_limiter: DataLimiter::new(config.base.reader_options.data_limit),
598            reader_options: config.base.reader_options,
599            fullnode_transaction_filter: config.fullnode_transaction_filter,
600        };
601
602        let handle = spawn_monitored_task!(reader.run());
603
604        Ok(Self {
605            handle,
606            gc_signal_tx,
607            checkpoint_rx,
608            token,
609        })
610    }
611
612    /// Read downloaded checkpoints from the queue.
613    pub(crate) async fn checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
614        self.checkpoint_rx.recv().await
615    }
616
617    /// Sends a garbage collection (GC) signal to the checkpoint reader.
618    ///
619    /// Transmits a watermark to the checkpoint reader, indicating that all
620    /// checkpoints below this watermark can be safely pruned or cleaned up.
621    /// The signal is sent over an internal channel to the checkpoint reader
622    /// task.
623    pub(crate) async fn send_gc_signal(
624        &self,
625        watermark: CheckpointSequenceNumber,
626    ) -> IngestionResult<()> {
627        self.gc_signal_tx.send(watermark).await.map_err(|_| {
628            IngestionError::Channel(
629                "unable to send GC operation to checkpoint reader, receiver half closed".into(),
630            )
631        })
632    }
633
634    /// Gracefully shuts down the checkpoint reader task.
635    ///
636    /// It signals the background checkpoint reader actor to terminate, then
637    /// awaits its completion. Any in-progress checkpoint reading or streaming
638    /// operations will be stopped as part of the shutdown process.
639    pub(crate) async fn shutdown(self) -> IngestionResult<()> {
640        self.token.cancel();
641        self.handle.await.map_err(|err| IngestionError::Shutdown {
642            component: "checkpoint reader".into(),
643            msg: err.to_string(),
644        })
645    }
646}