iota_data_ingestion_core/executor.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{path::PathBuf, pin::Pin, sync::Arc};
6
7use futures::Future;
8use iota_metrics::spawn_monitored_task;
9use iota_rest_api::CheckpointData;
10use iota_types::messages_checkpoint::CheckpointSequenceNumber;
11use prometheus::Registry;
12use tokio::{
13 sync::{mpsc, oneshot},
14 task::JoinHandle,
15};
16use tokio_util::sync::CancellationToken;
17
18use crate::{
19 DataIngestionMetrics, IngestionError, IngestionResult, ReaderOptions, Worker,
20 progress_store::{ExecutorProgress, ProgressStore, ProgressStoreWrapper, ShimProgressStore},
21 reader::CheckpointReader,
22 worker_pool::{WorkerPool, WorkerPoolStatus},
23};
24
25pub const MAX_CHECKPOINTS_IN_PROGRESS: usize = 10000;
26
27/// The Executor of the main ingestion pipeline process.
28///
29/// This struct orchestrates the execution of multiple worker pools, handling
30/// checkpoint distribution, progress tracking, and shutdown. It utilizes
31/// [`ProgressStore`] for persisting checkpoint progress and provides metrics
32/// for monitoring the indexing process.
33///
34/// # Example
35/// ```rust,no_run
36/// use async_trait::async_trait;
37/// use iota_data_ingestion_core::{
38/// DataIngestionMetrics, FileProgressStore, IndexerExecutor, IngestionError, ReaderOptions,
39/// Worker, WorkerPool,
40/// };
41/// use iota_types::full_checkpoint_content::CheckpointData;
42/// use prometheus::Registry;
43/// use tokio_util::sync::CancellationToken;
44/// use std::{path::PathBuf, sync::Arc};
45///
46/// struct CustomWorker;
47///
48/// #[async_trait]
49/// impl Worker for CustomWorker {
50/// type Message = ();
51/// type Error = IngestionError;
52///
53/// async fn process_checkpoint(
54/// &self,
55/// checkpoint: Arc<CheckpointData>,
56/// ) -> Result<Self::Message, Self::Error> {
57/// // custom processing logic.
58/// println!(
59/// "Processing Local checkpoint: {}",
60/// checkpoint.checkpoint_summary.to_string()
61/// );
62/// Ok(())
63/// }
64/// }
65///
66/// #[tokio::main]
67/// async fn main() {
68/// let concurrency = 5;
69/// let progress_store = FileProgressStore::new("progress.json").await.unwrap();
70/// let mut executor = IndexerExecutor::new(
71/// progress_store,
72/// 1, // number of registered WorkerPools.
73/// DataIngestionMetrics::new(&Registry::new()),
74/// CancellationToken::new(),
75/// );
76/// // register a worker pool with 5 workers to process checkpoints in parallel
77/// let worker_pool = WorkerPool::new(CustomWorker, "local_reader".to_string(), concurrency, Default::default());
78/// // register the worker pool to the executor.
79/// executor.register(worker_pool).await.unwrap();
80/// // run the ingestion pipeline.
81/// executor
82/// .run(
83/// PathBuf::from("./chk".to_string()), // path to a local directory where checkpoints are stored.
84/// None,
85/// vec![], // optional remote store access options.
86/// ReaderOptions::default(), // remote_read_batch_size.
87/// )
88/// .await
89/// .unwrap();
90/// }
91/// ```
92pub struct IndexerExecutor<P> {
93 pools: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
94 pool_senders: Vec<mpsc::Sender<Arc<CheckpointData>>>,
95 progress_store: ProgressStoreWrapper<P>,
96 pool_status_sender: mpsc::Sender<WorkerPoolStatus>,
97 pool_status_receiver: mpsc::Receiver<WorkerPoolStatus>,
98 metrics: DataIngestionMetrics,
99 token: CancellationToken,
100}
101
102impl<P: ProgressStore> IndexerExecutor<P> {
103 pub fn new(
104 progress_store: P,
105 number_of_jobs: usize,
106 metrics: DataIngestionMetrics,
107 token: CancellationToken,
108 ) -> Self {
109 let (pool_status_sender, pool_status_receiver) =
110 mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
111 Self {
112 pools: vec![],
113 pool_senders: vec![],
114 progress_store: ProgressStoreWrapper::new(progress_store),
115 pool_status_sender,
116 pool_status_receiver,
117 metrics,
118 token,
119 }
120 }
121
122 /// Registers new worker pool in executor.
123 pub async fn register<W: Worker + 'static>(
124 &mut self,
125 pool: WorkerPool<W>,
126 ) -> IngestionResult<()> {
127 let checkpoint_number = self.progress_store.load(pool.task_name.clone()).await?;
128 let (sender, receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
129 self.pools.push(Box::pin(pool.run(
130 checkpoint_number,
131 receiver,
132 self.pool_status_sender.clone(),
133 self.token.child_token(),
134 )));
135 self.pool_senders.push(sender);
136 Ok(())
137 }
138
139 pub async fn update_watermark(
140 &mut self,
141 task_name: String,
142 watermark: CheckpointSequenceNumber,
143 ) -> IngestionResult<()> {
144 self.progress_store.save(task_name, watermark).await
145 }
146 pub async fn read_watermark(
147 &mut self,
148 task_name: String,
149 ) -> IngestionResult<CheckpointSequenceNumber> {
150 self.progress_store.load(task_name).await
151 }
152
153 /// Main executor loop.
154 ///
155 /// # Error
156 ///
157 /// Returns an [`IngestionError::EmptyWorkerPool`] if no worker pool was
158 /// registered.
159 pub async fn run(
160 mut self,
161 path: PathBuf,
162 remote_store_url: Option<String>,
163 remote_store_options: Vec<(String, String)>,
164 reader_options: ReaderOptions,
165 ) -> IngestionResult<ExecutorProgress> {
166 let mut reader_checkpoint_number = self.progress_store.min_watermark()?;
167 let (checkpoint_reader, mut checkpoint_recv, gc_sender, exit_sender) =
168 CheckpointReader::initialize(
169 path,
170 reader_checkpoint_number,
171 remote_store_url,
172 remote_store_options,
173 reader_options,
174 );
175
176 let checkpoint_reader_handle = spawn_monitored_task!(checkpoint_reader.run());
177
178 let worker_pools = std::mem::take(&mut self.pools)
179 .into_iter()
180 .map(|pool| spawn_monitored_task!(pool))
181 .collect::<Vec<JoinHandle<()>>>();
182
183 let mut worker_pools_shutdown_signals = vec![];
184
185 loop {
186 tokio::select! {
187 Some(worker_pool_progress_msg) = self.pool_status_receiver.recv() => {
188 match worker_pool_progress_msg {
189 WorkerPoolStatus::Running((task_name, watermark)) => {
190 self.progress_store.save(task_name.clone(), watermark).await.map_err(|err| IngestionError::ProgressStore(err.to_string()))?;
191 let seq_number = self.progress_store.min_watermark()?;
192 if seq_number > reader_checkpoint_number {
193 gc_sender.send(seq_number).await.map_err(|_| {
194 IngestionError::Channel(
195 "unable to send GC operation to checkpoint reader, receiver half closed"
196 .to_owned(),
197 )
198 })?;
199 reader_checkpoint_number = seq_number;
200 }
201 self.metrics.data_ingestion_checkpoint.with_label_values(&[&task_name]).set(watermark as i64);
202 }
203 WorkerPoolStatus::Shutdown(worker_pool_name) => {
204 // Track worker pools that have initiated shutdown.
205 worker_pools_shutdown_signals.push(worker_pool_name);
206 }
207 }
208 }
209 // Only process new checkpoints while system is running (token not cancelled).
210 // The guard prevents accepting new work during shutdown while allowing existing work to complete for other branches.
211 Some(checkpoint) = checkpoint_recv.recv(), if !self.token.is_cancelled() => {
212 for sender in &self.pool_senders {
213 sender.send(checkpoint.clone()).await.map_err(|_| {
214 IngestionError::Channel(
215 "unable to send new checkpoint to worker pool, receiver half closed"
216 .to_owned(),
217 )
218 })?;
219 }
220 }
221 }
222
223 // Once all workers pools have signaled completion, start the graceful shutdown
224 // process.
225 if worker_pools_shutdown_signals.len() == self.pool_senders.len() {
226 break components_graceful_shutdown(
227 worker_pools,
228 exit_sender,
229 checkpoint_reader_handle,
230 )
231 .await?;
232 }
233 }
234
235 Ok(self.progress_store.stats())
236 }
237}
238
239/// Start the graceful shutdown of remaining components.
240///
241/// - Awaits all worker pool handles.
242/// - Send shutdown signal to checkpoint reader actor.
243/// - Await checkpoint reader handle.
244async fn components_graceful_shutdown(
245 worker_pools: Vec<JoinHandle<()>>,
246 exit_sender: oneshot::Sender<()>,
247 checkpoint_reader_handle: JoinHandle<IngestionResult<()>>,
248) -> IngestionResult<()> {
249 for worker_pool in worker_pools {
250 worker_pool.await.map_err(|err| IngestionError::Shutdown {
251 component: "Worker Pool".into(),
252 msg: err.to_string(),
253 })?;
254 }
255 _ = exit_sender.send(());
256 checkpoint_reader_handle
257 .await
258 .map_err(|err| IngestionError::Shutdown {
259 component: "Checkpoint Reader".into(),
260 msg: err.to_string(),
261 })??;
262 Ok(())
263}
264
265/// Sets up a single workflow for data ingestion.
266///
267/// This function initializes an [`IndexerExecutor`] with a single worker pool,
268/// using a [`ShimProgressStore`] initialized with the provided
269/// `initial_checkpoint_number`. It then returns a future that runs the executor
270/// and a [`CancellationToken`] for graceful shutdown.
271///
272/// # Docs
273/// For more info please check the [custom indexer docs](https://docs.iota.org/developer/advanced/custom-indexer).
274///
275/// # Example
276/// ```rust,no_run
277/// use std::sync::Arc;
278///
279/// use async_trait::async_trait;
280/// use iota_data_ingestion_core::{IngestionError, Worker, setup_single_workflow};
281/// use iota_types::full_checkpoint_content::CheckpointData;
282///
283/// struct CustomWorker;
284///
285/// #[async_trait]
286/// impl Worker for CustomWorker {
287/// type Message = ();
288/// type Error = IngestionError;
289///
290/// async fn process_checkpoint(
291/// &self,
292/// checkpoint: Arc<CheckpointData>,
293/// ) -> Result<Self::Message, Self::Error> {
294/// // custom processing logic.
295/// println!(
296/// "Processing checkpoint: {}",
297/// checkpoint.checkpoint_summary.to_string()
298/// );
299/// Ok(())
300/// }
301/// }
302///
303/// #[tokio::main]
304/// async fn main() {
305/// let (executor, _) = setup_single_workflow(
306/// CustomWorker,
307/// "http://127.0.0.1:9000/api/v1".to_string(), // fullnode REST API
308/// 0, // initial checkpoint number.
309/// 5, // concurrency.
310/// None, // extra reader options.
311/// )
312/// .await
313/// .unwrap();
314/// executor.await.unwrap();
315/// }
316/// ```
317pub async fn setup_single_workflow<W: Worker + 'static>(
318 worker: W,
319 remote_store_url: String,
320 initial_checkpoint_number: CheckpointSequenceNumber,
321 concurrency: usize,
322 reader_options: Option<ReaderOptions>,
323) -> IngestionResult<(
324 impl Future<Output = IngestionResult<ExecutorProgress>>,
325 CancellationToken,
326)> {
327 let metrics = DataIngestionMetrics::new(&Registry::new());
328 let progress_store = ShimProgressStore(initial_checkpoint_number);
329 let token = CancellationToken::new();
330 let mut executor = IndexerExecutor::new(progress_store, 1, metrics, token.child_token());
331 let worker_pool = WorkerPool::new(
332 worker,
333 "workflow".to_string(),
334 concurrency,
335 Default::default(),
336 );
337 executor.register(worker_pool).await?;
338 Ok((
339 executor.run(
340 tempfile::tempdir()?.into_path(),
341 Some(remote_store_url),
342 vec![],
343 reader_options.unwrap_or_default(),
344 ),
345 token,
346 ))
347}