iota_data_ingestion_core/lib.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! This library provides an easy way to create custom indexers.
6//! <br>
7//!
8//! ## Graceful shutdown
9//!
10//! The shutdown sequence in the data ingestion system ensures clean termination
11//! of all components while preserving data integrity. It is initiated via a
12//! [CancellationToken](tokio_util::sync::CancellationToken), which triggers a
13//! hierarchical and graceful shutdown process.
14//!
15//! The shutdown process follows a top-down hierarchy:
16//! 1. [`Worker`]: Individual workers within a [`WorkerPool`] detect the
17//! cancellation signal, completes current checkpoint processing, sends final
18//! progress updates and signals completion to parent [`WorkerPool`] via
19//! `WorkerStatus::Shutdown` message.
20//! 2. [`WorkerPool`]: Coordinates worker shutdowns, ensures all progress
21//! messages are processed, waits for all workers' shutdown signals and
22//! notifies [`IndexerExecutor`] with `WorkerPoolStatus::Shutdown` message
23//! when fully terminated.
24//! 3. [`IndexerExecutor`]: Orchestrates the shutdown of all worker pools and
25//! and finalizes system termination.
26
27mod errors;
28mod executor;
29pub mod history;
30mod metrics;
31mod progress_store;
32mod reader;
33mod reducer;
34#[cfg(test)]
35mod tests;
36mod util;
37mod worker_pool;
38
39use std::{
40 fmt::{Debug, Display},
41 sync::Arc,
42};
43
44use async_trait::async_trait;
45pub use errors::{IngestionError, IngestionResult};
46pub use executor::{IndexerExecutor, MAX_CHECKPOINTS_IN_PROGRESS, setup_single_workflow};
47use iota_types::full_checkpoint_content::CheckpointData;
48pub use metrics::DataIngestionMetrics;
49pub use progress_store::{FileProgressStore, ProgressStore, ShimProgressStore};
50pub use reader::ReaderOptions;
51pub use reducer::Reducer;
52pub use util::{create_remote_store_client, create_remote_store_client_with_ops};
53pub use worker_pool::WorkerPool;
54
55/// Processes individual checkpoints and produces messages for optional batch
56/// processing.
57///
58/// The Worker trait defines the core processing logic for checkpoint data.
59/// Workers run in parallel within a [`WorkerPool`] to process checkpoints and
60/// generate messages that can optionally be batched and processed by a
61/// [`Reducer`].
62///
63/// # Processing Modes
64///
65/// Workers support two processing modes:
66/// * **Direct Processing**: Messages are handled immediately without batching.
67/// * **Batch Processing**: Messages are accumulated and processed in batches by
68/// a [`Reducer`].
69///
70/// The processing mode is determined by the presence of a [`Reducer`] in the
71/// [`WorkerPool`] configuration.
72///
73/// # Concurrency
74///
75/// Multiple instances of a worker can run in parallel in the worker pool. The
76/// implementation must be thread-safe and handle checkpoint processing
77/// efficiently.
78///
79/// # Integration with Optional Reducer
80///
81/// Messages produced by the worker can be:
82/// * Processed directly without batching.
83/// * Accumulated and passed to a [`Reducer`] for batch processing.
84///
85/// The worker's [`Message`](Worker::Message) type must match the reducer's
86/// input type when batch processing is enabled.
87#[async_trait]
88pub trait Worker: Send + Sync {
89 type Error: Debug + Display;
90 type Message: Send + Sync;
91
92 /// Processes a single checkpoint and returns a message.
93 ///
94 /// This method contains the core logic for processing checkpoint data.
95 ///
96 /// # Note
97 /// - Checkpoints are processed in order when a single worker is used.
98 /// - Parallel processing with multiple workers does not guarantee
99 /// checkpoint order.
100 async fn process_checkpoint(
101 &self,
102 checkpoint: Arc<CheckpointData>,
103 ) -> Result<Self::Message, Self::Error>;
104
105 /// A hook that allows preprocessing a checkpoint before it's fully
106 /// processed.
107 ///
108 /// This method can be used to perform actions like validation or data
109 /// transformation before the main
110 /// [`process_checkpoint`](Worker::process_checkpoint) logic is executed.
111 ///
112 /// # Default implementation
113 ///
114 /// By default it returns `Ok(())`.
115 fn preprocess_hook(&self, _: Arc<CheckpointData>) -> Result<(), Self::Error> {
116 Ok(())
117 }
118}