iota_data_ingestion_core/
reducer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2025 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, sync::Arc};
6
7use async_trait::async_trait;
8use backoff::{ExponentialBackoff, backoff::Backoff};
9use futures::StreamExt;
10use iota_types::messages_checkpoint::CheckpointSequenceNumber;
11use tokio::sync::mpsc;
12use tokio_stream::wrappers::ReceiverStream;
13use tokio_util::sync::CancellationToken;
14
15use crate::{
16    IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, Worker, util::reset_backoff,
17    worker_pool::WorkerPoolStatus,
18};
19
20/// Represents the outcome of a commit operation with retry logic.
21pub enum CommitStatus {
22    /// Commit succeeded, continue processing.
23    Success,
24    /// Processing should stop due to shutdown signal.
25    Shutdown,
26}
27
28/// Processes and commits batches of messages produced by workers.
29///
30/// The Reducer trait provides batch processing capabilities for messages
31/// generated by [`Worker`]s. It allows for custom batching logic and efficient
32/// processing of worker results.
33///
34/// # Batch Processing
35///
36/// [`Messages`](Worker::Message) are accumulated in batches based on the
37/// [`should_close_batch`](Reducer::should_close_batch) policy.
38///
39/// When a batch is
40/// ready to be committed (as determined by `should_close_batch`), the
41/// [`commit`](Reducer::commit) method is called with the collected messages.
42#[async_trait]
43pub trait Reducer<Mapper: Worker>: Send + Sync {
44    /// Commits a batch of messages.
45    ///
46    /// This method is called when a batch is ready to be processed, as
47    /// determined by [`should_close_batch`](Reducer::should_close_batch). The
48    /// implementation should handle the batch processing logic and return
49    /// an error if the operation fails.
50    ///
51    /// # Note
52    /// Messages within each batch are guaranteed to be ordered by checkpoint
53    /// sequence number.
54    async fn commit(&self, batch: &[Mapper::Message]) -> Result<(), Mapper::Error>;
55
56    /// Attempts to commit a batch of messages with exponential backoff retries
57    /// on failure.
58    ///
59    /// This function repeatedly calls the [`commit`](Reducer::commit) method of
60    /// the provided [`Reducer`] until either:
61    /// - The commit succeeds, returning `CommitStatus::Success`.
62    /// - A cancellation signal is received via the [`CancellationToken`],
63    ///   returning `CommitStatus::Shutdown`.
64    /// - All retry attempts are exhausted within backoff's maximum elapsed
65    ///   time, causing a panic.
66    ///
67    /// # Retry Mechanism:
68    /// - Uses [`ExponentialBackoff`](backoff::ExponentialBackoff) to introduce
69    ///   increasing delays between retry attempts.
70    /// - Checks for cancellation both before and after each commit attempt.
71    /// - If a cancellation signal is received during a backoff delay, the
72    ///   function exits immediately with `CommitStatus::Shutdown`.
73    ///
74    /// # Panics:
75    /// - If all retry attempts are exhausted within backoff's the maximum
76    ///   elapsed time, indicating a persistent failure.
77    async fn commit_with_retry(
78        &self,
79        batch: &[Mapper::Message],
80        mut backoff: ExponentialBackoff,
81        token: &CancellationToken,
82    ) -> CommitStatus {
83        loop {
84            // check for cancellation before attempting commit.
85            if token.is_cancelled() {
86                return CommitStatus::Shutdown;
87            }
88            // attempt to commit.
89            match self.commit(batch).await {
90                Ok(_) => return CommitStatus::Success,
91                Err(err) => {
92                    let err = IngestionError::Reducer(format!("failed to commit batch: {err}"));
93                    tracing::warn!("transient reducer commit error {err:?}");
94                    // check for cancellation after failed commit.
95                    if token.is_cancelled() {
96                        return CommitStatus::Shutdown;
97                    }
98                }
99            }
100            // get next backoff duration or panic if max retries exceeded
101            let duration = backoff
102                .next_backoff()
103                .expect("max retry attempts exceeded: commit operation failed");
104            // if cancellation occurs during backoff wait, exit early with Shutdown.
105            // Otherwise (if timeout expires), continue with the next retry attempt
106            if tokio::time::timeout(duration, token.cancelled())
107                .await
108                .is_ok()
109            {
110                return CommitStatus::Shutdown;
111            }
112        }
113    }
114
115    /// Determines if the current batch should be closed and committed.
116    ///
117    /// This method is called for each new message to determine if the current
118    /// batch should be committed before adding the new message.
119    ///
120    /// # Default Implementation
121    ///
122    /// By default, returns `true` only when there are no more messages
123    /// (`next_item` is `None`), effectively creating a single batch for all
124    /// messages.
125    fn should_close_batch(
126        &self,
127        _batch: &[Mapper::Message],
128        next_item: Option<&Mapper::Message>,
129    ) -> bool {
130        next_item.is_none()
131    }
132}
133
134/// Processes worker messages and manages batching through a reducer.
135///
136/// This function is the core of the reduction pipeline, handling message
137/// batching, watermark tracking, and progress reporting. It maintains message
138/// order by checkpoint sequence number and applies batching logic through the
139/// provided reducer.
140///
141/// # Message Processing Flow
142///
143/// 1. Receives messages in chunks up to [`MAX_CHECKPOINTS_IN_PROGRESS`].
144/// 2. Maintains message order using checkpoint sequence numbers.
145/// 3. Batches messages according to reducer's [`Reducer::should_close_batch`]
146///    policy and after that its committed.
147/// 4. Progress is updated after each commit.
148/// 5. Reports progress back to the executor.
149///
150/// # Shutdown Behavior
151///
152/// The function will gracefully exit when receiving a shutdown signal,
153/// ensuring no data loss for processed messages.
154pub(crate) async fn reduce<W: Worker>(
155    task_name: String,
156    mut current_checkpoint_number: CheckpointSequenceNumber,
157    watermark_receiver: mpsc::Receiver<(CheckpointSequenceNumber, W::Message)>,
158    executor_progress_sender: mpsc::Sender<WorkerPoolStatus>,
159    reducer: Box<dyn Reducer<W>>,
160    backoff: Arc<ExponentialBackoff>,
161    token: CancellationToken,
162) -> IngestionResult<()> {
163    // convert to a stream of MAX_CHECKPOINTS_IN_PROGRESS size. This way, each
164    // iteration of the loop will process all ready messages.
165    let mut stream =
166        ReceiverStream::new(watermark_receiver).ready_chunks(MAX_CHECKPOINTS_IN_PROGRESS);
167    // store unprocessed progress messages from workers.
168    let mut unprocessed = HashMap::new();
169    // buffer to accumulate results before passing them to the reducer.
170    // The size of this batch is dynamically determined by the reducer's
171    // `should_close_batch` method.
172    let mut batch = vec![];
173    // track the next unprocessed checkpoint number for reporting progress
174    // after each chunk of messages is received from the stream.
175    let mut progress_update = None;
176    // flag to indicate a shutdown has been triggered.
177    let mut trigger_shutdown = false;
178
179    while let Some(update_batch) = stream.next().await {
180        unprocessed.extend(update_batch.into_iter());
181        // Process messages sequentially based on checkpoint sequence number.
182        // This ensures in-order processing and maintains progress integrity.
183        while let Some(message) = unprocessed.remove(&current_checkpoint_number) {
184            // reducer is configured, collect messages in batch based on
185            // `reducer.should_close_batch` policy, once a batch is collected it gets
186            // committed and a new batch is created with the current message.
187            if reducer.should_close_batch(&batch, Some(&message)) {
188                match reducer
189                    .commit_with_retry(&std::mem::take(&mut batch), reset_backoff(&backoff), &token)
190                    .await
191                {
192                    CommitStatus::Success => {
193                        batch = vec![message];
194                        progress_update = Some(current_checkpoint_number);
195                    }
196                    CommitStatus::Shutdown => {
197                        trigger_shutdown = true;
198                        break;
199                    }
200                };
201            } else {
202                // Add message to existing batch since no commit needed.
203                batch.push(message);
204            }
205            current_checkpoint_number += 1;
206        }
207        // Handle final batch processing.
208        // Check if the final batch should be committed.
209        // None parameter indicates no more messages available.
210        if reducer.should_close_batch(&batch, None) && !trigger_shutdown {
211            match reducer
212                .commit_with_retry(&std::mem::take(&mut batch), reset_backoff(&backoff), &token)
213                .await
214            {
215                CommitStatus::Success => {
216                    progress_update = Some(current_checkpoint_number);
217                }
218                CommitStatus::Shutdown => trigger_shutdown = true,
219            }
220        }
221        // report progress update to executor.
222        if let Some(watermark) = progress_update {
223            executor_progress_sender
224                .send(WorkerPoolStatus::Running((task_name.clone(), watermark)))
225                .await
226                .map_err(|_| IngestionError::Channel("unable to send worker pool progress updates to executor, receiver half closed".into()))?;
227            progress_update = None;
228        }
229
230        // Check for shutdown signal after progress update to ensure progress
231        // is reported even during shutdown.
232        if trigger_shutdown {
233            break;
234        }
235    }
236    Ok(())
237}