iota_data_ingestion_core/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! This library provides an easy way to create custom indexers.
6//! <br>
7//!
8//! ## Graceful shutdown
9//!
10//! The shutdown sequence in the data ingestion system ensures clean termination
11//! of all components while preserving data integrity. It is initiated via a
12//! [CancellationToken](tokio_util::sync::CancellationToken), which triggers a
13//! hierarchical and graceful shutdown process.
14//!
15//! The shutdown process follows a top-down hierarchy:
16//! 1. [`Worker`]: Individual workers within a [`WorkerPool`] detect the
17//!    cancellation signal, completes current checkpoint processing, sends final
18//!    progress updates and signals completion to parent [`WorkerPool`] via
19//!    `WorkerStatus::Shutdown` message.
20//! 2. [`WorkerPool`]: Coordinates worker shutdowns, ensures all progress
21//!    messages are processed, waits for all workers' shutdown signals and
22//!    notifies [`IndexerExecutor`] with `WorkerPoolStatus::Shutdown` message
23//!    when fully terminated.
24//! 3. [`IndexerExecutor`]: Orchestrates the shutdown of all worker pools and
25//!    and finalizes system termination.
26
27mod errors;
28mod executor;
29pub mod history;
30mod metrics;
31mod progress_store;
32mod reader;
33mod reducer;
34#[cfg(test)]
35mod tests;
36mod util;
37mod worker_pool;
38
39use std::{
40    fmt::{Debug, Display},
41    sync::Arc,
42};
43
44use async_trait::async_trait;
45pub use errors::{IngestionError, IngestionResult};
46pub use executor::{IndexerExecutor, MAX_CHECKPOINTS_IN_PROGRESS, setup_single_workflow};
47use iota_types::full_checkpoint_content::CheckpointData;
48pub use metrics::DataIngestionMetrics;
49pub use progress_store::{FileProgressStore, ProgressStore, ShimProgressStore};
50pub use reader::ReaderOptions;
51pub use reducer::Reducer;
52pub use util::{create_remote_store_client, create_remote_store_client_with_ops};
53pub use worker_pool::WorkerPool;
54
55/// Processes individual checkpoints and produces messages for optional batch
56/// processing.
57///
58/// The Worker trait defines the core processing logic for checkpoint data.
59/// Workers run in parallel within a [`WorkerPool`] to process checkpoints and
60/// generate messages that can optionally be batched and processed by a
61/// [`Reducer`].
62///
63/// # Processing Modes
64///
65/// Workers support two processing modes:
66/// * **Direct Processing**: Messages are handled immediately without batching.
67/// * **Batch Processing**: Messages are accumulated and processed in batches by
68///   a [`Reducer`].
69///
70/// The processing mode is determined by the presence of a [`Reducer`] in the
71/// [`WorkerPool`] configuration.
72///
73/// # Concurrency
74///
75/// Multiple instances of a worker can run in parallel in the worker pool. The
76/// implementation must be thread-safe and handle checkpoint processing
77/// efficiently.
78///
79/// # Integration with Optional Reducer
80///
81/// Messages produced by the worker can be:
82/// * Processed directly without batching.
83/// * Accumulated and passed to a [`Reducer`] for batch processing.
84///
85/// The worker's [`Message`](Worker::Message) type must match the reducer's
86/// input type when batch processing is enabled.
87#[async_trait]
88pub trait Worker: Send + Sync {
89    type Error: Debug + Display;
90    type Message: Send + Sync;
91
92    /// Processes a single checkpoint and returns a message.
93    ///
94    /// This method contains the core logic for processing checkpoint data.
95    ///
96    /// # Note
97    /// - Checkpoints are processed in order when a single worker is used.
98    /// - Parallel processing with multiple workers does not guarantee
99    ///   checkpoint order.
100    async fn process_checkpoint(
101        &self,
102        checkpoint: Arc<CheckpointData>,
103    ) -> Result<Self::Message, Self::Error>;
104
105    /// A hook that allows preprocessing a checkpoint before it's fully
106    /// processed.
107    ///
108    /// This method can be used to perform actions like validation or data
109    /// transformation before the main
110    /// [`process_checkpoint`](Worker::process_checkpoint) logic is executed.
111    ///
112    /// # Default implementation
113    ///
114    /// By default it returns `Ok(())`.
115    fn preprocess_hook(&self, _: Arc<CheckpointData>) -> Result<(), Self::Error> {
116        Ok(())
117    }
118}