iota_grpc_client/api/ledger/checkpoints.rs
1// Copyright (c) 2026 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! High-level API for checkpoint queries.
5//!
6//! # Available Read Mask Fields
7//!
8//! All checkpoint query methods support the following `read_mask` fields to
9//! control which data is included in the response:
10//!
11//! ## Checkpoint Fields
12//! - `checkpoint` - includes all checkpoint fields
13//! - `checkpoint.sequence_number` - the sequence number of the checkpoint
14//! - `checkpoint.summary` - includes all checkpoint summary fields
15//! - `checkpoint.summary.digest` - the digest of the checkpoint summary
16//! - `checkpoint.summary.bcs` - the full BCS-encoded checkpoint summary
17//! - `checkpoint.contents` - includes all checkpoint contents fields
18//! - `checkpoint.contents.digest` - the digest of the checkpoint contents
19//! - `checkpoint.contents.bcs` - the full BCS-encoded checkpoint contents
20//! - `checkpoint.signature` - the validator aggregated signature for the
21//! checkpoint
22//!
23//! ## Transaction Fields
24//! - `transactions` - includes all executed transaction fields
25//! - `transactions.transaction` - includes all transaction fields
26//! - `transactions.transaction.digest` - the transaction digest
27//! - `transactions.transaction.bcs` - the full BCS-encoded transaction
28//! - `transactions.signatures` - includes all signature fields
29//! - `transactions.signatures.bcs` - the full BCS-encoded signature
30//! - `transactions.effects` - includes all effects fields
31//! - `transactions.effects.digest` - the effects digest
32//! - `transactions.effects.bcs` - the full BCS-encoded effects
33//! - `transactions.events` - includes all event fields (all events of the
34//! transaction)
35//! - `transactions.events.digest` - the events digest
36//! - `transactions.events.events` - includes all event fields
37//! - `transactions.events.events.bcs` - the full BCS-encoded event
38//! - `transactions.events.events.package_id` - the ID of the package that
39//! emitted the event
40//! - `transactions.events.events.module` - the module that emitted the
41//! event
42//! - `transactions.events.events.sender` - the sender that triggered the
43//! event
44//! - `transactions.events.events.event_type` - the type of the event
45//! - `transactions.events.events.bcs_contents` - the full BCS-encoded
46//! contents of the event
47//! - `transactions.events.events.json_contents` - the JSON-encoded
48//! contents of the event
49//! - `transactions.checkpoint` - the checkpoint that included the transaction
50//! - `transactions.timestamp` - the timestamp of the checkpoint that included
51//! the transaction
52//! - `transactions.input_objects` - includes all input object fields
53//! - `transactions.input_objects.reference` - includes all reference fields
54//! - `transactions.input_objects.reference.object_id` - the ID of the
55//! input object
56//! - `transactions.input_objects.reference.version` - the version of the
57//! input object
58//! - `transactions.input_objects.reference.digest` - the digest of the
59//! input object contents
60//! - `transactions.input_objects.bcs` - the full BCS-encoded object
61//! - `transactions.output_objects` - includes all output object fields
62//! - `transactions.output_objects.reference` - includes all reference
63//! fields
64//! - `transactions.output_objects.reference.object_id` - the ID of the
65//! output object
66//! - `transactions.output_objects.reference.version` - the version of the
67//! output object
68//! - `transactions.output_objects.reference.digest` - the digest of the
69//! output object contents
70//! - `transactions.output_objects.bcs` - the full BCS-encoded object
71//!
72//! ## Event Fields
73//! - `events` - includes all event fields (all events of all transactions in
74//! the checkpoint)
75//! - `events.bcs` - the full BCS-encoded event
76//! - `events.package_id` - the ID of the package that emitted the event
77//! - `events.module` - the module that emitted the event
78//! - `events.sender` - the sender that triggered the event
79//! - `events.event_type` - the type of the event
80//! - `events.bcs_contents` - the full BCS-encoded contents of the event
81//! - `events.json_contents` - the JSON-encoded contents of the event
82
83use std::pin::Pin;
84
85use futures::{Stream, StreamExt};
86use iota_grpc_types::v1::{
87 checkpoint, event, filter as grpc_filter,
88 ledger_service::{
89 GetCheckpointRequest, StreamCheckpointsRequest, checkpoint_data, get_checkpoint_request,
90 },
91 signatures::ValidatorAggregatedSignature as ProtoValidatorAggregatedSignature,
92 transaction::ExecutedTransaction,
93};
94use iota_sdk_types::{CheckpointSequenceNumber, Digest};
95
96use crate::{
97 Client, Error,
98 api::{
99 CheckpointResponse, CheckpointStreamItem, GET_CHECKPOINT_READ_MASK, MetadataEnvelope,
100 Result, TryFromProtoError, field_mask_with_default, saturating_usize_to_u32,
101 },
102};
103
104impl Client {
105 /// Get the latest checkpoint.
106 ///
107 /// This retrieves the checkpoint including summary, contents,
108 /// transactions, and events based on the provided read mask.
109 ///
110 /// # Parameters
111 ///
112 /// * `read_mask` - Optional field mask specifying which fields to include.
113 /// If `None`, uses [`crate::api::GET_CHECKPOINT_READ_MASK`] as default.
114 /// See [module-level documentation](crate::api::ledger::checkpoints) for
115 /// all available fields.
116 /// * `transactions_filter` - Optional filter to apply to transactions
117 /// * `events_filter` - Optional filter to apply to events
118 ///
119 /// # Example
120 ///
121 /// ```no_run
122 /// # use iota_grpc_client::Client;
123 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
124 /// let client = Client::connect("http://localhost:9000").await?;
125 /// let checkpoint = client.get_checkpoint_latest(None, None, None).await?;
126 /// println!("Received checkpoint {}", checkpoint.body().sequence_number,);
127 /// # Ok(())
128 /// # }
129 /// ```
130 pub async fn get_checkpoint_latest(
131 &self,
132 read_mask: Option<&str>,
133 transactions_filter: Option<grpc_filter::TransactionFilter>,
134 events_filter: Option<grpc_filter::EventFilter>,
135 ) -> Result<MetadataEnvelope<CheckpointResponse>> {
136 self.get_checkpoint_internal(
137 get_checkpoint_request::CheckpointId::Latest(true),
138 read_mask,
139 transactions_filter,
140 events_filter,
141 )
142 .await
143 }
144
145 /// Get checkpoint by sequence number.
146 ///
147 /// This retrieves the checkpoint including summary, contents,
148 /// transactions, and events based on the provided read mask.
149 ///
150 /// # Parameters
151 ///
152 /// * `sequence_number` - The checkpoint sequence number to fetch
153 /// * `read_mask` - Optional field mask specifying which fields to include.
154 /// If `None`, uses [`crate::api::GET_CHECKPOINT_READ_MASK`] as default.
155 /// See [module-level documentation](crate::api::ledger::checkpoints) for
156 /// all available fields.
157 /// * `transactions_filter` - Optional filter to apply to transactions
158 /// * `events_filter` - Optional filter to apply to events
159 ///
160 /// # Example
161 ///
162 /// ```no_run
163 /// # use iota_grpc_client::Client;
164 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
165 /// let client = Client::connect("http://localhost:9000").await?;
166 /// let checkpoint = client
167 /// .get_checkpoint_by_sequence_number(100, None, None, None)
168 /// .await?;
169 /// println!("Received checkpoint {}", checkpoint.body().sequence_number,);
170 /// # Ok(())
171 /// # }
172 /// ```
173 pub async fn get_checkpoint_by_sequence_number(
174 &self,
175 sequence_number: CheckpointSequenceNumber,
176 read_mask: Option<&str>,
177 transactions_filter: Option<grpc_filter::TransactionFilter>,
178 events_filter: Option<grpc_filter::EventFilter>,
179 ) -> Result<MetadataEnvelope<CheckpointResponse>> {
180 self.get_checkpoint_internal(
181 get_checkpoint_request::CheckpointId::SequenceNumber(sequence_number),
182 read_mask,
183 transactions_filter,
184 events_filter,
185 )
186 .await
187 }
188
189 /// Get checkpoint by digest.
190 ///
191 /// This retrieves the checkpoint including summary, contents,
192 /// transactions, and events based on the provided read mask.
193 ///
194 /// # Parameters
195 ///
196 /// * `digest` - The checkpoint digest to fetch
197 /// * `read_mask` - Optional field mask specifying which fields to include.
198 /// If `None`, uses [`crate::api::GET_CHECKPOINT_READ_MASK`] as default.
199 /// See [module-level documentation](crate::api::ledger::checkpoints) for
200 /// all available fields.
201 /// * `transactions_filter` - Optional filter to apply to transactions
202 /// * `events_filter` - Optional filter to apply to events
203 ///
204 /// # Example
205 ///
206 /// ```no_run
207 /// # use iota_grpc_client::Client;
208 /// # use iota_sdk_types::Digest;
209 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
210 /// let client = Client::connect("http://localhost:9000").await?;
211 /// let digest: Digest = todo!();
212 /// let checkpoint = client
213 /// .get_checkpoint_by_digest(digest, None, None, None)
214 /// .await?;
215 /// println!("Received checkpoint {}", checkpoint.body().sequence_number,);
216 /// # Ok(())
217 /// # }
218 /// ```
219 pub async fn get_checkpoint_by_digest(
220 &self,
221 digest: Digest,
222 read_mask: Option<&str>,
223 transactions_filter: Option<grpc_filter::TransactionFilter>,
224 events_filter: Option<grpc_filter::EventFilter>,
225 ) -> Result<MetadataEnvelope<CheckpointResponse>> {
226 self.get_checkpoint_internal(
227 get_checkpoint_request::CheckpointId::Digest(digest.into()),
228 read_mask,
229 transactions_filter,
230 events_filter,
231 )
232 .await
233 }
234
235 /// Internal helper to fetch checkpoint by any ID type.
236 async fn get_checkpoint_internal(
237 &self,
238 checkpoint_id: get_checkpoint_request::CheckpointId,
239 read_mask: Option<&str>,
240 transactions_filter: Option<grpc_filter::TransactionFilter>,
241 events_filter: Option<grpc_filter::EventFilter>,
242 ) -> Result<MetadataEnvelope<CheckpointResponse>> {
243 let mut request = match checkpoint_id {
244 get_checkpoint_request::CheckpointId::Latest(val) => {
245 GetCheckpointRequest::default().with_latest(val)
246 }
247 get_checkpoint_request::CheckpointId::SequenceNumber(val) => {
248 GetCheckpointRequest::default().with_sequence_number(val)
249 }
250 get_checkpoint_request::CheckpointId::Digest(val) => {
251 GetCheckpointRequest::default().with_digest(val)
252 }
253 _ => {
254 return Err(Error::Protocol("Invalid checkpoint ID type".into()));
255 }
256 }
257 .with_read_mask(field_mask_with_default(read_mask, GET_CHECKPOINT_READ_MASK));
258
259 if let Some(tf) = transactions_filter {
260 request = request.with_transactions_filter(tf);
261 }
262 if let Some(ef) = events_filter {
263 request = request.with_events_filter(ef);
264 }
265 if let Some(max_size) = self
266 .max_decoding_message_size()
267 .map(saturating_usize_to_u32)
268 {
269 request = request.with_max_message_size_bytes(max_size);
270 }
271
272 let mut client = self.ledger_service_client();
273 let response = client.get_checkpoint(request).await?;
274 let (stream, metadata) = MetadataEnvelope::from(response).into_parts();
275
276 let reassembled = Self::reassemble_checkpoint_data_stream(stream);
277 futures::pin_mut!(reassembled);
278
279 // Skip any progress messages and find the first checkpoint
280 let checkpoint = loop {
281 match reassembled.next().await {
282 Some(Ok(CheckpointStreamItem::Checkpoint(cp))) => break *cp,
283 Some(Ok(CheckpointStreamItem::Progress { .. })) => continue,
284 Some(Err(e)) => return Err(e),
285 None => {
286 return Err(TryFromProtoError::missing("checkpoint data").into());
287 }
288 }
289 };
290
291 Ok(MetadataEnvelope::new(checkpoint, metadata))
292 }
293
294 /// Stream checkpoints across a range of checkpoints.
295 ///
296 /// Returns a stream of [`CheckpointResponse`] objects, each representing
297 /// a complete checkpoint with its transactions and events. Every checkpoint
298 /// in the range is yielded, even if the filters produce no matching
299 /// transactions or events within it.
300 ///
301 /// To skip non-matching checkpoints entirely, use
302 /// [`stream_checkpoints_filtered`](Self::stream_checkpoints_filtered).
303 ///
304 /// **Note:** The metadata in the returned [`MetadataEnvelope`] is captured
305 /// from the initial gRPC response headers when the stream is opened. It is
306 /// **not** updated as subsequent checkpoint data arrives.
307 ///
308 /// # Parameters
309 ///
310 /// * `start_sequence_number` - Optional starting checkpoint. If `None`,
311 /// starts from the latest checkpoint.
312 /// * `end_sequence_number` - Optional ending checkpoint. If `None`, streams
313 /// indefinitely.
314 /// * `read_mask` - Optional field mask specifying which fields to include.
315 /// If `None`, uses [`crate::api::GET_CHECKPOINT_READ_MASK`] as default.
316 /// See [module-level documentation](crate::api::ledger::checkpoints) for
317 /// all available fields.
318 /// * `transactions_filter` - Optional filter to apply to transactions
319 /// * `events_filter` - Optional filter to apply to events
320 ///
321 /// # Example
322 ///
323 /// ```no_run
324 /// # use iota_grpc_client::Client;
325 /// # use futures::StreamExt;
326 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
327 /// let client = Client::connect("http://localhost:9000").await?;
328 /// let mut stream = client
329 /// .stream_checkpoints(Some(0), Some(10), None, None, None)
330 /// .await?;
331 ///
332 /// while let Some(checkpoint) = stream.body_mut().next().await {
333 /// let checkpoint = checkpoint?;
334 /// println!("Received checkpoint {}", checkpoint.sequence_number);
335 /// }
336 /// # Ok(())
337 /// # }
338 /// ```
339 pub async fn stream_checkpoints(
340 &self,
341 start_sequence_number: Option<CheckpointSequenceNumber>,
342 end_sequence_number: Option<CheckpointSequenceNumber>,
343 read_mask: Option<&str>,
344 transactions_filter: Option<grpc_filter::TransactionFilter>,
345 events_filter: Option<grpc_filter::EventFilter>,
346 ) -> Result<MetadataEnvelope<Pin<Box<dyn Stream<Item = Result<CheckpointResponse>> + Send>>>>
347 {
348 let envelope = self
349 .stream_checkpoints_raw(
350 start_sequence_number,
351 end_sequence_number,
352 read_mask,
353 transactions_filter,
354 events_filter,
355 false,
356 None,
357 )
358 .await?;
359
360 let (stream, metadata) = envelope.into_parts();
361
362 // remove the wrapping CheckpointStreamItem layer since we know
363 // filter_checkpoints is false and thus only Checkpoint items will be produced
364 let filtered = stream.filter_map(|item| async {
365 match item {
366 Ok(CheckpointStreamItem::Checkpoint(cp)) => Some(Ok(*cp)),
367 Ok(CheckpointStreamItem::Progress { .. }) => None,
368 Err(e) => Some(Err(e)),
369 }
370 });
371
372 Ok(MetadataEnvelope::new(Box::pin(filtered), metadata))
373 }
374
375 /// Stream checkpoints, skipping those with no matching data.
376 ///
377 /// Unlike [`stream_checkpoints`](Self::stream_checkpoints), this method
378 /// sets `filter_checkpoints = true` on the server, which means checkpoints
379 /// without any matching transactions or events are skipped entirely.
380 ///
381 /// The returned stream yields [`CheckpointStreamItem`], which is either a
382 /// [`CheckpointStreamItem::Checkpoint`] or a
383 /// [`CheckpointStreamItem::Progress`]. Progress messages are sent
384 /// periodically during scanning to indicate liveness and the current scan
385 /// position (default every 2 seconds, configurable via
386 /// `progress_interval_ms`).
387 ///
388 /// For liveness detection, wrap `stream.next()` in
389 /// `tokio::time::timeout()` — if neither a `Checkpoint` nor a `Progress`
390 /// arrives within your chosen duration plus some buffer for connection
391 /// latency, the connection is likely dead.
392 ///
393 /// At least one of `transactions_filter` or `events_filter` must be set.
394 ///
395 /// # Parameters
396 ///
397 /// * `start_sequence_number` - Optional starting checkpoint. If `None`,
398 /// starts from the latest checkpoint.
399 /// * `end_sequence_number` - Optional ending checkpoint. If `None`, streams
400 /// indefinitely.
401 /// * `read_mask` - Optional field mask specifying which fields to include.
402 /// If `None`, uses [`crate::api::GET_CHECKPOINT_READ_MASK`] as default.
403 /// See [module-level documentation](crate::api::ledger::checkpoints) for
404 /// all available fields.
405 /// * `transactions_filter` - Optional filter to apply to transactions
406 /// * `events_filter` - Optional filter to apply to events
407 /// * `progress_interval_ms` - Optional progress message interval in
408 /// milliseconds. Defaults to 2000ms. Minimum 500ms.
409 ///
410 /// # Example
411 ///
412 /// ```no_run
413 /// # use iota_grpc_client::{Client, CheckpointStreamItem};
414 /// # use iota_grpc_types::v1::filter as grpc_filter;
415 /// # use futures::StreamExt;
416 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
417 /// let client = Client::connect("http://localhost:9000").await?;
418 /// // At least one filter is required
419 /// let tx_filter = grpc_filter::TransactionFilter::default();
420 /// let mut stream = client
421 /// .stream_checkpoints_filtered(Some(0), None, None, Some(tx_filter), None, None)
422 /// .await?;
423 ///
424 /// while let Some(item) = stream.body_mut().next().await {
425 /// match item? {
426 /// CheckpointStreamItem::Checkpoint(cp) => {
427 /// println!("Matched checkpoint {}", cp.sequence_number);
428 /// }
429 /// CheckpointStreamItem::Progress {
430 /// latest_scanned_sequence_number,
431 /// } => {
432 /// println!("Scanned up to {latest_scanned_sequence_number}");
433 /// }
434 /// }
435 /// }
436 /// # Ok(())
437 /// # }
438 /// ```
439 pub async fn stream_checkpoints_filtered(
440 &self,
441 start_sequence_number: Option<CheckpointSequenceNumber>,
442 end_sequence_number: Option<CheckpointSequenceNumber>,
443 read_mask: Option<&str>,
444 transactions_filter: Option<grpc_filter::TransactionFilter>,
445 events_filter: Option<grpc_filter::EventFilter>,
446 progress_interval_ms: Option<u32>,
447 ) -> Result<MetadataEnvelope<Pin<Box<dyn Stream<Item = Result<CheckpointStreamItem>> + Send>>>>
448 {
449 self.stream_checkpoints_raw(
450 start_sequence_number,
451 end_sequence_number,
452 read_mask,
453 transactions_filter,
454 events_filter,
455 true,
456 progress_interval_ms,
457 )
458 .await
459 }
460
461 /// Internal helper that builds the stream request and returns the raw
462 /// [`CheckpointStreamItem`] stream.
463 async fn stream_checkpoints_raw(
464 &self,
465 start_sequence_number: Option<CheckpointSequenceNumber>,
466 end_sequence_number: Option<CheckpointSequenceNumber>,
467 read_mask: Option<&str>,
468 transactions_filter: Option<grpc_filter::TransactionFilter>,
469 events_filter: Option<grpc_filter::EventFilter>,
470 filter_checkpoints: bool,
471 progress_interval_ms: Option<u32>,
472 ) -> Result<MetadataEnvelope<Pin<Box<dyn Stream<Item = Result<CheckpointStreamItem>> + Send>>>>
473 {
474 let mut request = StreamCheckpointsRequest::default()
475 .with_read_mask(field_mask_with_default(read_mask, GET_CHECKPOINT_READ_MASK));
476
477 if let Some(start) = start_sequence_number {
478 request = request.with_start_sequence_number(start);
479 }
480 if let Some(end) = end_sequence_number {
481 request = request.with_end_sequence_number(end);
482 }
483 if let Some(tf) = transactions_filter {
484 request = request.with_transactions_filter(tf);
485 }
486 if let Some(ef) = events_filter {
487 request = request.with_events_filter(ef);
488 }
489 if filter_checkpoints {
490 request = request.with_filter_checkpoints(true);
491 }
492 if let Some(ms) = progress_interval_ms {
493 request = request.with_progress_interval_ms(ms);
494 }
495 if let Some(max_size) = self
496 .max_decoding_message_size()
497 .map(saturating_usize_to_u32)
498 {
499 request = request.with_max_message_size_bytes(max_size);
500 }
501
502 let mut client = self.ledger_service_client();
503 let response = client.stream_checkpoints(request).await?;
504 let (stream, metadata) = MetadataEnvelope::from(response).into_parts();
505
506 Ok(MetadataEnvelope::new(
507 Box::pin(Self::reassemble_checkpoint_data_stream(stream)),
508 metadata,
509 ))
510 }
511
512 /// Reassemble a stream of checkpoint data chunks into complete checkpoints.
513 ///
514 /// The server sends checkpoint data in multiple messages:
515 /// - `Checkpoint` - Contains the checkpoint summary and contents
516 /// - `Transactions` - Contains executed transactions
517 /// - `Events` - Contains events from transactions
518 /// - `Progress` - Liveness indicator during filtered scanning
519 /// - `EndMarker` - Signals the end of one checkpoint's data
520 ///
521 /// This function buffers the chunks and yields [`CheckpointStreamItem`]
522 /// values: either complete [`CheckpointResponse`] objects when an
523 /// `EndMarker` is received, or [`CheckpointStreamItem::Progress`] when
524 /// a progress message arrives.
525 fn reassemble_checkpoint_data_stream<S, E>(
526 stream: S,
527 ) -> impl Stream<Item = Result<CheckpointStreamItem>>
528 where
529 S: Stream<
530 Item = std::result::Result<iota_grpc_types::v1::ledger_service::CheckpointData, E>,
531 >,
532 E: Into<Error>,
533 {
534 async_stream::try_stream! {
535 futures::pin_mut!(stream);
536
537 // State for accumulating checkpoint data
538 let mut current_sequence_number: Option<CheckpointSequenceNumber> = None;
539 let mut current_summary: Option<checkpoint::CheckpointSummary> = None;
540 let mut current_signature: Option<ProtoValidatorAggregatedSignature> = None;
541 let mut current_contents: Option<checkpoint::CheckpointContents> = None;
542 let mut current_transactions: Vec<ExecutedTransaction> = Vec::new();
543 let mut current_events: Vec<event::Event> = Vec::new();
544
545 while let Some(data) = stream.next().await {
546 let data = data.map_err(|e| e.into())?;
547
548 match data.payload {
549 Some(checkpoint_data::Payload::Checkpoint(checkpoint)) => {
550 if checkpoint.sequence_number.is_none() {
551 Err(TryFromProtoError::missing("checkpoint.sequence_number"))?;
552 }
553
554 // Start of new checkpoint - throw error if previous checkpoint was incomplete
555 if current_sequence_number.is_some() {
556 Err(Error::Protocol("Received new chunked checkpoint header before completing previous checkpoint".into()))?;
557 }
558 current_sequence_number = checkpoint.sequence_number;
559
560 // Store proto summary (optional, no deserialization)
561 current_summary = checkpoint.summary;
562
563 // Store proto signature (optional, no deserialization)
564 current_signature = checkpoint.signature;
565
566 // Store proto contents (optional, no deserialization)
567 current_contents = checkpoint.contents;
568
569 // Reset accumulators for new checkpoint (in case Transactions or Events
570 // arrived between endmarker and Checkpoint)
571 current_transactions.clear();
572 current_events.clear();
573 }
574
575 Some(checkpoint_data::Payload::ExecutedTransactions(txs)) => {
576 if current_sequence_number.is_none() {
577 Err(Error::Protocol("Received new chunked checkpoint transactions before receiving checkpoint header".into()))?;
578 }
579
580 // Accumulate proto transactions (no deserialization)
581 current_transactions.extend(txs.executed_transactions.into_iter());
582 }
583
584 Some(checkpoint_data::Payload::Events(events)) => {
585 if current_sequence_number.is_none() {
586 Err(Error::Protocol("Received new chunked checkpoint events before receiving checkpoint header".into()))?;
587 }
588
589 // Accumulate proto events (no deserialization)
590 current_events.extend(events.events);
591 }
592
593 Some(checkpoint_data::Payload::EndMarker(marker)) => {
594 // End of current checkpoint - assemble the result and yield it
595 let sequence_number = current_sequence_number
596 .take()
597 .ok_or_else(|| -> Error { Error::Protocol("Received checkpoint end marker before receiving checkpoint header".into()) })?;
598
599 let marker_sequence_number = marker.sequence_number
600 .ok_or_else(|| -> Error { TryFromProtoError::missing("end_marker.sequence_number").into() })?;
601
602 if marker_sequence_number != sequence_number {
603 Err(Error::Protocol(format!(
604 "EndMarker sequence_number {marker_sequence_number} does not match current checkpoint sequence_number {sequence_number:?}",
605 )))?;
606 }
607
608 let response = CheckpointResponse {
609 sequence_number,
610 summary: current_summary.take(),
611 signature: current_signature.take(),
612 contents: current_contents.take(),
613 executed_transactions: std::mem::take(&mut current_transactions),
614 events: std::mem::take(&mut current_events),
615 };
616
617 yield CheckpointStreamItem::Checkpoint(Box::new(response));
618 }
619
620 Some(checkpoint_data::Payload::Progress(progress)) => {
621 yield CheckpointStreamItem::Progress {
622 latest_scanned_sequence_number: progress.latest_scanned_sequence_number,
623 };
624 }
625
626 None => {
627 // Empty payload - skip
628 continue;
629 }
630
631 Some(_) => {
632 // Unknown payload type
633 Err(Error::Protocol("Received unknown checkpoint data payload type".into()))?;
634 }
635 }
636 }
637
638 // Check if stream ended with incomplete checkpoint data
639 if let Some(sequence_number) = current_sequence_number {
640 Err(Error::Protocol(format!(
641 "Stream ended with incomplete checkpoint data for sequence number {sequence_number}"
642 )))?;
643 }
644 }
645 }
646}