iota_data_ingestion_core/
executor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{path::PathBuf, pin::Pin, sync::Arc};
6
7use futures::Future;
8use iota_metrics::spawn_monitored_task;
9use iota_types::{
10    committee::EpochId, full_checkpoint_content::CheckpointData,
11    messages_checkpoint::CheckpointSequenceNumber,
12};
13use prometheus::Registry;
14use tokio::{
15    sync::{mpsc, oneshot},
16    task::JoinHandle,
17};
18use tokio_util::sync::CancellationToken;
19use tracing::info;
20
21use crate::{
22    DataIngestionMetrics, IngestionError, IngestionResult, ReaderOptions, Worker,
23    progress_store::{ExecutorProgress, ProgressStore, ProgressStoreWrapper, ShimProgressStore},
24    reader::{
25        v1::CheckpointReader as CheckpointReaderV1,
26        v2::{CheckpointReader as CheckpointReaderV2, CheckpointReaderConfig},
27    },
28    worker_pool::{WorkerPool, WorkerPoolStatus},
29};
30
31pub const MAX_CHECKPOINTS_IN_PROGRESS: usize = 10000;
32
33/// Callback function invoked for each incoming checkpoint to determine the
34/// shutdown action if it exceeds the ingestion limit.
35type ShutdownCallback = Box<dyn Fn(&CheckpointData) -> ShutdownAction + Send>;
36
37/// Determines the shutdown action when a checkpoint reaches the ingestion
38/// limit.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
40pub enum ShutdownAction {
41    /// Include the current checkpoint in the ingestion process, then initiate
42    /// the graceful shutdown process.
43    IncludeAndShutdown,
44    /// Exclude the current checkpoint from ingestion and immediately initiate
45    /// the graceful shutdown process.
46    ExcludeAndShutdown,
47    /// Continue processing the current checkpoint without shutting down.
48    Continue,
49}
50
51/// Common policies for upper limit checkpoint ingestion by the framework.
52///
53/// Once the limit is reached, the framework will start the graceful
54/// shutdown process.
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
56#[non_exhaustive]
57pub enum IngestionLimit {
58    /// Last checkpoint sequence number to process.
59    ///
60    /// After processing this checkpoint the framework will start the graceful
61    /// shutdown process.
62    MaxCheckpoint(CheckpointSequenceNumber),
63    /// Last checkpoint to process based on the given epoch.
64    ///
65    /// After processing this checkpoint the framework will start the graceful
66    /// shutdown process.
67    EndOfEpoch(EpochId),
68}
69
70impl IngestionLimit {
71    /// Evaluates whether the given checkpoint triggers a shutdown action based
72    /// on the ingestion limit.
73    fn matches(&self, checkpoint: &CheckpointData) -> ShutdownAction {
74        match self {
75            IngestionLimit::MaxCheckpoint(max) => {
76                if &checkpoint.checkpoint_summary.sequence_number > max {
77                    return ShutdownAction::ExcludeAndShutdown;
78                }
79                ShutdownAction::Continue
80            }
81            IngestionLimit::EndOfEpoch(max) => {
82                if &checkpoint.checkpoint_summary.epoch > max {
83                    return ShutdownAction::ExcludeAndShutdown;
84                }
85                ShutdownAction::Continue
86            }
87        }
88    }
89}
90
91/// Represents a common interface for checkpoint readers.
92///
93/// It manages the old checkpoint reader implementation for backwards
94/// compatibility and the new one.
95enum CheckpointReader {
96    /// The old checkpoint reader implementation.
97    V1 {
98        checkpoint_recv: mpsc::Receiver<Arc<CheckpointData>>,
99        gc_sender: mpsc::Sender<CheckpointSequenceNumber>,
100        exit_sender: oneshot::Sender<()>,
101        handle: JoinHandle<IngestionResult<()>>,
102    },
103    /// The new checkpoint reader implementation.
104    V2(CheckpointReaderV2),
105}
106
107impl CheckpointReader {
108    /// Gets the next checkpoint from the reader.
109    async fn get_checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
110        match self {
111            Self::V1 {
112                checkpoint_recv, ..
113            } => checkpoint_recv.recv().await,
114            Self::V2(reader) => reader.checkpoint().await,
115        }
116    }
117
118    /// Sends a GC signal to the reader.
119    async fn send_gc_signal(
120        &mut self,
121        seq_number: CheckpointSequenceNumber,
122    ) -> IngestionResult<()> {
123        match self {
124            Self::V1 { gc_sender, .. } => gc_sender.send(seq_number).await.map_err(|_| {
125                IngestionError::Channel(
126                    "unable to send GC operation to checkpoint reader, receiver half closed".into(),
127                )
128            }),
129            Self::V2(reader) => reader.send_gc_signal(seq_number).await,
130        }
131    }
132
133    /// Shuts down the reader.
134    async fn shutdown(self) -> IngestionResult<()> {
135        match self {
136            Self::V1 {
137                exit_sender,
138                handle,
139                ..
140            } => {
141                _ = exit_sender.send(());
142                handle.await.map_err(|err| IngestionError::Shutdown {
143                    component: "checkpoint reader".into(),
144                    msg: err.to_string(),
145                })?
146            }
147            Self::V2(reader) => reader.shutdown().await,
148        }
149    }
150}
151
152/// The Executor of the main ingestion pipeline process.
153///
154/// This struct orchestrates the execution of multiple worker pools, handling
155/// checkpoint distribution, progress tracking, and shutdown. It utilizes
156/// [`ProgressStore`] for persisting checkpoint progress and provides metrics
157/// for monitoring the indexing process.
158///
159/// # Example
160/// ```rust,no_run
161/// use async_trait::async_trait;
162/// use iota_data_ingestion_core::{
163///     DataIngestionMetrics, FileProgressStore, IndexerExecutor, IngestionError, ReaderOptions,
164///     Worker, WorkerPool,
165/// };
166/// use iota_types::full_checkpoint_content::CheckpointData;
167/// use prometheus::Registry;
168/// use tokio_util::sync::CancellationToken;
169/// use std::{path::PathBuf, sync::Arc};
170///
171/// struct CustomWorker;
172///
173/// #[async_trait]
174/// impl Worker for CustomWorker {
175///     type Message = ();
176///     type Error = IngestionError;
177///
178///     async fn process_checkpoint(
179///         &self,
180///         checkpoint: Arc<CheckpointData>,
181///     ) -> Result<Self::Message, Self::Error> {
182///         // custom processing logic.
183///         println!(
184///             "Processing Local checkpoint: {}",
185///             checkpoint.checkpoint_summary.to_string()
186///         );
187///         Ok(())
188///     }
189/// }
190///
191/// #[tokio::main]
192/// async fn main() {
193///     let concurrency = 5;
194///     let progress_store = FileProgressStore::new("progress.json").await.unwrap();
195///     let mut executor = IndexerExecutor::new(
196///         progress_store,
197///         1, // number of registered WorkerPools.
198///         DataIngestionMetrics::new(&Registry::new()),
199///         CancellationToken::new(),
200///     );
201///     // register a worker pool with 5 workers to process checkpoints in parallel
202///     let worker_pool = WorkerPool::new(CustomWorker, "local_reader".to_string(), concurrency, Default::default());
203///     // register the worker pool to the executor.
204///     executor.register(worker_pool).await.unwrap();
205///     // run the ingestion pipeline.
206///     executor
207///         .run(
208///             PathBuf::from("./chk".to_string()), // path to a local directory where checkpoints are stored.
209///             None,
210///             vec![],                   // optional remote store access options.
211///             ReaderOptions::default(), // remote_read_batch_size.
212///         )
213///         .await
214///         .unwrap();
215/// }
216/// ```
217pub struct IndexerExecutor<P> {
218    pools: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
219    pool_senders: Vec<mpsc::Sender<Arc<CheckpointData>>>,
220    progress_store: ProgressStoreWrapper<P>,
221    pool_status_sender: mpsc::Sender<WorkerPoolStatus>,
222    pool_status_receiver: mpsc::Receiver<WorkerPoolStatus>,
223    metrics: DataIngestionMetrics,
224    token: CancellationToken,
225    shutdown_callback: Option<ShutdownCallback>,
226}
227
228impl<P: ProgressStore> IndexerExecutor<P> {
229    pub fn new(
230        progress_store: P,
231        number_of_jobs: usize,
232        metrics: DataIngestionMetrics,
233        token: CancellationToken,
234    ) -> Self {
235        let (pool_status_sender, pool_status_receiver) =
236            mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
237        Self {
238            pools: vec![],
239            pool_senders: vec![],
240            progress_store: ProgressStoreWrapper::new(progress_store),
241            pool_status_sender,
242            pool_status_receiver,
243            metrics,
244            token,
245            shutdown_callback: None,
246        }
247    }
248
249    /// Registers new worker pool in executor.
250    pub async fn register<W: Worker + 'static>(
251        &mut self,
252        pool: WorkerPool<W>,
253    ) -> IngestionResult<()> {
254        let checkpoint_number = self.progress_store.load(pool.task_name.clone()).await?;
255        let (sender, receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
256        self.pools.push(Box::pin(pool.run(
257            checkpoint_number,
258            receiver,
259            self.pool_status_sender.clone(),
260            self.token.child_token(),
261        )));
262        self.pool_senders.push(sender);
263        Ok(())
264    }
265
266    /// Registers a predicate callback that determines when the ingestion
267    /// process should stop.
268    ///
269    /// This function `f` will be called for every **incoming checkpoint**
270    /// before it’s sent to the worker pool.
271    ///
272    /// Based on the returned [`ShutdownAction`] the executor will evaluate
273    /// whether to continue or stop the ingestion process by initiating the
274    /// graceful shutdown process.
275    ///
276    /// Once a shutdown action is triggered, the executor will stop sending new
277    /// checkpoints and will wait for all previously sent checkpoints to be
278    /// processed by workers before initiating graceful shutdown process.
279    ///
280    /// Note:
281    ///
282    /// Calling this method after
283    /// [`with_ingestion_limit`](Self::with_ingestion_limit) replaces the
284    /// earlier predicate, and vice versa. They are not cumulative.
285    pub fn shutdown_when<F>(&mut self, f: F)
286    where
287        F: Fn(&CheckpointData) -> ShutdownAction + Send + 'static,
288    {
289        self.shutdown_callback = Some(Box::new(f));
290    }
291
292    /// Adds an upper‑limit policy that determines when the ingestion
293    /// process should stop.
294    ///
295    /// This is a convenience method, it internally uses
296    /// [`shutdown_when`](Self::shutdown_when) by registering a predicate
297    /// derived from the provided [`IngestionLimit`].
298    ///
299    /// Note:
300    ///
301    /// Calling this method after [`shutdown_when`](Self::shutdown_when)
302    /// replaces the earlier predicate, and vice versa. They are not cumulative.
303    pub fn with_ingestion_limit(&mut self, limit: IngestionLimit) {
304        self.shutdown_when(move |checkpoint| limit.matches(checkpoint));
305    }
306
307    pub async fn update_watermark(
308        &mut self,
309        task_name: String,
310        watermark: CheckpointSequenceNumber,
311    ) -> IngestionResult<()> {
312        self.progress_store.save(task_name, watermark).await
313    }
314    pub async fn read_watermark(
315        &mut self,
316        task_name: String,
317    ) -> IngestionResult<CheckpointSequenceNumber> {
318        self.progress_store.load(task_name).await
319    }
320
321    /// Main executor loop.
322    ///
323    /// # Error
324    ///
325    /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
326    /// registered.
327    pub async fn run(
328        mut self,
329        path: PathBuf,
330        remote_store_url: Option<String>,
331        remote_store_options: Vec<(String, String)>,
332        reader_options: ReaderOptions,
333    ) -> IngestionResult<ExecutorProgress> {
334        let reader_checkpoint_number = self.progress_store.min_watermark()?;
335        let (checkpoint_reader, checkpoint_recv, gc_sender, exit_sender) =
336            CheckpointReaderV1::initialize(
337                path,
338                reader_checkpoint_number,
339                remote_store_url,
340                remote_store_options,
341                reader_options,
342            );
343
344        let handle = spawn_monitored_task!(checkpoint_reader.run());
345
346        self.run_executor_loop(
347            reader_checkpoint_number,
348            CheckpointReader::V1 {
349                checkpoint_recv,
350                gc_sender,
351                exit_sender,
352                handle,
353            },
354        )
355        .await
356    }
357
358    /// Alternative main executor loop. Uses the new iteration of the
359    /// `CheckpointReader` supporting syncing checkpoints from hybrid historical
360    /// store.
361    ///
362    /// # Error
363    ///
364    /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
365    /// registered.
366    pub async fn run_with_config(
367        mut self,
368        config: CheckpointReaderConfig,
369    ) -> IngestionResult<ExecutorProgress> {
370        let reader_checkpoint_number = self.progress_store.min_watermark()?;
371        let checkpoint_reader = CheckpointReaderV2::new(reader_checkpoint_number, config).await?;
372
373        self.run_executor_loop(
374            reader_checkpoint_number,
375            CheckpointReader::V2(checkpoint_reader),
376        )
377        .await
378    }
379
380    /// Common execution logic
381    async fn run_executor_loop(
382        &mut self,
383        mut reader_checkpoint_number: u64,
384        mut checkpoint_reader: CheckpointReader,
385    ) -> IngestionResult<ExecutorProgress> {
386        let worker_pools = std::mem::take(&mut self.pools)
387            .into_iter()
388            .map(|pool| spawn_monitored_task!(pool))
389            .collect::<Vec<JoinHandle<()>>>();
390
391        let mut worker_pools_shutdown_signals = vec![];
392        let mut checkpoint_limit_reached = None;
393
394        loop {
395            // the min watermark represents the lowest watermark that
396            // has been processed by any worker pool. This guarantees that
397            // all worker pools have processed the checkpoint before the
398            // shutdown process starts.
399            if checkpoint_limit_reached.is_some_and(|ch_seq_num| {
400                self.progress_store
401                    .min_watermark()
402                    .map(|watermark| watermark > ch_seq_num)
403                    .unwrap_or_default()
404            }) {
405                info!(
406                    "checkpoint upper limit reached: last checkpoint was processed, shutdown process started"
407                );
408                self.token.cancel();
409            }
410
411            tokio::select! {
412                Some(worker_pool_progress_msg) = self.pool_status_receiver.recv() => {
413                    match worker_pool_progress_msg {
414                        WorkerPoolStatus::Running((task_name, watermark)) => {
415                            self.progress_store.save(task_name.clone(), watermark).await
416                                .map_err(|err| IngestionError::ProgressStore(err.to_string()))?;
417                            let seq_number = self.progress_store.min_watermark()?;
418                            if seq_number > reader_checkpoint_number {
419                                checkpoint_reader.send_gc_signal(seq_number).await?;
420                                reader_checkpoint_number = seq_number;
421                            }
422                            self.metrics.data_ingestion_checkpoint
423                                .with_label_values(&[&task_name])
424                                .set(watermark as i64);
425                        }
426                        WorkerPoolStatus::Shutdown(worker_pool_name) => {
427                            worker_pools_shutdown_signals.push(worker_pool_name);
428                        }
429                    }
430                }
431                Some(checkpoint) = checkpoint_reader.get_checkpoint(), if !self.token.is_cancelled() => {
432                    // once upper limit reached skip sending new checkpoints to workers.
433                    if self.should_shutdown(&checkpoint, &mut checkpoint_limit_reached) {
434                        continue;
435                    }
436
437                    for sender in &self.pool_senders {
438                        sender.send(checkpoint.clone()).await.map_err(|_| {
439                            IngestionError::Channel(
440                                "unable to send new checkpoint to worker pool, receiver half closed".to_owned(),
441                            )
442                        })?;
443                    }
444                }
445            }
446
447            if worker_pools_shutdown_signals.len() == self.pool_senders.len() {
448                // Shutdown worker pools
449                for worker_pool in worker_pools {
450                    worker_pool.await.map_err(|err| IngestionError::Shutdown {
451                        component: "worker pool".into(),
452                        msg: err.to_string(),
453                    })?;
454                }
455                // Shutdown checkpoint reader
456                checkpoint_reader.shutdown().await?;
457                break;
458            }
459        }
460
461        Ok(self.progress_store.stats())
462    }
463
464    /// Check if the current ingestion limit has been reached.
465    ///
466    /// Returns `true` if the ingestion limit has been reached.
467    /// If no ingestion limit is present or it has not been reached yet, the
468    /// function returns `false`.
469    fn should_shutdown(
470        &mut self,
471        checkpoint: &CheckpointData,
472        checkpoint_limit_reached: &mut Option<CheckpointSequenceNumber>,
473    ) -> bool {
474        if checkpoint_limit_reached.is_some() {
475            return true;
476        }
477
478        let Some(shutdown_action) = self
479            .shutdown_callback
480            .as_ref()
481            .map(|matches| matches(checkpoint))
482        else {
483            return false;
484        };
485
486        match shutdown_action {
487            ShutdownAction::IncludeAndShutdown => {
488                checkpoint_limit_reached
489                    .get_or_insert(checkpoint.checkpoint_summary.sequence_number);
490                false
491            }
492            ShutdownAction::ExcludeAndShutdown => {
493                checkpoint_limit_reached.get_or_insert(
494                    checkpoint
495                        .checkpoint_summary
496                        .sequence_number
497                        .saturating_sub(1),
498                );
499                true
500            }
501            ShutdownAction::Continue => false,
502        }
503    }
504}
505
506/// Sets up a single workflow for data ingestion.
507///
508/// This function initializes an [`IndexerExecutor`] with a single worker pool,
509/// using a [`ShimProgressStore`] initialized with the provided
510/// `initial_checkpoint_number`. It then returns a future that runs the executor
511/// and a [`CancellationToken`] for graceful shutdown.
512///
513/// # Docs
514/// For more info please check the [custom indexer docs](https://docs.iota.org/developer/advanced/custom-indexer).
515///
516/// # Example
517/// ```rust,no_run
518/// use std::sync::Arc;
519///
520/// use async_trait::async_trait;
521/// use iota_data_ingestion_core::{IngestionError, Worker, setup_single_workflow};
522/// use iota_types::full_checkpoint_content::CheckpointData;
523///
524/// struct CustomWorker;
525///
526/// #[async_trait]
527/// impl Worker for CustomWorker {
528///     type Message = ();
529///     type Error = IngestionError;
530///
531///     async fn process_checkpoint(
532///         &self,
533///         checkpoint: Arc<CheckpointData>,
534///     ) -> Result<Self::Message, Self::Error> {
535///         // custom processing logic.
536///         println!(
537///             "Processing checkpoint: {}",
538///             checkpoint.checkpoint_summary.to_string()
539///         );
540///         Ok(())
541///     }
542/// }
543///
544/// #[tokio::main]
545/// async fn main() {
546///     let (executor, _) = setup_single_workflow(
547///         CustomWorker,
548///         "http://127.0.0.1:50051".to_string(), // fullnode gRPC API
549///         0,                                    // initial checkpoint number.
550///         5,                                    // concurrency.
551///         None,                                 // extra reader options.
552///     )
553///     .await
554///     .unwrap();
555///     executor.await.unwrap();
556/// }
557/// ```
558pub async fn setup_single_workflow<W: Worker + 'static>(
559    worker: W,
560    remote_store_url: String,
561    initial_checkpoint_number: CheckpointSequenceNumber,
562    concurrency: usize,
563    reader_options: Option<ReaderOptions>,
564) -> IngestionResult<(
565    impl Future<Output = IngestionResult<ExecutorProgress>>,
566    CancellationToken,
567)> {
568    let metrics = DataIngestionMetrics::new(&Registry::new());
569    let progress_store = ShimProgressStore(initial_checkpoint_number);
570    let token = CancellationToken::new();
571    let mut executor = IndexerExecutor::new(progress_store, 1, metrics, token.child_token());
572    let worker_pool = WorkerPool::new(
573        worker,
574        "workflow".to_string(),
575        concurrency,
576        Default::default(),
577    );
578    executor.register(worker_pool).await?;
579    Ok((
580        executor.run(
581            tempfile::tempdir()?.keep(),
582            Some(remote_store_url),
583            vec![],
584            reader_options.unwrap_or_default(),
585        ),
586        token,
587    ))
588}