iota_grpc_client/api/ledger/
checkpoints.rs

1// Copyright (c) 2026 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! High-level API for checkpoint queries.
5//!
6//! # Available Read Mask Fields
7//!
8//! All checkpoint query methods support the following `read_mask` fields to
9//! control which data is included in the response:
10//!
11//! ## Checkpoint Fields
12//! - `checkpoint` - includes all checkpoint fields
13//!   - `checkpoint.sequence_number` - the sequence number of the checkpoint
14//!   - `checkpoint.summary` - includes all checkpoint summary fields
15//!     - `checkpoint.summary.digest` - the digest of the checkpoint summary
16//!     - `checkpoint.summary.bcs` - the full BCS-encoded checkpoint summary
17//!   - `checkpoint.contents` - includes all checkpoint contents fields
18//!     - `checkpoint.contents.digest` - the digest of the checkpoint contents
19//!     - `checkpoint.contents.bcs` - the full BCS-encoded checkpoint contents
20//!   - `checkpoint.signature` - the validator aggregated signature for the
21//!     checkpoint
22//!
23//! ## Transaction Fields
24//! - `transactions` - includes all executed transaction fields
25//!   - `transactions.transaction` - includes all transaction fields
26//!     - `transactions.transaction.digest` - the transaction digest
27//!     - `transactions.transaction.bcs` - the full BCS-encoded transaction
28//!   - `transactions.signatures` - includes all signature fields
29//!     - `transactions.signatures.bcs` - the full BCS-encoded signature
30//!   - `transactions.effects` - includes all effects fields
31//!     - `transactions.effects.digest` - the effects digest
32//!     - `transactions.effects.bcs` - the full BCS-encoded effects
33//!   - `transactions.events` - includes all event fields (all events of the
34//!     transaction)
35//!     - `transactions.events.digest` - the events digest
36//!     - `transactions.events.events` - includes all event fields
37//!       - `transactions.events.events.bcs` - the full BCS-encoded event
38//!       - `transactions.events.events.package_id` - the ID of the package that
39//!         emitted the event
40//!       - `transactions.events.events.module` - the module that emitted the
41//!         event
42//!       - `transactions.events.events.sender` - the sender that triggered the
43//!         event
44//!       - `transactions.events.events.event_type` - the type of the event
45//!       - `transactions.events.events.bcs_contents` - the full BCS-encoded
46//!         contents of the event
47//!       - `transactions.events.events.json_contents` - the JSON-encoded
48//!         contents of the event
49//!   - `transactions.checkpoint` - the checkpoint that included the transaction
50//!   - `transactions.timestamp` - the timestamp of the checkpoint that included
51//!     the transaction
52//!   - `transactions.input_objects` - includes all input object fields
53//!     - `transactions.input_objects.reference` - includes all reference fields
54//!       - `transactions.input_objects.reference.object_id` - the ID of the
55//!         input object
56//!       - `transactions.input_objects.reference.version` - the version of the
57//!         input object
58//!       - `transactions.input_objects.reference.digest` - the digest of the
59//!         input object contents
60//!     - `transactions.input_objects.bcs` - the full BCS-encoded object
61//!   - `transactions.output_objects` - includes all output object fields
62//!     - `transactions.output_objects.reference` - includes all reference
63//!       fields
64//!       - `transactions.output_objects.reference.object_id` - the ID of the
65//!         output object
66//!       - `transactions.output_objects.reference.version` - the version of the
67//!         output object
68//!       - `transactions.output_objects.reference.digest` - the digest of the
69//!         output object contents
70//!     - `transactions.output_objects.bcs` - the full BCS-encoded object
71//!
72//! ## Event Fields
73//! - `events` - includes all event fields (all events of all transactions in
74//!   the checkpoint)
75//!   - `events.bcs` - the full BCS-encoded event
76//!   - `events.package_id` - the ID of the package that emitted the event
77//!   - `events.module` - the module that emitted the event
78//!   - `events.sender` - the sender that triggered the event
79//!   - `events.event_type` - the type of the event
80//!   - `events.bcs_contents` - the full BCS-encoded contents of the event
81//!   - `events.json_contents` - the JSON-encoded contents of the event
82
83use std::pin::Pin;
84
85use futures::{Stream, StreamExt};
86use iota_grpc_types::v1::{
87    checkpoint, event, filter as grpc_filter,
88    ledger_service::{
89        GetCheckpointRequest, StreamCheckpointsRequest, checkpoint_data, get_checkpoint_request,
90    },
91    signatures::ValidatorAggregatedSignature as ProtoValidatorAggregatedSignature,
92    transaction::ExecutedTransaction,
93};
94use iota_sdk_types::{CheckpointSequenceNumber, Digest};
95
96use crate::{
97    Client, Error,
98    api::{
99        CheckpointResponse, CheckpointStreamItem, GET_CHECKPOINT_READ_MASK, MetadataEnvelope,
100        Result, TryFromProtoError, field_mask_with_default, saturating_usize_to_u32,
101    },
102};
103
104impl Client {
105    /// Get the latest checkpoint.
106    ///
107    /// This retrieves the checkpoint including summary, contents,
108    /// transactions, and events based on the provided read mask.
109    ///
110    /// # Parameters
111    ///
112    /// * `read_mask` - Optional field mask specifying which fields to include.
113    ///   If `None`, uses [`crate::api::GET_CHECKPOINT_READ_MASK`] as default.
114    ///   See [module-level documentation](crate::api::ledger::checkpoints) for
115    ///   all available fields.
116    /// * `transactions_filter` - Optional filter to apply to transactions
117    /// * `events_filter` - Optional filter to apply to events
118    ///
119    /// # Example
120    ///
121    /// ```no_run
122    /// # use iota_grpc_client::Client;
123    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
124    /// let client = Client::connect("http://localhost:9000").await?;
125    /// let checkpoint = client.get_checkpoint_latest(None, None, None).await?;
126    /// println!("Received checkpoint {}", checkpoint.body().sequence_number,);
127    /// # Ok(())
128    /// # }
129    /// ```
130    pub async fn get_checkpoint_latest(
131        &self,
132        read_mask: Option<&str>,
133        transactions_filter: Option<grpc_filter::TransactionFilter>,
134        events_filter: Option<grpc_filter::EventFilter>,
135    ) -> Result<MetadataEnvelope<CheckpointResponse>> {
136        self.get_checkpoint_internal(
137            get_checkpoint_request::CheckpointId::Latest(true),
138            read_mask,
139            transactions_filter,
140            events_filter,
141        )
142        .await
143    }
144
145    /// Get checkpoint by sequence number.
146    ///
147    /// This retrieves the checkpoint including summary, contents,
148    /// transactions, and events based on the provided read mask.
149    ///
150    /// # Parameters
151    ///
152    /// * `sequence_number` - The checkpoint sequence number to fetch
153    /// * `read_mask` - Optional field mask specifying which fields to include.
154    ///   If `None`, uses [`crate::api::GET_CHECKPOINT_READ_MASK`] as default.
155    ///   See [module-level documentation](crate::api::ledger::checkpoints) for
156    ///   all available fields.
157    /// * `transactions_filter` - Optional filter to apply to transactions
158    /// * `events_filter` - Optional filter to apply to events
159    ///
160    /// # Example
161    ///
162    /// ```no_run
163    /// # use iota_grpc_client::Client;
164    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
165    /// let client = Client::connect("http://localhost:9000").await?;
166    /// let checkpoint = client
167    ///     .get_checkpoint_by_sequence_number(100, None, None, None)
168    ///     .await?;
169    /// println!("Received checkpoint {}", checkpoint.body().sequence_number,);
170    /// # Ok(())
171    /// # }
172    /// ```
173    pub async fn get_checkpoint_by_sequence_number(
174        &self,
175        sequence_number: CheckpointSequenceNumber,
176        read_mask: Option<&str>,
177        transactions_filter: Option<grpc_filter::TransactionFilter>,
178        events_filter: Option<grpc_filter::EventFilter>,
179    ) -> Result<MetadataEnvelope<CheckpointResponse>> {
180        self.get_checkpoint_internal(
181            get_checkpoint_request::CheckpointId::SequenceNumber(sequence_number),
182            read_mask,
183            transactions_filter,
184            events_filter,
185        )
186        .await
187    }
188
189    /// Get checkpoint by digest.
190    ///
191    /// This retrieves the checkpoint including summary, contents,
192    /// transactions, and events based on the provided read mask.
193    ///
194    /// # Parameters
195    ///
196    /// * `digest` - The checkpoint digest to fetch
197    /// * `read_mask` - Optional field mask specifying which fields to include.
198    ///   If `None`, uses [`crate::api::GET_CHECKPOINT_READ_MASK`] as default.
199    ///   See [module-level documentation](crate::api::ledger::checkpoints) for
200    ///   all available fields.
201    /// * `transactions_filter` - Optional filter to apply to transactions
202    /// * `events_filter` - Optional filter to apply to events
203    ///
204    /// # Example
205    ///
206    /// ```no_run
207    /// # use iota_grpc_client::Client;
208    /// # use iota_sdk_types::Digest;
209    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
210    /// let client = Client::connect("http://localhost:9000").await?;
211    /// let digest: Digest = todo!();
212    /// let checkpoint = client
213    ///     .get_checkpoint_by_digest(digest, None, None, None)
214    ///     .await?;
215    /// println!("Received checkpoint {}", checkpoint.body().sequence_number,);
216    /// # Ok(())
217    /// # }
218    /// ```
219    pub async fn get_checkpoint_by_digest(
220        &self,
221        digest: Digest,
222        read_mask: Option<&str>,
223        transactions_filter: Option<grpc_filter::TransactionFilter>,
224        events_filter: Option<grpc_filter::EventFilter>,
225    ) -> Result<MetadataEnvelope<CheckpointResponse>> {
226        self.get_checkpoint_internal(
227            get_checkpoint_request::CheckpointId::Digest(digest.into()),
228            read_mask,
229            transactions_filter,
230            events_filter,
231        )
232        .await
233    }
234
235    /// Internal helper to fetch checkpoint by any ID type.
236    async fn get_checkpoint_internal(
237        &self,
238        checkpoint_id: get_checkpoint_request::CheckpointId,
239        read_mask: Option<&str>,
240        transactions_filter: Option<grpc_filter::TransactionFilter>,
241        events_filter: Option<grpc_filter::EventFilter>,
242    ) -> Result<MetadataEnvelope<CheckpointResponse>> {
243        let mut request = match checkpoint_id {
244            get_checkpoint_request::CheckpointId::Latest(val) => {
245                GetCheckpointRequest::default().with_latest(val)
246            }
247            get_checkpoint_request::CheckpointId::SequenceNumber(val) => {
248                GetCheckpointRequest::default().with_sequence_number(val)
249            }
250            get_checkpoint_request::CheckpointId::Digest(val) => {
251                GetCheckpointRequest::default().with_digest(val)
252            }
253            _ => {
254                return Err(Error::Protocol("Invalid checkpoint ID type".into()));
255            }
256        }
257        .with_read_mask(field_mask_with_default(read_mask, GET_CHECKPOINT_READ_MASK));
258
259        if let Some(tf) = transactions_filter {
260            request = request.with_transactions_filter(tf);
261        }
262        if let Some(ef) = events_filter {
263            request = request.with_events_filter(ef);
264        }
265        if let Some(max_size) = self
266            .max_decoding_message_size()
267            .map(saturating_usize_to_u32)
268        {
269            request = request.with_max_message_size_bytes(max_size);
270        }
271
272        let mut client = self.ledger_service_client();
273        let response = client.get_checkpoint(request).await?;
274        let (stream, metadata) = MetadataEnvelope::from(response).into_parts();
275
276        let reassembled = Self::reassemble_checkpoint_data_stream(stream);
277        futures::pin_mut!(reassembled);
278
279        // Skip any progress messages and find the first checkpoint
280        let checkpoint = loop {
281            match reassembled.next().await {
282                Some(Ok(CheckpointStreamItem::Checkpoint(cp))) => break *cp,
283                Some(Ok(CheckpointStreamItem::Progress { .. })) => continue,
284                Some(Err(e)) => return Err(e),
285                None => {
286                    return Err(TryFromProtoError::missing("checkpoint data").into());
287                }
288            }
289        };
290
291        Ok(MetadataEnvelope::new(checkpoint, metadata))
292    }
293
294    /// Stream checkpoints across a range of checkpoints.
295    ///
296    /// Returns a stream of [`CheckpointResponse`] objects, each representing
297    /// a complete checkpoint with its transactions and events. Every checkpoint
298    /// in the range is yielded, even if the filters produce no matching
299    /// transactions or events within it.
300    ///
301    /// To skip non-matching checkpoints entirely, use
302    /// [`stream_checkpoints_filtered`](Self::stream_checkpoints_filtered).
303    ///
304    /// **Note:** The metadata in the returned [`MetadataEnvelope`] is captured
305    /// from the initial gRPC response headers when the stream is opened. It is
306    /// **not** updated as subsequent checkpoint data arrives.
307    ///
308    /// # Parameters
309    ///
310    /// * `start_sequence_number` - Optional starting checkpoint. If `None`,
311    ///   starts from the latest checkpoint.
312    /// * `end_sequence_number` - Optional ending checkpoint. If `None`, streams
313    ///   indefinitely.
314    /// * `read_mask` - Optional field mask specifying which fields to include.
315    ///   If `None`, uses [`crate::api::GET_CHECKPOINT_READ_MASK`] as default.
316    ///   See [module-level documentation](crate::api::ledger::checkpoints) for
317    ///   all available fields.
318    /// * `transactions_filter` - Optional filter to apply to transactions
319    /// * `events_filter` - Optional filter to apply to events
320    ///
321    /// # Example
322    ///
323    /// ```no_run
324    /// # use iota_grpc_client::Client;
325    /// # use futures::StreamExt;
326    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
327    /// let client = Client::connect("http://localhost:9000").await?;
328    /// let mut stream = client
329    ///     .stream_checkpoints(Some(0), Some(10), None, None, None)
330    ///     .await?;
331    ///
332    /// while let Some(checkpoint) = stream.body_mut().next().await {
333    ///     let checkpoint = checkpoint?;
334    ///     println!("Received checkpoint {}", checkpoint.sequence_number);
335    /// }
336    /// # Ok(())
337    /// # }
338    /// ```
339    pub async fn stream_checkpoints(
340        &self,
341        start_sequence_number: Option<CheckpointSequenceNumber>,
342        end_sequence_number: Option<CheckpointSequenceNumber>,
343        read_mask: Option<&str>,
344        transactions_filter: Option<grpc_filter::TransactionFilter>,
345        events_filter: Option<grpc_filter::EventFilter>,
346    ) -> Result<MetadataEnvelope<Pin<Box<dyn Stream<Item = Result<CheckpointResponse>> + Send>>>>
347    {
348        let envelope = self
349            .stream_checkpoints_raw(
350                start_sequence_number,
351                end_sequence_number,
352                read_mask,
353                transactions_filter,
354                events_filter,
355                false,
356                None,
357            )
358            .await?;
359
360        let (stream, metadata) = envelope.into_parts();
361
362        // remove the wrapping CheckpointStreamItem layer since we know
363        // filter_checkpoints is false and thus only Checkpoint items will be produced
364        let filtered = stream.filter_map(|item| async {
365            match item {
366                Ok(CheckpointStreamItem::Checkpoint(cp)) => Some(Ok(*cp)),
367                Ok(CheckpointStreamItem::Progress { .. }) => None,
368                Err(e) => Some(Err(e)),
369            }
370        });
371
372        Ok(MetadataEnvelope::new(Box::pin(filtered), metadata))
373    }
374
375    /// Stream checkpoints, skipping those with no matching data.
376    ///
377    /// Unlike [`stream_checkpoints`](Self::stream_checkpoints), this method
378    /// sets `filter_checkpoints = true` on the server, which means checkpoints
379    /// without any matching transactions or events are skipped entirely.
380    ///
381    /// The returned stream yields [`CheckpointStreamItem`], which is either a
382    /// [`CheckpointStreamItem::Checkpoint`] or a
383    /// [`CheckpointStreamItem::Progress`]. Progress messages are sent
384    /// periodically during scanning to indicate liveness and the current scan
385    /// position (default every 2 seconds, configurable via
386    /// `progress_interval_ms`).
387    ///
388    /// For liveness detection, wrap `stream.next()` in
389    /// `tokio::time::timeout()` — if neither a `Checkpoint` nor a `Progress`
390    /// arrives within your chosen duration plus some buffer for connection
391    /// latency, the connection is likely dead.
392    ///
393    /// At least one of `transactions_filter` or `events_filter` must be set.
394    ///
395    /// # Parameters
396    ///
397    /// * `start_sequence_number` - Optional starting checkpoint. If `None`,
398    ///   starts from the latest checkpoint.
399    /// * `end_sequence_number` - Optional ending checkpoint. If `None`, streams
400    ///   indefinitely.
401    /// * `read_mask` - Optional field mask specifying which fields to include.
402    ///   If `None`, uses [`crate::api::GET_CHECKPOINT_READ_MASK`] as default.
403    ///   See [module-level documentation](crate::api::ledger::checkpoints) for
404    ///   all available fields.
405    /// * `transactions_filter` - Optional filter to apply to transactions
406    /// * `events_filter` - Optional filter to apply to events
407    /// * `progress_interval_ms` - Optional progress message interval in
408    ///   milliseconds. Defaults to 2000ms. Minimum 500ms.
409    ///
410    /// # Example
411    ///
412    /// ```no_run
413    /// # use iota_grpc_client::{Client, CheckpointStreamItem};
414    /// # use iota_grpc_types::v1::filter as grpc_filter;
415    /// # use futures::StreamExt;
416    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
417    /// let client = Client::connect("http://localhost:9000").await?;
418    /// // At least one filter is required
419    /// let tx_filter = grpc_filter::TransactionFilter::default();
420    /// let mut stream = client
421    ///     .stream_checkpoints_filtered(Some(0), None, None, Some(tx_filter), None, None)
422    ///     .await?;
423    ///
424    /// while let Some(item) = stream.body_mut().next().await {
425    ///     match item? {
426    ///         CheckpointStreamItem::Checkpoint(cp) => {
427    ///             println!("Matched checkpoint {}", cp.sequence_number);
428    ///         }
429    ///         CheckpointStreamItem::Progress {
430    ///             latest_scanned_sequence_number,
431    ///         } => {
432    ///             println!("Scanned up to {latest_scanned_sequence_number}");
433    ///         }
434    ///     }
435    /// }
436    /// # Ok(())
437    /// # }
438    /// ```
439    pub async fn stream_checkpoints_filtered(
440        &self,
441        start_sequence_number: Option<CheckpointSequenceNumber>,
442        end_sequence_number: Option<CheckpointSequenceNumber>,
443        read_mask: Option<&str>,
444        transactions_filter: Option<grpc_filter::TransactionFilter>,
445        events_filter: Option<grpc_filter::EventFilter>,
446        progress_interval_ms: Option<u32>,
447    ) -> Result<MetadataEnvelope<Pin<Box<dyn Stream<Item = Result<CheckpointStreamItem>> + Send>>>>
448    {
449        self.stream_checkpoints_raw(
450            start_sequence_number,
451            end_sequence_number,
452            read_mask,
453            transactions_filter,
454            events_filter,
455            true,
456            progress_interval_ms,
457        )
458        .await
459    }
460
461    /// Internal helper that builds the stream request and returns the raw
462    /// [`CheckpointStreamItem`] stream.
463    async fn stream_checkpoints_raw(
464        &self,
465        start_sequence_number: Option<CheckpointSequenceNumber>,
466        end_sequence_number: Option<CheckpointSequenceNumber>,
467        read_mask: Option<&str>,
468        transactions_filter: Option<grpc_filter::TransactionFilter>,
469        events_filter: Option<grpc_filter::EventFilter>,
470        filter_checkpoints: bool,
471        progress_interval_ms: Option<u32>,
472    ) -> Result<MetadataEnvelope<Pin<Box<dyn Stream<Item = Result<CheckpointStreamItem>> + Send>>>>
473    {
474        let mut request = StreamCheckpointsRequest::default()
475            .with_read_mask(field_mask_with_default(read_mask, GET_CHECKPOINT_READ_MASK));
476
477        if let Some(start) = start_sequence_number {
478            request = request.with_start_sequence_number(start);
479        }
480        if let Some(end) = end_sequence_number {
481            request = request.with_end_sequence_number(end);
482        }
483        if let Some(tf) = transactions_filter {
484            request = request.with_transactions_filter(tf);
485        }
486        if let Some(ef) = events_filter {
487            request = request.with_events_filter(ef);
488        }
489        if filter_checkpoints {
490            request = request.with_filter_checkpoints(true);
491        }
492        if let Some(ms) = progress_interval_ms {
493            request = request.with_progress_interval_ms(ms);
494        }
495        if let Some(max_size) = self
496            .max_decoding_message_size()
497            .map(saturating_usize_to_u32)
498        {
499            request = request.with_max_message_size_bytes(max_size);
500        }
501
502        let mut client = self.ledger_service_client();
503        let response = client.stream_checkpoints(request).await?;
504        let (stream, metadata) = MetadataEnvelope::from(response).into_parts();
505
506        Ok(MetadataEnvelope::new(
507            Box::pin(Self::reassemble_checkpoint_data_stream(stream)),
508            metadata,
509        ))
510    }
511
512    /// Reassemble a stream of checkpoint data chunks into complete checkpoints.
513    ///
514    /// The server sends checkpoint data in multiple messages:
515    /// - `Checkpoint` - Contains the checkpoint summary and contents
516    /// - `Transactions` - Contains executed transactions
517    /// - `Events` - Contains events from transactions
518    /// - `Progress` - Liveness indicator during filtered scanning
519    /// - `EndMarker` - Signals the end of one checkpoint's data
520    ///
521    /// This function buffers the chunks and yields [`CheckpointStreamItem`]
522    /// values: either complete [`CheckpointResponse`] objects when an
523    /// `EndMarker` is received, or [`CheckpointStreamItem::Progress`] when
524    /// a progress message arrives.
525    fn reassemble_checkpoint_data_stream<S, E>(
526        stream: S,
527    ) -> impl Stream<Item = Result<CheckpointStreamItem>>
528    where
529        S: Stream<
530            Item = std::result::Result<iota_grpc_types::v1::ledger_service::CheckpointData, E>,
531        >,
532        E: Into<Error>,
533    {
534        async_stream::try_stream! {
535            futures::pin_mut!(stream);
536
537            // State for accumulating checkpoint data
538            let mut current_sequence_number: Option<CheckpointSequenceNumber> = None;
539            let mut current_summary: Option<checkpoint::CheckpointSummary> = None;
540            let mut current_signature: Option<ProtoValidatorAggregatedSignature> = None;
541            let mut current_contents: Option<checkpoint::CheckpointContents> = None;
542            let mut current_transactions: Vec<ExecutedTransaction> = Vec::new();
543            let mut current_events: Vec<event::Event> = Vec::new();
544
545            while let Some(data) = stream.next().await {
546                let data = data.map_err(|e| e.into())?;
547
548                match data.payload {
549                    Some(checkpoint_data::Payload::Checkpoint(checkpoint)) => {
550                        if checkpoint.sequence_number.is_none() {
551                            Err(TryFromProtoError::missing("checkpoint.sequence_number"))?;
552                        }
553
554                        // Start of new checkpoint - throw error if previous checkpoint was incomplete
555                        if current_sequence_number.is_some() {
556                            Err(Error::Protocol("Received new chunked checkpoint header before completing previous checkpoint".into()))?;
557                        }
558                        current_sequence_number = checkpoint.sequence_number;
559
560                        // Store proto summary (optional, no deserialization)
561                        current_summary = checkpoint.summary;
562
563                        // Store proto signature (optional, no deserialization)
564                        current_signature = checkpoint.signature;
565
566                        // Store proto contents (optional, no deserialization)
567                        current_contents = checkpoint.contents;
568
569                        // Reset accumulators for new checkpoint (in case Transactions or Events
570                        // arrived between endmarker and Checkpoint)
571                        current_transactions.clear();
572                        current_events.clear();
573                    }
574
575                    Some(checkpoint_data::Payload::ExecutedTransactions(txs)) => {
576                        if current_sequence_number.is_none() {
577                            Err(Error::Protocol("Received new chunked checkpoint transactions before receiving checkpoint header".into()))?;
578                        }
579
580                        // Accumulate proto transactions (no deserialization)
581                        current_transactions.extend(txs.executed_transactions.into_iter());
582                    }
583
584                    Some(checkpoint_data::Payload::Events(events)) => {
585                        if current_sequence_number.is_none() {
586                            Err(Error::Protocol("Received new chunked checkpoint events before receiving checkpoint header".into()))?;
587                        }
588
589                        // Accumulate proto events (no deserialization)
590                        current_events.extend(events.events);
591                    }
592
593                    Some(checkpoint_data::Payload::EndMarker(marker)) => {
594                        // End of current checkpoint - assemble the result and yield it
595                         let sequence_number = current_sequence_number
596                        .take()
597                        .ok_or_else(|| -> Error { Error::Protocol("Received checkpoint end marker before receiving checkpoint header".into()) })?;
598
599                        let marker_sequence_number = marker.sequence_number
600                        .ok_or_else(|| -> Error { TryFromProtoError::missing("end_marker.sequence_number").into() })?;
601
602                        if marker_sequence_number != sequence_number {
603                            Err(Error::Protocol(format!(
604                                "EndMarker sequence_number {marker_sequence_number} does not match current checkpoint sequence_number {sequence_number:?}",
605                            )))?;
606                        }
607
608                        let response = CheckpointResponse {
609                            sequence_number,
610                            summary: current_summary.take(),
611                            signature: current_signature.take(),
612                            contents: current_contents.take(),
613                            executed_transactions: std::mem::take(&mut current_transactions),
614                            events: std::mem::take(&mut current_events),
615                        };
616
617                        yield CheckpointStreamItem::Checkpoint(Box::new(response));
618                    }
619
620                    Some(checkpoint_data::Payload::Progress(progress)) => {
621                        yield CheckpointStreamItem::Progress {
622                            latest_scanned_sequence_number: progress.latest_scanned_sequence_number,
623                        };
624                    }
625
626                    None => {
627                        // Empty payload - skip
628                        continue;
629                    }
630
631                    Some(_) => {
632                        // Unknown payload type
633                        Err(Error::Protocol("Received unknown checkpoint data payload type".into()))?;
634                    }
635                }
636            }
637
638            // Check if stream ended with incomplete checkpoint data
639            if let Some(sequence_number) = current_sequence_number {
640                Err(Error::Protocol(format!(
641                    "Stream ended with incomplete checkpoint data for sequence number {sequence_number}"
642                )))?;
643            }
644        }
645    }
646}