iota_data_ingestion_core/
executor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{path::PathBuf, pin::Pin, sync::Arc};
6
7use futures::Future;
8use iota_metrics::spawn_monitored_task;
9use iota_rest_api::CheckpointData;
10use iota_types::{committee::EpochId, messages_checkpoint::CheckpointSequenceNumber};
11use prometheus::Registry;
12use tokio::{
13    sync::{mpsc, oneshot},
14    task::JoinHandle,
15};
16use tokio_util::sync::CancellationToken;
17use tracing::info;
18
19use crate::{
20    DataIngestionMetrics, IngestionError, IngestionResult, ReaderOptions, Worker,
21    progress_store::{ExecutorProgress, ProgressStore, ProgressStoreWrapper, ShimProgressStore},
22    reader::{
23        v1::CheckpointReader as CheckpointReaderV1,
24        v2::{CheckpointReader as CheckpointReaderV2, CheckpointReaderConfig},
25    },
26    worker_pool::{WorkerPool, WorkerPoolStatus},
27};
28
29pub const MAX_CHECKPOINTS_IN_PROGRESS: usize = 10000;
30
31/// Callback function invoked for each incoming checkpoint to determine the
32/// shutdown action if it exceeds the ingestion limit.
33type ShutdownCallback = Box<dyn Fn(&CheckpointData) -> ShutdownAction + Send>;
34
35/// Determines the shutdown action when a checkpoint reaches the ingestion
36/// limit.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
38pub enum ShutdownAction {
39    /// Include the current checkpoint in the ingestion process, then initiate
40    /// the graceful shutdown process.
41    IncludeAndShutdown,
42    /// Exclude the current checkpoint from ingestion and immediately initiate
43    /// the graceful shutdown process.
44    ExcludeAndShutdown,
45    /// Continue processing the current checkpoint without shutting down.
46    Continue,
47}
48
49/// Common policies for upper limit checkpoint ingestion by the framework.
50///
51/// Once the limit is reached, the framework will start the graceful
52/// shutdown process.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54#[non_exhaustive]
55pub enum IngestionLimit {
56    /// Last checkpoint sequence number to process.
57    ///
58    /// After processing this checkpoint the framework will start the graceful
59    /// shutdown process.
60    MaxCheckpoint(CheckpointSequenceNumber),
61    /// Last checkpoint to process based on the given epoch.
62    ///
63    /// After processing this checkpoint the framework will start the graceful
64    /// shutdown process.
65    EndOfEpoch(EpochId),
66}
67
68impl IngestionLimit {
69    /// Evaluates whether the given checkpoint triggers a shutdown action based
70    /// on the ingestion limit.
71    fn matches(&self, checkpoint: &CheckpointData) -> ShutdownAction {
72        match self {
73            IngestionLimit::MaxCheckpoint(max) => {
74                if &checkpoint.checkpoint_summary.sequence_number > max {
75                    return ShutdownAction::ExcludeAndShutdown;
76                }
77                ShutdownAction::Continue
78            }
79            IngestionLimit::EndOfEpoch(max) => {
80                if &checkpoint.checkpoint_summary.epoch > max {
81                    return ShutdownAction::ExcludeAndShutdown;
82                }
83                ShutdownAction::Continue
84            }
85        }
86    }
87}
88
89/// Represents a common interface for checkpoint readers.
90///
91/// It manages the old checkpoint reader implementation for backwards
92/// compatibility and the new one.
93enum CheckpointReader {
94    /// The old checkpoint reader implementation.
95    V1 {
96        checkpoint_recv: mpsc::Receiver<Arc<CheckpointData>>,
97        gc_sender: mpsc::Sender<CheckpointSequenceNumber>,
98        exit_sender: oneshot::Sender<()>,
99        handle: JoinHandle<IngestionResult<()>>,
100    },
101    /// The new checkpoint reader implementation.
102    V2(CheckpointReaderV2),
103}
104
105impl CheckpointReader {
106    /// Gets the next checkpoint from the reader.
107    async fn get_checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
108        match self {
109            Self::V1 {
110                checkpoint_recv, ..
111            } => checkpoint_recv.recv().await,
112            Self::V2(reader) => reader.checkpoint().await,
113        }
114    }
115
116    /// Sends a GC signal to the reader.
117    async fn send_gc_signal(
118        &mut self,
119        seq_number: CheckpointSequenceNumber,
120    ) -> IngestionResult<()> {
121        match self {
122            Self::V1 { gc_sender, .. } => gc_sender.send(seq_number).await.map_err(|_| {
123                IngestionError::Channel(
124                    "unable to send GC operation to checkpoint reader, receiver half closed".into(),
125                )
126            }),
127            Self::V2(reader) => reader.send_gc_signal(seq_number).await,
128        }
129    }
130
131    /// Shuts down the reader.
132    async fn shutdown(self) -> IngestionResult<()> {
133        match self {
134            Self::V1 {
135                exit_sender,
136                handle,
137                ..
138            } => {
139                _ = exit_sender.send(());
140                handle.await.map_err(|err| IngestionError::Shutdown {
141                    component: "checkpoint reader".into(),
142                    msg: err.to_string(),
143                })?
144            }
145            Self::V2(reader) => reader.shutdown().await,
146        }
147    }
148}
149
150/// The Executor of the main ingestion pipeline process.
151///
152/// This struct orchestrates the execution of multiple worker pools, handling
153/// checkpoint distribution, progress tracking, and shutdown. It utilizes
154/// [`ProgressStore`] for persisting checkpoint progress and provides metrics
155/// for monitoring the indexing process.
156///
157/// # Example
158/// ```rust,no_run
159/// use async_trait::async_trait;
160/// use iota_data_ingestion_core::{
161///     DataIngestionMetrics, FileProgressStore, IndexerExecutor, IngestionError, ReaderOptions,
162///     Worker, WorkerPool,
163/// };
164/// use iota_types::full_checkpoint_content::CheckpointData;
165/// use prometheus::Registry;
166/// use tokio_util::sync::CancellationToken;
167/// use std::{path::PathBuf, sync::Arc};
168///
169/// struct CustomWorker;
170///
171/// #[async_trait]
172/// impl Worker for CustomWorker {
173///     type Message = ();
174///     type Error = IngestionError;
175///
176///     async fn process_checkpoint(
177///         &self,
178///         checkpoint: Arc<CheckpointData>,
179///     ) -> Result<Self::Message, Self::Error> {
180///         // custom processing logic.
181///         println!(
182///             "Processing Local checkpoint: {}",
183///             checkpoint.checkpoint_summary.to_string()
184///         );
185///         Ok(())
186///     }
187/// }
188///
189/// #[tokio::main]
190/// async fn main() {
191///     let concurrency = 5;
192///     let progress_store = FileProgressStore::new("progress.json").await.unwrap();
193///     let mut executor = IndexerExecutor::new(
194///         progress_store,
195///         1, // number of registered WorkerPools.
196///         DataIngestionMetrics::new(&Registry::new()),
197///         CancellationToken::new(),
198///     );
199///     // register a worker pool with 5 workers to process checkpoints in parallel
200///     let worker_pool = WorkerPool::new(CustomWorker, "local_reader".to_string(), concurrency, Default::default());
201///     // register the worker pool to the executor.
202///     executor.register(worker_pool).await.unwrap();
203///     // run the ingestion pipeline.
204///     executor
205///         .run(
206///             PathBuf::from("./chk".to_string()), // path to a local directory where checkpoints are stored.
207///             None,
208///             vec![],                   // optional remote store access options.
209///             ReaderOptions::default(), // remote_read_batch_size.
210///         )
211///         .await
212///         .unwrap();
213/// }
214/// ```
215pub struct IndexerExecutor<P> {
216    pools: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
217    pool_senders: Vec<mpsc::Sender<Arc<CheckpointData>>>,
218    progress_store: ProgressStoreWrapper<P>,
219    pool_status_sender: mpsc::Sender<WorkerPoolStatus>,
220    pool_status_receiver: mpsc::Receiver<WorkerPoolStatus>,
221    metrics: DataIngestionMetrics,
222    token: CancellationToken,
223    shutdown_callback: Option<ShutdownCallback>,
224}
225
226impl<P: ProgressStore> IndexerExecutor<P> {
227    pub fn new(
228        progress_store: P,
229        number_of_jobs: usize,
230        metrics: DataIngestionMetrics,
231        token: CancellationToken,
232    ) -> Self {
233        let (pool_status_sender, pool_status_receiver) =
234            mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
235        Self {
236            pools: vec![],
237            pool_senders: vec![],
238            progress_store: ProgressStoreWrapper::new(progress_store),
239            pool_status_sender,
240            pool_status_receiver,
241            metrics,
242            token,
243            shutdown_callback: None,
244        }
245    }
246
247    /// Registers new worker pool in executor.
248    pub async fn register<W: Worker + 'static>(
249        &mut self,
250        pool: WorkerPool<W>,
251    ) -> IngestionResult<()> {
252        let checkpoint_number = self.progress_store.load(pool.task_name.clone()).await?;
253        let (sender, receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
254        self.pools.push(Box::pin(pool.run(
255            checkpoint_number,
256            receiver,
257            self.pool_status_sender.clone(),
258            self.token.child_token(),
259        )));
260        self.pool_senders.push(sender);
261        Ok(())
262    }
263
264    /// Registers a predicate callback that determines when the ingestion
265    /// process should stop.
266    ///
267    /// This function `f` will be called for every **incoming checkpoint**
268    /// before it’s sent to the worker pool.
269    ///
270    /// Based on the returned [`ShutdownAction`] the executor will evaluate
271    /// whether to continue or stop the ingestion process by initiating the
272    /// graceful shutdown process.
273    ///
274    /// Once a shutdown action is triggered, the executor will stop sending new
275    /// checkpoints and will wait for all previously sent checkpoints to be
276    /// processed by workers before initiating graceful shutdown process.
277    ///
278    /// Note:
279    ///
280    /// Calling this method after
281    /// [`with_ingestion_limit`](Self::with_ingestion_limit) replaces the
282    /// earlier predicate, and vice versa. They are not cumulative.
283    pub fn shutdown_when<F>(&mut self, f: F)
284    where
285        F: Fn(&CheckpointData) -> ShutdownAction + Send + 'static,
286    {
287        self.shutdown_callback = Some(Box::new(f));
288    }
289
290    /// Adds an upper‑limit policy that determines when the ingestion
291    /// process should stop.
292    ///
293    /// This is a convenience method, it internally uses
294    /// [`shutdown_when`](Self::shutdown_when) by registering a predicate
295    /// derived from the provided [`IngestionLimit`].
296    ///
297    /// Note:
298    ///
299    /// Calling this method after [`shutdown_when`](Self::shutdown_when)
300    /// replaces the earlier predicate, and vice versa. They are not cumulative.
301    pub fn with_ingestion_limit(&mut self, limit: IngestionLimit) {
302        self.shutdown_when(move |checkpoint| limit.matches(checkpoint));
303    }
304
305    pub async fn update_watermark(
306        &mut self,
307        task_name: String,
308        watermark: CheckpointSequenceNumber,
309    ) -> IngestionResult<()> {
310        self.progress_store.save(task_name, watermark).await
311    }
312    pub async fn read_watermark(
313        &mut self,
314        task_name: String,
315    ) -> IngestionResult<CheckpointSequenceNumber> {
316        self.progress_store.load(task_name).await
317    }
318
319    /// Main executor loop.
320    ///
321    /// # Error
322    ///
323    /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
324    /// registered.
325    pub async fn run(
326        mut self,
327        path: PathBuf,
328        remote_store_url: Option<String>,
329        remote_store_options: Vec<(String, String)>,
330        reader_options: ReaderOptions,
331    ) -> IngestionResult<ExecutorProgress> {
332        let reader_checkpoint_number = self.progress_store.min_watermark()?;
333        let (checkpoint_reader, checkpoint_recv, gc_sender, exit_sender) =
334            CheckpointReaderV1::initialize(
335                path,
336                reader_checkpoint_number,
337                remote_store_url,
338                remote_store_options,
339                reader_options,
340            );
341
342        let handle = spawn_monitored_task!(checkpoint_reader.run());
343
344        self.run_executor_loop(
345            reader_checkpoint_number,
346            CheckpointReader::V1 {
347                checkpoint_recv,
348                gc_sender,
349                exit_sender,
350                handle,
351            },
352        )
353        .await
354    }
355
356    /// Alternative main executor loop. Uses the new iteration of the
357    /// `CheckpointReader` supporting syncing checkppints from hybrid historical
358    /// store.
359    ///
360    /// # Error
361    ///
362    /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
363    /// registered.
364    pub async fn run_with_config(
365        mut self,
366        config: CheckpointReaderConfig,
367    ) -> IngestionResult<ExecutorProgress> {
368        let reader_checkpoint_number = self.progress_store.min_watermark()?;
369        let checkpoint_reader = CheckpointReaderV2::new(reader_checkpoint_number, config).await?;
370
371        self.run_executor_loop(
372            reader_checkpoint_number,
373            CheckpointReader::V2(checkpoint_reader),
374        )
375        .await
376    }
377
378    /// Common execution logic
379    async fn run_executor_loop(
380        &mut self,
381        mut reader_checkpoint_number: u64,
382        mut checkpoint_reader: CheckpointReader,
383    ) -> IngestionResult<ExecutorProgress> {
384        let worker_pools = std::mem::take(&mut self.pools)
385            .into_iter()
386            .map(|pool| spawn_monitored_task!(pool))
387            .collect::<Vec<JoinHandle<()>>>();
388
389        let mut worker_pools_shutdown_signals = vec![];
390        let mut checkpoint_limit_reached = None;
391
392        loop {
393            // the min watermark represents the lowest watermark that
394            // has been processed by any worker pool. This guarantees that
395            // all worker pools have processed the checkpoint before the
396            // shutdown process starts.
397            if checkpoint_limit_reached.is_some_and(|ch_seq_num| {
398                self.progress_store
399                    .min_watermark()
400                    .map(|watermark| watermark > ch_seq_num)
401                    .unwrap_or_default()
402            }) {
403                info!(
404                    "checkpoint upper limit reached: last checkpoint was processed, shutdown process started"
405                );
406                self.token.cancel();
407            }
408
409            tokio::select! {
410                Some(worker_pool_progress_msg) = self.pool_status_receiver.recv() => {
411                    match worker_pool_progress_msg {
412                        WorkerPoolStatus::Running((task_name, watermark)) => {
413                            self.progress_store.save(task_name.clone(), watermark).await
414                                .map_err(|err| IngestionError::ProgressStore(err.to_string()))?;
415                            let seq_number = self.progress_store.min_watermark()?;
416                            if seq_number > reader_checkpoint_number {
417                                checkpoint_reader.send_gc_signal(seq_number).await?;
418                                reader_checkpoint_number = seq_number;
419                            }
420                            self.metrics.data_ingestion_checkpoint
421                                .with_label_values(&[&task_name])
422                                .set(watermark as i64);
423                        }
424                        WorkerPoolStatus::Shutdown(worker_pool_name) => {
425                            worker_pools_shutdown_signals.push(worker_pool_name);
426                        }
427                    }
428                }
429                Some(checkpoint) = checkpoint_reader.get_checkpoint(), if !self.token.is_cancelled() => {
430                    // once upper limit reached skip sending new checkpoints to workers.
431                    if self.should_shutdown(&checkpoint, &mut checkpoint_limit_reached) {
432                        continue;
433                    }
434
435                    for sender in &self.pool_senders {
436                        sender.send(checkpoint.clone()).await.map_err(|_| {
437                            IngestionError::Channel(
438                                "unable to send new checkpoint to worker pool, receiver half closed".to_owned(),
439                            )
440                        })?;
441                    }
442                }
443            }
444
445            if worker_pools_shutdown_signals.len() == self.pool_senders.len() {
446                // Shutdown worker pools
447                for worker_pool in worker_pools {
448                    worker_pool.await.map_err(|err| IngestionError::Shutdown {
449                        component: "worker pool".into(),
450                        msg: err.to_string(),
451                    })?;
452                }
453                // Shutdown checkpoint reader
454                checkpoint_reader.shutdown().await?;
455                break;
456            }
457        }
458
459        Ok(self.progress_store.stats())
460    }
461
462    /// Check if the current ingestion limit has been reached.
463    ///
464    /// Returns `true` if the ingestion limit has been reached.
465    /// If no ingestion limit is present or it has not been reached yet, the
466    /// function returns `false`.
467    fn should_shutdown(
468        &mut self,
469        checkpoint: &CheckpointData,
470        checkpoint_limit_reached: &mut Option<CheckpointSequenceNumber>,
471    ) -> bool {
472        if checkpoint_limit_reached.is_some() {
473            return true;
474        }
475
476        let Some(shutdown_action) = self
477            .shutdown_callback
478            .as_ref()
479            .map(|matches| matches(checkpoint))
480        else {
481            return false;
482        };
483
484        match shutdown_action {
485            ShutdownAction::IncludeAndShutdown => {
486                checkpoint_limit_reached
487                    .get_or_insert(checkpoint.checkpoint_summary.sequence_number);
488                false
489            }
490            ShutdownAction::ExcludeAndShutdown => {
491                checkpoint_limit_reached.get_or_insert(
492                    checkpoint
493                        .checkpoint_summary
494                        .sequence_number
495                        .saturating_sub(1),
496                );
497                true
498            }
499            ShutdownAction::Continue => false,
500        }
501    }
502}
503
504/// Sets up a single workflow for data ingestion.
505///
506/// This function initializes an [`IndexerExecutor`] with a single worker pool,
507/// using a [`ShimProgressStore`] initialized with the provided
508/// `initial_checkpoint_number`. It then returns a future that runs the executor
509/// and a [`CancellationToken`] for graceful shutdown.
510///
511/// # Docs
512/// For more info please check the [custom indexer docs](https://docs.iota.org/developer/advanced/custom-indexer).
513///
514/// # Example
515/// ```rust,no_run
516/// use std::sync::Arc;
517///
518/// use async_trait::async_trait;
519/// use iota_data_ingestion_core::{IngestionError, Worker, setup_single_workflow};
520/// use iota_types::full_checkpoint_content::CheckpointData;
521///
522/// struct CustomWorker;
523///
524/// #[async_trait]
525/// impl Worker for CustomWorker {
526///     type Message = ();
527///     type Error = IngestionError;
528///
529///     async fn process_checkpoint(
530///         &self,
531///         checkpoint: Arc<CheckpointData>,
532///     ) -> Result<Self::Message, Self::Error> {
533///         // custom processing logic.
534///         println!(
535///             "Processing checkpoint: {}",
536///             checkpoint.checkpoint_summary.to_string()
537///         );
538///         Ok(())
539///     }
540/// }
541///
542/// #[tokio::main]
543/// async fn main() {
544///     let (executor, _) = setup_single_workflow(
545///         CustomWorker,
546///         "http://127.0.0.1:9000/api/v1".to_string(), // fullnode REST API
547///         0,                                          // initial checkpoint number.
548///         5,                                          // concurrency.
549///         None,                                       // extra reader options.
550///     )
551///     .await
552///     .unwrap();
553///     executor.await.unwrap();
554/// }
555/// ```
556pub async fn setup_single_workflow<W: Worker + 'static>(
557    worker: W,
558    remote_store_url: String,
559    initial_checkpoint_number: CheckpointSequenceNumber,
560    concurrency: usize,
561    reader_options: Option<ReaderOptions>,
562) -> IngestionResult<(
563    impl Future<Output = IngestionResult<ExecutorProgress>>,
564    CancellationToken,
565)> {
566    let metrics = DataIngestionMetrics::new(&Registry::new());
567    let progress_store = ShimProgressStore(initial_checkpoint_number);
568    let token = CancellationToken::new();
569    let mut executor = IndexerExecutor::new(progress_store, 1, metrics, token.child_token());
570    let worker_pool = WorkerPool::new(
571        worker,
572        "workflow".to_string(),
573        concurrency,
574        Default::default(),
575    );
576    executor.register(worker_pool).await?;
577    Ok((
578        executor.run(
579            tempfile::tempdir()?.keep(),
580            Some(remote_store_url),
581            vec![],
582            reader_options.unwrap_or_default(),
583        ),
584        token,
585    ))
586}