iota_data_ingestion_core/executor.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{pin::Pin, sync::Arc};
6
7use futures::Future;
8use iota_metrics::spawn_monitored_task;
9use iota_types::{
10 committee::EpochId, full_checkpoint_content::CheckpointData,
11 messages_checkpoint::CheckpointSequenceNumber,
12};
13use prometheus::Registry;
14use tokio::{sync::mpsc, task::JoinHandle};
15use tokio_util::sync::CancellationToken;
16use tracing::info;
17
18use crate::{
19 DataIngestionMetrics, IngestionError, IngestionResult, ReaderOptions, Worker,
20 config::CheckpointReaderConfigExt,
21 progress_store::{ExecutorProgress, ProgressStore, ProgressStoreWrapper, ShimProgressStore},
22 reader::v2::{CheckpointReader, CheckpointReaderConfig, RemoteUrl},
23 worker_pool::{WorkerPool, WorkerPoolStatus},
24};
25
26pub const MAX_CHECKPOINTS_IN_PROGRESS: usize = 10000;
27
28/// Callback function invoked for each incoming checkpoint to determine the
29/// shutdown action if it exceeds the ingestion limit.
30type ShutdownCallback = Box<dyn Fn(&CheckpointData) -> ShutdownAction + Send>;
31
32/// Determines the shutdown action when a checkpoint reaches the ingestion
33/// limit.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub enum ShutdownAction {
36 /// Include the current checkpoint in the ingestion process, then initiate
37 /// the graceful shutdown process.
38 IncludeAndShutdown,
39 /// Exclude the current checkpoint from ingestion and immediately initiate
40 /// the graceful shutdown process.
41 ExcludeAndShutdown,
42 /// Continue processing the current checkpoint without shutting down.
43 Continue,
44}
45
46/// Common policies for upper limit checkpoint ingestion by the framework.
47///
48/// Once the limit is reached, the framework will start the graceful
49/// shutdown process.
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
51#[non_exhaustive]
52pub enum IngestionLimit {
53 /// Last checkpoint sequence number to process.
54 ///
55 /// After processing this checkpoint the framework will start the graceful
56 /// shutdown process.
57 MaxCheckpoint(CheckpointSequenceNumber),
58 /// Last checkpoint to process based on the given epoch.
59 ///
60 /// After processing this checkpoint the framework will start the graceful
61 /// shutdown process.
62 EndOfEpoch(EpochId),
63}
64
65impl IngestionLimit {
66 /// Evaluates whether the given checkpoint triggers a shutdown action based
67 /// on the ingestion limit.
68 fn matches(&self, checkpoint: &CheckpointData) -> ShutdownAction {
69 match self {
70 IngestionLimit::MaxCheckpoint(max) => {
71 if &checkpoint.checkpoint_summary.sequence_number > max {
72 return ShutdownAction::ExcludeAndShutdown;
73 }
74 ShutdownAction::Continue
75 }
76 IngestionLimit::EndOfEpoch(max) => {
77 if &checkpoint.checkpoint_summary.epoch > max {
78 return ShutdownAction::ExcludeAndShutdown;
79 }
80 ShutdownAction::Continue
81 }
82 }
83 }
84}
85
86/// The Executor of the main ingestion pipeline process.
87///
88/// This struct orchestrates the execution of multiple worker pools, handling
89/// checkpoint distribution, progress tracking, and shutdown. It utilizes
90/// [`ProgressStore`] for persisting checkpoint progress and provides metrics
91/// for monitoring the indexing process.
92///
93/// # Example
94/// ```rust,no_run
95/// use async_trait::async_trait;
96/// use iota_data_ingestion_core::{
97/// DataIngestionMetrics, FileProgressStore, IndexerExecutor, IngestionError, ReaderOptions,
98/// Worker, WorkerPool,
99/// reader::v2::CheckpointReaderConfig
100/// };
101/// use iota_types::full_checkpoint_content::CheckpointData;
102/// use prometheus::Registry;
103/// use tokio_util::sync::CancellationToken;
104/// use std::{path::PathBuf, sync::Arc};
105///
106/// struct CustomWorker;
107///
108/// #[async_trait]
109/// impl Worker for CustomWorker {
110/// type Message = ();
111/// type Error = IngestionError;
112///
113/// async fn process_checkpoint(
114/// &self,
115/// checkpoint: Arc<CheckpointData>,
116/// ) -> Result<Self::Message, Self::Error> {
117/// // custom processing logic.
118/// println!(
119/// "Processing Local checkpoint: {}",
120/// checkpoint.checkpoint_summary.to_string()
121/// );
122/// Ok(())
123/// }
124/// }
125///
126/// #[tokio::main]
127/// async fn main() {
128/// let concurrency = 5;
129/// let progress_store = FileProgressStore::new("progress.json").await.unwrap();
130/// let mut executor = IndexerExecutor::new(
131/// progress_store,
132/// 1, // number of registered WorkerPools.
133/// DataIngestionMetrics::new(&Registry::new()),
134/// CancellationToken::new(),
135/// );
136/// // register a worker pool with 5 workers to process checkpoints in parallel
137/// let worker_pool = WorkerPool::new(CustomWorker, "local_reader".to_string(), concurrency, Default::default());
138/// // register the worker pool to the executor.
139/// executor.register(worker_pool).await.unwrap();
140/// // run the ingestion pipeline.
141/// executor
142/// .run_with_config(
143/// CheckpointReaderConfig {
144/// reader_options: ReaderOptions::default(),
145/// ingestion_path: Some(PathBuf::from("./chk".to_string())), // path to a local directory where checkpoints are stored.
146/// remote_store_url: None,
147/// }
148/// )
149/// .await
150/// .unwrap();
151/// }
152/// ```
153pub struct IndexerExecutor<P> {
154 pools: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
155 pool_senders: Vec<mpsc::Sender<Arc<CheckpointData>>>,
156 progress_store: ProgressStoreWrapper<P>,
157 pool_status_sender: mpsc::Sender<WorkerPoolStatus>,
158 pool_status_receiver: mpsc::Receiver<WorkerPoolStatus>,
159 metrics: DataIngestionMetrics,
160 token: CancellationToken,
161 shutdown_callback: Option<ShutdownCallback>,
162}
163
164impl<P: ProgressStore> IndexerExecutor<P> {
165 pub fn new(
166 progress_store: P,
167 number_of_jobs: usize,
168 metrics: DataIngestionMetrics,
169 token: CancellationToken,
170 ) -> Self {
171 let (pool_status_sender, pool_status_receiver) =
172 mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
173 Self {
174 pools: vec![],
175 pool_senders: vec![],
176 progress_store: ProgressStoreWrapper::new(progress_store),
177 pool_status_sender,
178 pool_status_receiver,
179 metrics,
180 token,
181 shutdown_callback: None,
182 }
183 }
184
185 /// Registers new worker pool in executor.
186 pub async fn register<W: Worker + 'static>(
187 &mut self,
188 pool: WorkerPool<W>,
189 ) -> IngestionResult<()> {
190 let checkpoint_number = self.progress_store.load(pool.task_name.clone()).await?;
191 let (sender, receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
192 self.pools.push(Box::pin(pool.run(
193 checkpoint_number,
194 receiver,
195 self.pool_status_sender.clone(),
196 self.token.child_token(),
197 )));
198 self.pool_senders.push(sender);
199 Ok(())
200 }
201
202 /// Registers a predicate callback that determines when the ingestion
203 /// process should stop.
204 ///
205 /// This function `f` will be called for every **incoming checkpoint**
206 /// before it’s sent to the worker pool.
207 ///
208 /// Based on the returned [`ShutdownAction`] the executor will evaluate
209 /// whether to continue or stop the ingestion process by initiating the
210 /// graceful shutdown process.
211 ///
212 /// Once a shutdown action is triggered, the executor will stop sending new
213 /// checkpoints and will wait for all previously sent checkpoints to be
214 /// processed by workers before initiating graceful shutdown process.
215 ///
216 /// Note:
217 ///
218 /// Calling this method after
219 /// [`with_ingestion_limit`](Self::with_ingestion_limit) replaces the
220 /// earlier predicate, and vice versa. They are not cumulative.
221 pub fn shutdown_when<F>(&mut self, f: F)
222 where
223 F: Fn(&CheckpointData) -> ShutdownAction + Send + 'static,
224 {
225 self.shutdown_callback = Some(Box::new(f));
226 }
227
228 /// Adds an upper‑limit policy that determines when the ingestion
229 /// process should stop.
230 ///
231 /// This is a convenience method, it internally uses
232 /// [`shutdown_when`](Self::shutdown_when) by registering a predicate
233 /// derived from the provided [`IngestionLimit`].
234 ///
235 /// Note:
236 ///
237 /// Calling this method after [`shutdown_when`](Self::shutdown_when)
238 /// replaces the earlier predicate, and vice versa. They are not cumulative.
239 pub fn with_ingestion_limit(&mut self, limit: IngestionLimit) {
240 self.shutdown_when(move |checkpoint| limit.matches(checkpoint));
241 }
242
243 pub async fn update_watermark(
244 &mut self,
245 task_name: String,
246 watermark: CheckpointSequenceNumber,
247 ) -> IngestionResult<()> {
248 self.progress_store.save(task_name, watermark).await
249 }
250
251 pub async fn read_watermark(
252 &mut self,
253 task_name: String,
254 ) -> IngestionResult<CheckpointSequenceNumber> {
255 self.progress_store.load(task_name).await
256 }
257
258 /// Main executor loop.
259 ///
260 /// # Error
261 ///
262 /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
263 /// registered.
264 pub async fn run_with_config(
265 mut self,
266 config: impl Into<CheckpointReaderConfigExt>,
267 ) -> IngestionResult<ExecutorProgress> {
268 let reader_checkpoint_number = self.progress_store.min_watermark()?;
269 let checkpoint_reader =
270 CheckpointReader::new(reader_checkpoint_number, config.into()).await?;
271
272 self.run_executor_loop(reader_checkpoint_number, checkpoint_reader)
273 .await
274 }
275
276 /// Common execution logic
277 async fn run_executor_loop(
278 &mut self,
279 mut reader_checkpoint_number: u64,
280 mut checkpoint_reader: CheckpointReader,
281 ) -> IngestionResult<ExecutorProgress> {
282 let worker_pools = std::mem::take(&mut self.pools)
283 .into_iter()
284 .map(|pool| spawn_monitored_task!(pool))
285 .collect::<Vec<JoinHandle<()>>>();
286
287 let mut worker_pools_shutdown_signals = vec![];
288 let mut checkpoint_limit_reached = None;
289
290 loop {
291 // the min watermark represents the lowest watermark that
292 // has been processed by any worker pool. This guarantees that
293 // all worker pools have processed the checkpoint before the
294 // shutdown process starts.
295 if checkpoint_limit_reached.is_some_and(|ch_seq_num| {
296 self.progress_store
297 .min_watermark()
298 .map(|watermark| watermark > ch_seq_num)
299 .unwrap_or_default()
300 }) {
301 info!(
302 "checkpoint upper limit reached: last checkpoint was processed, shutdown process started"
303 );
304 self.token.cancel();
305 }
306
307 tokio::select! {
308 Some(worker_pool_progress_msg) = self.pool_status_receiver.recv() => {
309 match worker_pool_progress_msg {
310 WorkerPoolStatus::Running((task_name, watermark)) => {
311 self.progress_store.save(task_name.clone(), watermark).await
312 .map_err(|err| IngestionError::ProgressStore(err.to_string()))?;
313 let seq_number = self.progress_store.min_watermark()?;
314 if seq_number > reader_checkpoint_number {
315 checkpoint_reader.send_gc_signal(seq_number).await?;
316 reader_checkpoint_number = seq_number;
317 }
318 self.metrics.data_ingestion_checkpoint
319 .with_label_values(&[&task_name])
320 .set(watermark as i64);
321 }
322 WorkerPoolStatus::Shutdown(worker_pool_name) => {
323 worker_pools_shutdown_signals.push(worker_pool_name);
324 }
325 }
326 }
327 Some(checkpoint) = checkpoint_reader.checkpoint(), if !self.token.is_cancelled() => {
328 // once upper limit reached skip sending new checkpoints to workers.
329 if self.should_shutdown(&checkpoint, &mut checkpoint_limit_reached) {
330 continue;
331 }
332
333 for sender in &self.pool_senders {
334 sender.send(checkpoint.clone()).await.map_err(|_| {
335 IngestionError::Channel(
336 "unable to send new checkpoint to worker pool, receiver half closed".to_owned(),
337 )
338 })?;
339 }
340 }
341 }
342
343 if worker_pools_shutdown_signals.len() == self.pool_senders.len() {
344 // Shutdown worker pools
345 for worker_pool in worker_pools {
346 worker_pool.await.map_err(|err| IngestionError::Shutdown {
347 component: "worker pool".into(),
348 msg: err.to_string(),
349 })?;
350 }
351 // Shutdown checkpoint reader
352 checkpoint_reader.shutdown().await?;
353 break;
354 }
355 }
356
357 Ok(self.progress_store.stats())
358 }
359
360 /// Check if the current ingestion limit has been reached.
361 ///
362 /// Returns `true` if the ingestion limit has been reached.
363 /// If no ingestion limit is present or it has not been reached yet, the
364 /// function returns `false`.
365 fn should_shutdown(
366 &mut self,
367 checkpoint: &CheckpointData,
368 checkpoint_limit_reached: &mut Option<CheckpointSequenceNumber>,
369 ) -> bool {
370 if checkpoint_limit_reached.is_some() {
371 return true;
372 }
373
374 let Some(shutdown_action) = self
375 .shutdown_callback
376 .as_ref()
377 .map(|matches| matches(checkpoint))
378 else {
379 return false;
380 };
381
382 match shutdown_action {
383 ShutdownAction::IncludeAndShutdown => {
384 checkpoint_limit_reached
385 .get_or_insert(checkpoint.checkpoint_summary.sequence_number);
386 false
387 }
388 ShutdownAction::ExcludeAndShutdown => {
389 checkpoint_limit_reached.get_or_insert(
390 checkpoint
391 .checkpoint_summary
392 .sequence_number
393 .saturating_sub(1),
394 );
395 true
396 }
397 ShutdownAction::Continue => false,
398 }
399 }
400}
401
402/// Sets up a single workflow for data ingestion.
403///
404/// This function initializes an [`IndexerExecutor`] with a single worker pool,
405/// using a [`ShimProgressStore`] initialized with the provided
406/// `initial_checkpoint_number`. It then returns a future that runs the executor
407/// and a [`CancellationToken`] for graceful shutdown.
408///
409/// # Docs
410/// For more info please check the [custom indexer docs](https://docs.iota.org/developer/advanced/custom-indexer).
411///
412/// # Example
413/// ```rust,no_run
414/// use std::sync::Arc;
415///
416/// use async_trait::async_trait;
417/// use iota_data_ingestion_core::{
418/// IngestionError, Worker, reader::v2::RemoteUrl, setup_single_workflow,
419/// };
420/// use iota_types::full_checkpoint_content::CheckpointData;
421///
422/// struct CustomWorker;
423///
424/// #[async_trait]
425/// impl Worker for CustomWorker {
426/// type Message = ();
427/// type Error = IngestionError;
428///
429/// async fn process_checkpoint(
430/// &self,
431/// checkpoint: Arc<CheckpointData>,
432/// ) -> Result<Self::Message, Self::Error> {
433/// // custom processing logic.
434/// println!(
435/// "Processing checkpoint: {}",
436/// checkpoint.checkpoint_summary.to_string()
437/// );
438/// Ok(())
439/// }
440/// }
441///
442/// #[tokio::main]
443/// async fn main() {
444/// let (executor, _) = setup_single_workflow(
445/// CustomWorker,
446/// RemoteUrl::Fullnode("http://127.0.0.1:50051".into()), // fullnode gRPC API.
447/// 0, // initial checkpoint number.
448/// 5, // concurrency.
449/// None, // extra reader options.
450/// )
451/// .await
452/// .unwrap();
453/// executor.await.unwrap();
454/// }
455/// ```
456pub async fn setup_single_workflow<W: Worker + 'static>(
457 worker: W,
458 remote_store_url: RemoteUrl,
459 initial_checkpoint_number: CheckpointSequenceNumber,
460 concurrency: usize,
461 reader_options: Option<ReaderOptions>,
462) -> IngestionResult<(
463 impl Future<Output = IngestionResult<ExecutorProgress>>,
464 CancellationToken,
465)> {
466 let metrics = DataIngestionMetrics::new(&Registry::new());
467 let progress_store = ShimProgressStore(initial_checkpoint_number);
468 let token = CancellationToken::new();
469 let mut executor = IndexerExecutor::new(progress_store, 1, metrics, token.child_token());
470 let worker_pool = WorkerPool::new(
471 worker,
472 "workflow".to_string(),
473 concurrency,
474 Default::default(),
475 );
476 executor.register(worker_pool).await?;
477 Ok((
478 executor.run_with_config(CheckpointReaderConfig {
479 reader_options: reader_options.unwrap_or_default(),
480 ingestion_path: None,
481 remote_store_url: Some(remote_store_url),
482 }),
483 token,
484 ))
485}