iota_data_ingestion_core/
executor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{path::PathBuf, pin::Pin, sync::Arc};
6
7use futures::Future;
8use iota_metrics::spawn_monitored_task;
9use iota_rest_api::CheckpointData;
10use iota_types::messages_checkpoint::CheckpointSequenceNumber;
11use prometheus::Registry;
12use tokio::{
13    sync::{mpsc, oneshot},
14    task::JoinHandle,
15};
16use tokio_util::sync::CancellationToken;
17
18use crate::{
19    DataIngestionMetrics, IngestionError, IngestionResult, ReaderOptions, Worker,
20    progress_store::{ExecutorProgress, ProgressStore, ProgressStoreWrapper, ShimProgressStore},
21    reader::{
22        v1::CheckpointReader as CheckpointReaderV1,
23        v2::{CheckpointReader as CheckpointReaderV2, CheckpointReaderConfig},
24    },
25    worker_pool::{WorkerPool, WorkerPoolStatus},
26};
27
28pub const MAX_CHECKPOINTS_IN_PROGRESS: usize = 10000;
29
30enum CheckpointReader {
31    V1 {
32        checkpoint_recv: mpsc::Receiver<Arc<CheckpointData>>,
33        gc_sender: mpsc::Sender<CheckpointSequenceNumber>,
34        exit_sender: oneshot::Sender<()>,
35        handle: JoinHandle<IngestionResult<()>>,
36    },
37    V2(CheckpointReaderV2),
38}
39
40impl CheckpointReader {
41    async fn get_checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
42        match self {
43            Self::V1 {
44                checkpoint_recv, ..
45            } => checkpoint_recv.recv().await,
46            Self::V2(reader) => reader.checkpoint().await,
47        }
48    }
49
50    async fn send_gc_signal(
51        &mut self,
52        seq_number: CheckpointSequenceNumber,
53    ) -> IngestionResult<()> {
54        match self {
55            Self::V1 { gc_sender, .. } => gc_sender.send(seq_number).await.map_err(|_| {
56                IngestionError::Channel(
57                    "unable to send GC operation to checkpoint reader, receiver half closed".into(),
58                )
59            }),
60            Self::V2(reader) => reader.send_gc_signal(seq_number).await,
61        }
62    }
63
64    async fn shutdown(self) -> IngestionResult<()> {
65        match self {
66            Self::V1 {
67                exit_sender,
68                handle,
69                ..
70            } => {
71                _ = exit_sender.send(());
72                handle.await.map_err(|err| IngestionError::Shutdown {
73                    component: "Checkpoint Reader".into(),
74                    msg: err.to_string(),
75                })?
76            }
77            Self::V2(reader) => reader.shutdown().await,
78        }
79    }
80}
81
82/// The Executor of the main ingestion pipeline process.
83///
84/// This struct orchestrates the execution of multiple worker pools, handling
85/// checkpoint distribution, progress tracking, and shutdown. It utilizes
86/// [`ProgressStore`] for persisting checkpoint progress and provides metrics
87/// for monitoring the indexing process.
88///
89/// # Example
90/// ```rust,no_run
91/// use async_trait::async_trait;
92/// use iota_data_ingestion_core::{
93///     DataIngestionMetrics, FileProgressStore, IndexerExecutor, IngestionError, ReaderOptions,
94///     Worker, WorkerPool,
95/// };
96/// use iota_types::full_checkpoint_content::CheckpointData;
97/// use prometheus::Registry;
98/// use tokio_util::sync::CancellationToken;
99/// use std::{path::PathBuf, sync::Arc};
100///
101/// struct CustomWorker;
102///
103/// #[async_trait]
104/// impl Worker for CustomWorker {
105///     type Message = ();
106///     type Error = IngestionError;
107///
108///     async fn process_checkpoint(
109///         &self,
110///         checkpoint: Arc<CheckpointData>,
111///     ) -> Result<Self::Message, Self::Error> {
112///         // custom processing logic.
113///         println!(
114///             "Processing Local checkpoint: {}",
115///             checkpoint.checkpoint_summary.to_string()
116///         );
117///         Ok(())
118///     }
119/// }
120///
121/// #[tokio::main]
122/// async fn main() {
123///     let concurrency = 5;
124///     let progress_store = FileProgressStore::new("progress.json").await.unwrap();
125///     let mut executor = IndexerExecutor::new(
126///         progress_store,
127///         1, // number of registered WorkerPools.
128///         DataIngestionMetrics::new(&Registry::new()),
129///         CancellationToken::new(),
130///     );
131///     // register a worker pool with 5 workers to process checkpoints in parallel
132///     let worker_pool = WorkerPool::new(CustomWorker, "local_reader".to_string(), concurrency, Default::default());
133///     // register the worker pool to the executor.
134///     executor.register(worker_pool).await.unwrap();
135///     // run the ingestion pipeline.
136///     executor
137///         .run(
138///             PathBuf::from("./chk".to_string()), // path to a local directory where checkpoints are stored.
139///             None,
140///             vec![],                   // optional remote store access options.
141///             ReaderOptions::default(), // remote_read_batch_size.
142///         )
143///         .await
144///         .unwrap();
145/// }
146/// ```
147pub struct IndexerExecutor<P> {
148    pools: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
149    pool_senders: Vec<mpsc::Sender<Arc<CheckpointData>>>,
150    progress_store: ProgressStoreWrapper<P>,
151    pool_status_sender: mpsc::Sender<WorkerPoolStatus>,
152    pool_status_receiver: mpsc::Receiver<WorkerPoolStatus>,
153    metrics: DataIngestionMetrics,
154    token: CancellationToken,
155}
156
157impl<P: ProgressStore> IndexerExecutor<P> {
158    pub fn new(
159        progress_store: P,
160        number_of_jobs: usize,
161        metrics: DataIngestionMetrics,
162        token: CancellationToken,
163    ) -> Self {
164        let (pool_status_sender, pool_status_receiver) =
165            mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
166        Self {
167            pools: vec![],
168            pool_senders: vec![],
169            progress_store: ProgressStoreWrapper::new(progress_store),
170            pool_status_sender,
171            pool_status_receiver,
172            metrics,
173            token,
174        }
175    }
176
177    /// Registers new worker pool in executor.
178    pub async fn register<W: Worker + 'static>(
179        &mut self,
180        pool: WorkerPool<W>,
181    ) -> IngestionResult<()> {
182        let checkpoint_number = self.progress_store.load(pool.task_name.clone()).await?;
183        let (sender, receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
184        self.pools.push(Box::pin(pool.run(
185            checkpoint_number,
186            receiver,
187            self.pool_status_sender.clone(),
188            self.token.child_token(),
189        )));
190        self.pool_senders.push(sender);
191        Ok(())
192    }
193
194    pub async fn update_watermark(
195        &mut self,
196        task_name: String,
197        watermark: CheckpointSequenceNumber,
198    ) -> IngestionResult<()> {
199        self.progress_store.save(task_name, watermark).await
200    }
201    pub async fn read_watermark(
202        &mut self,
203        task_name: String,
204    ) -> IngestionResult<CheckpointSequenceNumber> {
205        self.progress_store.load(task_name).await
206    }
207
208    /// Main executor loop.
209    ///
210    /// # Error
211    ///
212    /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
213    /// registered.
214    pub async fn run(
215        mut self,
216        path: PathBuf,
217        remote_store_url: Option<String>,
218        remote_store_options: Vec<(String, String)>,
219        reader_options: ReaderOptions,
220    ) -> IngestionResult<ExecutorProgress> {
221        let reader_checkpoint_number = self.progress_store.min_watermark()?;
222        let (checkpoint_reader, checkpoint_recv, gc_sender, exit_sender) =
223            CheckpointReaderV1::initialize(
224                path,
225                reader_checkpoint_number,
226                remote_store_url,
227                remote_store_options,
228                reader_options,
229            );
230
231        let handle = spawn_monitored_task!(checkpoint_reader.run());
232
233        self.run_executor_loop(
234            reader_checkpoint_number,
235            CheckpointReader::V1 {
236                checkpoint_recv,
237                gc_sender,
238                exit_sender,
239                handle,
240            },
241        )
242        .await
243    }
244
245    /// Alternative main executor loop. Uses the new iteration of the
246    /// `CheckpointReader` supporting syncing checkppints from hybrid historical
247    /// store.
248    ///
249    /// # Error
250    ///
251    /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
252    /// registered.
253    pub async fn run_with_config(
254        mut self,
255        config: CheckpointReaderConfig,
256    ) -> IngestionResult<ExecutorProgress> {
257        let reader_checkpoint_number = self.progress_store.min_watermark()?;
258
259        let checkpoint_reader = CheckpointReaderV2::new(reader_checkpoint_number, config).await?;
260
261        self.run_executor_loop(
262            reader_checkpoint_number,
263            CheckpointReader::V2(checkpoint_reader),
264        )
265        .await
266    }
267
268    /// Common execution logic
269    async fn run_executor_loop(
270        &mut self,
271        mut reader_checkpoint_number: u64,
272        mut checkpoint_reader: CheckpointReader,
273    ) -> IngestionResult<ExecutorProgress> {
274        let worker_pools = std::mem::take(&mut self.pools)
275            .into_iter()
276            .map(|pool| spawn_monitored_task!(pool))
277            .collect::<Vec<JoinHandle<()>>>();
278
279        let mut worker_pools_shutdown_signals = vec![];
280
281        loop {
282            tokio::select! {
283                Some(worker_pool_progress_msg) = self.pool_status_receiver.recv() => {
284                    match worker_pool_progress_msg {
285                        WorkerPoolStatus::Running((task_name, watermark)) => {
286                            self.progress_store.save(task_name.clone(), watermark).await
287                                .map_err(|err| IngestionError::ProgressStore(err.to_string()))?;
288                            let seq_number = self.progress_store.min_watermark()?;
289                            if seq_number > reader_checkpoint_number {
290                                checkpoint_reader.send_gc_signal(seq_number).await?;
291                                reader_checkpoint_number = seq_number;
292                            }
293                            self.metrics.data_ingestion_checkpoint
294                                .with_label_values(&[&task_name])
295                                .set(watermark as i64);
296                        }
297                        WorkerPoolStatus::Shutdown(worker_pool_name) => {
298                            worker_pools_shutdown_signals.push(worker_pool_name);
299                        }
300                    }
301                }
302                Some(checkpoint) = checkpoint_reader.get_checkpoint(), if !self.token.is_cancelled() => {
303                    for sender in &self.pool_senders {
304                        sender.send(checkpoint.clone()).await.map_err(|_| {
305                            IngestionError::Channel(
306                                "unable to send new checkpoint to worker pool, receiver half closed".to_owned(),
307                            )
308                        })?;
309                    }
310                }
311            }
312
313            if worker_pools_shutdown_signals.len() == self.pool_senders.len() {
314                // Shutdown worker pools
315                for worker_pool in worker_pools {
316                    worker_pool.await.map_err(|err| IngestionError::Shutdown {
317                        component: "Worker Pool".into(),
318                        msg: err.to_string(),
319                    })?;
320                }
321                // Shutdown checkpoint reader
322                checkpoint_reader.shutdown().await?;
323                break;
324            }
325        }
326
327        Ok(self.progress_store.stats())
328    }
329}
330
331/// Sets up a single workflow for data ingestion.
332///
333/// This function initializes an [`IndexerExecutor`] with a single worker pool,
334/// using a [`ShimProgressStore`] initialized with the provided
335/// `initial_checkpoint_number`. It then returns a future that runs the executor
336/// and a [`CancellationToken`] for graceful shutdown.
337///
338/// # Docs
339/// For more info please check the [custom indexer docs](https://docs.iota.org/developer/advanced/custom-indexer).
340///
341/// # Example
342/// ```rust,no_run
343/// use std::sync::Arc;
344///
345/// use async_trait::async_trait;
346/// use iota_data_ingestion_core::{IngestionError, Worker, setup_single_workflow};
347/// use iota_types::full_checkpoint_content::CheckpointData;
348///
349/// struct CustomWorker;
350///
351/// #[async_trait]
352/// impl Worker for CustomWorker {
353///     type Message = ();
354///     type Error = IngestionError;
355///
356///     async fn process_checkpoint(
357///         &self,
358///         checkpoint: Arc<CheckpointData>,
359///     ) -> Result<Self::Message, Self::Error> {
360///         // custom processing logic.
361///         println!(
362///             "Processing checkpoint: {}",
363///             checkpoint.checkpoint_summary.to_string()
364///         );
365///         Ok(())
366///     }
367/// }
368///
369/// #[tokio::main]
370/// async fn main() {
371///     let (executor, _) = setup_single_workflow(
372///         CustomWorker,
373///         "http://127.0.0.1:9000/api/v1".to_string(), // fullnode REST API
374///         0,                                          // initial checkpoint number.
375///         5,                                          // concurrency.
376///         None,                                       // extra reader options.
377///     )
378///     .await
379///     .unwrap();
380///     executor.await.unwrap();
381/// }
382/// ```
383pub async fn setup_single_workflow<W: Worker + 'static>(
384    worker: W,
385    remote_store_url: String,
386    initial_checkpoint_number: CheckpointSequenceNumber,
387    concurrency: usize,
388    reader_options: Option<ReaderOptions>,
389) -> IngestionResult<(
390    impl Future<Output = IngestionResult<ExecutorProgress>>,
391    CancellationToken,
392)> {
393    let metrics = DataIngestionMetrics::new(&Registry::new());
394    let progress_store = ShimProgressStore(initial_checkpoint_number);
395    let token = CancellationToken::new();
396    let mut executor = IndexerExecutor::new(progress_store, 1, metrics, token.child_token());
397    let worker_pool = WorkerPool::new(
398        worker,
399        "workflow".to_string(),
400        concurrency,
401        Default::default(),
402    );
403    executor.register(worker_pool).await?;
404    Ok((
405        executor.run(
406            tempfile::tempdir()?.into_path(),
407            Some(remote_store_url),
408            vec![],
409            reader_options.unwrap_or_default(),
410        ),
411        token,
412    ))
413}