iota_data_ingestion_core/reader/
v2.rs

1// Copyright (c) 2025 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    num::NonZeroUsize,
6    path::{Path, PathBuf},
7    sync::Arc,
8    time::Duration,
9};
10
11use backoff::backoff::Backoff;
12use futures::{StreamExt, TryStreamExt};
13use iota_config::{
14    node::ArchiveReaderConfig,
15    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
16};
17use iota_grpc_client::Client as GrpcClient;
18use iota_metrics::spawn_monitored_task;
19use iota_types::{
20    full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
21};
22use object_store::ObjectStore;
23use serde::{Deserialize, Serialize};
24use tap::Pipe;
25use tokio::{
26    sync::mpsc::{self},
27    task::JoinHandle,
28    time::timeout,
29};
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, info};
32
33#[cfg(not(target_os = "macos"))]
34use crate::reader::fetch::init_watcher;
35use crate::{
36    IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, create_remote_store_client,
37    history::reader::HistoricalReader,
38    reader::{
39        ReaderOptions,
40        common::DataLimiter,
41        fetch::{
42            GRPC_MAX_DECODING_MESSAGE_SIZE_BYTES, LocalRead, ReadSource, fetch_from_object_store,
43        },
44    },
45};
46
47/// Available sources for checkpoint streams supported by the ingestion
48/// framework.
49///
50/// This enum represents the different types of remote sources from which
51/// checkpoint data can be fetched. Each variant corresponds to a supported
52/// backend or combination of backends for checkpoint retrieval.
53#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
54pub enum RemoteUrl {
55    /// The URL to the Fullnode server that exposes
56    /// checkpoint data streaming through gRPC.
57    ///
58    /// # Example
59    /// ```text
60    /// "http://127.0.0.1:50051"
61    /// ```
62    Fullnode(String),
63    /// A hybrid source combining historical object store and optional live
64    /// object store.
65    HybridHistoricalStore {
66        /// The URL path to the historical object store that contains `*.chk`,
67        /// `*.sum` & `MANIFEST` files.
68        ///
69        /// # Example
70        /// ```text
71        /// "https://checkpoints.mainnet.iota.cafe/ingestion/historical"
72        /// ```
73        historical_url: String,
74        /// The URL path to the live object store that contains `*.chk`
75        /// checkpoint files.
76        ///
77        /// # Example
78        /// ```text
79        /// "https://checkpoints.mainnet.iota.cafe/ingestion/live"
80        /// ```
81        live_url: Option<String>,
82    },
83}
84
85/// Represents a remote backend for checkpoint data retrieval.
86///
87/// This enum encapsulates the supported remote storage mechanisms that can be
88/// used by the ingestion framework to fetch checkpoint data. Each variant
89/// corresponds to a different type of remote source.
90enum RemoteStore {
91    Fullnode(GrpcClient),
92    HybridHistoricalStore {
93        historical: HistoricalReader,
94        live: Option<Box<dyn ObjectStore>>,
95    },
96}
97
98impl RemoteStore {
99    async fn new(
100        remote_url: RemoteUrl,
101        batch_size: usize,
102        timeout_secs: u64,
103    ) -> IngestionResult<Self> {
104        let store = match remote_url {
105            RemoteUrl::Fullnode(ref url) => {
106                let grpc_client = GrpcClient::connect(url).await.map(|client| {
107                    client.with_max_decoding_message_size(GRPC_MAX_DECODING_MESSAGE_SIZE_BYTES)
108                })?;
109                RemoteStore::Fullnode(grpc_client)
110            }
111            RemoteUrl::HybridHistoricalStore {
112                historical_url,
113                live_url,
114            } => {
115                let config = ArchiveReaderConfig {
116                    download_concurrency: NonZeroUsize::new(batch_size)
117                        .expect("batch size must be greater than zero"),
118                    remote_store_config: ObjectStoreConfig {
119                        object_store: Some(ObjectStoreType::S3),
120                        object_store_connection_limit: 20,
121                        aws_endpoint: Some(historical_url),
122                        aws_virtual_hosted_style_request: true,
123                        no_sign_request: true,
124                        ..Default::default()
125                    },
126                    use_for_pruning_watermark: false,
127                };
128                let historical = HistoricalReader::new(config)
129                    .inspect_err(|e| error!("unable to instantiate historical reader: {e}"))?;
130
131                let live = live_url
132                    .map(|url| create_remote_store_client(url, Default::default(), timeout_secs))
133                    .transpose()?;
134
135                RemoteStore::HybridHistoricalStore { historical, live }
136            }
137        };
138        Ok(store)
139    }
140}
141
142/// Configuration options to control the behavior of a checkpoint
143/// reader.
144#[derive(Default, Clone)]
145pub struct CheckpointReaderConfig {
146    /// Config the checkpoint reader behavior for downloading new checkpoints.
147    pub reader_options: ReaderOptions,
148    /// Local path for checkpoint ingestion. If not provided, checkpoints will
149    /// be ingested from a temporary directory.
150    pub ingestion_path: Option<PathBuf>,
151    /// Remote source for checkpoint data stream.
152    pub remote_store_url: Option<RemoteUrl>,
153}
154
155/// Internal actor responsible for reading and streaming checkpoints.
156///
157/// `CheckpointReaderActor` is the core background task that manages the logic
158/// for fetching, batching, and streaming checkpoint data from local or remote
159/// sources. It handles checkpoint discovery, garbage collection signals, and
160/// coordinates with remote fetchers as needed.
161///
162/// This struct is intended to be run as an asynchronous task and is not
163/// typically interacted with directly. Instead, users should use
164/// [`CheckpointReader`], which provides a safe and ergonomic API for
165/// interacting with the running actor, such as receiving checkpoints, sending
166/// GC signals, or triggering shutdown.
167///
168/// # Responsibilities
169/// - Periodically scans for new checkpoints from configured sources.
170/// - Streams checkpoints to consumers via channels.
171/// - Handles garbage collection signals to prune processed checkpoints.
172/// - Coordinates with remote fetchers for batch downloads and retries.
173///
174/// # Usage
175/// Users should not construct or manage `CheckpointReader` directly. Instead,
176/// use [`CheckpointReader::new`] to spawn the actor and obtain a handle
177/// for interaction.
178struct CheckpointReaderActor {
179    /// Filesystem path to the local checkpoint directory.
180    path: PathBuf,
181    /// Start fetch from the current checkpoint sequence.
182    current_checkpoint_number: CheckpointSequenceNumber,
183    /// Keeps tracks the last processed checkpoint sequence number, used to
184    /// delete checkpoint files from ingestion path.
185    last_pruned_watermark: CheckpointSequenceNumber,
186    /// Channel for sending checkpoints to WorkerPools.
187    checkpoint_tx: mpsc::Sender<Arc<CheckpointData>>,
188    /// Sends a garbage collection (GC) signal to prune checkpoint files below
189    /// the specified watermark.
190    gc_signal_rx: mpsc::Receiver<CheckpointSequenceNumber>,
191    /// Remote checkpoint reader for fetching checkpoints from the network.
192    remote_store: Option<Arc<RemoteStore>>,
193    /// Shutdown signal for the actor.
194    token: CancellationToken,
195    /// Configures the behavior of the checkpoint reader.
196    reader_options: ReaderOptions,
197    /// Limit the amount of downloaded checkpoints held in memory to avoid OOM.
198    data_limiter: DataLimiter,
199}
200
201impl LocalRead for CheckpointReaderActor {
202    fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
203        ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
204            || self.data_limiter.exceeds()
205    }
206
207    fn path(&self) -> &Path {
208        &self.path
209    }
210
211    fn current_checkpoint_number(&self) -> CheckpointSequenceNumber {
212        self.current_checkpoint_number
213    }
214
215    fn update_last_pruned_watermark(&mut self, watermark: CheckpointSequenceNumber) {
216        self.last_pruned_watermark = watermark;
217    }
218}
219
220impl CheckpointReaderActor {
221    fn should_fetch_from_remote(&self, checkpoints: &[Arc<CheckpointData>]) -> bool {
222        self.remote_store.is_some()
223            && (checkpoints.is_empty()
224                || self.is_checkpoint_ahead(&checkpoints[0], self.current_checkpoint_number))
225    }
226
227    /// Fetches checkpoints from the historical object store and streams them to
228    /// a channel.
229    async fn relay_from_historical(
230        &mut self,
231        historical_reader: &HistoricalReader,
232    ) -> IngestionResult<()> {
233        // Only sync the manifest when needed to avoid unnecessary network calls.
234        // If the requested checkpoint is beyond what's currently available in our
235        // cached manifest, we need to refresh it to check for newer checkpoints.
236        if self.current_checkpoint_number > historical_reader.latest_available_checkpoint().await? {
237            timeout(
238                Duration::from_secs(self.reader_options.timeout_secs),
239                historical_reader.sync_manifest_once(),
240            )
241            .await
242            .map_err(|_| {
243                IngestionError::HistoryRead("reading manifest exceeded the timeout".into())
244            })??;
245
246            // Verify the requested checkpoint is now available after the manifest refresh.
247            // If it's still not available, the checkpoint hasn't been published yet.
248            if self.current_checkpoint_number
249                > historical_reader.latest_available_checkpoint().await?
250            {
251                return Err(IngestionError::CheckpointNotAvailableYet);
252            }
253        }
254
255        let manifest = historical_reader.get_manifest().await;
256
257        let files = historical_reader.verify_and_get_manifest_files(manifest)?;
258
259        let start_index = match files.binary_search_by_key(&self.current_checkpoint_number, |s| {
260            s.checkpoint_seq_range.start
261        }) {
262            Ok(index) => index,
263            Err(index) => index - 1,
264        };
265
266        for metadata in files
267            .into_iter()
268            .enumerate()
269            .filter_map(|(index, metadata)| (index >= start_index).then_some(metadata))
270        {
271            let checkpoints = timeout(
272                Duration::from_secs(self.reader_options.timeout_secs),
273                historical_reader.iter_for_file(metadata.file_path()),
274            )
275            .await
276            .map_err(|_| {
277                IngestionError::HistoryRead(format!(
278                    "reading checkpoint {} exceeded the timeout",
279                    metadata.file_path()
280                ))
281            })??
282            .filter(|c| c.checkpoint_summary.sequence_number >= self.current_checkpoint_number)
283            .collect::<Vec<CheckpointData>>();
284
285            for checkpoint in checkpoints {
286                let size = bcs::serialized_size(&checkpoint)?;
287                self.send_remote_checkpoint_with_capacity_check(Arc::new(checkpoint), size)
288                    .await?;
289            }
290        }
291
292        Ok(())
293    }
294
295    /// Fetches checkpoints from the live object store and streams them to a
296    /// channel.
297    async fn relay_from_live(
298        &mut self,
299        batch_size: usize,
300        live: &dyn ObjectStore,
301    ) -> IngestionResult<()> {
302        let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
303            .map(|checkpoint_number| fetch_from_object_store(live, checkpoint_number))
304            .pipe(futures::stream::iter)
305            .buffered(batch_size);
306        while let Some((checkpoint, size)) = self
307            .token
308            .run_until_cancelled(checkpoint_stream.try_next())
309            .await
310            .transpose()?
311            .flatten()
312        {
313            self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
314                .await?;
315        }
316        Ok(())
317    }
318
319    /// Fetches checkpoints from the fullnode through a gRPC streaming
320    /// connection and streams them to a channel.
321    async fn relay_from_fullnode(&mut self, client: &mut GrpcClient) -> IngestionResult<()> {
322        let mut checkpoints_stream = client
323            .stream_checkpoints(
324                Some(self.current_checkpoint_number),
325                None,
326                Some(iota_grpc_client::CHECKPOINT_RESPONSE_CHECKPOINT_DATA),
327                None,
328                None,
329            )
330            .await
331            .map_err(|e| {
332                IngestionError::Grpc(format!("failed to initialize the checkpoint stream: {e}"))
333            })?
334            .into_inner();
335
336        while let Some(grpc_checkpoint) = self
337            .token
338            .run_until_cancelled(checkpoints_stream.try_next())
339            .await
340            .transpose()?
341            .flatten()
342        {
343            let checkpoint = grpc_checkpoint.checkpoint_data()?.try_into()?;
344            let size = bcs::serialized_size(&checkpoint)?;
345            self.send_remote_checkpoint_with_capacity_check(Arc::new(checkpoint), size)
346                .await?;
347        }
348
349        Ok(())
350    }
351
352    /// Fetches remote checkpoints from the remote store and streams them to the
353    /// channel.
354    ///
355    /// For every successfully fetched checkpoint, this function updates the
356    /// current checkpoint number and the data limiter. If an error occurs while
357    /// fetching a checkpoint, the function returns immediately with that error.
358    async fn fetch_and_send_to_channel(&mut self) -> IngestionResult<()> {
359        let Some(remote_store) = self.remote_store.as_ref().map(Arc::clone) else {
360            return Ok(());
361        };
362        let batch_size = self.reader_options.batch_size;
363        match remote_store.as_ref() {
364            RemoteStore::Fullnode(client) => {
365                self.relay_from_fullnode(&mut client.clone()).await?;
366            }
367            RemoteStore::HybridHistoricalStore { historical, live } => {
368                if let Some(Err(err)) = self
369                    .token
370                    .clone()
371                    .run_until_cancelled(self.relay_from_historical(historical))
372                    .await
373                {
374                    if matches!(err, IngestionError::CheckpointNotAvailableYet) {
375                        let live = live.as_ref().ok_or(err)?;
376                        return self.relay_from_live(batch_size, live).await;
377                    }
378                    return Err(err);
379                }
380            }
381        };
382        Ok(())
383    }
384
385    /// Fetches and sends checkpoints to the channel with retry logic.
386    ///
387    /// Uses an exponential backoff strategy to retry failed requests.
388    async fn fetch_and_send_to_channel_with_retry(&mut self) {
389        let mut backoff = backoff::ExponentialBackoff::default();
390        backoff.max_elapsed_time = Some(Duration::from_secs(60));
391        backoff.initial_interval = Duration::from_millis(100);
392        backoff.current_interval = backoff.initial_interval;
393        backoff.multiplier = 1.0;
394
395        loop {
396            match self.fetch_and_send_to_channel().await {
397                Ok(_) => break,
398                Err(IngestionError::MaxCheckpointsCapacityReached) => break,
399                Err(IngestionError::CheckpointNotAvailableYet) => {
400                    break info!("historical reader does not have the requested checkpoint yet");
401                }
402                Err(err) => match backoff.next_backoff() {
403                    Some(duration) => {
404                        if !err.to_string().to_lowercase().contains("not found") {
405                            debug!(
406                                "remote reader retry in {} ms. Error is {err:?}",
407                                duration.as_millis(),
408                            );
409                        }
410                        if self
411                            .token
412                            .run_until_cancelled(tokio::time::sleep(duration))
413                            .await
414                            .is_none()
415                        {
416                            break;
417                        }
418                    }
419                    None => {
420                        break error!("remote reader transient error {err:?}");
421                    }
422                },
423            }
424        }
425    }
426
427    /// Attempts to send a checkpoint from remote source to the channel if
428    /// capacity allows.
429    ///
430    /// If the checkpoint's sequence number would exceed the allowed capacity,
431    /// returns `IngestionError::MaxCheckpointsCapacityReached` and does not
432    /// send. Otherwise, adds the checkpoint to the data limiter and sends
433    /// it to the channel.
434    async fn send_remote_checkpoint_with_capacity_check(
435        &mut self,
436        checkpoint: Arc<CheckpointData>,
437        size: usize,
438    ) -> IngestionResult<()> {
439        if self.exceeds_capacity(checkpoint.checkpoint_summary.sequence_number) {
440            return Err(IngestionError::MaxCheckpointsCapacityReached);
441        }
442        self.data_limiter.add(&checkpoint, size);
443        self.send_checkpoint_to_channel(checkpoint).await
444    }
445
446    /// Sends a batch of local checkpoints to the channel in order.
447    ///
448    /// Each checkpoint is sent sequentially until a gap is detected (i.e., a
449    /// checkpoint with a sequence number greater than the current
450    /// checkpoint number). If a gap is found, the function breaks early. If
451    /// sending fails, returns the error immediately.
452    async fn send_local_checkpoints_to_channel(
453        &mut self,
454        checkpoints: Vec<Arc<CheckpointData>>,
455    ) -> IngestionResult<()> {
456        for checkpoint in checkpoints {
457            if self.is_checkpoint_ahead(&checkpoint, self.current_checkpoint_number) {
458                break;
459            }
460            self.send_checkpoint_to_channel(checkpoint).await?;
461        }
462        Ok(())
463    }
464
465    /// Sends a single checkpoint to the channel and advances the current
466    /// checkpoint number.
467    ///
468    /// Asserts that the checkpoint's sequence number matches the expected
469    /// current number. Increments the current checkpoint number after
470    /// sending.
471    async fn send_checkpoint_to_channel(
472        &mut self,
473        checkpoint: Arc<CheckpointData>,
474    ) -> IngestionResult<()> {
475        assert_eq!(
476            checkpoint.checkpoint_summary.sequence_number,
477            self.current_checkpoint_number
478        );
479        self.checkpoint_tx.send(checkpoint).await.map_err(|_| {
480            IngestionError::Channel(
481                "unable to send checkpoint to executor, receiver half closed".to_owned(),
482            )
483        })?;
484        self.current_checkpoint_number += 1;
485        Ok(())
486    }
487
488    /// Sync from either local or remote source new checkpoints to be processed
489    /// by the executor.
490    async fn sync(&mut self) -> IngestionResult<()> {
491        let mut remote_source = ReadSource::Local;
492        let checkpoints = self.read_local_files_with_retry().await?;
493        let should_fetch_from_remote = self.should_fetch_from_remote(&checkpoints);
494
495        if should_fetch_from_remote {
496            remote_source = ReadSource::Remote;
497            self.fetch_and_send_to_channel_with_retry().await;
498        } else {
499            self.send_local_checkpoints_to_channel(checkpoints).await?;
500        }
501
502        info!(
503            "Read from {remote_source}. Current checkpoint number: {}, pruning watermark: {}",
504            self.current_checkpoint_number, self.last_pruned_watermark,
505        );
506
507        Ok(())
508    }
509
510    /// Run the main loop of the checkpoint reader actor.
511    async fn run(mut self) {
512        let (_inotify_tx, mut inotify_rx) = mpsc::channel::<()>(1);
513        std::fs::create_dir_all(self.path()).expect("failed to create a directory");
514
515        #[cfg(not(target_os = "macos"))]
516        let _watcher = init_watcher(_inotify_tx, self.path());
517
518        self.data_limiter.gc(self.last_pruned_watermark);
519        self.gc_processed_files(self.last_pruned_watermark)
520            .expect("failed to clean the directory");
521
522        loop {
523            tokio::select! {
524                _ = self.token.cancelled() => break,
525                Some(watermark) = self.gc_signal_rx.recv() => {
526                    self.data_limiter.gc(watermark);
527                    self.gc_processed_files(watermark).expect("failed to clean the directory");
528                }
529                Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.reader_options.tick_interval_ms), inotify_rx.recv())  => {
530                    self.sync().await.expect("failed to read checkpoint files");
531                }
532            }
533        }
534    }
535}
536
537/// Public API for interacting with the checkpoint reader actor.
538///
539/// It provides methods to receive streamed checkpoints, send garbage collection
540/// signals, and gracefully shut down the background checkpoint reading task.
541/// Internally, it communicates with a [`CheckpointReaderActor`], which manages
542/// the actual checkpoint fetching and streaming logic.
543pub(crate) struct CheckpointReader {
544    handle: JoinHandle<()>,
545    gc_signal_tx: mpsc::Sender<CheckpointSequenceNumber>,
546    checkpoint_rx: mpsc::Receiver<Arc<CheckpointData>>,
547    token: CancellationToken,
548}
549
550impl CheckpointReader {
551    pub(crate) async fn new(
552        starting_checkpoint_number: CheckpointSequenceNumber,
553        config: CheckpointReaderConfig,
554    ) -> IngestionResult<Self> {
555        let (checkpoint_tx, checkpoint_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
556        let (gc_signal_tx, gc_signal_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
557
558        let remote_store = if let Some(url) = config.remote_store_url {
559            Some(Arc::new(
560                RemoteStore::new(
561                    url,
562                    config.reader_options.batch_size,
563                    config.reader_options.timeout_secs,
564                )
565                .await?,
566            ))
567        } else {
568            None
569        };
570
571        let path = match config.ingestion_path {
572            Some(p) => p,
573            None => tempfile::tempdir()?.keep(),
574        };
575        let token = CancellationToken::new();
576        let reader = CheckpointReaderActor {
577            path,
578            current_checkpoint_number: starting_checkpoint_number,
579            last_pruned_watermark: starting_checkpoint_number,
580            checkpoint_tx,
581            gc_signal_rx,
582            remote_store,
583            token: token.clone(),
584            data_limiter: DataLimiter::new(config.reader_options.data_limit),
585            reader_options: config.reader_options,
586        };
587
588        let handle = spawn_monitored_task!(reader.run());
589
590        Ok(Self {
591            handle,
592            gc_signal_tx,
593            checkpoint_rx,
594            token,
595        })
596    }
597
598    /// Read downloaded checkpoints from the queue.
599    pub(crate) async fn checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
600        self.checkpoint_rx.recv().await
601    }
602
603    /// Sends a garbage collection (GC) signal to the checkpoint reader.
604    ///
605    /// Transmits a watermark to the checkpoint reader, indicating that all
606    /// checkpoints below this watermark can be safely pruned or cleaned up.
607    /// The signal is sent over an internal channel to the checkpoint reader
608    /// task.
609    pub(crate) async fn send_gc_signal(
610        &self,
611        watermark: CheckpointSequenceNumber,
612    ) -> IngestionResult<()> {
613        self.gc_signal_tx.send(watermark).await.map_err(|_| {
614            IngestionError::Channel(
615                "unable to send GC operation to checkpoint reader, receiver half closed".into(),
616            )
617        })
618    }
619
620    /// Gracefully shuts down the checkpoint reader task.
621    ///
622    /// It signals the background checkpoint reader actor to terminate, then
623    /// awaits its completion. Any in-progress checkpoint reading or streaming
624    /// operations will be stopped as part of the shutdown process.
625    pub(crate) async fn shutdown(self) -> IngestionResult<()> {
626        self.token.cancel();
627        self.handle.await.map_err(|err| IngestionError::Shutdown {
628            component: "checkpoint reader".into(),
629            msg: err.to_string(),
630        })
631    }
632}