iota_data_ingestion_core/reducer.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2025 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, sync::Arc};
6
7use async_trait::async_trait;
8use backoff::{ExponentialBackoff, backoff::Backoff};
9use futures::StreamExt;
10use iota_types::messages_checkpoint::CheckpointSequenceNumber;
11use tokio::sync::mpsc;
12use tokio_stream::wrappers::ReceiverStream;
13use tokio_util::sync::CancellationToken;
14
15use crate::{
16 IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, Worker, util::reset_backoff,
17 worker_pool::WorkerPoolStatus,
18};
19
20/// Represents the outcome of a commit operation with retry logic.
21pub enum CommitStatus {
22 /// Commit succeeded, continue processing.
23 Success,
24 /// Processing should stop due to shutdown signal.
25 Shutdown,
26}
27
28/// Processes and commits batches of messages produced by workers.
29///
30/// The Reducer trait provides batch processing capabilities for messages
31/// generated by [`Worker`]s. It allows for custom batching logic and efficient
32/// processing of worker results.
33///
34/// # Batch Processing
35///
36/// [`Messages`](Worker::Message) are accumulated in batches based on the
37/// [`should_close_batch`](Reducer::should_close_batch) policy.
38///
39/// When a batch is
40/// ready to be committed (as determined by `should_close_batch`), the
41/// [`commit`](Reducer::commit) method is called with the collected messages.
42#[async_trait]
43pub trait Reducer<Mapper: Worker>: Send + Sync {
44 /// Commits a batch of messages.
45 ///
46 /// This method is called when a batch is ready to be processed, as
47 /// determined by [`should_close_batch`](Reducer::should_close_batch). The
48 /// implementation should handle the batch processing logic and return
49 /// an error if the operation fails.
50 ///
51 /// # Note
52 /// Messages within each batch are guaranteed to be ordered by checkpoint
53 /// sequence number.
54 async fn commit(&self, batch: &[Mapper::Message]) -> Result<(), Mapper::Error>;
55
56 /// Attempts to commit a batch of messages with exponential backoff retries
57 /// on failure.
58 ///
59 /// This function repeatedly calls the [`commit`](Reducer::commit) method of
60 /// the provided [`Reducer`] until either:
61 /// - The commit succeeds, returning `CommitStatus::Success`.
62 /// - A cancellation signal is received via the [`CancellationToken`],
63 /// returning `CommitStatus::Shutdown`.
64 /// - All retry attempts are exhausted within backoff's maximum elapsed
65 /// time, causing a panic.
66 ///
67 /// # Retry Mechanism:
68 /// - Uses [`ExponentialBackoff`](backoff::ExponentialBackoff) to introduce
69 /// increasing delays between retry attempts.
70 /// - Checks for cancellation both before and after each commit attempt.
71 /// - If a cancellation signal is received during a backoff delay, the
72 /// function exits immediately with `CommitStatus::Shutdown`.
73 ///
74 /// # Panics:
75 /// - If all retry attempts are exhausted within backoff's the maximum
76 /// elapsed time, indicating a persistent failure.
77 async fn commit_with_retry(
78 &self,
79 batch: &[Mapper::Message],
80 mut backoff: ExponentialBackoff,
81 token: &CancellationToken,
82 ) -> CommitStatus {
83 loop {
84 // check for cancellation before attempting commit.
85 if token.is_cancelled() {
86 return CommitStatus::Shutdown;
87 }
88 // attempt to commit.
89 match self.commit(batch).await {
90 Ok(_) => return CommitStatus::Success,
91 Err(err) => {
92 let err = IngestionError::Reducer(format!("failed to commit batch: {err}"));
93 tracing::warn!("transient reducer commit error {err:?}");
94 // check for cancellation after failed commit.
95 if token.is_cancelled() {
96 return CommitStatus::Shutdown;
97 }
98 }
99 }
100 // get next backoff duration or panic if max retries exceeded
101 let duration = backoff
102 .next_backoff()
103 .expect("max retry attempts exceeded: commit operation failed");
104 // if cancellation occurs during backoff wait, exit early with Shutdown.
105 // Otherwise (if timeout expires), continue with the next retry attempt
106 if tokio::time::timeout(duration, token.cancelled())
107 .await
108 .is_ok()
109 {
110 return CommitStatus::Shutdown;
111 }
112 }
113 }
114
115 /// Determines if the current batch should be closed and committed.
116 ///
117 /// This method is called by the framework in two situations:
118 ///
119 /// - **`next_item = Some(msg)`**: a new message is about to be added to the
120 /// batch. Returning `true` triggers a commit of the existing batch first.
121 /// The `msg` is added to a new batch. Returning `false` appends `msg` to
122 /// the current batch without committing.
123 ///
124 /// - **`next_item = None`**: the current chunk from the upstream stream has
125 /// been fully consumed and there are no more messages to add in this
126 /// iteration (the stream itself may still produce more chunks later).
127 /// Returning `true` flushes the partial batch. Returning `false` leaves
128 /// the batch in memory until the next chunk arrives.
129 ///
130 /// # Default Implementation
131 ///
132 /// Returns `true` only when `next_item` is `None`, so each chunk's
133 /// remaining batch is flushed at the chunk boundary.
134 fn should_close_batch(
135 &self,
136 _batch: &[Mapper::Message],
137 next_item: Option<&Mapper::Message>,
138 ) -> bool {
139 next_item.is_none()
140 }
141}
142
143/// Processes worker messages and manages batching through a reducer.
144///
145/// This function is the core of the reduction pipeline, handling message
146/// batching, watermark tracking, and progress reporting. It maintains message
147/// order by checkpoint sequence number and applies batching logic through the
148/// provided reducer.
149///
150/// # Message Processing Flow
151///
152/// 1. Receives messages in chunks up to [`MAX_CHECKPOINTS_IN_PROGRESS`].
153/// 2. Maintains message order using checkpoint sequence numbers.
154/// 3. Batches messages according to reducer's [`Reducer::should_close_batch`]
155/// policy and after that its committed.
156/// 4. Progress is updated after each commit.
157/// 5. Reports progress back to the executor.
158///
159/// # Shutdown Behavior
160///
161/// The function will gracefully exit when receiving a shutdown signal,
162/// ensuring no data loss for processed messages.
163pub(crate) async fn reduce<W: Worker>(
164 task_name: String,
165 mut current_checkpoint_number: CheckpointSequenceNumber,
166 watermark_receiver: mpsc::Receiver<(CheckpointSequenceNumber, Option<W::Message>)>,
167 executor_progress_sender: mpsc::Sender<WorkerPoolStatus>,
168 reducer: Box<dyn Reducer<W>>,
169 backoff: Arc<ExponentialBackoff>,
170 token: CancellationToken,
171) -> IngestionResult<()> {
172 // convert to a stream of MAX_CHECKPOINTS_IN_PROGRESS size. This way, each
173 // iteration of the loop will process all ready messages.
174 let mut stream =
175 ReceiverStream::new(watermark_receiver).ready_chunks(MAX_CHECKPOINTS_IN_PROGRESS);
176 // store unprocessed progress messages from workers.
177 let mut unprocessed = HashMap::new();
178 // buffer to accumulate results before passing them to the reducer.
179 // The size of this batch is dynamically determined by the reducer's
180 // `should_close_batch` method.
181 let mut batch = vec![];
182 // track the next unprocessed checkpoint number for reporting progress
183 // after each chunk of messages is received from the stream.
184 let mut progress_update = None;
185 // flag to indicate a shutdown has been triggered.
186 let mut trigger_shutdown = false;
187
188 while let Some(update_batch) = stream.next().await {
189 unprocessed.extend(update_batch);
190 // Process messages sequentially based on checkpoint sequence number.
191 // This ensures in-order processing and maintains progress integrity.
192 while let Some(maybe_worker_message) = unprocessed.remove(¤t_checkpoint_number) {
193 let Some(message) = maybe_worker_message else {
194 current_checkpoint_number += 1;
195 if batch.is_empty() {
196 progress_update = Some(current_checkpoint_number);
197 }
198 continue;
199 };
200
201 // reducer is configured, collect messages in batch based on
202 // `reducer.should_close_batch` policy, once a batch is collected it gets
203 // committed and a new batch is created with the current message.
204 if reducer.should_close_batch(&batch, Some(&message)) {
205 match reducer
206 .commit_with_retry(&std::mem::take(&mut batch), reset_backoff(&backoff), &token)
207 .await
208 {
209 CommitStatus::Success => {
210 batch = vec![message];
211 progress_update = Some(current_checkpoint_number);
212 }
213 CommitStatus::Shutdown => {
214 trigger_shutdown = true;
215 break;
216 }
217 };
218 } else {
219 // Add message to existing batch since no commit needed.
220 batch.push(message);
221 }
222 current_checkpoint_number += 1;
223 }
224 // Handle final batch processing.
225 // Check if the final batch should be committed.
226 // None parameter indicates no more messages available.
227 // Only check if batch is non-empty to avoid unnecessary calls.
228 if !batch.is_empty() && reducer.should_close_batch(&batch, None) && !trigger_shutdown {
229 match reducer
230 .commit_with_retry(&std::mem::take(&mut batch), reset_backoff(&backoff), &token)
231 .await
232 {
233 CommitStatus::Success => {
234 progress_update = Some(current_checkpoint_number);
235 }
236 CommitStatus::Shutdown => trigger_shutdown = true,
237 }
238 }
239 // report progress update to executor.
240 if let Some(watermark) = progress_update {
241 executor_progress_sender
242 .send(WorkerPoolStatus::Running((task_name.clone(), watermark)))
243 .await
244 .map_err(|_| IngestionError::Channel("unable to send worker pool progress updates to executor, receiver half closed".into()))?;
245 progress_update = None;
246 }
247
248 // Check for shutdown signal after progress update to ensure progress
249 // is reported even during shutdown.
250 if trigger_shutdown {
251 break;
252 }
253 }
254 Ok(())
255}