Trait Reducer

Source
pub trait Reducer<Mapper: Worker>: Send + Sync {
    // Required method
    fn commit<'life0, 'life1, 'async_trait>(
        &'life0 self,
        batch: &'life1 [Mapper::Message],
    ) -> Pin<Box<dyn Future<Output = Result<(), Mapper::Error>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;

    // Provided methods
    fn commit_with_retry<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 self,
        batch: &'life1 [Mapper::Message],
        backoff: ExponentialBackoff,
        token: &'life2 CancellationToken,
    ) -> Pin<Box<dyn Future<Output = CommitStatus> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait { ... }
    fn should_close_batch(
        &self,
        _batch: &[Mapper::Message],
        next_item: Option<&Mapper::Message>,
    ) -> bool { ... }
}
Expand description

Processes and commits batches of messages produced by workers.

The Reducer trait provides batch processing capabilities for messages generated by Workers. It allows for custom batching logic and efficient processing of worker results.

§Batch Processing

Messages are accumulated in batches based on the should_close_batch policy.

When a batch is ready to be committed (as determined by should_close_batch), the commit method is called with the collected messages.

Required Methods§

Source

fn commit<'life0, 'life1, 'async_trait>( &'life0 self, batch: &'life1 [Mapper::Message], ) -> Pin<Box<dyn Future<Output = Result<(), Mapper::Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Commits a batch of messages.

This method is called when a batch is ready to be processed, as determined by should_close_batch. The implementation should handle the batch processing logic and return an error if the operation fails.

§Note

Messages within each batch are guaranteed to be ordered by checkpoint sequence number.

Provided Methods§

Source

fn commit_with_retry<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, batch: &'life1 [Mapper::Message], backoff: ExponentialBackoff, token: &'life2 CancellationToken, ) -> Pin<Box<dyn Future<Output = CommitStatus> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Attempts to commit a batch of messages with exponential backoff retries on failure.

This function repeatedly calls the commit method of the provided Reducer until either:

  • The commit succeeds, returning CommitStatus::Success.
  • A cancellation signal is received via the [CancellationToken], returning CommitStatus::Shutdown.
  • All retry attempts are exhausted within backoff’s maximum elapsed time, causing a panic.
§Retry Mechanism:
  • Uses ExponentialBackoff to introduce increasing delays between retry attempts.
  • Checks for cancellation both before and after each commit attempt.
  • If a cancellation signal is received during a backoff delay, the function exits immediately with CommitStatus::Shutdown.
§Panics:
  • If all retry attempts are exhausted within backoff’s the maximum elapsed time, indicating a persistent failure.
Source

fn should_close_batch( &self, _batch: &[Mapper::Message], next_item: Option<&Mapper::Message>, ) -> bool

Determines if the current batch should be closed and committed.

This method is called for each new message to determine if the current batch should be committed before adding the new message.

§Default Implementation

By default, returns true only when there are no more messages (next_item is None), effectively creating a single batch for all messages.

Implementors§