iota_data_ingestion_core/
executor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{path::PathBuf, pin::Pin, sync::Arc};
6
7use futures::Future;
8use iota_metrics::spawn_monitored_task;
9use iota_rest_api::CheckpointData;
10use iota_types::messages_checkpoint::CheckpointSequenceNumber;
11use prometheus::Registry;
12use tokio::{
13    sync::{mpsc, oneshot},
14    task::JoinHandle,
15};
16use tokio_util::sync::CancellationToken;
17
18use crate::{
19    DataIngestionMetrics, IngestionError, IngestionResult, ReaderOptions, Worker,
20    progress_store::{ExecutorProgress, ProgressStore, ProgressStoreWrapper, ShimProgressStore},
21    reader::CheckpointReader,
22    worker_pool::{WorkerPool, WorkerPoolStatus},
23};
24
25pub const MAX_CHECKPOINTS_IN_PROGRESS: usize = 10000;
26
27/// The Executor of the main ingestion pipeline process.
28///
29/// This struct orchestrates the execution of multiple worker pools, handling
30/// checkpoint distribution, progress tracking, and shutdown. It utilizes
31/// [`ProgressStore`] for persisting checkpoint progress and provides metrics
32/// for monitoring the indexing process.
33///
34/// # Example
35/// ```rust,no_run
36/// use async_trait::async_trait;
37/// use iota_data_ingestion_core::{
38///     DataIngestionMetrics, FileProgressStore, IndexerExecutor, IngestionError, ReaderOptions,
39///     Worker, WorkerPool,
40/// };
41/// use iota_types::full_checkpoint_content::CheckpointData;
42/// use prometheus::Registry;
43/// use tokio_util::sync::CancellationToken;
44/// use std::{path::PathBuf, sync::Arc};
45///
46/// struct CustomWorker;
47///
48/// #[async_trait]
49/// impl Worker for CustomWorker {
50///     type Message = ();
51///     type Error = IngestionError;
52///
53///     async fn process_checkpoint(
54///         &self,
55///         checkpoint: Arc<CheckpointData>,
56///     ) -> Result<Self::Message, Self::Error> {
57///         // custom processing logic.
58///         println!(
59///             "Processing Local checkpoint: {}",
60///             checkpoint.checkpoint_summary.to_string()
61///         );
62///         Ok(())
63///     }
64/// }
65///
66/// #[tokio::main]
67/// async fn main() {
68///     let concurrency = 5;
69///     let progress_store = FileProgressStore::new("progress.json").await.unwrap();
70///     let mut executor = IndexerExecutor::new(
71///         progress_store,
72///         1, // number of registered WorkerPools.
73///         DataIngestionMetrics::new(&Registry::new()),
74///         CancellationToken::new(),
75///     );
76///     // register a worker pool with 5 workers to process checkpoints in parallel
77///     let worker_pool = WorkerPool::new(CustomWorker, "local_reader".to_string(), concurrency, Default::default());
78///     // register the worker pool to the executor.
79///     executor.register(worker_pool).await.unwrap();
80///     // run the ingestion pipeline.
81///     executor
82///         .run(
83///             PathBuf::from("./chk".to_string()), // path to a local directory where checkpoints are stored.
84///             None,
85///             vec![],                   // optional remote store access options.
86///             ReaderOptions::default(), // remote_read_batch_size.
87///         )
88///         .await
89///         .unwrap();
90/// }
91/// ```
92pub struct IndexerExecutor<P> {
93    pools: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
94    pool_senders: Vec<mpsc::Sender<Arc<CheckpointData>>>,
95    progress_store: ProgressStoreWrapper<P>,
96    pool_status_sender: mpsc::Sender<WorkerPoolStatus>,
97    pool_status_receiver: mpsc::Receiver<WorkerPoolStatus>,
98    metrics: DataIngestionMetrics,
99    token: CancellationToken,
100}
101
102impl<P: ProgressStore> IndexerExecutor<P> {
103    pub fn new(
104        progress_store: P,
105        number_of_jobs: usize,
106        metrics: DataIngestionMetrics,
107        token: CancellationToken,
108    ) -> Self {
109        let (pool_status_sender, pool_status_receiver) =
110            mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
111        Self {
112            pools: vec![],
113            pool_senders: vec![],
114            progress_store: ProgressStoreWrapper::new(progress_store),
115            pool_status_sender,
116            pool_status_receiver,
117            metrics,
118            token,
119        }
120    }
121
122    /// Registers new worker pool in executor.
123    pub async fn register<W: Worker + 'static>(
124        &mut self,
125        pool: WorkerPool<W>,
126    ) -> IngestionResult<()> {
127        let checkpoint_number = self.progress_store.load(pool.task_name.clone()).await?;
128        let (sender, receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
129        self.pools.push(Box::pin(pool.run(
130            checkpoint_number,
131            receiver,
132            self.pool_status_sender.clone(),
133            self.token.child_token(),
134        )));
135        self.pool_senders.push(sender);
136        Ok(())
137    }
138
139    pub async fn update_watermark(
140        &mut self,
141        task_name: String,
142        watermark: CheckpointSequenceNumber,
143    ) -> IngestionResult<()> {
144        self.progress_store.save(task_name, watermark).await
145    }
146    pub async fn read_watermark(
147        &mut self,
148        task_name: String,
149    ) -> IngestionResult<CheckpointSequenceNumber> {
150        self.progress_store.load(task_name).await
151    }
152
153    /// Main executor loop.
154    ///
155    /// # Error
156    ///
157    /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
158    /// registered.
159    pub async fn run(
160        mut self,
161        path: PathBuf,
162        remote_store_url: Option<String>,
163        remote_store_options: Vec<(String, String)>,
164        reader_options: ReaderOptions,
165    ) -> IngestionResult<ExecutorProgress> {
166        let mut reader_checkpoint_number = self.progress_store.min_watermark()?;
167        let (checkpoint_reader, mut checkpoint_recv, gc_sender, exit_sender) =
168            CheckpointReader::initialize(
169                path,
170                reader_checkpoint_number,
171                remote_store_url,
172                remote_store_options,
173                reader_options,
174            );
175
176        let checkpoint_reader_handle = spawn_monitored_task!(checkpoint_reader.run());
177
178        let worker_pools = std::mem::take(&mut self.pools)
179            .into_iter()
180            .map(|pool| spawn_monitored_task!(pool))
181            .collect::<Vec<JoinHandle<()>>>();
182
183        let mut worker_pools_shutdown_signals = vec![];
184
185        loop {
186            tokio::select! {
187                Some(worker_pool_progress_msg) = self.pool_status_receiver.recv() => {
188                    match worker_pool_progress_msg {
189                        WorkerPoolStatus::Running((task_name, watermark)) => {
190                            self.progress_store.save(task_name.clone(), watermark).await.map_err(|err| IngestionError::ProgressStore(err.to_string()))?;
191                            let seq_number = self.progress_store.min_watermark()?;
192                            if seq_number > reader_checkpoint_number {
193                                gc_sender.send(seq_number).await.map_err(|_| {
194                                    IngestionError::Channel(
195                                        "unable to send GC operation to checkpoint reader, receiver half closed"
196                                            .to_owned(),
197                                    )
198                                })?;
199                                reader_checkpoint_number = seq_number;
200                            }
201                            self.metrics.data_ingestion_checkpoint.with_label_values(&[&task_name]).set(watermark as i64);
202                        }
203                        WorkerPoolStatus::Shutdown(worker_pool_name) => {
204                            // Track worker pools that have initiated shutdown.
205                            worker_pools_shutdown_signals.push(worker_pool_name);
206                        }
207                    }
208                }
209                // Only process new checkpoints while system is running (token not cancelled).
210                // The guard prevents accepting new work during shutdown while allowing existing work to complete for other branches.
211                Some(checkpoint) = checkpoint_recv.recv(), if !self.token.is_cancelled() => {
212                    for sender in &self.pool_senders {
213                        sender.send(checkpoint.clone()).await.map_err(|_| {
214                            IngestionError::Channel(
215                                "unable to send new checkpoint to worker pool, receiver half closed"
216                                    .to_owned(),
217                            )
218                        })?;
219                    }
220                }
221            }
222
223            // Once all workers pools have signaled completion, start the graceful shutdown
224            // process.
225            if worker_pools_shutdown_signals.len() == self.pool_senders.len() {
226                break components_graceful_shutdown(
227                    worker_pools,
228                    exit_sender,
229                    checkpoint_reader_handle,
230                )
231                .await?;
232            }
233        }
234
235        Ok(self.progress_store.stats())
236    }
237}
238
239/// Start the graceful shutdown of remaining components.
240///
241/// - Awaits all worker pool handles.
242/// - Send shutdown signal to checkpoint reader actor.
243/// - Await checkpoint reader handle.
244async fn components_graceful_shutdown(
245    worker_pools: Vec<JoinHandle<()>>,
246    exit_sender: oneshot::Sender<()>,
247    checkpoint_reader_handle: JoinHandle<IngestionResult<()>>,
248) -> IngestionResult<()> {
249    for worker_pool in worker_pools {
250        worker_pool.await.map_err(|err| IngestionError::Shutdown {
251            component: "Worker Pool".into(),
252            msg: err.to_string(),
253        })?;
254    }
255    _ = exit_sender.send(());
256    checkpoint_reader_handle
257        .await
258        .map_err(|err| IngestionError::Shutdown {
259            component: "Checkpoint Reader".into(),
260            msg: err.to_string(),
261        })??;
262    Ok(())
263}
264
265/// Sets up a single workflow for data ingestion.
266///
267/// This function initializes an [`IndexerExecutor`] with a single worker pool,
268/// using a [`ShimProgressStore`] initialized with the provided
269/// `initial_checkpoint_number`. It then returns a future that runs the executor
270/// and a [`CancellationToken`] for graceful shutdown.
271///
272/// # Docs
273/// For more info please check the [custom indexer docs](https://docs.iota.org/developer/advanced/custom-indexer).
274///
275/// # Example
276/// ```rust,no_run
277/// use std::sync::Arc;
278///
279/// use async_trait::async_trait;
280/// use iota_data_ingestion_core::{IngestionError, Worker, setup_single_workflow};
281/// use iota_types::full_checkpoint_content::CheckpointData;
282///
283/// struct CustomWorker;
284///
285/// #[async_trait]
286/// impl Worker for CustomWorker {
287///     type Message = ();
288///     type Error = IngestionError;
289///
290///     async fn process_checkpoint(
291///         &self,
292///         checkpoint: Arc<CheckpointData>,
293///     ) -> Result<Self::Message, Self::Error> {
294///         // custom processing logic.
295///         println!(
296///             "Processing checkpoint: {}",
297///             checkpoint.checkpoint_summary.to_string()
298///         );
299///         Ok(())
300///     }
301/// }
302///
303/// #[tokio::main]
304/// async fn main() {
305///     let (executor, _) = setup_single_workflow(
306///         CustomWorker,
307///         "http://127.0.0.1:9000/api/v1".to_string(), // fullnode REST API
308///         0,                                          // initial checkpoint number.
309///         5,                                          // concurrency.
310///         None,                                       // extra reader options.
311///     )
312///     .await
313///     .unwrap();
314///     executor.await.unwrap();
315/// }
316/// ```
317pub async fn setup_single_workflow<W: Worker + 'static>(
318    worker: W,
319    remote_store_url: String,
320    initial_checkpoint_number: CheckpointSequenceNumber,
321    concurrency: usize,
322    reader_options: Option<ReaderOptions>,
323) -> IngestionResult<(
324    impl Future<Output = IngestionResult<ExecutorProgress>>,
325    CancellationToken,
326)> {
327    let metrics = DataIngestionMetrics::new(&Registry::new());
328    let progress_store = ShimProgressStore(initial_checkpoint_number);
329    let token = CancellationToken::new();
330    let mut executor = IndexerExecutor::new(progress_store, 1, metrics, token.child_token());
331    let worker_pool = WorkerPool::new(
332        worker,
333        "workflow".to_string(),
334        concurrency,
335        Default::default(),
336    );
337    executor.register(worker_pool).await?;
338    Ok((
339        executor.run(
340            tempfile::tempdir()?.into_path(),
341            Some(remote_store_url),
342            vec![],
343            reader_options.unwrap_or_default(),
344        ),
345        token,
346    ))
347}