iota_data_ingestion_core/executor.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{path::PathBuf, pin::Pin, sync::Arc};
6
7use futures::Future;
8use iota_metrics::spawn_monitored_task;
9use iota_rest_api::CheckpointData;
10use iota_types::{committee::EpochId, messages_checkpoint::CheckpointSequenceNumber};
11use prometheus::Registry;
12use tokio::{
13 sync::{mpsc, oneshot},
14 task::JoinHandle,
15};
16use tokio_util::sync::CancellationToken;
17use tracing::info;
18
19use crate::{
20 DataIngestionMetrics, IngestionError, IngestionResult, ReaderOptions, Worker,
21 progress_store::{ExecutorProgress, ProgressStore, ProgressStoreWrapper, ShimProgressStore},
22 reader::{
23 v1::CheckpointReader as CheckpointReaderV1,
24 v2::{CheckpointReader as CheckpointReaderV2, CheckpointReaderConfig},
25 },
26 worker_pool::{WorkerPool, WorkerPoolStatus},
27};
28
29pub const MAX_CHECKPOINTS_IN_PROGRESS: usize = 10000;
30
31/// Callback function invoked for each incoming checkpoint to determine the
32/// shutdown action if it exceeds the ingestion limit.
33type ShutdownCallback = Box<dyn Fn(&CheckpointData) -> ShutdownAction + Send>;
34
35/// Determines the shutdown action when a checkpoint reaches the ingestion
36/// limit.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
38pub enum ShutdownAction {
39 /// Include the current checkpoint in the ingestion process, then initiate
40 /// the graceful shutdown process.
41 IncludeAndShutdown,
42 /// Exclude the current checkpoint from ingestion and immediately initiate
43 /// the graceful shutdown process.
44 ExcludeAndShutdown,
45 /// Continue processing the current checkpoint without shutting down.
46 Continue,
47}
48
49/// Common policies for upper limit checkpoint ingestion by the framework.
50///
51/// Once the limit is reached, the framework will start the graceful
52/// shutdown process.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54#[non_exhaustive]
55pub enum IngestionLimit {
56 /// Last checkpoint sequence number to process.
57 ///
58 /// After processing this checkpoint the framework will start the graceful
59 /// shutdown process.
60 MaxCheckpoint(CheckpointSequenceNumber),
61 /// Last checkpoint to process based on the given epoch.
62 ///
63 /// After processing this checkpoint the framework will start the graceful
64 /// shutdown process.
65 EndOfEpoch(EpochId),
66}
67
68impl IngestionLimit {
69 /// Evaluates whether the given checkpoint triggers a shutdown action based
70 /// on the ingestion limit.
71 fn matches(&self, checkpoint: &CheckpointData) -> ShutdownAction {
72 match self {
73 IngestionLimit::MaxCheckpoint(max) => {
74 if &checkpoint.checkpoint_summary.sequence_number > max {
75 return ShutdownAction::ExcludeAndShutdown;
76 }
77 ShutdownAction::Continue
78 }
79 IngestionLimit::EndOfEpoch(max) => {
80 if &checkpoint.checkpoint_summary.epoch > max {
81 return ShutdownAction::ExcludeAndShutdown;
82 }
83 ShutdownAction::Continue
84 }
85 }
86 }
87}
88
89/// Represents a common interface for checkpoint readers.
90///
91/// It manages the old checkpoint reader implementation for backwards
92/// compatibility and the new one.
93enum CheckpointReader {
94 /// The old checkpoint reader implementation.
95 V1 {
96 checkpoint_recv: mpsc::Receiver<Arc<CheckpointData>>,
97 gc_sender: mpsc::Sender<CheckpointSequenceNumber>,
98 exit_sender: oneshot::Sender<()>,
99 handle: JoinHandle<IngestionResult<()>>,
100 },
101 /// The new checkpoint reader implementation.
102 V2(CheckpointReaderV2),
103}
104
105impl CheckpointReader {
106 /// Gets the next checkpoint from the reader.
107 async fn get_checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
108 match self {
109 Self::V1 {
110 checkpoint_recv, ..
111 } => checkpoint_recv.recv().await,
112 Self::V2(reader) => reader.checkpoint().await,
113 }
114 }
115
116 /// Sends a GC signal to the reader.
117 async fn send_gc_signal(
118 &mut self,
119 seq_number: CheckpointSequenceNumber,
120 ) -> IngestionResult<()> {
121 match self {
122 Self::V1 { gc_sender, .. } => gc_sender.send(seq_number).await.map_err(|_| {
123 IngestionError::Channel(
124 "unable to send GC operation to checkpoint reader, receiver half closed".into(),
125 )
126 }),
127 Self::V2(reader) => reader.send_gc_signal(seq_number).await,
128 }
129 }
130
131 /// Shuts down the reader.
132 async fn shutdown(self) -> IngestionResult<()> {
133 match self {
134 Self::V1 {
135 exit_sender,
136 handle,
137 ..
138 } => {
139 _ = exit_sender.send(());
140 handle.await.map_err(|err| IngestionError::Shutdown {
141 component: "checkpoint reader".into(),
142 msg: err.to_string(),
143 })?
144 }
145 Self::V2(reader) => reader.shutdown().await,
146 }
147 }
148}
149
150/// The Executor of the main ingestion pipeline process.
151///
152/// This struct orchestrates the execution of multiple worker pools, handling
153/// checkpoint distribution, progress tracking, and shutdown. It utilizes
154/// [`ProgressStore`] for persisting checkpoint progress and provides metrics
155/// for monitoring the indexing process.
156///
157/// # Example
158/// ```rust,no_run
159/// use async_trait::async_trait;
160/// use iota_data_ingestion_core::{
161/// DataIngestionMetrics, FileProgressStore, IndexerExecutor, IngestionError, ReaderOptions,
162/// Worker, WorkerPool,
163/// };
164/// use iota_types::full_checkpoint_content::CheckpointData;
165/// use prometheus::Registry;
166/// use tokio_util::sync::CancellationToken;
167/// use std::{path::PathBuf, sync::Arc};
168///
169/// struct CustomWorker;
170///
171/// #[async_trait]
172/// impl Worker for CustomWorker {
173/// type Message = ();
174/// type Error = IngestionError;
175///
176/// async fn process_checkpoint(
177/// &self,
178/// checkpoint: Arc<CheckpointData>,
179/// ) -> Result<Self::Message, Self::Error> {
180/// // custom processing logic.
181/// println!(
182/// "Processing Local checkpoint: {}",
183/// checkpoint.checkpoint_summary.to_string()
184/// );
185/// Ok(())
186/// }
187/// }
188///
189/// #[tokio::main]
190/// async fn main() {
191/// let concurrency = 5;
192/// let progress_store = FileProgressStore::new("progress.json").await.unwrap();
193/// let mut executor = IndexerExecutor::new(
194/// progress_store,
195/// 1, // number of registered WorkerPools.
196/// DataIngestionMetrics::new(&Registry::new()),
197/// CancellationToken::new(),
198/// );
199/// // register a worker pool with 5 workers to process checkpoints in parallel
200/// let worker_pool = WorkerPool::new(CustomWorker, "local_reader".to_string(), concurrency, Default::default());
201/// // register the worker pool to the executor.
202/// executor.register(worker_pool).await.unwrap();
203/// // run the ingestion pipeline.
204/// executor
205/// .run(
206/// PathBuf::from("./chk".to_string()), // path to a local directory where checkpoints are stored.
207/// None,
208/// vec![], // optional remote store access options.
209/// ReaderOptions::default(), // remote_read_batch_size.
210/// )
211/// .await
212/// .unwrap();
213/// }
214/// ```
215pub struct IndexerExecutor<P> {
216 pools: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
217 pool_senders: Vec<mpsc::Sender<Arc<CheckpointData>>>,
218 progress_store: ProgressStoreWrapper<P>,
219 pool_status_sender: mpsc::Sender<WorkerPoolStatus>,
220 pool_status_receiver: mpsc::Receiver<WorkerPoolStatus>,
221 metrics: DataIngestionMetrics,
222 token: CancellationToken,
223 shutdown_callback: Option<ShutdownCallback>,
224}
225
226impl<P: ProgressStore> IndexerExecutor<P> {
227 pub fn new(
228 progress_store: P,
229 number_of_jobs: usize,
230 metrics: DataIngestionMetrics,
231 token: CancellationToken,
232 ) -> Self {
233 let (pool_status_sender, pool_status_receiver) =
234 mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
235 Self {
236 pools: vec![],
237 pool_senders: vec![],
238 progress_store: ProgressStoreWrapper::new(progress_store),
239 pool_status_sender,
240 pool_status_receiver,
241 metrics,
242 token,
243 shutdown_callback: None,
244 }
245 }
246
247 /// Registers new worker pool in executor.
248 pub async fn register<W: Worker + 'static>(
249 &mut self,
250 pool: WorkerPool<W>,
251 ) -> IngestionResult<()> {
252 let checkpoint_number = self.progress_store.load(pool.task_name.clone()).await?;
253 let (sender, receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
254 self.pools.push(Box::pin(pool.run(
255 checkpoint_number,
256 receiver,
257 self.pool_status_sender.clone(),
258 self.token.child_token(),
259 )));
260 self.pool_senders.push(sender);
261 Ok(())
262 }
263
264 /// Registers a predicate callback that determines when the ingestion
265 /// process should stop.
266 ///
267 /// This function `f` will be called for every **incoming checkpoint**
268 /// before it’s sent to the worker pool.
269 ///
270 /// Based on the returned [`ShutdownAction`] the executor will evaluate
271 /// whether to continue or stop the ingestion process by initiating the
272 /// graceful shutdown process.
273 ///
274 /// Once a shutdown action is triggered, the executor will stop sending new
275 /// checkpoints and will wait for all previously sent checkpoints to be
276 /// processed by workers before initiating graceful shutdown process.
277 ///
278 /// Note:
279 ///
280 /// Calling this method after
281 /// [`with_ingestion_limit`](Self::with_ingestion_limit) replaces the
282 /// earlier predicate, and vice versa. They are not cumulative.
283 pub fn shutdown_when<F>(&mut self, f: F)
284 where
285 F: Fn(&CheckpointData) -> ShutdownAction + Send + 'static,
286 {
287 self.shutdown_callback = Some(Box::new(f));
288 }
289
290 /// Adds an upper‑limit policy that determines when the ingestion
291 /// process should stop.
292 ///
293 /// This is a convenience method, it internally uses
294 /// [`shutdown_when`](Self::shutdown_when) by registering a predicate
295 /// derived from the provided [`IngestionLimit`].
296 ///
297 /// Note:
298 ///
299 /// Calling this method after [`shutdown_when`](Self::shutdown_when)
300 /// replaces the earlier predicate, and vice versa. They are not cumulative.
301 pub fn with_ingestion_limit(&mut self, limit: IngestionLimit) {
302 self.shutdown_when(move |checkpoint| limit.matches(checkpoint));
303 }
304
305 pub async fn update_watermark(
306 &mut self,
307 task_name: String,
308 watermark: CheckpointSequenceNumber,
309 ) -> IngestionResult<()> {
310 self.progress_store.save(task_name, watermark).await
311 }
312 pub async fn read_watermark(
313 &mut self,
314 task_name: String,
315 ) -> IngestionResult<CheckpointSequenceNumber> {
316 self.progress_store.load(task_name).await
317 }
318
319 /// Main executor loop.
320 ///
321 /// # Error
322 ///
323 /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
324 /// registered.
325 pub async fn run(
326 mut self,
327 path: PathBuf,
328 remote_store_url: Option<String>,
329 remote_store_options: Vec<(String, String)>,
330 reader_options: ReaderOptions,
331 ) -> IngestionResult<ExecutorProgress> {
332 let reader_checkpoint_number = self.progress_store.min_watermark()?;
333 let (checkpoint_reader, checkpoint_recv, gc_sender, exit_sender) =
334 CheckpointReaderV1::initialize(
335 path,
336 reader_checkpoint_number,
337 remote_store_url,
338 remote_store_options,
339 reader_options,
340 );
341
342 let handle = spawn_monitored_task!(checkpoint_reader.run());
343
344 self.run_executor_loop(
345 reader_checkpoint_number,
346 CheckpointReader::V1 {
347 checkpoint_recv,
348 gc_sender,
349 exit_sender,
350 handle,
351 },
352 )
353 .await
354 }
355
356 /// Alternative main executor loop. Uses the new iteration of the
357 /// `CheckpointReader` supporting syncing checkppints from hybrid historical
358 /// store.
359 ///
360 /// # Error
361 ///
362 /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
363 /// registered.
364 pub async fn run_with_config(
365 mut self,
366 config: CheckpointReaderConfig,
367 ) -> IngestionResult<ExecutorProgress> {
368 let reader_checkpoint_number = self.progress_store.min_watermark()?;
369 let checkpoint_reader = CheckpointReaderV2::new(reader_checkpoint_number, config).await?;
370
371 self.run_executor_loop(
372 reader_checkpoint_number,
373 CheckpointReader::V2(checkpoint_reader),
374 )
375 .await
376 }
377
378 /// Common execution logic
379 async fn run_executor_loop(
380 &mut self,
381 mut reader_checkpoint_number: u64,
382 mut checkpoint_reader: CheckpointReader,
383 ) -> IngestionResult<ExecutorProgress> {
384 let worker_pools = std::mem::take(&mut self.pools)
385 .into_iter()
386 .map(|pool| spawn_monitored_task!(pool))
387 .collect::<Vec<JoinHandle<()>>>();
388
389 let mut worker_pools_shutdown_signals = vec![];
390 let mut checkpoint_limit_reached = None;
391
392 loop {
393 // the min watermark represents the lowest watermark that
394 // has been processed by any worker pool. This guarantees that
395 // all worker pools have processed the checkpoint before the
396 // shutdown process starts.
397 if checkpoint_limit_reached.is_some_and(|ch_seq_num| {
398 self.progress_store
399 .min_watermark()
400 .map(|watermark| watermark > ch_seq_num)
401 .unwrap_or_default()
402 }) {
403 info!(
404 "checkpoint upper limit reached: last checkpoint was processed, shutdown process started"
405 );
406 self.token.cancel();
407 }
408
409 tokio::select! {
410 Some(worker_pool_progress_msg) = self.pool_status_receiver.recv() => {
411 match worker_pool_progress_msg {
412 WorkerPoolStatus::Running((task_name, watermark)) => {
413 self.progress_store.save(task_name.clone(), watermark).await
414 .map_err(|err| IngestionError::ProgressStore(err.to_string()))?;
415 let seq_number = self.progress_store.min_watermark()?;
416 if seq_number > reader_checkpoint_number {
417 checkpoint_reader.send_gc_signal(seq_number).await?;
418 reader_checkpoint_number = seq_number;
419 }
420 self.metrics.data_ingestion_checkpoint
421 .with_label_values(&[&task_name])
422 .set(watermark as i64);
423 }
424 WorkerPoolStatus::Shutdown(worker_pool_name) => {
425 worker_pools_shutdown_signals.push(worker_pool_name);
426 }
427 }
428 }
429 Some(checkpoint) = checkpoint_reader.get_checkpoint(), if !self.token.is_cancelled() => {
430 // once upper limit reached skip sending new checkpoints to workers.
431 if self.should_shutdown(&checkpoint, &mut checkpoint_limit_reached) {
432 continue;
433 }
434
435 for sender in &self.pool_senders {
436 sender.send(checkpoint.clone()).await.map_err(|_| {
437 IngestionError::Channel(
438 "unable to send new checkpoint to worker pool, receiver half closed".to_owned(),
439 )
440 })?;
441 }
442 }
443 }
444
445 if worker_pools_shutdown_signals.len() == self.pool_senders.len() {
446 // Shutdown worker pools
447 for worker_pool in worker_pools {
448 worker_pool.await.map_err(|err| IngestionError::Shutdown {
449 component: "worker pool".into(),
450 msg: err.to_string(),
451 })?;
452 }
453 // Shutdown checkpoint reader
454 checkpoint_reader.shutdown().await?;
455 break;
456 }
457 }
458
459 Ok(self.progress_store.stats())
460 }
461
462 /// Check if the current ingestion limit has been reached.
463 ///
464 /// Returns `true` if the ingestion limit has been reached.
465 /// If no ingestion limit is present or it has not been reached yet, the
466 /// function returns `false`.
467 fn should_shutdown(
468 &mut self,
469 checkpoint: &CheckpointData,
470 checkpoint_limit_reached: &mut Option<CheckpointSequenceNumber>,
471 ) -> bool {
472 if checkpoint_limit_reached.is_some() {
473 return true;
474 }
475
476 let Some(shutdown_action) = self
477 .shutdown_callback
478 .as_ref()
479 .map(|matches| matches(checkpoint))
480 else {
481 return false;
482 };
483
484 match shutdown_action {
485 ShutdownAction::IncludeAndShutdown => {
486 checkpoint_limit_reached
487 .get_or_insert(checkpoint.checkpoint_summary.sequence_number);
488 false
489 }
490 ShutdownAction::ExcludeAndShutdown => {
491 checkpoint_limit_reached.get_or_insert(
492 checkpoint
493 .checkpoint_summary
494 .sequence_number
495 .saturating_sub(1),
496 );
497 true
498 }
499 ShutdownAction::Continue => false,
500 }
501 }
502}
503
504/// Sets up a single workflow for data ingestion.
505///
506/// This function initializes an [`IndexerExecutor`] with a single worker pool,
507/// using a [`ShimProgressStore`] initialized with the provided
508/// `initial_checkpoint_number`. It then returns a future that runs the executor
509/// and a [`CancellationToken`] for graceful shutdown.
510///
511/// # Docs
512/// For more info please check the [custom indexer docs](https://docs.iota.org/developer/advanced/custom-indexer).
513///
514/// # Example
515/// ```rust,no_run
516/// use std::sync::Arc;
517///
518/// use async_trait::async_trait;
519/// use iota_data_ingestion_core::{IngestionError, Worker, setup_single_workflow};
520/// use iota_types::full_checkpoint_content::CheckpointData;
521///
522/// struct CustomWorker;
523///
524/// #[async_trait]
525/// impl Worker for CustomWorker {
526/// type Message = ();
527/// type Error = IngestionError;
528///
529/// async fn process_checkpoint(
530/// &self,
531/// checkpoint: Arc<CheckpointData>,
532/// ) -> Result<Self::Message, Self::Error> {
533/// // custom processing logic.
534/// println!(
535/// "Processing checkpoint: {}",
536/// checkpoint.checkpoint_summary.to_string()
537/// );
538/// Ok(())
539/// }
540/// }
541///
542/// #[tokio::main]
543/// async fn main() {
544/// let (executor, _) = setup_single_workflow(
545/// CustomWorker,
546/// "http://127.0.0.1:9000/api/v1".to_string(), // fullnode REST API
547/// 0, // initial checkpoint number.
548/// 5, // concurrency.
549/// None, // extra reader options.
550/// )
551/// .await
552/// .unwrap();
553/// executor.await.unwrap();
554/// }
555/// ```
556pub async fn setup_single_workflow<W: Worker + 'static>(
557 worker: W,
558 remote_store_url: String,
559 initial_checkpoint_number: CheckpointSequenceNumber,
560 concurrency: usize,
561 reader_options: Option<ReaderOptions>,
562) -> IngestionResult<(
563 impl Future<Output = IngestionResult<ExecutorProgress>>,
564 CancellationToken,
565)> {
566 let metrics = DataIngestionMetrics::new(&Registry::new());
567 let progress_store = ShimProgressStore(initial_checkpoint_number);
568 let token = CancellationToken::new();
569 let mut executor = IndexerExecutor::new(progress_store, 1, metrics, token.child_token());
570 let worker_pool = WorkerPool::new(
571 worker,
572 "workflow".to_string(),
573 concurrency,
574 Default::default(),
575 );
576 executor.register(worker_pool).await?;
577 Ok((
578 executor.run(
579 tempfile::tempdir()?.keep(),
580 Some(remote_store_url),
581 vec![],
582 reader_options.unwrap_or_default(),
583 ),
584 token,
585 ))
586}