iota_data_ingestion_core/
executor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{pin::Pin, sync::Arc};
6
7use futures::Future;
8use iota_metrics::spawn_monitored_task;
9use iota_types::{
10    committee::EpochId, full_checkpoint_content::CheckpointData,
11    messages_checkpoint::CheckpointSequenceNumber,
12};
13use prometheus::Registry;
14use tokio::{sync::mpsc, task::JoinHandle};
15use tokio_util::sync::CancellationToken;
16use tracing::info;
17
18use crate::{
19    DataIngestionMetrics, IngestionError, IngestionResult, ReaderOptions, Worker,
20    progress_store::{ExecutorProgress, ProgressStore, ProgressStoreWrapper, ShimProgressStore},
21    reader::v2::{CheckpointReader, CheckpointReaderConfig, RemoteUrl},
22    worker_pool::{WorkerPool, WorkerPoolStatus},
23};
24
25pub const MAX_CHECKPOINTS_IN_PROGRESS: usize = 10000;
26
27/// Callback function invoked for each incoming checkpoint to determine the
28/// shutdown action if it exceeds the ingestion limit.
29type ShutdownCallback = Box<dyn Fn(&CheckpointData) -> ShutdownAction + Send>;
30
31/// Determines the shutdown action when a checkpoint reaches the ingestion
32/// limit.
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum ShutdownAction {
35    /// Include the current checkpoint in the ingestion process, then initiate
36    /// the graceful shutdown process.
37    IncludeAndShutdown,
38    /// Exclude the current checkpoint from ingestion and immediately initiate
39    /// the graceful shutdown process.
40    ExcludeAndShutdown,
41    /// Continue processing the current checkpoint without shutting down.
42    Continue,
43}
44
45/// Common policies for upper limit checkpoint ingestion by the framework.
46///
47/// Once the limit is reached, the framework will start the graceful
48/// shutdown process.
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
50#[non_exhaustive]
51pub enum IngestionLimit {
52    /// Last checkpoint sequence number to process.
53    ///
54    /// After processing this checkpoint the framework will start the graceful
55    /// shutdown process.
56    MaxCheckpoint(CheckpointSequenceNumber),
57    /// Last checkpoint to process based on the given epoch.
58    ///
59    /// After processing this checkpoint the framework will start the graceful
60    /// shutdown process.
61    EndOfEpoch(EpochId),
62}
63
64impl IngestionLimit {
65    /// Evaluates whether the given checkpoint triggers a shutdown action based
66    /// on the ingestion limit.
67    fn matches(&self, checkpoint: &CheckpointData) -> ShutdownAction {
68        match self {
69            IngestionLimit::MaxCheckpoint(max) => {
70                if &checkpoint.checkpoint_summary.sequence_number > max {
71                    return ShutdownAction::ExcludeAndShutdown;
72                }
73                ShutdownAction::Continue
74            }
75            IngestionLimit::EndOfEpoch(max) => {
76                if &checkpoint.checkpoint_summary.epoch > max {
77                    return ShutdownAction::ExcludeAndShutdown;
78                }
79                ShutdownAction::Continue
80            }
81        }
82    }
83}
84
85/// The Executor of the main ingestion pipeline process.
86///
87/// This struct orchestrates the execution of multiple worker pools, handling
88/// checkpoint distribution, progress tracking, and shutdown. It utilizes
89/// [`ProgressStore`] for persisting checkpoint progress and provides metrics
90/// for monitoring the indexing process.
91///
92/// # Example
93/// ```rust,no_run
94/// use async_trait::async_trait;
95/// use iota_data_ingestion_core::{
96///     DataIngestionMetrics, FileProgressStore, IndexerExecutor, IngestionError, ReaderOptions,
97///     Worker, WorkerPool,
98///     reader::v2::CheckpointReaderConfig
99/// };
100/// use iota_types::full_checkpoint_content::CheckpointData;
101/// use prometheus::Registry;
102/// use tokio_util::sync::CancellationToken;
103/// use std::{path::PathBuf, sync::Arc};
104///
105/// struct CustomWorker;
106///
107/// #[async_trait]
108/// impl Worker for CustomWorker {
109///     type Message = ();
110///     type Error = IngestionError;
111///
112///     async fn process_checkpoint(
113///         &self,
114///         checkpoint: Arc<CheckpointData>,
115///     ) -> Result<Self::Message, Self::Error> {
116///         // custom processing logic.
117///         println!(
118///             "Processing Local checkpoint: {}",
119///             checkpoint.checkpoint_summary.to_string()
120///         );
121///         Ok(())
122///     }
123/// }
124///
125/// #[tokio::main]
126/// async fn main() {
127///     let concurrency = 5;
128///     let progress_store = FileProgressStore::new("progress.json").await.unwrap();
129///     let mut executor = IndexerExecutor::new(
130///         progress_store,
131///         1, // number of registered WorkerPools.
132///         DataIngestionMetrics::new(&Registry::new()),
133///         CancellationToken::new(),
134///     );
135///     // register a worker pool with 5 workers to process checkpoints in parallel
136///     let worker_pool = WorkerPool::new(CustomWorker, "local_reader".to_string(), concurrency, Default::default());
137///     // register the worker pool to the executor.
138///     executor.register(worker_pool).await.unwrap();
139///     // run the ingestion pipeline.
140///     executor
141///         .run_with_config(
142///             CheckpointReaderConfig {
143///                 reader_options: ReaderOptions::default(),
144///                 ingestion_path: Some(PathBuf::from("./chk".to_string())), // path to a local directory where checkpoints are stored.
145///                 remote_store_url: None,
146///             }
147///         )
148///         .await
149///         .unwrap();
150/// }
151/// ```
152pub struct IndexerExecutor<P> {
153    pools: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
154    pool_senders: Vec<mpsc::Sender<Arc<CheckpointData>>>,
155    progress_store: ProgressStoreWrapper<P>,
156    pool_status_sender: mpsc::Sender<WorkerPoolStatus>,
157    pool_status_receiver: mpsc::Receiver<WorkerPoolStatus>,
158    metrics: DataIngestionMetrics,
159    token: CancellationToken,
160    shutdown_callback: Option<ShutdownCallback>,
161}
162
163impl<P: ProgressStore> IndexerExecutor<P> {
164    pub fn new(
165        progress_store: P,
166        number_of_jobs: usize,
167        metrics: DataIngestionMetrics,
168        token: CancellationToken,
169    ) -> Self {
170        let (pool_status_sender, pool_status_receiver) =
171            mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
172        Self {
173            pools: vec![],
174            pool_senders: vec![],
175            progress_store: ProgressStoreWrapper::new(progress_store),
176            pool_status_sender,
177            pool_status_receiver,
178            metrics,
179            token,
180            shutdown_callback: None,
181        }
182    }
183
184    /// Registers new worker pool in executor.
185    pub async fn register<W: Worker + 'static>(
186        &mut self,
187        pool: WorkerPool<W>,
188    ) -> IngestionResult<()> {
189        let checkpoint_number = self.progress_store.load(pool.task_name.clone()).await?;
190        let (sender, receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
191        self.pools.push(Box::pin(pool.run(
192            checkpoint_number,
193            receiver,
194            self.pool_status_sender.clone(),
195            self.token.child_token(),
196        )));
197        self.pool_senders.push(sender);
198        Ok(())
199    }
200
201    /// Registers a predicate callback that determines when the ingestion
202    /// process should stop.
203    ///
204    /// This function `f` will be called for every **incoming checkpoint**
205    /// before it’s sent to the worker pool.
206    ///
207    /// Based on the returned [`ShutdownAction`] the executor will evaluate
208    /// whether to continue or stop the ingestion process by initiating the
209    /// graceful shutdown process.
210    ///
211    /// Once a shutdown action is triggered, the executor will stop sending new
212    /// checkpoints and will wait for all previously sent checkpoints to be
213    /// processed by workers before initiating graceful shutdown process.
214    ///
215    /// Note:
216    ///
217    /// Calling this method after
218    /// [`with_ingestion_limit`](Self::with_ingestion_limit) replaces the
219    /// earlier predicate, and vice versa. They are not cumulative.
220    pub fn shutdown_when<F>(&mut self, f: F)
221    where
222        F: Fn(&CheckpointData) -> ShutdownAction + Send + 'static,
223    {
224        self.shutdown_callback = Some(Box::new(f));
225    }
226
227    /// Adds an upper‑limit policy that determines when the ingestion
228    /// process should stop.
229    ///
230    /// This is a convenience method, it internally uses
231    /// [`shutdown_when`](Self::shutdown_when) by registering a predicate
232    /// derived from the provided [`IngestionLimit`].
233    ///
234    /// Note:
235    ///
236    /// Calling this method after [`shutdown_when`](Self::shutdown_when)
237    /// replaces the earlier predicate, and vice versa. They are not cumulative.
238    pub fn with_ingestion_limit(&mut self, limit: IngestionLimit) {
239        self.shutdown_when(move |checkpoint| limit.matches(checkpoint));
240    }
241
242    pub async fn update_watermark(
243        &mut self,
244        task_name: String,
245        watermark: CheckpointSequenceNumber,
246    ) -> IngestionResult<()> {
247        self.progress_store.save(task_name, watermark).await
248    }
249    pub async fn read_watermark(
250        &mut self,
251        task_name: String,
252    ) -> IngestionResult<CheckpointSequenceNumber> {
253        self.progress_store.load(task_name).await
254    }
255
256    /// Main executor loop.
257    ///
258    /// # Error
259    ///
260    /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
261    /// registered.
262    pub async fn run_with_config(
263        mut self,
264        config: CheckpointReaderConfig,
265    ) -> IngestionResult<ExecutorProgress> {
266        let reader_checkpoint_number = self.progress_store.min_watermark()?;
267        let checkpoint_reader = CheckpointReader::new(reader_checkpoint_number, config).await?;
268
269        self.run_executor_loop(reader_checkpoint_number, checkpoint_reader)
270            .await
271    }
272
273    /// Common execution logic
274    async fn run_executor_loop(
275        &mut self,
276        mut reader_checkpoint_number: u64,
277        mut checkpoint_reader: CheckpointReader,
278    ) -> IngestionResult<ExecutorProgress> {
279        let worker_pools = std::mem::take(&mut self.pools)
280            .into_iter()
281            .map(|pool| spawn_monitored_task!(pool))
282            .collect::<Vec<JoinHandle<()>>>();
283
284        let mut worker_pools_shutdown_signals = vec![];
285        let mut checkpoint_limit_reached = None;
286
287        loop {
288            // the min watermark represents the lowest watermark that
289            // has been processed by any worker pool. This guarantees that
290            // all worker pools have processed the checkpoint before the
291            // shutdown process starts.
292            if checkpoint_limit_reached.is_some_and(|ch_seq_num| {
293                self.progress_store
294                    .min_watermark()
295                    .map(|watermark| watermark > ch_seq_num)
296                    .unwrap_or_default()
297            }) {
298                info!(
299                    "checkpoint upper limit reached: last checkpoint was processed, shutdown process started"
300                );
301                self.token.cancel();
302            }
303
304            tokio::select! {
305                Some(worker_pool_progress_msg) = self.pool_status_receiver.recv() => {
306                    match worker_pool_progress_msg {
307                        WorkerPoolStatus::Running((task_name, watermark)) => {
308                            self.progress_store.save(task_name.clone(), watermark).await
309                                .map_err(|err| IngestionError::ProgressStore(err.to_string()))?;
310                            let seq_number = self.progress_store.min_watermark()?;
311                            if seq_number > reader_checkpoint_number {
312                                checkpoint_reader.send_gc_signal(seq_number).await?;
313                                reader_checkpoint_number = seq_number;
314                            }
315                            self.metrics.data_ingestion_checkpoint
316                                .with_label_values(&[&task_name])
317                                .set(watermark as i64);
318                        }
319                        WorkerPoolStatus::Shutdown(worker_pool_name) => {
320                            worker_pools_shutdown_signals.push(worker_pool_name);
321                        }
322                    }
323                }
324                Some(checkpoint) = checkpoint_reader.checkpoint(), if !self.token.is_cancelled() => {
325                    // once upper limit reached skip sending new checkpoints to workers.
326                    if self.should_shutdown(&checkpoint, &mut checkpoint_limit_reached) {
327                        continue;
328                    }
329
330                    for sender in &self.pool_senders {
331                        sender.send(checkpoint.clone()).await.map_err(|_| {
332                            IngestionError::Channel(
333                                "unable to send new checkpoint to worker pool, receiver half closed".to_owned(),
334                            )
335                        })?;
336                    }
337                }
338            }
339
340            if worker_pools_shutdown_signals.len() == self.pool_senders.len() {
341                // Shutdown worker pools
342                for worker_pool in worker_pools {
343                    worker_pool.await.map_err(|err| IngestionError::Shutdown {
344                        component: "worker pool".into(),
345                        msg: err.to_string(),
346                    })?;
347                }
348                // Shutdown checkpoint reader
349                checkpoint_reader.shutdown().await?;
350                break;
351            }
352        }
353
354        Ok(self.progress_store.stats())
355    }
356
357    /// Check if the current ingestion limit has been reached.
358    ///
359    /// Returns `true` if the ingestion limit has been reached.
360    /// If no ingestion limit is present or it has not been reached yet, the
361    /// function returns `false`.
362    fn should_shutdown(
363        &mut self,
364        checkpoint: &CheckpointData,
365        checkpoint_limit_reached: &mut Option<CheckpointSequenceNumber>,
366    ) -> bool {
367        if checkpoint_limit_reached.is_some() {
368            return true;
369        }
370
371        let Some(shutdown_action) = self
372            .shutdown_callback
373            .as_ref()
374            .map(|matches| matches(checkpoint))
375        else {
376            return false;
377        };
378
379        match shutdown_action {
380            ShutdownAction::IncludeAndShutdown => {
381                checkpoint_limit_reached
382                    .get_or_insert(checkpoint.checkpoint_summary.sequence_number);
383                false
384            }
385            ShutdownAction::ExcludeAndShutdown => {
386                checkpoint_limit_reached.get_or_insert(
387                    checkpoint
388                        .checkpoint_summary
389                        .sequence_number
390                        .saturating_sub(1),
391                );
392                true
393            }
394            ShutdownAction::Continue => false,
395        }
396    }
397}
398
399/// Sets up a single workflow for data ingestion.
400///
401/// This function initializes an [`IndexerExecutor`] with a single worker pool,
402/// using a [`ShimProgressStore`] initialized with the provided
403/// `initial_checkpoint_number`. It then returns a future that runs the executor
404/// and a [`CancellationToken`] for graceful shutdown.
405///
406/// # Docs
407/// For more info please check the [custom indexer docs](https://docs.iota.org/developer/advanced/custom-indexer).
408///
409/// # Example
410/// ```rust,no_run
411/// use std::sync::Arc;
412///
413/// use async_trait::async_trait;
414/// use iota_data_ingestion_core::{
415///     IngestionError, Worker, reader::v2::RemoteUrl, setup_single_workflow,
416/// };
417/// use iota_types::full_checkpoint_content::CheckpointData;
418///
419/// struct CustomWorker;
420///
421/// #[async_trait]
422/// impl Worker for CustomWorker {
423///     type Message = ();
424///     type Error = IngestionError;
425///
426///     async fn process_checkpoint(
427///         &self,
428///         checkpoint: Arc<CheckpointData>,
429///     ) -> Result<Self::Message, Self::Error> {
430///         // custom processing logic.
431///         println!(
432///             "Processing checkpoint: {}",
433///             checkpoint.checkpoint_summary.to_string()
434///         );
435///         Ok(())
436///     }
437/// }
438///
439/// #[tokio::main]
440/// async fn main() {
441///     let (executor, _) = setup_single_workflow(
442///         CustomWorker,
443///         RemoteUrl::Fullnode("http://127.0.0.1:50051".into()), // fullnode gRPC API.
444///         0,                                                    // initial checkpoint number.
445///         5,                                                    // concurrency.
446///         None,                                                 // extra reader options.
447///     )
448///     .await
449///     .unwrap();
450///     executor.await.unwrap();
451/// }
452/// ```
453pub async fn setup_single_workflow<W: Worker + 'static>(
454    worker: W,
455    remote_store_url: RemoteUrl,
456    initial_checkpoint_number: CheckpointSequenceNumber,
457    concurrency: usize,
458    reader_options: Option<ReaderOptions>,
459) -> IngestionResult<(
460    impl Future<Output = IngestionResult<ExecutorProgress>>,
461    CancellationToken,
462)> {
463    let metrics = DataIngestionMetrics::new(&Registry::new());
464    let progress_store = ShimProgressStore(initial_checkpoint_number);
465    let token = CancellationToken::new();
466    let mut executor = IndexerExecutor::new(progress_store, 1, metrics, token.child_token());
467    let worker_pool = WorkerPool::new(
468        worker,
469        "workflow".to_string(),
470        concurrency,
471        Default::default(),
472    );
473    executor.register(worker_pool).await?;
474    Ok((
475        executor.run_with_config(CheckpointReaderConfig {
476            reader_options: reader_options.unwrap_or_default(),
477            ingestion_path: None,
478            remote_store_url: Some(remote_store_url),
479        }),
480        token,
481    ))
482}