iota_data_ingestion_core/reducer.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2025 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, sync::Arc};
6
7use async_trait::async_trait;
8use backoff::{ExponentialBackoff, backoff::Backoff};
9use futures::StreamExt;
10use iota_types::messages_checkpoint::CheckpointSequenceNumber;
11use tokio::sync::mpsc;
12use tokio_stream::wrappers::ReceiverStream;
13use tokio_util::sync::CancellationToken;
14
15use crate::{
16 IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, Worker, util::reset_backoff,
17 worker_pool::WorkerPoolStatus,
18};
19
20/// Represents the outcome of a commit operation with retry logic.
21pub enum CommitStatus {
22 /// Commit succeeded, continue processing.
23 Success,
24 /// Processing should stop due to shutdown signal.
25 Shutdown,
26}
27
28/// Processes and commits batches of messages produced by workers.
29///
30/// The Reducer trait provides batch processing capabilities for messages
31/// generated by [`Worker`]s. It allows for custom batching logic and efficient
32/// processing of worker results.
33///
34/// # Batch Processing
35///
36/// [`Messages`](Worker::Message) are accumulated in batches based on the
37/// [`should_close_batch`](Reducer::should_close_batch) policy.
38///
39/// When a batch is
40/// ready to be committed (as determined by `should_close_batch`), the
41/// [`commit`](Reducer::commit) method is called with the collected messages.
42#[async_trait]
43pub trait Reducer<Mapper: Worker>: Send + Sync {
44 /// Commits a batch of messages.
45 ///
46 /// This method is called when a batch is ready to be processed, as
47 /// determined by [`should_close_batch`](Reducer::should_close_batch). The
48 /// implementation should handle the batch processing logic and return
49 /// an error if the operation fails.
50 ///
51 /// # Note
52 /// Messages within each batch are guaranteed to be ordered by checkpoint
53 /// sequence number.
54 async fn commit(&self, batch: &[Mapper::Message]) -> Result<(), Mapper::Error>;
55
56 /// Attempts to commit a batch of messages with exponential backoff retries
57 /// on failure.
58 ///
59 /// This function repeatedly calls the [`commit`](Reducer::commit) method of
60 /// the provided [`Reducer`] until either:
61 /// - The commit succeeds, returning `CommitStatus::Success`.
62 /// - A cancellation signal is received via the [`CancellationToken`],
63 /// returning `CommitStatus::Shutdown`.
64 /// - All retry attempts are exhausted within backoff's maximum elapsed
65 /// time, causing a panic.
66 ///
67 /// # Retry Mechanism:
68 /// - Uses [`ExponentialBackoff`](backoff::ExponentialBackoff) to introduce
69 /// increasing delays between retry attempts.
70 /// - Checks for cancellation both before and after each commit attempt.
71 /// - If a cancellation signal is received during a backoff delay, the
72 /// function exits immediately with `CommitStatus::Shutdown`.
73 ///
74 /// # Panics:
75 /// - If all retry attempts are exhausted within backoff's the maximum
76 /// elapsed time, indicating a persistent failure.
77 async fn commit_with_retry(
78 &self,
79 batch: &[Mapper::Message],
80 mut backoff: ExponentialBackoff,
81 token: &CancellationToken,
82 ) -> CommitStatus {
83 loop {
84 // check for cancellation before attempting commit.
85 if token.is_cancelled() {
86 return CommitStatus::Shutdown;
87 }
88 // attempt to commit.
89 match self.commit(batch).await {
90 Ok(_) => return CommitStatus::Success,
91 Err(err) => {
92 let err = IngestionError::Reducer(format!("failed to commit batch: {err}"));
93 tracing::warn!("transient reducer commit error {err:?}");
94 // check for cancellation after failed commit.
95 if token.is_cancelled() {
96 return CommitStatus::Shutdown;
97 }
98 }
99 }
100 // get next backoff duration or panic if max retries exceeded
101 let duration = backoff
102 .next_backoff()
103 .expect("max retry attempts exceeded: commit operation failed");
104 // if cancellation occurs during backoff wait, exit early with Shutdown.
105 // Otherwise (if timeout expires), continue with the next retry attempt
106 if tokio::time::timeout(duration, token.cancelled())
107 .await
108 .is_ok()
109 {
110 return CommitStatus::Shutdown;
111 }
112 }
113 }
114
115 /// Determines if the current batch should be closed and committed.
116 ///
117 /// This method is called for each new message to determine if the current
118 /// batch should be committed before adding the new message.
119 ///
120 /// # Default Implementation
121 ///
122 /// By default, returns `true` only when there are no more messages
123 /// (`next_item` is `None`), effectively creating a single batch for all
124 /// messages.
125 fn should_close_batch(
126 &self,
127 _batch: &[Mapper::Message],
128 next_item: Option<&Mapper::Message>,
129 ) -> bool {
130 next_item.is_none()
131 }
132}
133
134/// Processes worker messages and manages batching through a reducer.
135///
136/// This function is the core of the reduction pipeline, handling message
137/// batching, watermark tracking, and progress reporting. It maintains message
138/// order by checkpoint sequence number and applies batching logic through the
139/// provided reducer.
140///
141/// # Message Processing Flow
142///
143/// 1. Receives messages in chunks up to [`MAX_CHECKPOINTS_IN_PROGRESS`].
144/// 2. Maintains message order using checkpoint sequence numbers.
145/// 3. Batches messages according to reducer's [`Reducer::should_close_batch`]
146/// policy and after that its committed.
147/// 4. Progress is updated after each commit.
148/// 5. Reports progress back to the executor.
149///
150/// # Shutdown Behavior
151///
152/// The function will gracefully exit when receiving a shutdown signal,
153/// ensuring no data loss for processed messages.
154pub(crate) async fn reduce<W: Worker>(
155 task_name: String,
156 mut current_checkpoint_number: CheckpointSequenceNumber,
157 watermark_receiver: mpsc::Receiver<(CheckpointSequenceNumber, W::Message)>,
158 executor_progress_sender: mpsc::Sender<WorkerPoolStatus>,
159 reducer: Box<dyn Reducer<W>>,
160 backoff: Arc<ExponentialBackoff>,
161 token: CancellationToken,
162) -> IngestionResult<()> {
163 // convert to a stream of MAX_CHECKPOINTS_IN_PROGRESS size. This way, each
164 // iteration of the loop will process all ready messages.
165 let mut stream =
166 ReceiverStream::new(watermark_receiver).ready_chunks(MAX_CHECKPOINTS_IN_PROGRESS);
167 // store unprocessed progress messages from workers.
168 let mut unprocessed = HashMap::new();
169 // buffer to accumulate results before passing them to the reducer.
170 // The size of this batch is dynamically determined by the reducer's
171 // `should_close_batch` method.
172 let mut batch = vec![];
173 // track the next unprocessed checkpoint number for reporting progress
174 // after each chunk of messages is received from the stream.
175 let mut progress_update = None;
176 // flag to indicate a shutdown has been triggered.
177 let mut trigger_shutdown = false;
178
179 while let Some(update_batch) = stream.next().await {
180 unprocessed.extend(update_batch.into_iter());
181 // Process messages sequentially based on checkpoint sequence number.
182 // This ensures in-order processing and maintains progress integrity.
183 while let Some(message) = unprocessed.remove(¤t_checkpoint_number) {
184 // reducer is configured, collect messages in batch based on
185 // `reducer.should_close_batch` policy, once a batch is collected it gets
186 // committed and a new batch is created with the current message.
187 if reducer.should_close_batch(&batch, Some(&message)) {
188 match reducer
189 .commit_with_retry(&std::mem::take(&mut batch), reset_backoff(&backoff), &token)
190 .await
191 {
192 CommitStatus::Success => {
193 batch = vec![message];
194 progress_update = Some(current_checkpoint_number);
195 }
196 CommitStatus::Shutdown => {
197 trigger_shutdown = true;
198 break;
199 }
200 };
201 } else {
202 // Add message to existing batch since no commit needed.
203 batch.push(message);
204 }
205 current_checkpoint_number += 1;
206 }
207 // Handle final batch processing.
208 // Check if the final batch should be committed.
209 // None parameter indicates no more messages available.
210 if reducer.should_close_batch(&batch, None) && !trigger_shutdown {
211 match reducer
212 .commit_with_retry(&std::mem::take(&mut batch), reset_backoff(&backoff), &token)
213 .await
214 {
215 CommitStatus::Success => {
216 progress_update = Some(current_checkpoint_number);
217 }
218 CommitStatus::Shutdown => trigger_shutdown = true,
219 }
220 }
221 // report progress update to executor.
222 if let Some(watermark) = progress_update {
223 executor_progress_sender
224 .send(WorkerPoolStatus::Running((task_name.clone(), watermark)))
225 .await
226 .map_err(|_| IngestionError::Channel("unable to send worker pool progress updates to executor, receiver half closed".into()))?;
227 progress_update = None;
228 }
229
230 // Check for shutdown signal after progress update to ensure progress
231 // is reported even during shutdown.
232 if trigger_shutdown {
233 break;
234 }
235 }
236 Ok(())
237}