iota_data_ingestion_core/reader/
v2.rs

1// Copyright (c) 2025 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    num::NonZeroUsize,
6    path::{Path, PathBuf},
7    sync::Arc,
8    time::Duration,
9};
10
11use backoff::backoff::Backoff;
12use futures::StreamExt;
13use iota_config::{
14    node::ArchiveReaderConfig,
15    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
16};
17use iota_metrics::spawn_monitored_task;
18use iota_rest_api::CheckpointData;
19use iota_types::messages_checkpoint::CheckpointSequenceNumber;
20use object_store::ObjectStore;
21use serde::{Deserialize, Serialize};
22use tap::Pipe;
23use tokio::{
24    sync::{
25        mpsc::{self},
26        oneshot,
27    },
28    task::JoinHandle,
29    time::timeout,
30};
31use tracing::{debug, error, info};
32
33#[cfg(not(target_os = "macos"))]
34use crate::reader::fetch::init_watcher;
35use crate::{
36    IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, create_remote_store_client,
37    history::reader::HistoricalReader,
38    reader::{
39        fetch::{LocalRead, ReadSource, fetch_from_full_node, fetch_from_object_store},
40        v1::{DataLimiter, ReaderOptions},
41    },
42};
43
44/// Available sources for checkpoint streams supported by the ingestion
45/// framework.
46///
47/// This enum represents the different types of remote sources from which
48/// checkpoint data can be fetched. Each variant corresponds to a supported
49/// backend or combination of backends for checkpoint retrieval.
50#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
51pub enum RemoteUrl {
52    /// The URL to the Fullnode server that exposes
53    /// checkpoint data.
54    ///
55    /// # Example
56    /// ```text
57    /// "http://127.0.0.1:9000/api/v1"
58    /// ```
59    Fullnode(String),
60    /// A hybrid source combining historical object store and optional live
61    /// object store.
62    HybridHistoricalStore {
63        /// The URL path to the historical object store that contains `*.chk`,
64        /// `*.sum` & `MANIFEST` files.
65        ///
66        /// # Example
67        /// ```text
68        /// "https://checkpoints.mainnet.iota.cafe/ingestion/historical"
69        /// ```
70        historical_url: String,
71        /// The URL path to the live object store that contains `*.chk`
72        /// checkpoint files.
73        ///
74        /// # Example
75        /// ```text
76        /// "https://checkpoints.mainnet.iota.cafe/ingestion/live"
77        /// ```
78        live_url: Option<String>,
79    },
80}
81
82/// Represents a remote backend for checkpoint data retrieval.
83///
84/// This enum encapsulates the supported remote storage mechanisms that can be
85/// used by the ingestion framework to fetch checkpoint data. Each variant
86/// corresponds to a different type of remote source.
87enum RemoteStore {
88    Fullnode(iota_rest_api::Client),
89    HybridHistoricalStore {
90        historical: HistoricalReader,
91        live: Option<Box<dyn ObjectStore>>,
92    },
93}
94
95impl RemoteStore {
96    async fn new(
97        remote_url: RemoteUrl,
98        batch_size: usize,
99        timeout_secs: u64,
100    ) -> IngestionResult<Self> {
101        let store = match remote_url {
102            RemoteUrl::Fullnode(url) => RemoteStore::Fullnode(iota_rest_api::Client::new(url)),
103            RemoteUrl::HybridHistoricalStore {
104                historical_url,
105                live_url,
106            } => {
107                let config = ArchiveReaderConfig {
108                    download_concurrency: NonZeroUsize::new(batch_size)
109                        .expect("batch size must be greater than zero"),
110                    remote_store_config: ObjectStoreConfig {
111                        object_store: Some(ObjectStoreType::S3),
112                        object_store_connection_limit: 20,
113                        aws_endpoint: Some(historical_url),
114                        aws_virtual_hosted_style_request: true,
115                        no_sign_request: true,
116                        ..Default::default()
117                    },
118                    use_for_pruning_watermark: false,
119                };
120                let historical = HistoricalReader::new(config)
121                    .inspect_err(|e| error!("unable to instantiate historical reader: {e}"))?;
122
123                let live = live_url
124                    .map(|url| create_remote_store_client(url, Default::default(), timeout_secs))
125                    .transpose()?;
126
127                RemoteStore::HybridHistoricalStore { historical, live }
128            }
129        };
130        Ok(store)
131    }
132}
133
134/// Configuration options to control the behavior of a checkpoint
135/// reader.
136#[derive(Default, Clone)]
137pub struct CheckpointReaderConfig {
138    /// Config the checkpoint reader behavior for downloading new checkpoints.
139    pub reader_options: ReaderOptions,
140    /// Local path for checkpoint ingestion. If not provided, checkpoints will
141    /// be ingested from a temporary directory.
142    pub ingestion_path: Option<PathBuf>,
143    /// Remote source for checkpoint data stream.
144    pub remote_store_url: Option<RemoteUrl>,
145}
146
147/// Internal actor responsible for reading and streaming checkpoints.
148///
149/// `CheckpointReaderActor` is the core background task that manages the logic
150/// for fetching, batching, and streaming checkpoint data from local or remote
151/// sources. It handles checkpoint discovery, garbage collection signals, and
152/// coordinates with remote fetchers as needed.
153///
154/// This struct is intended to be run as an asynchronous task and is not
155/// typically interacted with directly. Instead, users should use
156/// [`CheckpointReader`], which provides a safe and ergonomic API for
157/// interacting with the running actor, such as receiving checkpoints, sending
158/// GC signals, or triggering shutdown.
159///
160/// # Responsibilities
161/// - Periodically scans for new checkpoints from configured sources.
162/// - Streams checkpoints to consumers via channels.
163/// - Handles garbage collection signals to prune processed checkpoints.
164/// - Coordinates with remote fetchers for batch downloads and retries.
165///
166/// # Usage
167/// Users should not construct or manage `CheckpointReader` directly. Instead,
168/// use [`CheckpointReader::new`] to spawn the actor and obtain a handle
169/// for interaction.
170struct CheckpointReaderActor {
171    /// Filesystem path to the local checkpoint directory.
172    path: PathBuf,
173    /// Start fetch from the current checkpoint sequence.
174    current_checkpoint_number: CheckpointSequenceNumber,
175    /// Keeps tracks the last processed checkpoint sequence number, used to
176    /// delete checkpoint files from ingestion path.
177    last_pruned_watermark: CheckpointSequenceNumber,
178    /// Channel for sending checkpoints to WorkerPools.
179    checkpoint_tx: mpsc::Sender<Arc<CheckpointData>>,
180    /// Sends a garbage collection (GC) signal to prune checkpoint files below
181    /// the specified watermark.
182    gc_signal_rx: mpsc::Receiver<CheckpointSequenceNumber>,
183    /// Remote checkpoint reader for fetching checkpoints from the network.
184    remote_store: Option<Arc<RemoteStore>>,
185    /// Signal when the reader should exit.
186    shutdown_rx: oneshot::Receiver<()>,
187    /// Configures the behavior of the checkpoint reader.
188    reader_options: ReaderOptions,
189    /// Limit the amount of downloaded checkpoints held in memory to avoid OOM.
190    data_limiter: DataLimiter,
191}
192
193impl LocalRead for CheckpointReaderActor {
194    fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
195        ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
196            || self.data_limiter.exceeds()
197    }
198
199    fn path(&self) -> &Path {
200        &self.path
201    }
202
203    fn current_checkpoint_number(&self) -> CheckpointSequenceNumber {
204        self.current_checkpoint_number
205    }
206
207    fn update_last_pruned_watermark(&mut self, watermark: CheckpointSequenceNumber) {
208        self.last_pruned_watermark = watermark;
209    }
210}
211
212impl CheckpointReaderActor {
213    fn should_fetch_from_remote(&self, checkpoints: &[Arc<CheckpointData>]) -> bool {
214        self.remote_store.is_some()
215            && (checkpoints.is_empty()
216                || self.is_checkpoint_ahead(&checkpoints[0], self.current_checkpoint_number))
217    }
218
219    /// Fetch checkpoints from the historical object store and stream them to a
220    /// channel.
221    async fn fetch_historical(
222        &mut self,
223        historical_reader: &HistoricalReader,
224    ) -> IngestionResult<()> {
225        // Only sync the manifest when needed to avoid unnecessary network calls.
226        // If the requested checkpoint is beyond what's currently available in our
227        // cached manifest, we need to refresh it to check for newer checkpoints.
228        if self.current_checkpoint_number > historical_reader.latest_available_checkpoint().await? {
229            timeout(
230                Duration::from_secs(self.reader_options.timeout_secs),
231                historical_reader.sync_manifest_once(),
232            )
233            .await
234            .map_err(|_| {
235                IngestionError::HistoryRead("reading manifest exceeded the timeout".into())
236            })??;
237
238            // Verify the requested checkpoint is now available after the manifest refresh.
239            // If it's still not available, the checkpoint hasn't been published yet.
240            if self.current_checkpoint_number
241                > historical_reader.latest_available_checkpoint().await?
242            {
243                return Err(IngestionError::CheckpointNotAvailableYet);
244            }
245        }
246
247        let manifest = historical_reader.get_manifest().await;
248
249        let files = historical_reader.verify_and_get_manifest_files(manifest)?;
250
251        let start_index = match files.binary_search_by_key(&self.current_checkpoint_number, |s| {
252            s.checkpoint_seq_range.start
253        }) {
254            Ok(index) => index,
255            Err(index) => index - 1,
256        };
257
258        for metadata in files
259            .into_iter()
260            .enumerate()
261            .filter_map(|(index, metadata)| (index >= start_index).then_some(metadata))
262        {
263            let checkpoints = timeout(
264                Duration::from_secs(self.reader_options.timeout_secs),
265                historical_reader.iter_for_file(metadata.file_path()),
266            )
267            .await
268            .map_err(|_| {
269                IngestionError::HistoryRead(format!(
270                    "reading checkpoint {} exceeded the timeout",
271                    metadata.file_path()
272                ))
273            })??
274            .filter(|c| c.checkpoint_summary.sequence_number >= self.current_checkpoint_number)
275            .collect::<Vec<CheckpointData>>();
276
277            for checkpoint in checkpoints {
278                let size = bcs::serialized_size(&checkpoint)?;
279                self.send_remote_checkpoint_with_capacity_check(Arc::new(checkpoint), size)
280                    .await?;
281            }
282        }
283
284        Ok(())
285    }
286
287    /// Fetches remote checkpoints from the remote store and streams them to the
288    /// channel.
289    ///
290    /// For every successfully fetched checkpoint, this function updates the
291    /// current checkpoint number and the data limiter. If an error occurs while
292    /// fetching a checkpoint, the function returns immediately with that error.
293    async fn fetch_and_send_to_channel(&mut self) -> IngestionResult<()> {
294        let Some(remote_store) = self.remote_store.as_ref().map(Arc::clone) else {
295            return Ok(());
296        };
297        let batch_size = self.reader_options.batch_size;
298        match remote_store.as_ref() {
299            RemoteStore::Fullnode(client) => {
300                let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
301                    .map(|checkpoint_number| fetch_from_full_node(client, checkpoint_number))
302                    .pipe(futures::stream::iter)
303                    .buffered(batch_size);
304
305                while let Some(checkpoint_result) = checkpoint_stream.next().await {
306                    let (checkpoint, size) = checkpoint_result?;
307                    self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
308                        .await?;
309                }
310            }
311            RemoteStore::HybridHistoricalStore { historical, live } => {
312                if let Err(err) = self.fetch_historical(historical).await {
313                    if matches!(err, IngestionError::CheckpointNotAvailableYet) {
314                        let live = live.as_ref().ok_or(err)?;
315                        let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
316                            .map(|checkpoint_number| {
317                                fetch_from_object_store(live, checkpoint_number)
318                            })
319                            .pipe(futures::stream::iter)
320                            .buffered(batch_size);
321
322                        while let Some(checkpoint_result) = checkpoint_stream.next().await {
323                            let (checkpoint, size) = checkpoint_result?;
324                            self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
325                                .await?;
326                        }
327                        return Ok(());
328                    }
329                    return Err(err);
330                }
331            }
332        };
333        Ok(())
334    }
335
336    /// Fetches and sends checkpoints to the channel with retry logic.
337    ///
338    /// Uses an exponential backoff strategy to retry failed requests.
339    async fn fetch_and_send_to_channel_with_retry(&mut self) {
340        let mut backoff = backoff::ExponentialBackoff::default();
341        backoff.max_elapsed_time = Some(Duration::from_secs(60));
342        backoff.initial_interval = Duration::from_millis(100);
343        backoff.current_interval = backoff.initial_interval;
344        backoff.multiplier = 1.0;
345
346        loop {
347            match self.fetch_and_send_to_channel().await {
348                Ok(_) => break,
349                Err(IngestionError::MaxCheckpointsCapacityReached) => break,
350                Err(IngestionError::CheckpointNotAvailableYet) => {
351                    break info!("historical reader does not have the requested checkpoint yet");
352                }
353                Err(err) => match backoff.next_backoff() {
354                    Some(duration) => {
355                        if !err.to_string().to_lowercase().contains("not found") {
356                            debug!(
357                                "remote reader retry in {} ms. Error is {err:?}",
358                                duration.as_millis(),
359                            );
360                        }
361                        tokio::time::sleep(duration).await
362                    }
363                    None => {
364                        break error!("remote reader transient error {err:?}");
365                    }
366                },
367            }
368        }
369    }
370
371    /// Attempts to send a checkpoint from remote source to the channel if
372    /// capacity allows.
373    ///
374    /// If the checkpoint's sequence number would exceed the allowed capacity,
375    /// returns `IngestionError::MaxCheckpointsCapacityReached` and does not
376    /// send. Otherwise, adds the checkpoint to the data limiter and sends
377    /// it to the channel.
378    async fn send_remote_checkpoint_with_capacity_check(
379        &mut self,
380        checkpoint: Arc<CheckpointData>,
381        size: usize,
382    ) -> IngestionResult<()> {
383        if self.exceeds_capacity(checkpoint.checkpoint_summary.sequence_number) {
384            return Err(IngestionError::MaxCheckpointsCapacityReached);
385        }
386        self.data_limiter.add(&checkpoint, size);
387        self.send_checkpoint_to_channel(checkpoint).await
388    }
389
390    /// Sends a batch of local checkpoints to the channel in order.
391    ///
392    /// Each checkpoint is sent sequentially until a gap is detected (i.e., a
393    /// checkpoint with a sequence number greater than the current
394    /// checkpoint number). If a gap is found, the function breaks early. If
395    /// sending fails, returns the error immediately.
396    async fn send_local_checkpoints_to_channel(
397        &mut self,
398        checkpoints: Vec<Arc<CheckpointData>>,
399    ) -> IngestionResult<()> {
400        for checkpoint in checkpoints {
401            if self.is_checkpoint_ahead(&checkpoint, self.current_checkpoint_number) {
402                break;
403            }
404            self.send_checkpoint_to_channel(checkpoint).await?;
405        }
406        Ok(())
407    }
408
409    /// Sends a single checkpoint to the channel and advances the current
410    /// checkpoint number.
411    ///
412    /// Asserts that the checkpoint's sequence number matches the expected
413    /// current number. Increments the current checkpoint number after
414    /// sending.
415    async fn send_checkpoint_to_channel(
416        &mut self,
417        checkpoint: Arc<CheckpointData>,
418    ) -> IngestionResult<()> {
419        assert_eq!(
420            checkpoint.checkpoint_summary.sequence_number,
421            self.current_checkpoint_number
422        );
423        self.checkpoint_tx.send(checkpoint).await.map_err(|_| {
424            IngestionError::Channel(
425                "unable to send checkpoint to executor, receiver half closed".to_owned(),
426            )
427        })?;
428        self.current_checkpoint_number += 1;
429        Ok(())
430    }
431
432    /// Sync from either local or remote source new checkpoints to be processed
433    /// by the executor.
434    async fn sync(&mut self) -> IngestionResult<()> {
435        let mut remote_source = ReadSource::Local;
436        let checkpoints = self.read_local_files_with_retry().await?;
437        let should_fetch_from_remote = self.should_fetch_from_remote(&checkpoints);
438
439        if should_fetch_from_remote {
440            remote_source = ReadSource::Remote;
441            self.fetch_and_send_to_channel_with_retry().await;
442        } else {
443            self.send_local_checkpoints_to_channel(checkpoints).await?;
444        }
445
446        info!(
447            "Read from {remote_source}. Current checkpoint number: {}, pruning watermark: {}",
448            self.current_checkpoint_number, self.last_pruned_watermark,
449        );
450
451        Ok(())
452    }
453
454    /// Run the main loop of the checkpoint reader actor.
455    async fn run(mut self) {
456        let (_inotify_tx, mut inotify_rx) = mpsc::channel::<()>(1);
457        std::fs::create_dir_all(self.path()).expect("failed to create a directory");
458
459        #[cfg(not(target_os = "macos"))]
460        let _watcher = init_watcher(_inotify_tx, self.path());
461
462        self.data_limiter.gc(self.last_pruned_watermark);
463        self.gc_processed_files(self.last_pruned_watermark)
464            .expect("failed to clean the directory");
465
466        loop {
467            tokio::select! {
468                _ = &mut self.shutdown_rx => break,
469                Some(watermark) = self.gc_signal_rx.recv() => {
470                    self.data_limiter.gc(watermark);
471                    self.gc_processed_files(watermark).expect("failed to clean the directory");
472                }
473                Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.reader_options.tick_interval_ms), inotify_rx.recv())  => {
474                    self.sync().await.expect("failed to read checkpoint files");
475                }
476            }
477        }
478    }
479}
480
481/// Public API for interacting with the checkpoint reader actor.
482///
483/// It provides methods to receive streamed checkpoints, send garbage collection
484/// signals, and gracefully shut down the background checkpoint reading task.
485/// Internally, it communicates with a [`CheckpointReaderActor`], which manages
486/// the actual checkpoint fetching and streaming logic.
487pub(crate) struct CheckpointReader {
488    handle: JoinHandle<()>,
489    shutdown_tx: oneshot::Sender<()>,
490    gc_signal_tx: mpsc::Sender<CheckpointSequenceNumber>,
491    checkpoint_rx: mpsc::Receiver<Arc<CheckpointData>>,
492}
493
494impl CheckpointReader {
495    pub(crate) async fn new(
496        starting_checkpoint_number: CheckpointSequenceNumber,
497        config: CheckpointReaderConfig,
498    ) -> IngestionResult<Self> {
499        let (checkpoint_tx, checkpoint_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
500        let (gc_signal_tx, gc_signal_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
501        let (shutdown_tx, shutdown_rx) = oneshot::channel();
502
503        let remote_store = if let Some(url) = config.remote_store_url {
504            Some(Arc::new(
505                RemoteStore::new(
506                    url,
507                    config.reader_options.batch_size,
508                    config.reader_options.timeout_secs,
509                )
510                .await?,
511            ))
512        } else {
513            None
514        };
515
516        let path = match config.ingestion_path {
517            Some(p) => p,
518            None => tempfile::tempdir()?.keep(),
519        };
520
521        let reader = CheckpointReaderActor {
522            path,
523            current_checkpoint_number: starting_checkpoint_number,
524            last_pruned_watermark: starting_checkpoint_number,
525            checkpoint_tx,
526            gc_signal_rx,
527            remote_store,
528            shutdown_rx,
529            data_limiter: DataLimiter::new(config.reader_options.data_limit),
530            reader_options: config.reader_options,
531        };
532
533        let handle = spawn_monitored_task!(reader.run());
534
535        Ok(Self {
536            handle,
537            gc_signal_tx,
538            shutdown_tx,
539            checkpoint_rx,
540        })
541    }
542
543    /// Read downloaded checkpoints from the queue.
544    pub(crate) async fn checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
545        self.checkpoint_rx.recv().await
546    }
547
548    /// Sends a garbage collection (GC) signal to the checkpoint reader.
549    ///
550    /// Transmits a watermark to the checkpoint reader, indicating that all
551    /// checkpoints below this watermark can be safely pruned or cleaned up.
552    /// The signal is sent over an internal channel to the checkpoint reader
553    /// task.
554    pub(crate) async fn send_gc_signal(
555        &self,
556        watermark: CheckpointSequenceNumber,
557    ) -> IngestionResult<()> {
558        self.gc_signal_tx.send(watermark).await.map_err(|_| {
559            IngestionError::Channel(
560                "unable to send GC operation to checkpoint reader, receiver half closed".into(),
561            )
562        })
563    }
564
565    /// Gracefully shuts down the checkpoint reader task.
566    ///
567    /// It signals the background checkpoint reader actor to terminate, then
568    /// awaits its completion. Any in-progress checkpoint reading or streaming
569    /// operations will be stopped as part of the shutdown process.
570    pub(crate) async fn shutdown(self) -> IngestionResult<()> {
571        _ = self.shutdown_tx.send(());
572        self.handle.await.map_err(|err| IngestionError::Shutdown {
573            component: "checkpoint reader".into(),
574            msg: err.to_string(),
575        })
576    }
577}