Skip to main content

Reducer

Trait Reducer 

Source
pub trait Reducer<Mapper: Worker>: Send + Sync {
    // Required method
    fn commit<'life0, 'life1, 'async_trait>(
        &'life0 self,
        batch: &'life1 [Mapper::Message],
    ) -> Pin<Box<dyn Future<Output = Result<(), Mapper::Error>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;

    // Provided methods
    fn commit_with_retry<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 self,
        batch: &'life1 [Mapper::Message],
        backoff: ExponentialBackoff,
        token: &'life2 CancellationToken,
    ) -> Pin<Box<dyn Future<Output = CommitStatus> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait { ... }
    fn should_close_batch(
        &self,
        _batch: &[Mapper::Message],
        next_item: Option<&Mapper::Message>,
    ) -> bool { ... }
}
Expand description

Processes and commits batches of messages produced by workers.

The Reducer trait provides batch processing capabilities for messages generated by Workers. It allows for custom batching logic and efficient processing of worker results.

§Batch Processing

Messages are accumulated in batches based on the should_close_batch policy.

When a batch is ready to be committed (as determined by should_close_batch), the commit method is called with the collected messages.

Required Methods§

Source

fn commit<'life0, 'life1, 'async_trait>( &'life0 self, batch: &'life1 [Mapper::Message], ) -> Pin<Box<dyn Future<Output = Result<(), Mapper::Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Commits a batch of messages.

This method is called when a batch is ready to be processed, as determined by should_close_batch. The implementation should handle the batch processing logic and return an error if the operation fails.

§Note

Messages within each batch are guaranteed to be ordered by checkpoint sequence number.

Provided Methods§

Source

fn commit_with_retry<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, batch: &'life1 [Mapper::Message], backoff: ExponentialBackoff, token: &'life2 CancellationToken, ) -> Pin<Box<dyn Future<Output = CommitStatus> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Attempts to commit a batch of messages with exponential backoff retries on failure.

This function repeatedly calls the commit method of the provided Reducer until either:

  • The commit succeeds, returning CommitStatus::Success.
  • A cancellation signal is received via the [CancellationToken], returning CommitStatus::Shutdown.
  • All retry attempts are exhausted within backoff’s maximum elapsed time, causing a panic.
§Retry Mechanism:
  • Uses ExponentialBackoff to introduce increasing delays between retry attempts.
  • Checks for cancellation both before and after each commit attempt.
  • If a cancellation signal is received during a backoff delay, the function exits immediately with CommitStatus::Shutdown.
§Panics:
  • If all retry attempts are exhausted within backoff’s the maximum elapsed time, indicating a persistent failure.
Source

fn should_close_batch( &self, _batch: &[Mapper::Message], next_item: Option<&Mapper::Message>, ) -> bool

Determines if the current batch should be closed and committed.

This method is called by the framework in two situations:

  • next_item = Some(msg): a new message is about to be added to the batch. Returning true triggers a commit of the existing batch first. The msg is added to a new batch. Returning false appends msg to the current batch without committing.

  • next_item = None: the current chunk from the upstream stream has been fully consumed and there are no more messages to add in this iteration (the stream itself may still produce more chunks later). Returning true flushes the partial batch. Returning false leaves the batch in memory until the next chunk arrives.

§Default Implementation

Returns true only when next_item is None, so each chunk’s remaining batch is flushed at the chunk boundary.

Implementors§