Skip to main content

iota_data_ingestion_core/
executor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{pin::Pin, sync::Arc};
6
7use futures::Future;
8use iota_metrics::spawn_monitored_task;
9use iota_types::{
10    committee::EpochId, full_checkpoint_content::CheckpointData,
11    messages_checkpoint::CheckpointSequenceNumber,
12};
13use prometheus::Registry;
14use tokio::{sync::mpsc, task::JoinHandle};
15use tokio_util::sync::CancellationToken;
16use tracing::info;
17
18use crate::{
19    DataIngestionMetrics, IngestionError, IngestionResult, ReaderOptions, Worker,
20    config::CheckpointReaderConfigExt,
21    progress_store::{ExecutorProgress, ProgressStore, ProgressStoreWrapper, ShimProgressStore},
22    reader::v2::{CheckpointReader, CheckpointReaderConfig, RemoteUrl},
23    worker_pool::{WorkerPool, WorkerPoolStatus},
24};
25
26pub const MAX_CHECKPOINTS_IN_PROGRESS: usize = 10000;
27
28/// Callback function invoked for each incoming checkpoint to determine the
29/// shutdown action if it exceeds the ingestion limit.
30type ShutdownCallback = Box<dyn Fn(&CheckpointData) -> ShutdownAction + Send>;
31
32/// Determines the shutdown action when a checkpoint reaches the ingestion
33/// limit.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub enum ShutdownAction {
36    /// Include the current checkpoint in the ingestion process, then initiate
37    /// the graceful shutdown process.
38    IncludeAndShutdown,
39    /// Exclude the current checkpoint from ingestion and immediately initiate
40    /// the graceful shutdown process.
41    ExcludeAndShutdown,
42    /// Continue processing the current checkpoint without shutting down.
43    Continue,
44}
45
46/// Common policies for upper limit checkpoint ingestion by the framework.
47///
48/// Once the limit is reached, the framework will start the graceful
49/// shutdown process.
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
51#[non_exhaustive]
52pub enum IngestionLimit {
53    /// Last checkpoint sequence number to process.
54    ///
55    /// After processing this checkpoint the framework will start the graceful
56    /// shutdown process.
57    MaxCheckpoint(CheckpointSequenceNumber),
58    /// Last checkpoint to process based on the given epoch.
59    ///
60    /// After processing this checkpoint the framework will start the graceful
61    /// shutdown process.
62    EndOfEpoch(EpochId),
63}
64
65impl IngestionLimit {
66    /// Evaluates whether the given checkpoint triggers a shutdown action based
67    /// on the ingestion limit.
68    fn matches(&self, checkpoint: &CheckpointData) -> ShutdownAction {
69        match self {
70            IngestionLimit::MaxCheckpoint(max) => {
71                if &checkpoint.checkpoint_summary.sequence_number > max {
72                    return ShutdownAction::ExcludeAndShutdown;
73                }
74                ShutdownAction::Continue
75            }
76            IngestionLimit::EndOfEpoch(max) => {
77                if &checkpoint.checkpoint_summary.epoch > max {
78                    return ShutdownAction::ExcludeAndShutdown;
79                }
80                ShutdownAction::Continue
81            }
82        }
83    }
84}
85
86/// The Executor of the main ingestion pipeline process.
87///
88/// This struct orchestrates the execution of multiple worker pools, handling
89/// checkpoint distribution, progress tracking, and shutdown. It utilizes
90/// [`ProgressStore`] for persisting checkpoint progress and provides metrics
91/// for monitoring the indexing process.
92///
93/// # Example
94/// ```rust,no_run
95/// use async_trait::async_trait;
96/// use iota_data_ingestion_core::{
97///     DataIngestionMetrics, FileProgressStore, IndexerExecutor, IngestionError, ReaderOptions,
98///     Worker, WorkerPool,
99///     reader::v2::CheckpointReaderConfig
100/// };
101/// use iota_types::full_checkpoint_content::CheckpointData;
102/// use prometheus::Registry;
103/// use tokio_util::sync::CancellationToken;
104/// use std::{path::PathBuf, sync::Arc};
105///
106/// struct CustomWorker;
107///
108/// #[async_trait]
109/// impl Worker for CustomWorker {
110///     type Message = ();
111///     type Error = IngestionError;
112///
113///     async fn process_checkpoint(
114///         &self,
115///         checkpoint: Arc<CheckpointData>,
116///     ) -> Result<Self::Message, Self::Error> {
117///         // custom processing logic.
118///         println!(
119///             "Processing Local checkpoint: {}",
120///             checkpoint.checkpoint_summary.to_string()
121///         );
122///         Ok(())
123///     }
124/// }
125///
126/// #[tokio::main]
127/// async fn main() {
128///     let concurrency = 5;
129///     let progress_store = FileProgressStore::new("progress.json").await.unwrap();
130///     let mut executor = IndexerExecutor::new(
131///         progress_store,
132///         1, // number of registered WorkerPools.
133///         DataIngestionMetrics::new(&Registry::new()),
134///         CancellationToken::new(),
135///     );
136///     // register a worker pool with 5 workers to process checkpoints in parallel
137///     let worker_pool = WorkerPool::new(CustomWorker, "local_reader".to_string(), concurrency, Default::default());
138///     // register the worker pool to the executor.
139///     executor.register(worker_pool).await.unwrap();
140///     // run the ingestion pipeline.
141///     executor
142///         .run_with_config(
143///             CheckpointReaderConfig {
144///                 reader_options: ReaderOptions::default(),
145///                 ingestion_path: Some(PathBuf::from("./chk".to_string())), // path to a local directory where checkpoints are stored.
146///                 remote_store_url: None,
147///             }
148///         )
149///         .await
150///         .unwrap();
151/// }
152/// ```
153pub struct IndexerExecutor<P> {
154    pools: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
155    pool_senders: Vec<mpsc::Sender<Arc<CheckpointData>>>,
156    progress_store: ProgressStoreWrapper<P>,
157    pool_status_sender: mpsc::Sender<WorkerPoolStatus>,
158    pool_status_receiver: mpsc::Receiver<WorkerPoolStatus>,
159    metrics: DataIngestionMetrics,
160    token: CancellationToken,
161    shutdown_callback: Option<ShutdownCallback>,
162}
163
164impl<P: ProgressStore> IndexerExecutor<P> {
165    pub fn new(
166        progress_store: P,
167        number_of_jobs: usize,
168        metrics: DataIngestionMetrics,
169        token: CancellationToken,
170    ) -> Self {
171        let (pool_status_sender, pool_status_receiver) =
172            mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
173        Self {
174            pools: vec![],
175            pool_senders: vec![],
176            progress_store: ProgressStoreWrapper::new(progress_store),
177            pool_status_sender,
178            pool_status_receiver,
179            metrics,
180            token,
181            shutdown_callback: None,
182        }
183    }
184
185    /// Registers new worker pool in executor.
186    pub async fn register<W: Worker + 'static>(
187        &mut self,
188        pool: WorkerPool<W>,
189    ) -> IngestionResult<()> {
190        let checkpoint_number = self.progress_store.load(pool.task_name.clone()).await?;
191        let (sender, receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
192        self.pools.push(Box::pin(pool.run(
193            checkpoint_number,
194            receiver,
195            self.pool_status_sender.clone(),
196            self.token.child_token(),
197        )));
198        self.pool_senders.push(sender);
199        Ok(())
200    }
201
202    /// Registers a predicate callback that determines when the ingestion
203    /// process should stop.
204    ///
205    /// This function `f` will be called for every **incoming checkpoint**
206    /// before it’s sent to the worker pool.
207    ///
208    /// Based on the returned [`ShutdownAction`] the executor will evaluate
209    /// whether to continue or stop the ingestion process by initiating the
210    /// graceful shutdown process.
211    ///
212    /// Once a shutdown action is triggered, the executor will stop sending new
213    /// checkpoints and will wait for all previously sent checkpoints to be
214    /// processed by workers before initiating graceful shutdown process.
215    ///
216    /// Note:
217    ///
218    /// Calling this method after
219    /// [`with_ingestion_limit`](Self::with_ingestion_limit) replaces the
220    /// earlier predicate, and vice versa. They are not cumulative.
221    pub fn shutdown_when<F>(&mut self, f: F)
222    where
223        F: Fn(&CheckpointData) -> ShutdownAction + Send + 'static,
224    {
225        self.shutdown_callback = Some(Box::new(f));
226    }
227
228    /// Adds an upper‑limit policy that determines when the ingestion
229    /// process should stop.
230    ///
231    /// This is a convenience method, it internally uses
232    /// [`shutdown_when`](Self::shutdown_when) by registering a predicate
233    /// derived from the provided [`IngestionLimit`].
234    ///
235    /// Note:
236    ///
237    /// Calling this method after [`shutdown_when`](Self::shutdown_when)
238    /// replaces the earlier predicate, and vice versa. They are not cumulative.
239    pub fn with_ingestion_limit(&mut self, limit: IngestionLimit) {
240        self.shutdown_when(move |checkpoint| limit.matches(checkpoint));
241    }
242
243    pub async fn update_watermark(
244        &mut self,
245        task_name: String,
246        watermark: CheckpointSequenceNumber,
247    ) -> IngestionResult<()> {
248        self.progress_store.save(task_name, watermark).await
249    }
250
251    pub async fn read_watermark(
252        &mut self,
253        task_name: String,
254    ) -> IngestionResult<CheckpointSequenceNumber> {
255        self.progress_store.load(task_name).await
256    }
257
258    /// Main executor loop.
259    ///
260    /// # Error
261    ///
262    /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
263    /// registered.
264    pub async fn run_with_config(
265        mut self,
266        config: impl Into<CheckpointReaderConfigExt>,
267    ) -> IngestionResult<ExecutorProgress> {
268        let reader_checkpoint_number = self.progress_store.min_watermark()?;
269        let checkpoint_reader =
270            CheckpointReader::new(reader_checkpoint_number, config.into()).await?;
271
272        self.run_executor_loop(reader_checkpoint_number, checkpoint_reader)
273            .await
274    }
275
276    /// Common execution logic
277    async fn run_executor_loop(
278        &mut self,
279        mut reader_checkpoint_number: u64,
280        mut checkpoint_reader: CheckpointReader,
281    ) -> IngestionResult<ExecutorProgress> {
282        let worker_pools = std::mem::take(&mut self.pools)
283            .into_iter()
284            .map(|pool| spawn_monitored_task!(pool))
285            .collect::<Vec<JoinHandle<()>>>();
286
287        let mut worker_pools_shutdown_signals = vec![];
288        let mut checkpoint_limit_reached = None;
289
290        loop {
291            // the min watermark represents the lowest watermark that
292            // has been processed by any worker pool. This guarantees that
293            // all worker pools have processed the checkpoint before the
294            // shutdown process starts.
295            if checkpoint_limit_reached.is_some_and(|ch_seq_num| {
296                self.progress_store
297                    .min_watermark()
298                    .map(|watermark| watermark > ch_seq_num)
299                    .unwrap_or_default()
300            }) {
301                info!(
302                    "checkpoint upper limit reached: last checkpoint was processed, shutdown process started"
303                );
304                self.token.cancel();
305            }
306
307            tokio::select! {
308                Some(worker_pool_progress_msg) = self.pool_status_receiver.recv() => {
309                    match worker_pool_progress_msg {
310                        WorkerPoolStatus::Running((task_name, watermark)) => {
311                            self.progress_store.save(task_name.clone(), watermark).await
312                                .map_err(|err| IngestionError::ProgressStore(err.to_string()))?;
313                            let seq_number = self.progress_store.min_watermark()?;
314                            if seq_number > reader_checkpoint_number {
315                                checkpoint_reader.send_gc_signal(seq_number).await?;
316                                reader_checkpoint_number = seq_number;
317                            }
318                            self.metrics.data_ingestion_checkpoint
319                                .with_label_values(&[&task_name])
320                                .set(watermark as i64);
321                        }
322                        WorkerPoolStatus::Shutdown(worker_pool_name) => {
323                            worker_pools_shutdown_signals.push(worker_pool_name);
324                        }
325                    }
326                }
327                Some(checkpoint) = checkpoint_reader.checkpoint(), if !self.token.is_cancelled() => {
328                    // once upper limit reached skip sending new checkpoints to workers.
329                    if self.should_shutdown(&checkpoint, &mut checkpoint_limit_reached) {
330                        continue;
331                    }
332
333                    for sender in &self.pool_senders {
334                        sender.send(checkpoint.clone()).await.map_err(|_| {
335                            IngestionError::Channel(
336                                "unable to send new checkpoint to worker pool, receiver half closed".to_owned(),
337                            )
338                        })?;
339                    }
340                }
341            }
342
343            if worker_pools_shutdown_signals.len() == self.pool_senders.len() {
344                // Shutdown worker pools
345                for worker_pool in worker_pools {
346                    worker_pool.await.map_err(|err| IngestionError::Shutdown {
347                        component: "worker pool".into(),
348                        msg: err.to_string(),
349                    })?;
350                }
351                // Shutdown checkpoint reader
352                checkpoint_reader.shutdown().await?;
353                break;
354            }
355        }
356
357        Ok(self.progress_store.stats())
358    }
359
360    /// Check if the current ingestion limit has been reached.
361    ///
362    /// Returns `true` if the ingestion limit has been reached.
363    /// If no ingestion limit is present or it has not been reached yet, the
364    /// function returns `false`.
365    fn should_shutdown(
366        &mut self,
367        checkpoint: &CheckpointData,
368        checkpoint_limit_reached: &mut Option<CheckpointSequenceNumber>,
369    ) -> bool {
370        if checkpoint_limit_reached.is_some() {
371            return true;
372        }
373
374        let Some(shutdown_action) = self
375            .shutdown_callback
376            .as_ref()
377            .map(|matches| matches(checkpoint))
378        else {
379            return false;
380        };
381
382        match shutdown_action {
383            ShutdownAction::IncludeAndShutdown => {
384                checkpoint_limit_reached
385                    .get_or_insert(checkpoint.checkpoint_summary.sequence_number);
386                false
387            }
388            ShutdownAction::ExcludeAndShutdown => {
389                checkpoint_limit_reached.get_or_insert(
390                    checkpoint
391                        .checkpoint_summary
392                        .sequence_number
393                        .saturating_sub(1),
394                );
395                true
396            }
397            ShutdownAction::Continue => false,
398        }
399    }
400}
401
402/// Sets up a single workflow for data ingestion.
403///
404/// This function initializes an [`IndexerExecutor`] with a single worker pool,
405/// using a [`ShimProgressStore`] initialized with the provided
406/// `initial_checkpoint_number`. It then returns a future that runs the executor
407/// and a [`CancellationToken`] for graceful shutdown.
408///
409/// # Docs
410/// For more info please check the [custom indexer docs](https://docs.iota.org/developer/advanced/custom-indexer).
411///
412/// # Example
413/// ```rust,no_run
414/// use std::sync::Arc;
415///
416/// use async_trait::async_trait;
417/// use iota_data_ingestion_core::{
418///     IngestionError, Worker, reader::v2::RemoteUrl, setup_single_workflow,
419/// };
420/// use iota_types::full_checkpoint_content::CheckpointData;
421///
422/// struct CustomWorker;
423///
424/// #[async_trait]
425/// impl Worker for CustomWorker {
426///     type Message = ();
427///     type Error = IngestionError;
428///
429///     async fn process_checkpoint(
430///         &self,
431///         checkpoint: Arc<CheckpointData>,
432///     ) -> Result<Self::Message, Self::Error> {
433///         // custom processing logic.
434///         println!(
435///             "Processing checkpoint: {}",
436///             checkpoint.checkpoint_summary.to_string()
437///         );
438///         Ok(())
439///     }
440/// }
441///
442/// #[tokio::main]
443/// async fn main() {
444///     let (executor, _) = setup_single_workflow(
445///         CustomWorker,
446///         RemoteUrl::Fullnode("http://127.0.0.1:50051".into()), // fullnode gRPC API.
447///         0,                                                    // initial checkpoint number.
448///         5,                                                    // concurrency.
449///         None,                                                 // extra reader options.
450///     )
451///     .await
452///     .unwrap();
453///     executor.await.unwrap();
454/// }
455/// ```
456pub async fn setup_single_workflow<W: Worker + 'static>(
457    worker: W,
458    remote_store_url: RemoteUrl,
459    initial_checkpoint_number: CheckpointSequenceNumber,
460    concurrency: usize,
461    reader_options: Option<ReaderOptions>,
462) -> IngestionResult<(
463    impl Future<Output = IngestionResult<ExecutorProgress>>,
464    CancellationToken,
465)> {
466    let metrics = DataIngestionMetrics::new(&Registry::new());
467    let progress_store = ShimProgressStore(initial_checkpoint_number);
468    let token = CancellationToken::new();
469    let mut executor = IndexerExecutor::new(progress_store, 1, metrics, token.child_token());
470    let worker_pool = WorkerPool::new(
471        worker,
472        "workflow".to_string(),
473        concurrency,
474        Default::default(),
475    );
476    executor.register(worker_pool).await?;
477    Ok((
478        executor.run_with_config(CheckpointReaderConfig {
479            reader_options: reader_options.unwrap_or_default(),
480            ingestion_path: None,
481            remote_store_url: Some(remote_store_url),
482        }),
483        token,
484    ))
485}