iota_data_ingestion_core/executor.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{path::PathBuf, pin::Pin, sync::Arc};
6
7use futures::Future;
8use iota_metrics::spawn_monitored_task;
9use iota_types::{
10 committee::EpochId, full_checkpoint_content::CheckpointData,
11 messages_checkpoint::CheckpointSequenceNumber,
12};
13use prometheus::Registry;
14use tokio::{
15 sync::{mpsc, oneshot},
16 task::JoinHandle,
17};
18use tokio_util::sync::CancellationToken;
19use tracing::info;
20
21use crate::{
22 DataIngestionMetrics, IngestionError, IngestionResult, ReaderOptions, Worker,
23 progress_store::{ExecutorProgress, ProgressStore, ProgressStoreWrapper, ShimProgressStore},
24 reader::{
25 v1::CheckpointReader as CheckpointReaderV1,
26 v2::{CheckpointReader as CheckpointReaderV2, CheckpointReaderConfig},
27 },
28 worker_pool::{WorkerPool, WorkerPoolStatus},
29};
30
31pub const MAX_CHECKPOINTS_IN_PROGRESS: usize = 10000;
32
33/// Callback function invoked for each incoming checkpoint to determine the
34/// shutdown action if it exceeds the ingestion limit.
35type ShutdownCallback = Box<dyn Fn(&CheckpointData) -> ShutdownAction + Send>;
36
37/// Determines the shutdown action when a checkpoint reaches the ingestion
38/// limit.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
40pub enum ShutdownAction {
41 /// Include the current checkpoint in the ingestion process, then initiate
42 /// the graceful shutdown process.
43 IncludeAndShutdown,
44 /// Exclude the current checkpoint from ingestion and immediately initiate
45 /// the graceful shutdown process.
46 ExcludeAndShutdown,
47 /// Continue processing the current checkpoint without shutting down.
48 Continue,
49}
50
51/// Common policies for upper limit checkpoint ingestion by the framework.
52///
53/// Once the limit is reached, the framework will start the graceful
54/// shutdown process.
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
56#[non_exhaustive]
57pub enum IngestionLimit {
58 /// Last checkpoint sequence number to process.
59 ///
60 /// After processing this checkpoint the framework will start the graceful
61 /// shutdown process.
62 MaxCheckpoint(CheckpointSequenceNumber),
63 /// Last checkpoint to process based on the given epoch.
64 ///
65 /// After processing this checkpoint the framework will start the graceful
66 /// shutdown process.
67 EndOfEpoch(EpochId),
68}
69
70impl IngestionLimit {
71 /// Evaluates whether the given checkpoint triggers a shutdown action based
72 /// on the ingestion limit.
73 fn matches(&self, checkpoint: &CheckpointData) -> ShutdownAction {
74 match self {
75 IngestionLimit::MaxCheckpoint(max) => {
76 if &checkpoint.checkpoint_summary.sequence_number > max {
77 return ShutdownAction::ExcludeAndShutdown;
78 }
79 ShutdownAction::Continue
80 }
81 IngestionLimit::EndOfEpoch(max) => {
82 if &checkpoint.checkpoint_summary.epoch > max {
83 return ShutdownAction::ExcludeAndShutdown;
84 }
85 ShutdownAction::Continue
86 }
87 }
88 }
89}
90
91/// Represents a common interface for checkpoint readers.
92///
93/// It manages the old checkpoint reader implementation for backwards
94/// compatibility and the new one.
95enum CheckpointReader {
96 /// The old checkpoint reader implementation.
97 V1 {
98 checkpoint_recv: mpsc::Receiver<Arc<CheckpointData>>,
99 gc_sender: mpsc::Sender<CheckpointSequenceNumber>,
100 exit_sender: oneshot::Sender<()>,
101 handle: JoinHandle<IngestionResult<()>>,
102 },
103 /// The new checkpoint reader implementation.
104 V2(CheckpointReaderV2),
105}
106
107impl CheckpointReader {
108 /// Gets the next checkpoint from the reader.
109 async fn get_checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
110 match self {
111 Self::V1 {
112 checkpoint_recv, ..
113 } => checkpoint_recv.recv().await,
114 Self::V2(reader) => reader.checkpoint().await,
115 }
116 }
117
118 /// Sends a GC signal to the reader.
119 async fn send_gc_signal(
120 &mut self,
121 seq_number: CheckpointSequenceNumber,
122 ) -> IngestionResult<()> {
123 match self {
124 Self::V1 { gc_sender, .. } => gc_sender.send(seq_number).await.map_err(|_| {
125 IngestionError::Channel(
126 "unable to send GC operation to checkpoint reader, receiver half closed".into(),
127 )
128 }),
129 Self::V2(reader) => reader.send_gc_signal(seq_number).await,
130 }
131 }
132
133 /// Shuts down the reader.
134 async fn shutdown(self) -> IngestionResult<()> {
135 match self {
136 Self::V1 {
137 exit_sender,
138 handle,
139 ..
140 } => {
141 _ = exit_sender.send(());
142 handle.await.map_err(|err| IngestionError::Shutdown {
143 component: "checkpoint reader".into(),
144 msg: err.to_string(),
145 })?
146 }
147 Self::V2(reader) => reader.shutdown().await,
148 }
149 }
150}
151
152/// The Executor of the main ingestion pipeline process.
153///
154/// This struct orchestrates the execution of multiple worker pools, handling
155/// checkpoint distribution, progress tracking, and shutdown. It utilizes
156/// [`ProgressStore`] for persisting checkpoint progress and provides metrics
157/// for monitoring the indexing process.
158///
159/// # Example
160/// ```rust,no_run
161/// use async_trait::async_trait;
162/// use iota_data_ingestion_core::{
163/// DataIngestionMetrics, FileProgressStore, IndexerExecutor, IngestionError, ReaderOptions,
164/// Worker, WorkerPool,
165/// };
166/// use iota_types::full_checkpoint_content::CheckpointData;
167/// use prometheus::Registry;
168/// use tokio_util::sync::CancellationToken;
169/// use std::{path::PathBuf, sync::Arc};
170///
171/// struct CustomWorker;
172///
173/// #[async_trait]
174/// impl Worker for CustomWorker {
175/// type Message = ();
176/// type Error = IngestionError;
177///
178/// async fn process_checkpoint(
179/// &self,
180/// checkpoint: Arc<CheckpointData>,
181/// ) -> Result<Self::Message, Self::Error> {
182/// // custom processing logic.
183/// println!(
184/// "Processing Local checkpoint: {}",
185/// checkpoint.checkpoint_summary.to_string()
186/// );
187/// Ok(())
188/// }
189/// }
190///
191/// #[tokio::main]
192/// async fn main() {
193/// let concurrency = 5;
194/// let progress_store = FileProgressStore::new("progress.json").await.unwrap();
195/// let mut executor = IndexerExecutor::new(
196/// progress_store,
197/// 1, // number of registered WorkerPools.
198/// DataIngestionMetrics::new(&Registry::new()),
199/// CancellationToken::new(),
200/// );
201/// // register a worker pool with 5 workers to process checkpoints in parallel
202/// let worker_pool = WorkerPool::new(CustomWorker, "local_reader".to_string(), concurrency, Default::default());
203/// // register the worker pool to the executor.
204/// executor.register(worker_pool).await.unwrap();
205/// // run the ingestion pipeline.
206/// executor
207/// .run(
208/// PathBuf::from("./chk".to_string()), // path to a local directory where checkpoints are stored.
209/// None,
210/// vec![], // optional remote store access options.
211/// ReaderOptions::default(), // remote_read_batch_size.
212/// )
213/// .await
214/// .unwrap();
215/// }
216/// ```
217pub struct IndexerExecutor<P> {
218 pools: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
219 pool_senders: Vec<mpsc::Sender<Arc<CheckpointData>>>,
220 progress_store: ProgressStoreWrapper<P>,
221 pool_status_sender: mpsc::Sender<WorkerPoolStatus>,
222 pool_status_receiver: mpsc::Receiver<WorkerPoolStatus>,
223 metrics: DataIngestionMetrics,
224 token: CancellationToken,
225 shutdown_callback: Option<ShutdownCallback>,
226}
227
228impl<P: ProgressStore> IndexerExecutor<P> {
229 pub fn new(
230 progress_store: P,
231 number_of_jobs: usize,
232 metrics: DataIngestionMetrics,
233 token: CancellationToken,
234 ) -> Self {
235 let (pool_status_sender, pool_status_receiver) =
236 mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
237 Self {
238 pools: vec![],
239 pool_senders: vec![],
240 progress_store: ProgressStoreWrapper::new(progress_store),
241 pool_status_sender,
242 pool_status_receiver,
243 metrics,
244 token,
245 shutdown_callback: None,
246 }
247 }
248
249 /// Registers new worker pool in executor.
250 pub async fn register<W: Worker + 'static>(
251 &mut self,
252 pool: WorkerPool<W>,
253 ) -> IngestionResult<()> {
254 let checkpoint_number = self.progress_store.load(pool.task_name.clone()).await?;
255 let (sender, receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
256 self.pools.push(Box::pin(pool.run(
257 checkpoint_number,
258 receiver,
259 self.pool_status_sender.clone(),
260 self.token.child_token(),
261 )));
262 self.pool_senders.push(sender);
263 Ok(())
264 }
265
266 /// Registers a predicate callback that determines when the ingestion
267 /// process should stop.
268 ///
269 /// This function `f` will be called for every **incoming checkpoint**
270 /// before it’s sent to the worker pool.
271 ///
272 /// Based on the returned [`ShutdownAction`] the executor will evaluate
273 /// whether to continue or stop the ingestion process by initiating the
274 /// graceful shutdown process.
275 ///
276 /// Once a shutdown action is triggered, the executor will stop sending new
277 /// checkpoints and will wait for all previously sent checkpoints to be
278 /// processed by workers before initiating graceful shutdown process.
279 ///
280 /// Note:
281 ///
282 /// Calling this method after
283 /// [`with_ingestion_limit`](Self::with_ingestion_limit) replaces the
284 /// earlier predicate, and vice versa. They are not cumulative.
285 pub fn shutdown_when<F>(&mut self, f: F)
286 where
287 F: Fn(&CheckpointData) -> ShutdownAction + Send + 'static,
288 {
289 self.shutdown_callback = Some(Box::new(f));
290 }
291
292 /// Adds an upper‑limit policy that determines when the ingestion
293 /// process should stop.
294 ///
295 /// This is a convenience method, it internally uses
296 /// [`shutdown_when`](Self::shutdown_when) by registering a predicate
297 /// derived from the provided [`IngestionLimit`].
298 ///
299 /// Note:
300 ///
301 /// Calling this method after [`shutdown_when`](Self::shutdown_when)
302 /// replaces the earlier predicate, and vice versa. They are not cumulative.
303 pub fn with_ingestion_limit(&mut self, limit: IngestionLimit) {
304 self.shutdown_when(move |checkpoint| limit.matches(checkpoint));
305 }
306
307 pub async fn update_watermark(
308 &mut self,
309 task_name: String,
310 watermark: CheckpointSequenceNumber,
311 ) -> IngestionResult<()> {
312 self.progress_store.save(task_name, watermark).await
313 }
314 pub async fn read_watermark(
315 &mut self,
316 task_name: String,
317 ) -> IngestionResult<CheckpointSequenceNumber> {
318 self.progress_store.load(task_name).await
319 }
320
321 /// Main executor loop.
322 ///
323 /// # Error
324 ///
325 /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
326 /// registered.
327 pub async fn run(
328 mut self,
329 path: PathBuf,
330 remote_store_url: Option<String>,
331 remote_store_options: Vec<(String, String)>,
332 reader_options: ReaderOptions,
333 ) -> IngestionResult<ExecutorProgress> {
334 let reader_checkpoint_number = self.progress_store.min_watermark()?;
335 let (checkpoint_reader, checkpoint_recv, gc_sender, exit_sender) =
336 CheckpointReaderV1::initialize(
337 path,
338 reader_checkpoint_number,
339 remote_store_url,
340 remote_store_options,
341 reader_options,
342 );
343
344 let handle = spawn_monitored_task!(checkpoint_reader.run());
345
346 self.run_executor_loop(
347 reader_checkpoint_number,
348 CheckpointReader::V1 {
349 checkpoint_recv,
350 gc_sender,
351 exit_sender,
352 handle,
353 },
354 )
355 .await
356 }
357
358 /// Alternative main executor loop. Uses the new iteration of the
359 /// `CheckpointReader` supporting syncing checkpoints from hybrid historical
360 /// store.
361 ///
362 /// # Error
363 ///
364 /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
365 /// registered.
366 pub async fn run_with_config(
367 mut self,
368 config: CheckpointReaderConfig,
369 ) -> IngestionResult<ExecutorProgress> {
370 let reader_checkpoint_number = self.progress_store.min_watermark()?;
371 let checkpoint_reader = CheckpointReaderV2::new(reader_checkpoint_number, config).await?;
372
373 self.run_executor_loop(
374 reader_checkpoint_number,
375 CheckpointReader::V2(checkpoint_reader),
376 )
377 .await
378 }
379
380 /// Common execution logic
381 async fn run_executor_loop(
382 &mut self,
383 mut reader_checkpoint_number: u64,
384 mut checkpoint_reader: CheckpointReader,
385 ) -> IngestionResult<ExecutorProgress> {
386 let worker_pools = std::mem::take(&mut self.pools)
387 .into_iter()
388 .map(|pool| spawn_monitored_task!(pool))
389 .collect::<Vec<JoinHandle<()>>>();
390
391 let mut worker_pools_shutdown_signals = vec![];
392 let mut checkpoint_limit_reached = None;
393
394 loop {
395 // the min watermark represents the lowest watermark that
396 // has been processed by any worker pool. This guarantees that
397 // all worker pools have processed the checkpoint before the
398 // shutdown process starts.
399 if checkpoint_limit_reached.is_some_and(|ch_seq_num| {
400 self.progress_store
401 .min_watermark()
402 .map(|watermark| watermark > ch_seq_num)
403 .unwrap_or_default()
404 }) {
405 info!(
406 "checkpoint upper limit reached: last checkpoint was processed, shutdown process started"
407 );
408 self.token.cancel();
409 }
410
411 tokio::select! {
412 Some(worker_pool_progress_msg) = self.pool_status_receiver.recv() => {
413 match worker_pool_progress_msg {
414 WorkerPoolStatus::Running((task_name, watermark)) => {
415 self.progress_store.save(task_name.clone(), watermark).await
416 .map_err(|err| IngestionError::ProgressStore(err.to_string()))?;
417 let seq_number = self.progress_store.min_watermark()?;
418 if seq_number > reader_checkpoint_number {
419 checkpoint_reader.send_gc_signal(seq_number).await?;
420 reader_checkpoint_number = seq_number;
421 }
422 self.metrics.data_ingestion_checkpoint
423 .with_label_values(&[&task_name])
424 .set(watermark as i64);
425 }
426 WorkerPoolStatus::Shutdown(worker_pool_name) => {
427 worker_pools_shutdown_signals.push(worker_pool_name);
428 }
429 }
430 }
431 Some(checkpoint) = checkpoint_reader.get_checkpoint(), if !self.token.is_cancelled() => {
432 // once upper limit reached skip sending new checkpoints to workers.
433 if self.should_shutdown(&checkpoint, &mut checkpoint_limit_reached) {
434 continue;
435 }
436
437 for sender in &self.pool_senders {
438 sender.send(checkpoint.clone()).await.map_err(|_| {
439 IngestionError::Channel(
440 "unable to send new checkpoint to worker pool, receiver half closed".to_owned(),
441 )
442 })?;
443 }
444 }
445 }
446
447 if worker_pools_shutdown_signals.len() == self.pool_senders.len() {
448 // Shutdown worker pools
449 for worker_pool in worker_pools {
450 worker_pool.await.map_err(|err| IngestionError::Shutdown {
451 component: "worker pool".into(),
452 msg: err.to_string(),
453 })?;
454 }
455 // Shutdown checkpoint reader
456 checkpoint_reader.shutdown().await?;
457 break;
458 }
459 }
460
461 Ok(self.progress_store.stats())
462 }
463
464 /// Check if the current ingestion limit has been reached.
465 ///
466 /// Returns `true` if the ingestion limit has been reached.
467 /// If no ingestion limit is present or it has not been reached yet, the
468 /// function returns `false`.
469 fn should_shutdown(
470 &mut self,
471 checkpoint: &CheckpointData,
472 checkpoint_limit_reached: &mut Option<CheckpointSequenceNumber>,
473 ) -> bool {
474 if checkpoint_limit_reached.is_some() {
475 return true;
476 }
477
478 let Some(shutdown_action) = self
479 .shutdown_callback
480 .as_ref()
481 .map(|matches| matches(checkpoint))
482 else {
483 return false;
484 };
485
486 match shutdown_action {
487 ShutdownAction::IncludeAndShutdown => {
488 checkpoint_limit_reached
489 .get_or_insert(checkpoint.checkpoint_summary.sequence_number);
490 false
491 }
492 ShutdownAction::ExcludeAndShutdown => {
493 checkpoint_limit_reached.get_or_insert(
494 checkpoint
495 .checkpoint_summary
496 .sequence_number
497 .saturating_sub(1),
498 );
499 true
500 }
501 ShutdownAction::Continue => false,
502 }
503 }
504}
505
506/// Sets up a single workflow for data ingestion.
507///
508/// This function initializes an [`IndexerExecutor`] with a single worker pool,
509/// using a [`ShimProgressStore`] initialized with the provided
510/// `initial_checkpoint_number`. It then returns a future that runs the executor
511/// and a [`CancellationToken`] for graceful shutdown.
512///
513/// # Docs
514/// For more info please check the [custom indexer docs](https://docs.iota.org/developer/advanced/custom-indexer).
515///
516/// # Example
517/// ```rust,no_run
518/// use std::sync::Arc;
519///
520/// use async_trait::async_trait;
521/// use iota_data_ingestion_core::{IngestionError, Worker, setup_single_workflow};
522/// use iota_types::full_checkpoint_content::CheckpointData;
523///
524/// struct CustomWorker;
525///
526/// #[async_trait]
527/// impl Worker for CustomWorker {
528/// type Message = ();
529/// type Error = IngestionError;
530///
531/// async fn process_checkpoint(
532/// &self,
533/// checkpoint: Arc<CheckpointData>,
534/// ) -> Result<Self::Message, Self::Error> {
535/// // custom processing logic.
536/// println!(
537/// "Processing checkpoint: {}",
538/// checkpoint.checkpoint_summary.to_string()
539/// );
540/// Ok(())
541/// }
542/// }
543///
544/// #[tokio::main]
545/// async fn main() {
546/// let (executor, _) = setup_single_workflow(
547/// CustomWorker,
548/// "http://127.0.0.1:50051".to_string(), // fullnode gRPC API
549/// 0, // initial checkpoint number.
550/// 5, // concurrency.
551/// None, // extra reader options.
552/// )
553/// .await
554/// .unwrap();
555/// executor.await.unwrap();
556/// }
557/// ```
558pub async fn setup_single_workflow<W: Worker + 'static>(
559 worker: W,
560 remote_store_url: String,
561 initial_checkpoint_number: CheckpointSequenceNumber,
562 concurrency: usize,
563 reader_options: Option<ReaderOptions>,
564) -> IngestionResult<(
565 impl Future<Output = IngestionResult<ExecutorProgress>>,
566 CancellationToken,
567)> {
568 let metrics = DataIngestionMetrics::new(&Registry::new());
569 let progress_store = ShimProgressStore(initial_checkpoint_number);
570 let token = CancellationToken::new();
571 let mut executor = IndexerExecutor::new(progress_store, 1, metrics, token.child_token());
572 let worker_pool = WorkerPool::new(
573 worker,
574 "workflow".to_string(),
575 concurrency,
576 Default::default(),
577 );
578 executor.register(worker_pool).await?;
579 Ok((
580 executor.run(
581 tempfile::tempdir()?.keep(),
582 Some(remote_store_url),
583 vec![],
584 reader_options.unwrap_or_default(),
585 ),
586 token,
587 ))
588}