iota_data_ingestion_core/lib.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! This library provides an easy way to create custom indexers.
6//! <br>
7//!
8//! ## Graceful shutdown
9//!
10//! The shutdown sequence in the data ingestion system ensures clean termination
11//! of all components while preserving data integrity. It is initiated via a
12//! [CancellationToken](tokio_util::sync::CancellationToken), which triggers a
13//! hierarchical and graceful shutdown process.
14//!
15//! The shutdown process follows a top-down hierarchy:
16//! 1. [`Worker`]: Individual workers within a [`WorkerPool`] detect the
17//! cancellation signal, completes current checkpoint processing, sends final
18//! progress updates and signals completion to parent [`WorkerPool`] via
19//! `WorkerStatus::Shutdown` message.
20//! 2. [`WorkerPool`]: Coordinates worker shutdowns, ensures all progress
21//! messages are processed, waits for all workers' shutdown signals and
22//! notifies [`IndexerExecutor`] with `WorkerPoolStatus::Shutdown` message
23//! when fully terminated.
24//! 3. [`IndexerExecutor`]: Orchestrates the shutdown of all worker pools and
25//! and finalizes system termination.
26
27mod errors;
28mod executor;
29pub mod history;
30mod metrics;
31mod progress_store;
32pub mod reader;
33mod reducer;
34#[cfg(test)]
35mod tests;
36mod util;
37mod worker_pool;
38
39use std::{
40 fmt::{Debug, Display},
41 sync::Arc,
42};
43
44use async_trait::async_trait;
45pub use errors::{IngestionError, IngestionResult};
46pub use executor::{
47 IndexerExecutor, IngestionLimit, MAX_CHECKPOINTS_IN_PROGRESS, ShutdownAction,
48 setup_single_workflow,
49};
50use iota_types::full_checkpoint_content::CheckpointData;
51pub use metrics::DataIngestionMetrics;
52pub use progress_store::{FileProgressStore, ProgressStore, ShimProgressStore};
53pub use reader::v1::ReaderOptions;
54pub use reducer::Reducer;
55pub use util::{create_remote_store_client, create_remote_store_client_with_ops};
56pub use worker_pool::WorkerPool;
57
58/// Processes individual checkpoints and produces messages for optional batch
59/// processing.
60///
61/// The Worker trait defines the core processing logic for checkpoint data.
62/// Workers run in parallel within a [`WorkerPool`] to process checkpoints and
63/// generate messages that can optionally be batched and processed by a
64/// [`Reducer`].
65///
66/// # Processing Modes
67///
68/// Workers support two processing modes:
69/// * **Direct Processing**: Messages are handled immediately without batching.
70/// * **Batch Processing**: Messages are accumulated and processed in batches by
71/// a [`Reducer`].
72///
73/// The processing mode is determined by the presence of a [`Reducer`] in the
74/// [`WorkerPool`] configuration.
75///
76/// # Concurrency
77///
78/// Multiple instances of a worker can run in parallel in the worker pool. The
79/// implementation must be thread-safe and handle checkpoint processing
80/// efficiently.
81///
82/// # Integration with Optional Reducer
83///
84/// Messages produced by the worker can be:
85/// * Processed directly without batching.
86/// * Accumulated and passed to a [`Reducer`] for batch processing.
87///
88/// The worker's [`Message`](Worker::Message) type must match the reducer's
89/// input type when batch processing is enabled.
90#[async_trait]
91pub trait Worker: Send + Sync {
92 type Error: Debug + Display;
93 type Message: Send + Sync;
94
95 /// Processes a single checkpoint and returns a message.
96 ///
97 /// This method contains the core logic for processing checkpoint data.
98 ///
99 /// # Note
100 /// - Checkpoints are processed in order when a single worker is used.
101 /// - Parallel processing with multiple workers does not guarantee
102 /// checkpoint order.
103 async fn process_checkpoint(
104 &self,
105 checkpoint: Arc<CheckpointData>,
106 ) -> Result<Self::Message, Self::Error>;
107
108 /// A hook that allows preprocessing a checkpoint before it's fully
109 /// processed.
110 ///
111 /// This method can be used to perform actions like validation or data
112 /// transformation before the main
113 /// [`process_checkpoint`](Worker::process_checkpoint) logic is executed.
114 ///
115 /// # Default implementation
116 ///
117 /// By default it returns `Ok(())`.
118 fn preprocess_hook(&self, _: Arc<CheckpointData>) -> Result<(), Self::Error> {
119 Ok(())
120 }
121}