iota_data_ingestion_core/reader/
v2.rs

1// Copyright (c) 2025 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    num::NonZeroUsize,
6    path::{Path, PathBuf},
7    sync::Arc,
8    time::Duration,
9};
10
11use backoff::backoff::Backoff;
12use futures::StreamExt;
13use iota_config::{
14    node::ArchiveReaderConfig,
15    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
16};
17use iota_metrics::spawn_monitored_task;
18use iota_rest_api::CheckpointData;
19use iota_types::messages_checkpoint::CheckpointSequenceNumber;
20use object_store::ObjectStore;
21use serde::{Deserialize, Serialize};
22use tap::Pipe;
23use tokio::{
24    sync::mpsc::{self},
25    task::JoinHandle,
26    time::timeout,
27};
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, error, info};
30
31#[cfg(not(target_os = "macos"))]
32use crate::reader::fetch::init_watcher;
33use crate::{
34    IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, create_remote_store_client,
35    history::reader::HistoricalReader,
36    reader::{
37        fetch::{LocalRead, ReadSource, fetch_from_full_node, fetch_from_object_store},
38        v1::{DataLimiter, ReaderOptions},
39    },
40};
41
42/// Available sources for checkpoint streams supported by the ingestion
43/// framework.
44///
45/// This enum represents the different types of remote sources from which
46/// checkpoint data can be fetched. Each variant corresponds to a supported
47/// backend or combination of backends for checkpoint retrieval.
48#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
49pub enum RemoteUrl {
50    /// The URL to the Fullnode server that exposes
51    /// checkpoint data.
52    ///
53    /// # Example
54    /// ```text
55    /// "http://127.0.0.1:9000/api/v1"
56    /// ```
57    Fullnode(String),
58    /// A hybrid source combining historical object store and optional live
59    /// object store.
60    HybridHistoricalStore {
61        /// The URL path to the historical object store that contains `*.chk`,
62        /// `*.sum` & `MANIFEST` files.
63        ///
64        /// # Example
65        /// ```text
66        /// "https://checkpoints.mainnet.iota.cafe/ingestion/historical"
67        /// ```
68        historical_url: String,
69        /// The URL path to the live object store that contains `*.chk`
70        /// checkpoint files.
71        ///
72        /// # Example
73        /// ```text
74        /// "https://checkpoints.mainnet.iota.cafe/ingestion/live"
75        /// ```
76        live_url: Option<String>,
77    },
78}
79
80/// Represents a remote backend for checkpoint data retrieval.
81///
82/// This enum encapsulates the supported remote storage mechanisms that can be
83/// used by the ingestion framework to fetch checkpoint data. Each variant
84/// corresponds to a different type of remote source.
85enum RemoteStore {
86    Fullnode(iota_rest_api::Client),
87    HybridHistoricalStore {
88        historical: HistoricalReader,
89        live: Option<Box<dyn ObjectStore>>,
90    },
91}
92
93impl RemoteStore {
94    async fn new(
95        remote_url: RemoteUrl,
96        batch_size: usize,
97        timeout_secs: u64,
98    ) -> IngestionResult<Self> {
99        let store = match remote_url {
100            RemoteUrl::Fullnode(url) => RemoteStore::Fullnode(iota_rest_api::Client::new(url)),
101            RemoteUrl::HybridHistoricalStore {
102                historical_url,
103                live_url,
104            } => {
105                let config = ArchiveReaderConfig {
106                    download_concurrency: NonZeroUsize::new(batch_size)
107                        .expect("batch size must be greater than zero"),
108                    remote_store_config: ObjectStoreConfig {
109                        object_store: Some(ObjectStoreType::S3),
110                        object_store_connection_limit: 20,
111                        aws_endpoint: Some(historical_url),
112                        aws_virtual_hosted_style_request: true,
113                        no_sign_request: true,
114                        ..Default::default()
115                    },
116                    use_for_pruning_watermark: false,
117                };
118                let historical = HistoricalReader::new(config)
119                    .inspect_err(|e| error!("unable to instantiate historical reader: {e}"))?;
120
121                let live = live_url
122                    .map(|url| create_remote_store_client(url, Default::default(), timeout_secs))
123                    .transpose()?;
124
125                RemoteStore::HybridHistoricalStore { historical, live }
126            }
127        };
128        Ok(store)
129    }
130}
131
132/// Configuration options to control the behavior of a checkpoint
133/// reader.
134#[derive(Default, Clone)]
135pub struct CheckpointReaderConfig {
136    /// Config the checkpoint reader behavior for downloading new checkpoints.
137    pub reader_options: ReaderOptions,
138    /// Local path for checkpoint ingestion. If not provided, checkpoints will
139    /// be ingested from a temporary directory.
140    pub ingestion_path: Option<PathBuf>,
141    /// Remote source for checkpoint data stream.
142    pub remote_store_url: Option<RemoteUrl>,
143}
144
145/// Internal actor responsible for reading and streaming checkpoints.
146///
147/// `CheckpointReaderActor` is the core background task that manages the logic
148/// for fetching, batching, and streaming checkpoint data from local or remote
149/// sources. It handles checkpoint discovery, garbage collection signals, and
150/// coordinates with remote fetchers as needed.
151///
152/// This struct is intended to be run as an asynchronous task and is not
153/// typically interacted with directly. Instead, users should use
154/// [`CheckpointReader`], which provides a safe and ergonomic API for
155/// interacting with the running actor, such as receiving checkpoints, sending
156/// GC signals, or triggering shutdown.
157///
158/// # Responsibilities
159/// - Periodically scans for new checkpoints from configured sources.
160/// - Streams checkpoints to consumers via channels.
161/// - Handles garbage collection signals to prune processed checkpoints.
162/// - Coordinates with remote fetchers for batch downloads and retries.
163///
164/// # Usage
165/// Users should not construct or manage `CheckpointReader` directly. Instead,
166/// use [`CheckpointReader::new`] to spawn the actor and obtain a handle
167/// for interaction.
168struct CheckpointReaderActor {
169    /// Filesystem path to the local checkpoint directory.
170    path: PathBuf,
171    /// Start fetch from the current checkpoint sequence.
172    current_checkpoint_number: CheckpointSequenceNumber,
173    /// Keeps tracks the last processed checkpoint sequence number, used to
174    /// delete checkpoint files from ingestion path.
175    last_pruned_watermark: CheckpointSequenceNumber,
176    /// Channel for sending checkpoints to WorkerPools.
177    checkpoint_tx: mpsc::Sender<Arc<CheckpointData>>,
178    /// Sends a garbage collection (GC) signal to prune checkpoint files below
179    /// the specified watermark.
180    gc_signal_rx: mpsc::Receiver<CheckpointSequenceNumber>,
181    /// Remote checkpoint reader for fetching checkpoints from the network.
182    remote_store: Option<Arc<RemoteStore>>,
183    /// Shutdown signal for the actor.
184    token: CancellationToken,
185    /// Configures the behavior of the checkpoint reader.
186    reader_options: ReaderOptions,
187    /// Limit the amount of downloaded checkpoints held in memory to avoid OOM.
188    data_limiter: DataLimiter,
189}
190
191impl LocalRead for CheckpointReaderActor {
192    fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
193        ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
194            || self.data_limiter.exceeds()
195    }
196
197    fn path(&self) -> &Path {
198        &self.path
199    }
200
201    fn current_checkpoint_number(&self) -> CheckpointSequenceNumber {
202        self.current_checkpoint_number
203    }
204
205    fn update_last_pruned_watermark(&mut self, watermark: CheckpointSequenceNumber) {
206        self.last_pruned_watermark = watermark;
207    }
208}
209
210impl CheckpointReaderActor {
211    fn should_fetch_from_remote(&self, checkpoints: &[Arc<CheckpointData>]) -> bool {
212        self.remote_store.is_some()
213            && (checkpoints.is_empty()
214                || self.is_checkpoint_ahead(&checkpoints[0], self.current_checkpoint_number))
215    }
216
217    /// Fetch checkpoints from the historical object store and stream them to a
218    /// channel.
219    async fn fetch_historical(
220        &mut self,
221        historical_reader: &HistoricalReader,
222    ) -> IngestionResult<()> {
223        // Only sync the manifest when needed to avoid unnecessary network calls.
224        // If the requested checkpoint is beyond what's currently available in our
225        // cached manifest, we need to refresh it to check for newer checkpoints.
226        if self.current_checkpoint_number > historical_reader.latest_available_checkpoint().await? {
227            timeout(
228                Duration::from_secs(self.reader_options.timeout_secs),
229                historical_reader.sync_manifest_once(),
230            )
231            .await
232            .map_err(|_| {
233                IngestionError::HistoryRead("reading manifest exceeded the timeout".into())
234            })??;
235
236            // Verify the requested checkpoint is now available after the manifest refresh.
237            // If it's still not available, the checkpoint hasn't been published yet.
238            if self.current_checkpoint_number
239                > historical_reader.latest_available_checkpoint().await?
240            {
241                return Err(IngestionError::CheckpointNotAvailableYet);
242            }
243        }
244
245        let manifest = historical_reader.get_manifest().await;
246
247        let files = historical_reader.verify_and_get_manifest_files(manifest)?;
248
249        let start_index = match files.binary_search_by_key(&self.current_checkpoint_number, |s| {
250            s.checkpoint_seq_range.start
251        }) {
252            Ok(index) => index,
253            Err(index) => index - 1,
254        };
255
256        for metadata in files
257            .into_iter()
258            .enumerate()
259            .filter_map(|(index, metadata)| (index >= start_index).then_some(metadata))
260        {
261            let checkpoints = timeout(
262                Duration::from_secs(self.reader_options.timeout_secs),
263                historical_reader.iter_for_file(metadata.file_path()),
264            )
265            .await
266            .map_err(|_| {
267                IngestionError::HistoryRead(format!(
268                    "reading checkpoint {} exceeded the timeout",
269                    metadata.file_path()
270                ))
271            })??
272            .filter(|c| c.checkpoint_summary.sequence_number >= self.current_checkpoint_number)
273            .collect::<Vec<CheckpointData>>();
274
275            for checkpoint in checkpoints {
276                let size = bcs::serialized_size(&checkpoint)?;
277                self.send_remote_checkpoint_with_capacity_check(Arc::new(checkpoint), size)
278                    .await?;
279            }
280        }
281
282        Ok(())
283    }
284
285    /// Fetches remote checkpoints from the remote store and streams them to the
286    /// channel.
287    ///
288    /// For every successfully fetched checkpoint, this function updates the
289    /// current checkpoint number and the data limiter. If an error occurs while
290    /// fetching a checkpoint, the function returns immediately with that error.
291    async fn fetch_and_send_to_channel(&mut self) -> IngestionResult<()> {
292        let Some(remote_store) = self.remote_store.as_ref().map(Arc::clone) else {
293            return Ok(());
294        };
295        let batch_size = self.reader_options.batch_size;
296        match remote_store.as_ref() {
297            RemoteStore::Fullnode(client) => {
298                let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
299                    .map(|checkpoint_number| fetch_from_full_node(client, checkpoint_number))
300                    .pipe(futures::stream::iter)
301                    .buffered(batch_size);
302
303                while let Some(checkpoint_result) = self
304                    .token
305                    .run_until_cancelled(checkpoint_stream.next())
306                    .await
307                    .flatten()
308                {
309                    let (checkpoint, size) = checkpoint_result?;
310                    self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
311                        .await?;
312                }
313            }
314            RemoteStore::HybridHistoricalStore { historical, live } => {
315                if let Some(Err(err)) = self
316                    .token
317                    .clone()
318                    .run_until_cancelled(self.fetch_historical(historical))
319                    .await
320                {
321                    if matches!(err, IngestionError::CheckpointNotAvailableYet) {
322                        let live = live.as_ref().ok_or(err)?;
323                        let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
324                            .map(|checkpoint_number| {
325                                fetch_from_object_store(live, checkpoint_number)
326                            })
327                            .pipe(futures::stream::iter)
328                            .buffered(batch_size);
329
330                        while let Some(checkpoint_result) = self
331                            .token
332                            .run_until_cancelled(checkpoint_stream.next())
333                            .await
334                            .flatten()
335                        {
336                            let (checkpoint, size) = checkpoint_result?;
337                            self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
338                                .await?;
339                        }
340                        return Ok(());
341                    }
342                    return Err(err);
343                }
344            }
345        };
346        Ok(())
347    }
348
349    /// Fetches and sends checkpoints to the channel with retry logic.
350    ///
351    /// Uses an exponential backoff strategy to retry failed requests.
352    async fn fetch_and_send_to_channel_with_retry(&mut self) {
353        let mut backoff = backoff::ExponentialBackoff::default();
354        backoff.max_elapsed_time = Some(Duration::from_secs(60));
355        backoff.initial_interval = Duration::from_millis(100);
356        backoff.current_interval = backoff.initial_interval;
357        backoff.multiplier = 1.0;
358
359        loop {
360            match self.fetch_and_send_to_channel().await {
361                Ok(_) => break,
362                Err(IngestionError::MaxCheckpointsCapacityReached) => break,
363                Err(IngestionError::CheckpointNotAvailableYet) => {
364                    break info!("historical reader does not have the requested checkpoint yet");
365                }
366                Err(err) => match backoff.next_backoff() {
367                    Some(duration) => {
368                        if !err.to_string().to_lowercase().contains("not found") {
369                            debug!(
370                                "remote reader retry in {} ms. Error is {err:?}",
371                                duration.as_millis(),
372                            );
373                        }
374                        if self
375                            .token
376                            .run_until_cancelled(tokio::time::sleep(duration))
377                            .await
378                            .is_none()
379                        {
380                            break;
381                        }
382                    }
383                    None => {
384                        break error!("remote reader transient error {err:?}");
385                    }
386                },
387            }
388        }
389    }
390
391    /// Attempts to send a checkpoint from remote source to the channel if
392    /// capacity allows.
393    ///
394    /// If the checkpoint's sequence number would exceed the allowed capacity,
395    /// returns `IngestionError::MaxCheckpointsCapacityReached` and does not
396    /// send. Otherwise, adds the checkpoint to the data limiter and sends
397    /// it to the channel.
398    async fn send_remote_checkpoint_with_capacity_check(
399        &mut self,
400        checkpoint: Arc<CheckpointData>,
401        size: usize,
402    ) -> IngestionResult<()> {
403        if self.exceeds_capacity(checkpoint.checkpoint_summary.sequence_number) {
404            return Err(IngestionError::MaxCheckpointsCapacityReached);
405        }
406        self.data_limiter.add(&checkpoint, size);
407        self.send_checkpoint_to_channel(checkpoint).await
408    }
409
410    /// Sends a batch of local checkpoints to the channel in order.
411    ///
412    /// Each checkpoint is sent sequentially until a gap is detected (i.e., a
413    /// checkpoint with a sequence number greater than the current
414    /// checkpoint number). If a gap is found, the function breaks early. If
415    /// sending fails, returns the error immediately.
416    async fn send_local_checkpoints_to_channel(
417        &mut self,
418        checkpoints: Vec<Arc<CheckpointData>>,
419    ) -> IngestionResult<()> {
420        for checkpoint in checkpoints {
421            if self.is_checkpoint_ahead(&checkpoint, self.current_checkpoint_number) {
422                break;
423            }
424            self.send_checkpoint_to_channel(checkpoint).await?;
425        }
426        Ok(())
427    }
428
429    /// Sends a single checkpoint to the channel and advances the current
430    /// checkpoint number.
431    ///
432    /// Asserts that the checkpoint's sequence number matches the expected
433    /// current number. Increments the current checkpoint number after
434    /// sending.
435    async fn send_checkpoint_to_channel(
436        &mut self,
437        checkpoint: Arc<CheckpointData>,
438    ) -> IngestionResult<()> {
439        assert_eq!(
440            checkpoint.checkpoint_summary.sequence_number,
441            self.current_checkpoint_number
442        );
443        self.checkpoint_tx.send(checkpoint).await.map_err(|_| {
444            IngestionError::Channel(
445                "unable to send checkpoint to executor, receiver half closed".to_owned(),
446            )
447        })?;
448        self.current_checkpoint_number += 1;
449        Ok(())
450    }
451
452    /// Sync from either local or remote source new checkpoints to be processed
453    /// by the executor.
454    async fn sync(&mut self) -> IngestionResult<()> {
455        let mut remote_source = ReadSource::Local;
456        let checkpoints = self.read_local_files_with_retry().await?;
457        let should_fetch_from_remote = self.should_fetch_from_remote(&checkpoints);
458
459        if should_fetch_from_remote {
460            remote_source = ReadSource::Remote;
461            self.fetch_and_send_to_channel_with_retry().await;
462        } else {
463            self.send_local_checkpoints_to_channel(checkpoints).await?;
464        }
465
466        info!(
467            "Read from {remote_source}. Current checkpoint number: {}, pruning watermark: {}",
468            self.current_checkpoint_number, self.last_pruned_watermark,
469        );
470
471        Ok(())
472    }
473
474    /// Run the main loop of the checkpoint reader actor.
475    async fn run(mut self) {
476        let (_inotify_tx, mut inotify_rx) = mpsc::channel::<()>(1);
477        std::fs::create_dir_all(self.path()).expect("failed to create a directory");
478
479        #[cfg(not(target_os = "macos"))]
480        let _watcher = init_watcher(_inotify_tx, self.path());
481
482        self.data_limiter.gc(self.last_pruned_watermark);
483        self.gc_processed_files(self.last_pruned_watermark)
484            .expect("failed to clean the directory");
485
486        loop {
487            tokio::select! {
488                _ = self.token.cancelled() => break,
489                Some(watermark) = self.gc_signal_rx.recv() => {
490                    self.data_limiter.gc(watermark);
491                    self.gc_processed_files(watermark).expect("failed to clean the directory");
492                }
493                Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.reader_options.tick_interval_ms), inotify_rx.recv())  => {
494                    self.sync().await.expect("failed to read checkpoint files");
495                }
496            }
497        }
498    }
499}
500
501/// Public API for interacting with the checkpoint reader actor.
502///
503/// It provides methods to receive streamed checkpoints, send garbage collection
504/// signals, and gracefully shut down the background checkpoint reading task.
505/// Internally, it communicates with a [`CheckpointReaderActor`], which manages
506/// the actual checkpoint fetching and streaming logic.
507pub(crate) struct CheckpointReader {
508    handle: JoinHandle<()>,
509    gc_signal_tx: mpsc::Sender<CheckpointSequenceNumber>,
510    checkpoint_rx: mpsc::Receiver<Arc<CheckpointData>>,
511    token: CancellationToken,
512}
513
514impl CheckpointReader {
515    pub(crate) async fn new(
516        starting_checkpoint_number: CheckpointSequenceNumber,
517        config: CheckpointReaderConfig,
518    ) -> IngestionResult<Self> {
519        let (checkpoint_tx, checkpoint_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
520        let (gc_signal_tx, gc_signal_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
521
522        let remote_store = if let Some(url) = config.remote_store_url {
523            Some(Arc::new(
524                RemoteStore::new(
525                    url,
526                    config.reader_options.batch_size,
527                    config.reader_options.timeout_secs,
528                )
529                .await?,
530            ))
531        } else {
532            None
533        };
534
535        let path = match config.ingestion_path {
536            Some(p) => p,
537            None => tempfile::tempdir()?.keep(),
538        };
539        let token = CancellationToken::new();
540        let reader = CheckpointReaderActor {
541            path,
542            current_checkpoint_number: starting_checkpoint_number,
543            last_pruned_watermark: starting_checkpoint_number,
544            checkpoint_tx,
545            gc_signal_rx,
546            remote_store,
547            token: token.clone(),
548            data_limiter: DataLimiter::new(config.reader_options.data_limit),
549            reader_options: config.reader_options,
550        };
551
552        let handle = spawn_monitored_task!(reader.run());
553
554        Ok(Self {
555            handle,
556            gc_signal_tx,
557            checkpoint_rx,
558            token,
559        })
560    }
561
562    /// Read downloaded checkpoints from the queue.
563    pub(crate) async fn checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
564        self.checkpoint_rx.recv().await
565    }
566
567    /// Sends a garbage collection (GC) signal to the checkpoint reader.
568    ///
569    /// Transmits a watermark to the checkpoint reader, indicating that all
570    /// checkpoints below this watermark can be safely pruned or cleaned up.
571    /// The signal is sent over an internal channel to the checkpoint reader
572    /// task.
573    pub(crate) async fn send_gc_signal(
574        &self,
575        watermark: CheckpointSequenceNumber,
576    ) -> IngestionResult<()> {
577        self.gc_signal_tx.send(watermark).await.map_err(|_| {
578            IngestionError::Channel(
579                "unable to send GC operation to checkpoint reader, receiver half closed".into(),
580            )
581        })
582    }
583
584    /// Gracefully shuts down the checkpoint reader task.
585    ///
586    /// It signals the background checkpoint reader actor to terminate, then
587    /// awaits its completion. Any in-progress checkpoint reading or streaming
588    /// operations will be stopped as part of the shutdown process.
589    pub(crate) async fn shutdown(self) -> IngestionResult<()> {
590        self.token.cancel();
591        self.handle.await.map_err(|err| IngestionError::Shutdown {
592            component: "checkpoint reader".into(),
593            msg: err.to_string(),
594        })
595    }
596}