Skip to main content

iota_data_ingestion_core/
reducer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2025 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, sync::Arc};
6
7use async_trait::async_trait;
8use backoff::{ExponentialBackoff, backoff::Backoff};
9use futures::StreamExt;
10use iota_types::messages_checkpoint::CheckpointSequenceNumber;
11use tokio::sync::mpsc;
12use tokio_stream::wrappers::ReceiverStream;
13use tokio_util::sync::CancellationToken;
14
15use crate::{
16    IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, Worker, util::reset_backoff,
17    worker_pool::WorkerPoolStatus,
18};
19
20/// Represents the outcome of a commit operation with retry logic.
21pub enum CommitStatus {
22    /// Commit succeeded, continue processing.
23    Success,
24    /// Processing should stop due to shutdown signal.
25    Shutdown,
26}
27
28/// Processes and commits batches of messages produced by workers.
29///
30/// The Reducer trait provides batch processing capabilities for messages
31/// generated by [`Worker`]s. It allows for custom batching logic and efficient
32/// processing of worker results.
33///
34/// # Batch Processing
35///
36/// [`Messages`](Worker::Message) are accumulated in batches based on the
37/// [`should_close_batch`](Reducer::should_close_batch) policy.
38///
39/// When a batch is
40/// ready to be committed (as determined by `should_close_batch`), the
41/// [`commit`](Reducer::commit) method is called with the collected messages.
42#[async_trait]
43pub trait Reducer<Mapper: Worker>: Send + Sync {
44    /// Commits a batch of messages.
45    ///
46    /// This method is called when a batch is ready to be processed, as
47    /// determined by [`should_close_batch`](Reducer::should_close_batch). The
48    /// implementation should handle the batch processing logic and return
49    /// an error if the operation fails.
50    ///
51    /// # Note
52    /// Messages within each batch are guaranteed to be ordered by checkpoint
53    /// sequence number.
54    async fn commit(&self, batch: &[Mapper::Message]) -> Result<(), Mapper::Error>;
55
56    /// Attempts to commit a batch of messages with exponential backoff retries
57    /// on failure.
58    ///
59    /// This function repeatedly calls the [`commit`](Reducer::commit) method of
60    /// the provided [`Reducer`] until either:
61    /// - The commit succeeds, returning `CommitStatus::Success`.
62    /// - A cancellation signal is received via the [`CancellationToken`],
63    ///   returning `CommitStatus::Shutdown`.
64    /// - All retry attempts are exhausted within backoff's maximum elapsed
65    ///   time, causing a panic.
66    ///
67    /// # Retry Mechanism:
68    /// - Uses [`ExponentialBackoff`](backoff::ExponentialBackoff) to introduce
69    ///   increasing delays between retry attempts.
70    /// - Checks for cancellation both before and after each commit attempt.
71    /// - If a cancellation signal is received during a backoff delay, the
72    ///   function exits immediately with `CommitStatus::Shutdown`.
73    ///
74    /// # Panics:
75    /// - If all retry attempts are exhausted within backoff's the maximum
76    ///   elapsed time, indicating a persistent failure.
77    async fn commit_with_retry(
78        &self,
79        batch: &[Mapper::Message],
80        mut backoff: ExponentialBackoff,
81        token: &CancellationToken,
82    ) -> CommitStatus {
83        loop {
84            // check for cancellation before attempting commit.
85            if token.is_cancelled() {
86                return CommitStatus::Shutdown;
87            }
88            // attempt to commit.
89            match self.commit(batch).await {
90                Ok(_) => return CommitStatus::Success,
91                Err(err) => {
92                    let err = IngestionError::Reducer(format!("failed to commit batch: {err}"));
93                    tracing::warn!("transient reducer commit error {err:?}");
94                    // check for cancellation after failed commit.
95                    if token.is_cancelled() {
96                        return CommitStatus::Shutdown;
97                    }
98                }
99            }
100            // get next backoff duration or panic if max retries exceeded
101            let duration = backoff
102                .next_backoff()
103                .expect("max retry attempts exceeded: commit operation failed");
104            // if cancellation occurs during backoff wait, exit early with Shutdown.
105            // Otherwise (if timeout expires), continue with the next retry attempt
106            if tokio::time::timeout(duration, token.cancelled())
107                .await
108                .is_ok()
109            {
110                return CommitStatus::Shutdown;
111            }
112        }
113    }
114
115    /// Determines if the current batch should be closed and committed.
116    ///
117    /// This method is called by the framework in two situations:
118    ///
119    /// - **`next_item = Some(msg)`**: a new message is about to be added to the
120    ///   batch. Returning `true` triggers a commit of the existing batch first.
121    ///   The `msg` is added to a new batch. Returning `false` appends `msg` to
122    ///   the current batch without committing.
123    ///
124    /// - **`next_item = None`**: the current chunk from the upstream stream has
125    ///   been fully consumed and there are no more messages to add in this
126    ///   iteration (the stream itself may still produce more chunks later).
127    ///   Returning `true` flushes the partial batch. Returning `false` leaves
128    ///   the batch in memory until the next chunk arrives.
129    ///
130    /// # Default Implementation
131    ///
132    /// Returns `true` only when `next_item` is `None`, so each chunk's
133    /// remaining batch is flushed at the chunk boundary.
134    fn should_close_batch(
135        &self,
136        _batch: &[Mapper::Message],
137        next_item: Option<&Mapper::Message>,
138    ) -> bool {
139        next_item.is_none()
140    }
141}
142
143/// Processes worker messages and manages batching through a reducer.
144///
145/// This function is the core of the reduction pipeline, handling message
146/// batching, watermark tracking, and progress reporting. It maintains message
147/// order by checkpoint sequence number and applies batching logic through the
148/// provided reducer.
149///
150/// # Message Processing Flow
151///
152/// 1. Receives messages in chunks up to [`MAX_CHECKPOINTS_IN_PROGRESS`].
153/// 2. Maintains message order using checkpoint sequence numbers.
154/// 3. Batches messages according to reducer's [`Reducer::should_close_batch`]
155///    policy and after that its committed.
156/// 4. Progress is updated after each commit.
157/// 5. Reports progress back to the executor.
158///
159/// # Shutdown Behavior
160///
161/// The function will gracefully exit when receiving a shutdown signal,
162/// ensuring no data loss for processed messages.
163pub(crate) async fn reduce<W: Worker>(
164    task_name: String,
165    mut current_checkpoint_number: CheckpointSequenceNumber,
166    watermark_receiver: mpsc::Receiver<(CheckpointSequenceNumber, Option<W::Message>)>,
167    executor_progress_sender: mpsc::Sender<WorkerPoolStatus>,
168    reducer: Box<dyn Reducer<W>>,
169    backoff: Arc<ExponentialBackoff>,
170    token: CancellationToken,
171) -> IngestionResult<()> {
172    // convert to a stream of MAX_CHECKPOINTS_IN_PROGRESS size. This way, each
173    // iteration of the loop will process all ready messages.
174    let mut stream =
175        ReceiverStream::new(watermark_receiver).ready_chunks(MAX_CHECKPOINTS_IN_PROGRESS);
176    // store unprocessed progress messages from workers.
177    let mut unprocessed = HashMap::new();
178    // buffer to accumulate results before passing them to the reducer.
179    // The size of this batch is dynamically determined by the reducer's
180    // `should_close_batch` method.
181    let mut batch = vec![];
182    // track the next unprocessed checkpoint number for reporting progress
183    // after each chunk of messages is received from the stream.
184    let mut progress_update = None;
185    // flag to indicate a shutdown has been triggered.
186    let mut trigger_shutdown = false;
187
188    while let Some(update_batch) = stream.next().await {
189        unprocessed.extend(update_batch);
190        // Process messages sequentially based on checkpoint sequence number.
191        // This ensures in-order processing and maintains progress integrity.
192        while let Some(maybe_worker_message) = unprocessed.remove(&current_checkpoint_number) {
193            let Some(message) = maybe_worker_message else {
194                current_checkpoint_number += 1;
195                if batch.is_empty() {
196                    progress_update = Some(current_checkpoint_number);
197                }
198                continue;
199            };
200
201            // reducer is configured, collect messages in batch based on
202            // `reducer.should_close_batch` policy, once a batch is collected it gets
203            // committed and a new batch is created with the current message.
204            if reducer.should_close_batch(&batch, Some(&message)) {
205                match reducer
206                    .commit_with_retry(&std::mem::take(&mut batch), reset_backoff(&backoff), &token)
207                    .await
208                {
209                    CommitStatus::Success => {
210                        batch = vec![message];
211                        progress_update = Some(current_checkpoint_number);
212                    }
213                    CommitStatus::Shutdown => {
214                        trigger_shutdown = true;
215                        break;
216                    }
217                };
218            } else {
219                // Add message to existing batch since no commit needed.
220                batch.push(message);
221            }
222            current_checkpoint_number += 1;
223        }
224        // Handle final batch processing.
225        // Check if the final batch should be committed.
226        // None parameter indicates no more messages available.
227        // Only check if batch is non-empty to avoid unnecessary calls.
228        if !batch.is_empty() && reducer.should_close_batch(&batch, None) && !trigger_shutdown {
229            match reducer
230                .commit_with_retry(&std::mem::take(&mut batch), reset_backoff(&backoff), &token)
231                .await
232            {
233                CommitStatus::Success => {
234                    progress_update = Some(current_checkpoint_number);
235                }
236                CommitStatus::Shutdown => trigger_shutdown = true,
237            }
238        }
239        // report progress update to executor.
240        if let Some(watermark) = progress_update {
241            executor_progress_sender
242                .send(WorkerPoolStatus::Running((task_name.clone(), watermark)))
243                .await
244                .map_err(|_| IngestionError::Channel("unable to send worker pool progress updates to executor, receiver half closed".into()))?;
245            progress_update = None;
246        }
247
248        // Check for shutdown signal after progress update to ensure progress
249        // is reported even during shutdown.
250        if trigger_shutdown {
251            break;
252        }
253    }
254    Ok(())
255}