iota_data_ingestion_core/executor.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{pin::Pin, sync::Arc};
6
7use futures::Future;
8use iota_metrics::spawn_monitored_task;
9use iota_types::{
10 committee::EpochId, full_checkpoint_content::CheckpointData,
11 messages_checkpoint::CheckpointSequenceNumber,
12};
13use prometheus::Registry;
14use tokio::{sync::mpsc, task::JoinHandle};
15use tokio_util::sync::CancellationToken;
16use tracing::info;
17
18use crate::{
19 DataIngestionMetrics, IngestionError, IngestionResult, ReaderOptions, Worker,
20 progress_store::{ExecutorProgress, ProgressStore, ProgressStoreWrapper, ShimProgressStore},
21 reader::v2::{CheckpointReader, CheckpointReaderConfig, RemoteUrl},
22 worker_pool::{WorkerPool, WorkerPoolStatus},
23};
24
25pub const MAX_CHECKPOINTS_IN_PROGRESS: usize = 10000;
26
27/// Callback function invoked for each incoming checkpoint to determine the
28/// shutdown action if it exceeds the ingestion limit.
29type ShutdownCallback = Box<dyn Fn(&CheckpointData) -> ShutdownAction + Send>;
30
31/// Determines the shutdown action when a checkpoint reaches the ingestion
32/// limit.
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum ShutdownAction {
35 /// Include the current checkpoint in the ingestion process, then initiate
36 /// the graceful shutdown process.
37 IncludeAndShutdown,
38 /// Exclude the current checkpoint from ingestion and immediately initiate
39 /// the graceful shutdown process.
40 ExcludeAndShutdown,
41 /// Continue processing the current checkpoint without shutting down.
42 Continue,
43}
44
45/// Common policies for upper limit checkpoint ingestion by the framework.
46///
47/// Once the limit is reached, the framework will start the graceful
48/// shutdown process.
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
50#[non_exhaustive]
51pub enum IngestionLimit {
52 /// Last checkpoint sequence number to process.
53 ///
54 /// After processing this checkpoint the framework will start the graceful
55 /// shutdown process.
56 MaxCheckpoint(CheckpointSequenceNumber),
57 /// Last checkpoint to process based on the given epoch.
58 ///
59 /// After processing this checkpoint the framework will start the graceful
60 /// shutdown process.
61 EndOfEpoch(EpochId),
62}
63
64impl IngestionLimit {
65 /// Evaluates whether the given checkpoint triggers a shutdown action based
66 /// on the ingestion limit.
67 fn matches(&self, checkpoint: &CheckpointData) -> ShutdownAction {
68 match self {
69 IngestionLimit::MaxCheckpoint(max) => {
70 if &checkpoint.checkpoint_summary.sequence_number > max {
71 return ShutdownAction::ExcludeAndShutdown;
72 }
73 ShutdownAction::Continue
74 }
75 IngestionLimit::EndOfEpoch(max) => {
76 if &checkpoint.checkpoint_summary.epoch > max {
77 return ShutdownAction::ExcludeAndShutdown;
78 }
79 ShutdownAction::Continue
80 }
81 }
82 }
83}
84
85/// The Executor of the main ingestion pipeline process.
86///
87/// This struct orchestrates the execution of multiple worker pools, handling
88/// checkpoint distribution, progress tracking, and shutdown. It utilizes
89/// [`ProgressStore`] for persisting checkpoint progress and provides metrics
90/// for monitoring the indexing process.
91///
92/// # Example
93/// ```rust,no_run
94/// use async_trait::async_trait;
95/// use iota_data_ingestion_core::{
96/// DataIngestionMetrics, FileProgressStore, IndexerExecutor, IngestionError, ReaderOptions,
97/// Worker, WorkerPool,
98/// reader::v2::CheckpointReaderConfig
99/// };
100/// use iota_types::full_checkpoint_content::CheckpointData;
101/// use prometheus::Registry;
102/// use tokio_util::sync::CancellationToken;
103/// use std::{path::PathBuf, sync::Arc};
104///
105/// struct CustomWorker;
106///
107/// #[async_trait]
108/// impl Worker for CustomWorker {
109/// type Message = ();
110/// type Error = IngestionError;
111///
112/// async fn process_checkpoint(
113/// &self,
114/// checkpoint: Arc<CheckpointData>,
115/// ) -> Result<Self::Message, Self::Error> {
116/// // custom processing logic.
117/// println!(
118/// "Processing Local checkpoint: {}",
119/// checkpoint.checkpoint_summary.to_string()
120/// );
121/// Ok(())
122/// }
123/// }
124///
125/// #[tokio::main]
126/// async fn main() {
127/// let concurrency = 5;
128/// let progress_store = FileProgressStore::new("progress.json").await.unwrap();
129/// let mut executor = IndexerExecutor::new(
130/// progress_store,
131/// 1, // number of registered WorkerPools.
132/// DataIngestionMetrics::new(&Registry::new()),
133/// CancellationToken::new(),
134/// );
135/// // register a worker pool with 5 workers to process checkpoints in parallel
136/// let worker_pool = WorkerPool::new(CustomWorker, "local_reader".to_string(), concurrency, Default::default());
137/// // register the worker pool to the executor.
138/// executor.register(worker_pool).await.unwrap();
139/// // run the ingestion pipeline.
140/// executor
141/// .run_with_config(
142/// CheckpointReaderConfig {
143/// reader_options: ReaderOptions::default(),
144/// ingestion_path: Some(PathBuf::from("./chk".to_string())), // path to a local directory where checkpoints are stored.
145/// remote_store_url: None,
146/// }
147/// )
148/// .await
149/// .unwrap();
150/// }
151/// ```
152pub struct IndexerExecutor<P> {
153 pools: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
154 pool_senders: Vec<mpsc::Sender<Arc<CheckpointData>>>,
155 progress_store: ProgressStoreWrapper<P>,
156 pool_status_sender: mpsc::Sender<WorkerPoolStatus>,
157 pool_status_receiver: mpsc::Receiver<WorkerPoolStatus>,
158 metrics: DataIngestionMetrics,
159 token: CancellationToken,
160 shutdown_callback: Option<ShutdownCallback>,
161}
162
163impl<P: ProgressStore> IndexerExecutor<P> {
164 pub fn new(
165 progress_store: P,
166 number_of_jobs: usize,
167 metrics: DataIngestionMetrics,
168 token: CancellationToken,
169 ) -> Self {
170 let (pool_status_sender, pool_status_receiver) =
171 mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
172 Self {
173 pools: vec![],
174 pool_senders: vec![],
175 progress_store: ProgressStoreWrapper::new(progress_store),
176 pool_status_sender,
177 pool_status_receiver,
178 metrics,
179 token,
180 shutdown_callback: None,
181 }
182 }
183
184 /// Registers new worker pool in executor.
185 pub async fn register<W: Worker + 'static>(
186 &mut self,
187 pool: WorkerPool<W>,
188 ) -> IngestionResult<()> {
189 let checkpoint_number = self.progress_store.load(pool.task_name.clone()).await?;
190 let (sender, receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
191 self.pools.push(Box::pin(pool.run(
192 checkpoint_number,
193 receiver,
194 self.pool_status_sender.clone(),
195 self.token.child_token(),
196 )));
197 self.pool_senders.push(sender);
198 Ok(())
199 }
200
201 /// Registers a predicate callback that determines when the ingestion
202 /// process should stop.
203 ///
204 /// This function `f` will be called for every **incoming checkpoint**
205 /// before it’s sent to the worker pool.
206 ///
207 /// Based on the returned [`ShutdownAction`] the executor will evaluate
208 /// whether to continue or stop the ingestion process by initiating the
209 /// graceful shutdown process.
210 ///
211 /// Once a shutdown action is triggered, the executor will stop sending new
212 /// checkpoints and will wait for all previously sent checkpoints to be
213 /// processed by workers before initiating graceful shutdown process.
214 ///
215 /// Note:
216 ///
217 /// Calling this method after
218 /// [`with_ingestion_limit`](Self::with_ingestion_limit) replaces the
219 /// earlier predicate, and vice versa. They are not cumulative.
220 pub fn shutdown_when<F>(&mut self, f: F)
221 where
222 F: Fn(&CheckpointData) -> ShutdownAction + Send + 'static,
223 {
224 self.shutdown_callback = Some(Box::new(f));
225 }
226
227 /// Adds an upper‑limit policy that determines when the ingestion
228 /// process should stop.
229 ///
230 /// This is a convenience method, it internally uses
231 /// [`shutdown_when`](Self::shutdown_when) by registering a predicate
232 /// derived from the provided [`IngestionLimit`].
233 ///
234 /// Note:
235 ///
236 /// Calling this method after [`shutdown_when`](Self::shutdown_when)
237 /// replaces the earlier predicate, and vice versa. They are not cumulative.
238 pub fn with_ingestion_limit(&mut self, limit: IngestionLimit) {
239 self.shutdown_when(move |checkpoint| limit.matches(checkpoint));
240 }
241
242 pub async fn update_watermark(
243 &mut self,
244 task_name: String,
245 watermark: CheckpointSequenceNumber,
246 ) -> IngestionResult<()> {
247 self.progress_store.save(task_name, watermark).await
248 }
249 pub async fn read_watermark(
250 &mut self,
251 task_name: String,
252 ) -> IngestionResult<CheckpointSequenceNumber> {
253 self.progress_store.load(task_name).await
254 }
255
256 /// Main executor loop.
257 ///
258 /// # Error
259 ///
260 /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
261 /// registered.
262 pub async fn run_with_config(
263 mut self,
264 config: CheckpointReaderConfig,
265 ) -> IngestionResult<ExecutorProgress> {
266 let reader_checkpoint_number = self.progress_store.min_watermark()?;
267 let checkpoint_reader = CheckpointReader::new(reader_checkpoint_number, config).await?;
268
269 self.run_executor_loop(reader_checkpoint_number, checkpoint_reader)
270 .await
271 }
272
273 /// Common execution logic
274 async fn run_executor_loop(
275 &mut self,
276 mut reader_checkpoint_number: u64,
277 mut checkpoint_reader: CheckpointReader,
278 ) -> IngestionResult<ExecutorProgress> {
279 let worker_pools = std::mem::take(&mut self.pools)
280 .into_iter()
281 .map(|pool| spawn_monitored_task!(pool))
282 .collect::<Vec<JoinHandle<()>>>();
283
284 let mut worker_pools_shutdown_signals = vec![];
285 let mut checkpoint_limit_reached = None;
286
287 loop {
288 // the min watermark represents the lowest watermark that
289 // has been processed by any worker pool. This guarantees that
290 // all worker pools have processed the checkpoint before the
291 // shutdown process starts.
292 if checkpoint_limit_reached.is_some_and(|ch_seq_num| {
293 self.progress_store
294 .min_watermark()
295 .map(|watermark| watermark > ch_seq_num)
296 .unwrap_or_default()
297 }) {
298 info!(
299 "checkpoint upper limit reached: last checkpoint was processed, shutdown process started"
300 );
301 self.token.cancel();
302 }
303
304 tokio::select! {
305 Some(worker_pool_progress_msg) = self.pool_status_receiver.recv() => {
306 match worker_pool_progress_msg {
307 WorkerPoolStatus::Running((task_name, watermark)) => {
308 self.progress_store.save(task_name.clone(), watermark).await
309 .map_err(|err| IngestionError::ProgressStore(err.to_string()))?;
310 let seq_number = self.progress_store.min_watermark()?;
311 if seq_number > reader_checkpoint_number {
312 checkpoint_reader.send_gc_signal(seq_number).await?;
313 reader_checkpoint_number = seq_number;
314 }
315 self.metrics.data_ingestion_checkpoint
316 .with_label_values(&[&task_name])
317 .set(watermark as i64);
318 }
319 WorkerPoolStatus::Shutdown(worker_pool_name) => {
320 worker_pools_shutdown_signals.push(worker_pool_name);
321 }
322 }
323 }
324 Some(checkpoint) = checkpoint_reader.checkpoint(), if !self.token.is_cancelled() => {
325 // once upper limit reached skip sending new checkpoints to workers.
326 if self.should_shutdown(&checkpoint, &mut checkpoint_limit_reached) {
327 continue;
328 }
329
330 for sender in &self.pool_senders {
331 sender.send(checkpoint.clone()).await.map_err(|_| {
332 IngestionError::Channel(
333 "unable to send new checkpoint to worker pool, receiver half closed".to_owned(),
334 )
335 })?;
336 }
337 }
338 }
339
340 if worker_pools_shutdown_signals.len() == self.pool_senders.len() {
341 // Shutdown worker pools
342 for worker_pool in worker_pools {
343 worker_pool.await.map_err(|err| IngestionError::Shutdown {
344 component: "worker pool".into(),
345 msg: err.to_string(),
346 })?;
347 }
348 // Shutdown checkpoint reader
349 checkpoint_reader.shutdown().await?;
350 break;
351 }
352 }
353
354 Ok(self.progress_store.stats())
355 }
356
357 /// Check if the current ingestion limit has been reached.
358 ///
359 /// Returns `true` if the ingestion limit has been reached.
360 /// If no ingestion limit is present or it has not been reached yet, the
361 /// function returns `false`.
362 fn should_shutdown(
363 &mut self,
364 checkpoint: &CheckpointData,
365 checkpoint_limit_reached: &mut Option<CheckpointSequenceNumber>,
366 ) -> bool {
367 if checkpoint_limit_reached.is_some() {
368 return true;
369 }
370
371 let Some(shutdown_action) = self
372 .shutdown_callback
373 .as_ref()
374 .map(|matches| matches(checkpoint))
375 else {
376 return false;
377 };
378
379 match shutdown_action {
380 ShutdownAction::IncludeAndShutdown => {
381 checkpoint_limit_reached
382 .get_or_insert(checkpoint.checkpoint_summary.sequence_number);
383 false
384 }
385 ShutdownAction::ExcludeAndShutdown => {
386 checkpoint_limit_reached.get_or_insert(
387 checkpoint
388 .checkpoint_summary
389 .sequence_number
390 .saturating_sub(1),
391 );
392 true
393 }
394 ShutdownAction::Continue => false,
395 }
396 }
397}
398
399/// Sets up a single workflow for data ingestion.
400///
401/// This function initializes an [`IndexerExecutor`] with a single worker pool,
402/// using a [`ShimProgressStore`] initialized with the provided
403/// `initial_checkpoint_number`. It then returns a future that runs the executor
404/// and a [`CancellationToken`] for graceful shutdown.
405///
406/// # Docs
407/// For more info please check the [custom indexer docs](https://docs.iota.org/developer/advanced/custom-indexer).
408///
409/// # Example
410/// ```rust,no_run
411/// use std::sync::Arc;
412///
413/// use async_trait::async_trait;
414/// use iota_data_ingestion_core::{
415/// IngestionError, Worker, reader::v2::RemoteUrl, setup_single_workflow,
416/// };
417/// use iota_types::full_checkpoint_content::CheckpointData;
418///
419/// struct CustomWorker;
420///
421/// #[async_trait]
422/// impl Worker for CustomWorker {
423/// type Message = ();
424/// type Error = IngestionError;
425///
426/// async fn process_checkpoint(
427/// &self,
428/// checkpoint: Arc<CheckpointData>,
429/// ) -> Result<Self::Message, Self::Error> {
430/// // custom processing logic.
431/// println!(
432/// "Processing checkpoint: {}",
433/// checkpoint.checkpoint_summary.to_string()
434/// );
435/// Ok(())
436/// }
437/// }
438///
439/// #[tokio::main]
440/// async fn main() {
441/// let (executor, _) = setup_single_workflow(
442/// CustomWorker,
443/// RemoteUrl::Fullnode("http://127.0.0.1:50051".into()), // fullnode gRPC API.
444/// 0, // initial checkpoint number.
445/// 5, // concurrency.
446/// None, // extra reader options.
447/// )
448/// .await
449/// .unwrap();
450/// executor.await.unwrap();
451/// }
452/// ```
453pub async fn setup_single_workflow<W: Worker + 'static>(
454 worker: W,
455 remote_store_url: RemoteUrl,
456 initial_checkpoint_number: CheckpointSequenceNumber,
457 concurrency: usize,
458 reader_options: Option<ReaderOptions>,
459) -> IngestionResult<(
460 impl Future<Output = IngestionResult<ExecutorProgress>>,
461 CancellationToken,
462)> {
463 let metrics = DataIngestionMetrics::new(&Registry::new());
464 let progress_store = ShimProgressStore(initial_checkpoint_number);
465 let token = CancellationToken::new();
466 let mut executor = IndexerExecutor::new(progress_store, 1, metrics, token.child_token());
467 let worker_pool = WorkerPool::new(
468 worker,
469 "workflow".to_string(),
470 concurrency,
471 Default::default(),
472 );
473 executor.register(worker_pool).await?;
474 Ok((
475 executor.run_with_config(CheckpointReaderConfig {
476 reader_options: reader_options.unwrap_or_default(),
477 ingestion_path: None,
478 remote_store_url: Some(remote_store_url),
479 }),
480 token,
481 ))
482}