iota_data_ingestion_core/
executor.rs1use std::{path::PathBuf, pin::Pin, sync::Arc};
6
7use futures::Future;
8use iota_metrics::spawn_monitored_task;
9use iota_rest_api::CheckpointData;
10use iota_types::messages_checkpoint::CheckpointSequenceNumber;
11use prometheus::Registry;
12use tokio::{
13 sync::{mpsc, oneshot},
14 task::JoinHandle,
15};
16use tokio_util::sync::CancellationToken;
17
18use crate::{
19 DataIngestionMetrics, IngestionError, IngestionResult, ReaderOptions, Worker,
20 progress_store::{ExecutorProgress, ProgressStore, ProgressStoreWrapper, ShimProgressStore},
21 reader::{
22 v1::CheckpointReader as CheckpointReaderV1,
23 v2::{CheckpointReader as CheckpointReaderV2, CheckpointReaderConfig},
24 },
25 worker_pool::{WorkerPool, WorkerPoolStatus},
26};
27
28pub const MAX_CHECKPOINTS_IN_PROGRESS: usize = 10000;
29
30enum CheckpointReader {
31 V1 {
32 checkpoint_recv: mpsc::Receiver<Arc<CheckpointData>>,
33 gc_sender: mpsc::Sender<CheckpointSequenceNumber>,
34 exit_sender: oneshot::Sender<()>,
35 handle: JoinHandle<IngestionResult<()>>,
36 },
37 V2(CheckpointReaderV2),
38}
39
40impl CheckpointReader {
41 async fn get_checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
42 match self {
43 Self::V1 {
44 checkpoint_recv, ..
45 } => checkpoint_recv.recv().await,
46 Self::V2(reader) => reader.checkpoint().await,
47 }
48 }
49
50 async fn send_gc_signal(
51 &mut self,
52 seq_number: CheckpointSequenceNumber,
53 ) -> IngestionResult<()> {
54 match self {
55 Self::V1 { gc_sender, .. } => gc_sender.send(seq_number).await.map_err(|_| {
56 IngestionError::Channel(
57 "unable to send GC operation to checkpoint reader, receiver half closed".into(),
58 )
59 }),
60 Self::V2(reader) => reader.send_gc_signal(seq_number).await,
61 }
62 }
63
64 async fn shutdown(self) -> IngestionResult<()> {
65 match self {
66 Self::V1 {
67 exit_sender,
68 handle,
69 ..
70 } => {
71 _ = exit_sender.send(());
72 handle.await.map_err(|err| IngestionError::Shutdown {
73 component: "Checkpoint Reader".into(),
74 msg: err.to_string(),
75 })?
76 }
77 Self::V2(reader) => reader.shutdown().await,
78 }
79 }
80}
81
82pub struct IndexerExecutor<P> {
148 pools: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
149 pool_senders: Vec<mpsc::Sender<Arc<CheckpointData>>>,
150 progress_store: ProgressStoreWrapper<P>,
151 pool_status_sender: mpsc::Sender<WorkerPoolStatus>,
152 pool_status_receiver: mpsc::Receiver<WorkerPoolStatus>,
153 metrics: DataIngestionMetrics,
154 token: CancellationToken,
155}
156
157impl<P: ProgressStore> IndexerExecutor<P> {
158 pub fn new(
159 progress_store: P,
160 number_of_jobs: usize,
161 metrics: DataIngestionMetrics,
162 token: CancellationToken,
163 ) -> Self {
164 let (pool_status_sender, pool_status_receiver) =
165 mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
166 Self {
167 pools: vec![],
168 pool_senders: vec![],
169 progress_store: ProgressStoreWrapper::new(progress_store),
170 pool_status_sender,
171 pool_status_receiver,
172 metrics,
173 token,
174 }
175 }
176
177 pub async fn register<W: Worker + 'static>(
179 &mut self,
180 pool: WorkerPool<W>,
181 ) -> IngestionResult<()> {
182 let checkpoint_number = self.progress_store.load(pool.task_name.clone()).await?;
183 let (sender, receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
184 self.pools.push(Box::pin(pool.run(
185 checkpoint_number,
186 receiver,
187 self.pool_status_sender.clone(),
188 self.token.child_token(),
189 )));
190 self.pool_senders.push(sender);
191 Ok(())
192 }
193
194 pub async fn update_watermark(
195 &mut self,
196 task_name: String,
197 watermark: CheckpointSequenceNumber,
198 ) -> IngestionResult<()> {
199 self.progress_store.save(task_name, watermark).await
200 }
201 pub async fn read_watermark(
202 &mut self,
203 task_name: String,
204 ) -> IngestionResult<CheckpointSequenceNumber> {
205 self.progress_store.load(task_name).await
206 }
207
208 pub async fn run(
215 mut self,
216 path: PathBuf,
217 remote_store_url: Option<String>,
218 remote_store_options: Vec<(String, String)>,
219 reader_options: ReaderOptions,
220 ) -> IngestionResult<ExecutorProgress> {
221 let reader_checkpoint_number = self.progress_store.min_watermark()?;
222 let (checkpoint_reader, checkpoint_recv, gc_sender, exit_sender) =
223 CheckpointReaderV1::initialize(
224 path,
225 reader_checkpoint_number,
226 remote_store_url,
227 remote_store_options,
228 reader_options,
229 );
230
231 let handle = spawn_monitored_task!(checkpoint_reader.run());
232
233 self.run_executor_loop(
234 reader_checkpoint_number,
235 CheckpointReader::V1 {
236 checkpoint_recv,
237 gc_sender,
238 exit_sender,
239 handle,
240 },
241 )
242 .await
243 }
244
245 pub async fn run_with_config(
254 mut self,
255 config: CheckpointReaderConfig,
256 ) -> IngestionResult<ExecutorProgress> {
257 let reader_checkpoint_number = self.progress_store.min_watermark()?;
258
259 let checkpoint_reader = CheckpointReaderV2::new(reader_checkpoint_number, config).await?;
260
261 self.run_executor_loop(
262 reader_checkpoint_number,
263 CheckpointReader::V2(checkpoint_reader),
264 )
265 .await
266 }
267
268 async fn run_executor_loop(
270 &mut self,
271 mut reader_checkpoint_number: u64,
272 mut checkpoint_reader: CheckpointReader,
273 ) -> IngestionResult<ExecutorProgress> {
274 let worker_pools = std::mem::take(&mut self.pools)
275 .into_iter()
276 .map(|pool| spawn_monitored_task!(pool))
277 .collect::<Vec<JoinHandle<()>>>();
278
279 let mut worker_pools_shutdown_signals = vec![];
280
281 loop {
282 tokio::select! {
283 Some(worker_pool_progress_msg) = self.pool_status_receiver.recv() => {
284 match worker_pool_progress_msg {
285 WorkerPoolStatus::Running((task_name, watermark)) => {
286 self.progress_store.save(task_name.clone(), watermark).await
287 .map_err(|err| IngestionError::ProgressStore(err.to_string()))?;
288 let seq_number = self.progress_store.min_watermark()?;
289 if seq_number > reader_checkpoint_number {
290 checkpoint_reader.send_gc_signal(seq_number).await?;
291 reader_checkpoint_number = seq_number;
292 }
293 self.metrics.data_ingestion_checkpoint
294 .with_label_values(&[&task_name])
295 .set(watermark as i64);
296 }
297 WorkerPoolStatus::Shutdown(worker_pool_name) => {
298 worker_pools_shutdown_signals.push(worker_pool_name);
299 }
300 }
301 }
302 Some(checkpoint) = checkpoint_reader.get_checkpoint(), if !self.token.is_cancelled() => {
303 for sender in &self.pool_senders {
304 sender.send(checkpoint.clone()).await.map_err(|_| {
305 IngestionError::Channel(
306 "unable to send new checkpoint to worker pool, receiver half closed".to_owned(),
307 )
308 })?;
309 }
310 }
311 }
312
313 if worker_pools_shutdown_signals.len() == self.pool_senders.len() {
314 for worker_pool in worker_pools {
316 worker_pool.await.map_err(|err| IngestionError::Shutdown {
317 component: "Worker Pool".into(),
318 msg: err.to_string(),
319 })?;
320 }
321 checkpoint_reader.shutdown().await?;
323 break;
324 }
325 }
326
327 Ok(self.progress_store.stats())
328 }
329}
330
331pub async fn setup_single_workflow<W: Worker + 'static>(
384 worker: W,
385 remote_store_url: String,
386 initial_checkpoint_number: CheckpointSequenceNumber,
387 concurrency: usize,
388 reader_options: Option<ReaderOptions>,
389) -> IngestionResult<(
390 impl Future<Output = IngestionResult<ExecutorProgress>>,
391 CancellationToken,
392)> {
393 let metrics = DataIngestionMetrics::new(&Registry::new());
394 let progress_store = ShimProgressStore(initial_checkpoint_number);
395 let token = CancellationToken::new();
396 let mut executor = IndexerExecutor::new(progress_store, 1, metrics, token.child_token());
397 let worker_pool = WorkerPool::new(
398 worker,
399 "workflow".to_string(),
400 concurrency,
401 Default::default(),
402 );
403 executor.register(worker_pool).await?;
404 Ok((
405 executor.run(
406 tempfile::tempdir()?.into_path(),
407 Some(remote_store_url),
408 vec![],
409 reader_options.unwrap_or_default(),
410 ),
411 token,
412 ))
413}