pub trait Reducer<Mapper: Worker>: Send + Sync {
// Required method
fn commit<'life0, 'life1, 'async_trait>(
&'life0 self,
batch: &'life1 [Mapper::Message],
) -> Pin<Box<dyn Future<Output = Result<(), Mapper::Error>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
// Provided methods
fn commit_with_retry<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
batch: &'life1 [Mapper::Message],
backoff: ExponentialBackoff,
token: &'life2 CancellationToken,
) -> Pin<Box<dyn Future<Output = CommitStatus> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn should_close_batch(
&self,
_batch: &[Mapper::Message],
next_item: Option<&Mapper::Message>,
) -> bool { ... }
}
Expand description
Processes and commits batches of messages produced by workers.
The Reducer trait provides batch processing capabilities for messages
generated by Worker
s. It allows for custom batching logic and efficient
processing of worker results.
§Batch Processing
Messages
are accumulated in batches based on the
should_close_batch
policy.
When a batch is
ready to be committed (as determined by should_close_batch
), the
commit
method is called with the collected messages.
Required Methods§
Sourcefn commit<'life0, 'life1, 'async_trait>(
&'life0 self,
batch: &'life1 [Mapper::Message],
) -> Pin<Box<dyn Future<Output = Result<(), Mapper::Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn commit<'life0, 'life1, 'async_trait>(
&'life0 self,
batch: &'life1 [Mapper::Message],
) -> Pin<Box<dyn Future<Output = Result<(), Mapper::Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Commits a batch of messages.
This method is called when a batch is ready to be processed, as
determined by should_close_batch
. The
implementation should handle the batch processing logic and return
an error if the operation fails.
§Note
Messages within each batch are guaranteed to be ordered by checkpoint sequence number.
Provided Methods§
Sourcefn commit_with_retry<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
batch: &'life1 [Mapper::Message],
backoff: ExponentialBackoff,
token: &'life2 CancellationToken,
) -> Pin<Box<dyn Future<Output = CommitStatus> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn commit_with_retry<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
batch: &'life1 [Mapper::Message],
backoff: ExponentialBackoff,
token: &'life2 CancellationToken,
) -> Pin<Box<dyn Future<Output = CommitStatus> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Attempts to commit a batch of messages with exponential backoff retries on failure.
This function repeatedly calls the commit
method of
the provided Reducer
until either:
- The commit succeeds, returning
CommitStatus::Success
. - A cancellation signal is received via the [
CancellationToken
], returningCommitStatus::Shutdown
. - All retry attempts are exhausted within backoff’s maximum elapsed time, causing a panic.
§Retry Mechanism:
- Uses
ExponentialBackoff
to introduce increasing delays between retry attempts. - Checks for cancellation both before and after each commit attempt.
- If a cancellation signal is received during a backoff delay, the
function exits immediately with
CommitStatus::Shutdown
.
§Panics:
- If all retry attempts are exhausted within backoff’s the maximum elapsed time, indicating a persistent failure.
Sourcefn should_close_batch(
&self,
_batch: &[Mapper::Message],
next_item: Option<&Mapper::Message>,
) -> bool
fn should_close_batch( &self, _batch: &[Mapper::Message], next_item: Option<&Mapper::Message>, ) -> bool
Determines if the current batch should be closed and committed.
This method is called for each new message to determine if the current batch should be committed before adding the new message.
§Default Implementation
By default, returns true
only when there are no more messages
(next_item
is None
), effectively creating a single batch for all
messages.