iota_grpc_api/
checkpoint_service.rs

1// Copyright (c) 2025 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{pin::Pin, sync::Arc};
5
6use tokio_util::sync::CancellationToken;
7use tonic::{Request, Response, Status};
8use tracing::{debug, info};
9
10use crate::{
11    checkpoint::{
12        CheckpointSequenceNumberResponse, CheckpointStreamRequest, EpochRequest,
13        checkpoint_service_server::CheckpointService,
14    },
15    types::*,
16};
17
18pub struct CheckpointGrpcService {
19    pub reader: Arc<GrpcReader>,
20    pub checkpoint_summary_broadcaster: GrpcCheckpointSummaryBroadcaster,
21    pub checkpoint_data_broadcaster: GrpcCheckpointDataBroadcaster,
22    pub cancellation_token: CancellationToken,
23}
24
25impl CheckpointGrpcService {
26    /// Create a new CheckpointGrpcService with a GrpcReader
27    pub fn new(
28        reader: Arc<GrpcReader>,
29        checkpoint_summary_broadcaster: GrpcCheckpointSummaryBroadcaster,
30        checkpoint_data_broadcaster: GrpcCheckpointDataBroadcaster,
31        cancellation_token: CancellationToken,
32    ) -> Self {
33        Self {
34            reader,
35            checkpoint_summary_broadcaster,
36            checkpoint_data_broadcaster,
37            cancellation_token,
38        }
39    }
40}
41
42impl CheckpointGrpcService {
43    fn stream_checkpoint_data(
44        &self,
45        start_sequence_number: Option<u64>,
46        end_sequence_number: Option<u64>,
47    ) -> impl futures::Stream<Item = CheckpointStreamResult> + Send {
48        let rx = self.checkpoint_data_broadcaster.subscribe();
49        self.reader.create_checkpoint_data_stream(
50            rx,
51            start_sequence_number,
52            end_sequence_number,
53            self.cancellation_token.clone(),
54        )
55    }
56
57    fn stream_checkpoint_summary(
58        &self,
59        start_sequence_number: Option<u64>,
60        end_sequence_number: Option<u64>,
61    ) -> impl futures::Stream<Item = CheckpointStreamResult> + Send {
62        let rx = self.checkpoint_summary_broadcaster.subscribe();
63        self.reader.create_checkpoint_summary_stream(
64            rx,
65            start_sequence_number,
66            end_sequence_number,
67            self.cancellation_token.clone(),
68        )
69    }
70}
71
72// The `CheckpointService` is the auto-generated trait from the protobuf
73// definition. It's generated by tonic/protobuf and defines the interface that
74// any gRPC checkpoint service must implement.
75#[tonic::async_trait]
76impl CheckpointService for CheckpointGrpcService {
77    type StreamCheckpointsStream =
78        Pin<Box<dyn futures::Stream<Item = Result<crate::checkpoint::Checkpoint, Status>> + Send>>;
79
80    async fn stream_checkpoints(
81        &self,
82        request: Request<CheckpointStreamRequest>,
83    ) -> Result<Response<Self::StreamCheckpointsStream>, Status> {
84        let req = request.into_inner();
85        let start_sequence_number = req.start_sequence_number;
86        let end_sequence_number = req.end_sequence_number;
87        let full = req.full;
88
89        let stream: Self::StreamCheckpointsStream = if full {
90            Box::pin(self.stream_checkpoint_data(start_sequence_number, end_sequence_number))
91        } else {
92            Box::pin(self.stream_checkpoint_summary(start_sequence_number, end_sequence_number))
93        };
94        Ok(Response::new(stream))
95    }
96
97    async fn get_epoch_first_checkpoint_sequence_number(
98        &self,
99        request: Request<EpochRequest>,
100    ) -> Result<Response<CheckpointSequenceNumberResponse>, Status> {
101        let epoch = request.into_inner().epoch;
102        debug!(
103            "get_epoch_first_checkpoint_sequence_number called for epoch {}",
104            epoch
105        );
106
107        let sequence_number = if epoch == 0 {
108            // Genesis epoch starts at checkpoint 0
109            0
110        } else {
111            // Get the last checkpoint of the previous epoch
112            match self.reader.get_epoch_last_checkpoint(epoch - 1) {
113                Ok(Some(last_checkpoint)) => {
114                    // First checkpoint of current epoch is the next one
115                    *last_checkpoint.sequence_number() + 1
116                }
117                Ok(None) => {
118                    return Err(Status::not_found(format!(
119                        "No checkpoints found for previous epoch {}",
120                        epoch - 1
121                    )));
122                }
123                Err(e) => {
124                    return Err(Status::internal(format!(
125                        "Failed to get last checkpoint for epoch {}: {}",
126                        epoch - 1,
127                        e
128                    )));
129                }
130            }
131        };
132
133        info!(
134            "First checkpoint for epoch {}: seq={}",
135            epoch, sequence_number
136        );
137
138        Ok(Response::new(CheckpointSequenceNumberResponse {
139            sequence_number,
140        }))
141    }
142}