iota_data_ingestion_core/reader/
v2.rs

1// Copyright (c) 2025 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    num::NonZeroUsize,
6    path::{Path, PathBuf},
7    sync::Arc,
8    time::Duration,
9};
10
11use backoff::backoff::Backoff;
12use futures::StreamExt;
13use iota_config::{
14    node::ArchiveReaderConfig,
15    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
16};
17use iota_metrics::spawn_monitored_task;
18use iota_rest_api::CheckpointData;
19use iota_types::messages_checkpoint::CheckpointSequenceNumber;
20use object_store::ObjectStore;
21use serde::{Deserialize, Serialize};
22use tap::Pipe;
23use tokio::{
24    sync::{
25        mpsc::{self},
26        oneshot,
27    },
28    task::JoinHandle,
29    time::timeout,
30};
31use tracing::{debug, error, info};
32
33use crate::{
34    IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, create_remote_store_client,
35    history::reader::HistoricalReader,
36    reader::{
37        fetch::{LocalRead, ReadSource, fetch_from_full_node, fetch_from_object_store},
38        v1::{DataLimiter, ReaderOptions},
39    },
40};
41
42/// Available sources for checkpoint streams supported by the ingestion
43/// framework.
44///
45/// This enum represents the different types of remote sources from which
46/// checkpoint data can be fetched. Each variant corresponds to a supported
47/// backend or combination of backends for checkpoint retrieval.
48#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
49pub enum RemoteUrl {
50    /// The URL to the Fullnode server that exposes
51    /// checkpoint data.
52    ///
53    /// # Example
54    /// ```text
55    /// "http://127.0.0.1:9000/api/v1"
56    /// ```
57    Fullnode(String),
58    /// A hybrid source combining historical object store and optional live
59    /// object store.
60    HybridHistoricalStore {
61        /// The URL path to the historical object store that contains `*.chk`,
62        /// `*.sum` & `MANIFEST` files.
63        ///
64        /// # Example
65        /// ```text
66        /// "https://checkpoints.mainnet.iota.cafe/ingestion/historical"
67        /// ```
68        historical_url: String,
69        /// The URL path to the live object store that contains `*.chk`
70        /// checkpoint files.
71        ///
72        /// # Example
73        /// ```text
74        /// "https://checkpoints.mainnet.iota.cafe/ingestion/live"
75        /// ```
76        live_url: Option<String>,
77    },
78}
79
80/// Represents a remote backend for checkpoint data retrieval.
81///
82/// This enum encapsulates the supported remote storage mechanisms that can be
83/// used by the ingestion framework to fetch checkpoint data. Each variant
84/// corresponds to a different type of remote source.
85enum RemoteStore {
86    Fullnode(iota_rest_api::Client),
87    HybridHistoricalStore {
88        historical: HistoricalReader,
89        live: Option<Box<dyn ObjectStore>>,
90    },
91}
92
93impl RemoteStore {
94    async fn new(
95        remote_url: RemoteUrl,
96        batch_size: usize,
97        timeout_secs: u64,
98    ) -> IngestionResult<Self> {
99        let store = match remote_url {
100            RemoteUrl::Fullnode(url) => RemoteStore::Fullnode(iota_rest_api::Client::new(url)),
101            RemoteUrl::HybridHistoricalStore {
102                historical_url,
103                live_url,
104            } => {
105                let config = ArchiveReaderConfig {
106                    download_concurrency: NonZeroUsize::new(batch_size)
107                        .expect("batch size must be greater than zero"),
108                    remote_store_config: ObjectStoreConfig {
109                        object_store: Some(ObjectStoreType::S3),
110                        object_store_connection_limit: 20,
111                        aws_endpoint: Some(historical_url),
112                        aws_virtual_hosted_style_request: true,
113                        no_sign_request: true,
114                        ..Default::default()
115                    },
116                    use_for_pruning_watermark: false,
117                };
118                let historical = HistoricalReader::new(config)
119                    .inspect_err(|e| error!("Unable to instantiate historical reader: {e}"))?;
120
121                let live = live_url
122                    .map(|url| create_remote_store_client(url, Default::default(), timeout_secs))
123                    .transpose()?;
124
125                RemoteStore::HybridHistoricalStore { historical, live }
126            }
127        };
128        Ok(store)
129    }
130}
131
132/// Configuration options to control the behavior of a checkpoint
133/// reader.
134#[derive(Default, Clone)]
135pub struct CheckpointReaderConfig {
136    /// Config the checkpoint reader behavior for downloading new checkpoints.
137    pub reader_options: ReaderOptions,
138    /// Local path for checkpoint ingestion. If not provided, checkpoints will
139    /// be ingested from a temporary directory.
140    pub ingestion_path: Option<PathBuf>,
141    /// Remote source for checkpoint data stream.
142    pub remote_store_url: Option<RemoteUrl>,
143}
144
145/// Internal actor responsible for reading and streaming checkpoints.
146///
147/// `CheckpointReaderActor` is the core background task that manages the logic
148/// for fetching, batching, and streaming checkpoint data from local or remote
149/// sources. It handles checkpoint discovery, garbage collection signals, and
150/// coordinates with remote fetchers as needed.
151///
152/// This struct is intended to be run as an asynchronous task and is not
153/// typically interacted with directly. Instead, users should use
154/// [`CheckpointReader`], which provides a safe and ergonomic API for
155/// interacting with the running actor, such as receiving checkpoints, sending
156/// GC signals, or triggering shutdown.
157///
158/// # Responsibilities
159/// - Periodically scans for new checkpoints from configured sources.
160/// - Streams checkpoints to consumers via channels.
161/// - Handles garbage collection signals to prune processed checkpoints.
162/// - Coordinates with remote fetchers for batch downloads and retries.
163///
164/// # Usage
165/// Users should not construct or manage `CheckpointReader` directly. Instead,
166/// use [`CheckpointReader::new`] to spawn the actor and obtain a handle
167/// for interaction.
168struct CheckpointReaderActor {
169    /// Filesystem path to the local checkpoint directory.
170    path: PathBuf,
171    /// Start fetch from the current checkpoint sequence.
172    current_checkpoint_number: CheckpointSequenceNumber,
173    /// Keeps tracks the last processed checkpoint sequence number, used to
174    /// delete checkpoint files from ingestion path.
175    last_pruned_watermark: CheckpointSequenceNumber,
176    /// Channel for sending checkpoints to WorkerPools.
177    checkpoint_tx: mpsc::Sender<Arc<CheckpointData>>,
178    /// Sends a garbage collection (GC) signal to prune checkpoint files below
179    /// the specified watermark.
180    gc_signal_rx: mpsc::Receiver<CheckpointSequenceNumber>,
181    /// Remote checkpoint reader for fetching checkpoints from the network.
182    remote_store: Option<Arc<RemoteStore>>,
183    /// Signal when the reader should exit.
184    shutdown_rx: oneshot::Receiver<()>,
185    /// Configures the behavior of the checkpoint reader.
186    reader_options: ReaderOptions,
187    /// Limit the amount of downloaded checkpoints held in memory to avoid OOM.
188    data_limiter: DataLimiter,
189}
190
191impl LocalRead for CheckpointReaderActor {
192    fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
193        ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
194            || self.data_limiter.exceeds()
195    }
196
197    fn path(&self) -> &Path {
198        &self.path
199    }
200
201    fn current_checkpoint_number(&self) -> CheckpointSequenceNumber {
202        self.current_checkpoint_number
203    }
204
205    fn update_last_pruned_watermark(&mut self, watermark: CheckpointSequenceNumber) {
206        self.last_pruned_watermark = watermark;
207    }
208}
209
210impl CheckpointReaderActor {
211    fn should_fetch_from_remote(&self, checkpoints: &[Arc<CheckpointData>]) -> bool {
212        self.remote_store.is_some()
213            && (checkpoints.is_empty()
214                || self.is_checkpoint_ahead(&checkpoints[0], self.current_checkpoint_number))
215    }
216
217    /// Fetch checkpoints from the historical object store and stream them to a
218    /// channel.
219    async fn fetch_historical(
220        &mut self,
221        historical_reader: &HistoricalReader,
222    ) -> IngestionResult<()> {
223        // Only sync the manifest when needed to avoid unnecessary network calls.
224        // If the requested checkpoint is beyond what's currently available in our
225        // cached manifest, we need to refresh it to check for newer checkpoints.
226        if self.current_checkpoint_number > historical_reader.latest_available_checkpoint().await? {
227            timeout(
228                Duration::from_secs(self.reader_options.timeout_secs),
229                historical_reader.sync_manifest_once(),
230            )
231            .await
232            .map_err(|_| {
233                IngestionError::HistoryRead("Reading Manifest exceeded the timeout".into())
234            })??;
235
236            // Verify the requested checkpoint is now available after the manifest refresh.
237            // If it's still not available, the checkpoint hasn't been published yet.
238            if self.current_checkpoint_number
239                > historical_reader.latest_available_checkpoint().await?
240            {
241                return Err(IngestionError::CheckpointNotAvailableYet);
242            }
243        }
244
245        let manifest = historical_reader.get_manifest().await;
246
247        let files = historical_reader.verify_and_get_manifest_files(manifest)?;
248
249        let start_index = match files.binary_search_by_key(&self.current_checkpoint_number, |s| {
250            s.checkpoint_seq_range.start
251        }) {
252            Ok(index) => index,
253            Err(index) => index - 1,
254        };
255
256        for metadata in files
257            .into_iter()
258            .enumerate()
259            .filter_map(|(index, metadata)| (index >= start_index).then_some(metadata))
260        {
261            let checkpoints = timeout(
262                Duration::from_secs(self.reader_options.timeout_secs),
263                historical_reader.iter_for_file(metadata.file_path()),
264            )
265            .await
266            .map_err(|_| {
267                IngestionError::HistoryRead(format!(
268                    "Reading checkpoint {} exceeded the timeout",
269                    metadata.file_path()
270                ))
271            })??
272            .filter(|c| c.checkpoint_summary.sequence_number >= self.current_checkpoint_number)
273            .collect::<Vec<CheckpointData>>();
274
275            for checkpoint in checkpoints {
276                let size = bcs::serialized_size(&checkpoint)?;
277                self.send_remote_checkpoint_with_capacity_check(Arc::new(checkpoint), size)
278                    .await?;
279            }
280        }
281
282        Ok(())
283    }
284
285    /// Fetches remote checkpoints from the remote store and streams them to the
286    /// channel.
287    ///
288    /// For every successfully fetched checkpoint, this function updates the
289    /// current checkpoint number and the data limiter. If an error occurs while
290    /// fetching a checkpoint, the function returns immediately with that error.
291    async fn fetch_and_send_to_channel(&mut self) -> IngestionResult<()> {
292        let Some(remote_store) = self.remote_store.as_ref().map(Arc::clone) else {
293            return Ok(());
294        };
295        let batch_size = self.reader_options.batch_size;
296        match remote_store.as_ref() {
297            RemoteStore::Fullnode(client) => {
298                let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
299                    .map(|checkpoint_number| fetch_from_full_node(client, checkpoint_number))
300                    .pipe(futures::stream::iter)
301                    .buffered(batch_size);
302
303                while let Some(checkpoint_result) = checkpoint_stream.next().await {
304                    let (checkpoint, size) = checkpoint_result?;
305                    self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
306                        .await?;
307                }
308            }
309            RemoteStore::HybridHistoricalStore { historical, live } => {
310                if let Err(err) = self.fetch_historical(historical).await {
311                    if matches!(err, IngestionError::CheckpointNotAvailableYet) {
312                        let live = live.as_ref().ok_or(err)?;
313                        let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
314                            .map(|checkpoint_number| {
315                                fetch_from_object_store(live, checkpoint_number)
316                            })
317                            .pipe(futures::stream::iter)
318                            .buffered(batch_size);
319
320                        while let Some(checkpoint_result) = checkpoint_stream.next().await {
321                            let (checkpoint, size) = checkpoint_result?;
322                            self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
323                                .await?;
324                        }
325                        return Ok(());
326                    }
327                    return Err(err);
328                }
329            }
330        };
331        Ok(())
332    }
333
334    /// Fetches and sends checkpoints to the channel with retry logic.
335    ///
336    /// Uses an exponential backoff strategy to retry failed requests.
337    async fn fetch_and_send_to_channel_with_retry(&mut self) {
338        let mut backoff = backoff::ExponentialBackoff::default();
339        backoff.max_elapsed_time = Some(Duration::from_secs(60));
340        backoff.initial_interval = Duration::from_millis(100);
341        backoff.current_interval = backoff.initial_interval;
342        backoff.multiplier = 1.0;
343
344        loop {
345            match self.fetch_and_send_to_channel().await {
346                Ok(_) => break,
347                Err(IngestionError::MaxCheckpointsCapacityReached) => break,
348                Err(IngestionError::CheckpointNotAvailableYet) => {
349                    break info!("historical reader does not have the requested checkpoint yet");
350                }
351                Err(err) => match backoff.next_backoff() {
352                    Some(duration) => {
353                        if !err.to_string().to_lowercase().contains("not found") {
354                            debug!(
355                                "remote reader retry in {} ms. Error is {err:?}",
356                                duration.as_millis(),
357                            );
358                        }
359                        tokio::time::sleep(duration).await
360                    }
361                    None => {
362                        break error!("remote reader transient error {err:?}");
363                    }
364                },
365            }
366        }
367    }
368
369    /// Attempts to send a checkpoint from remote source to the channel if
370    /// capacity allows.
371    ///
372    /// If the checkpoint's sequence number would exceed the allowed capacity,
373    /// returns `IngestionError::MaxCheckpointsCapacityReached` and does not
374    /// send. Otherwise, adds the checkpoint to the data limiter and sends
375    /// it to the channel.
376    async fn send_remote_checkpoint_with_capacity_check(
377        &mut self,
378        checkpoint: Arc<CheckpointData>,
379        size: usize,
380    ) -> IngestionResult<()> {
381        if self.exceeds_capacity(checkpoint.checkpoint_summary.sequence_number) {
382            return Err(IngestionError::MaxCheckpointsCapacityReached);
383        }
384        self.data_limiter.add(&checkpoint, size);
385        self.send_checkpoint_to_channel(checkpoint).await
386    }
387
388    /// Sends a batch of local checkpoints to the channel in order.
389    ///
390    /// Each checkpoint is sent sequentially until a gap is detected (i.e., a
391    /// checkpoint with a sequence number greater than the current
392    /// checkpoint number). If a gap is found, the function breaks early. If
393    /// sending fails, returns the error immediately.
394    async fn send_local_checkpoints_to_channel(
395        &mut self,
396        checkpoints: Vec<Arc<CheckpointData>>,
397    ) -> IngestionResult<()> {
398        for checkpoint in checkpoints {
399            if self.is_checkpoint_ahead(&checkpoint, self.current_checkpoint_number) {
400                break;
401            }
402            self.send_checkpoint_to_channel(checkpoint).await?;
403        }
404        Ok(())
405    }
406
407    /// Sends a single checkpoint to the channel and advances the current
408    /// checkpoint number.
409    ///
410    /// Asserts that the checkpoint's sequence number matches the expected
411    /// current number. Increments the current checkpoint number after
412    /// sending.
413    async fn send_checkpoint_to_channel(
414        &mut self,
415        checkpoint: Arc<CheckpointData>,
416    ) -> IngestionResult<()> {
417        assert_eq!(
418            checkpoint.checkpoint_summary.sequence_number,
419            self.current_checkpoint_number
420        );
421        self.checkpoint_tx.send(checkpoint).await.map_err(|_| {
422            IngestionError::Channel(
423                "unable to send checkpoint to executor, receiver half closed".to_owned(),
424            )
425        })?;
426        self.current_checkpoint_number += 1;
427        Ok(())
428    }
429
430    /// Sync from either local or remote source new checkpoints to be processed
431    /// by the executor.
432    async fn sync(&mut self) -> IngestionResult<()> {
433        let mut remote_source = ReadSource::Local;
434        let checkpoints = self.read_local_files_with_retry().await?;
435        let should_fetch_from_remote = self.should_fetch_from_remote(&checkpoints);
436
437        if should_fetch_from_remote {
438            remote_source = ReadSource::Remote;
439            self.fetch_and_send_to_channel_with_retry().await;
440        } else {
441            self.send_local_checkpoints_to_channel(checkpoints).await?;
442        }
443
444        info!(
445            "Read from {remote_source}. Current checkpoint number: {}, pruning watermark: {}",
446            self.current_checkpoint_number, self.last_pruned_watermark,
447        );
448
449        Ok(())
450    }
451
452    /// Run the main loop of the checkpoint reader actor.
453    async fn run(mut self) {
454        let (_watcher, mut inotify_rx) = self.setup_directory_watcher();
455        self.data_limiter.gc(self.last_pruned_watermark);
456        self.gc_processed_files(self.last_pruned_watermark)
457            .expect("Failed to clean the directory");
458
459        loop {
460            tokio::select! {
461                _ = &mut self.shutdown_rx => break,
462                Some(watermark) = self.gc_signal_rx.recv() => {
463                    self.data_limiter.gc(watermark);
464                    self.gc_processed_files(watermark).expect("Failed to clean the directory");
465                }
466                Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.reader_options.tick_interval_ms), inotify_rx.recv())  => {
467                    self.sync().await.expect("Failed to read checkpoint files");
468                }
469            }
470        }
471    }
472}
473
474/// Public API for interacting with the checkpoint reader actor.
475///
476/// It provides methods to receive streamed checkpoints, send garbage collection
477/// signals, and gracefully shut down the background checkpoint reading task.
478/// Internally, it communicates with a [`CheckpointReaderActor`], which manages
479/// the actual checkpoint fetching and streaming logic.
480pub(crate) struct CheckpointReader {
481    handle: JoinHandle<()>,
482    shutdown_tx: oneshot::Sender<()>,
483    gc_signal_tx: mpsc::Sender<CheckpointSequenceNumber>,
484    checkpoint_rx: mpsc::Receiver<Arc<CheckpointData>>,
485}
486
487impl CheckpointReader {
488    pub(crate) async fn new(
489        starting_checkpoint_number: CheckpointSequenceNumber,
490        config: CheckpointReaderConfig,
491    ) -> IngestionResult<Self> {
492        let (checkpoint_tx, checkpoint_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
493        let (gc_signal_tx, gc_signal_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
494        let (shutdown_tx, shutdown_rx) = oneshot::channel();
495
496        let remote_store = if let Some(url) = config.remote_store_url {
497            Some(Arc::new(
498                RemoteStore::new(
499                    url,
500                    config.reader_options.batch_size,
501                    config.reader_options.timeout_secs,
502                )
503                .await?,
504            ))
505        } else {
506            None
507        };
508
509        let path = match config.ingestion_path {
510            Some(p) => p,
511            None => tempfile::tempdir()?.keep(),
512        };
513
514        let reader = CheckpointReaderActor {
515            path,
516            current_checkpoint_number: starting_checkpoint_number,
517            last_pruned_watermark: starting_checkpoint_number,
518            checkpoint_tx,
519            gc_signal_rx,
520            remote_store,
521            shutdown_rx,
522            data_limiter: DataLimiter::new(config.reader_options.data_limit),
523            reader_options: config.reader_options,
524        };
525
526        let handle = spawn_monitored_task!(reader.run());
527
528        Ok(Self {
529            handle,
530            gc_signal_tx,
531            shutdown_tx,
532            checkpoint_rx,
533        })
534    }
535
536    /// Read downloaded checkpoints from the queue.
537    pub(crate) async fn checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
538        self.checkpoint_rx.recv().await
539    }
540
541    /// Sends a garbage collection (GC) signal to the checkpoint reader.
542    ///
543    /// Transmits a watermark to the checkpoint reader, indicating that all
544    /// checkpoints below this watermark can be safely pruned or cleaned up.
545    /// The signal is sent over an internal channel to the checkpoint reader
546    /// task.
547    pub(crate) async fn send_gc_signal(
548        &self,
549        watermark: CheckpointSequenceNumber,
550    ) -> IngestionResult<()> {
551        self.gc_signal_tx.send(watermark).await.map_err(|_| {
552            IngestionError::Channel(
553                "unable to send GC operation to checkpoint reader, receiver half closed".into(),
554            )
555        })
556    }
557
558    /// Gracefully shuts down the checkpoint reader task.
559    ///
560    /// It signals the background checkpoint reader actor to terminate, then
561    /// awaits its completion. Any in-progress checkpoint reading or streaming
562    /// operations will be stopped as part of the shutdown process.
563    pub(crate) async fn shutdown(self) -> IngestionResult<()> {
564        _ = self.shutdown_tx.send(());
565        self.handle.await.map_err(|err| IngestionError::Shutdown {
566            component: "CheckpointReader".into(),
567            msg: err.to_string(),
568        })
569    }
570}