iota_grpc_api/
checkpoint_service.rs1use std::{pin::Pin, sync::Arc};
5
6use tokio_util::sync::CancellationToken;
7use tonic::{Request, Response, Status};
8use tracing::{debug, info};
9
10use crate::{
11 checkpoint::{
12 CheckpointSequenceNumberResponse, CheckpointStreamRequest, EpochRequest,
13 checkpoint_service_server::CheckpointService,
14 },
15 types::*,
16};
17
18pub struct CheckpointGrpcService {
19 pub reader: Arc<GrpcReader>,
20 pub checkpoint_summary_broadcaster: GrpcCheckpointSummaryBroadcaster,
21 pub checkpoint_data_broadcaster: GrpcCheckpointDataBroadcaster,
22 pub cancellation_token: CancellationToken,
23}
24
25impl CheckpointGrpcService {
26 pub fn new(
28 reader: Arc<GrpcReader>,
29 checkpoint_summary_broadcaster: GrpcCheckpointSummaryBroadcaster,
30 checkpoint_data_broadcaster: GrpcCheckpointDataBroadcaster,
31 cancellation_token: CancellationToken,
32 ) -> Self {
33 Self {
34 reader,
35 checkpoint_summary_broadcaster,
36 checkpoint_data_broadcaster,
37 cancellation_token,
38 }
39 }
40}
41
42impl CheckpointGrpcService {
43 fn stream_checkpoint_data(
44 &self,
45 start_sequence_number: Option<u64>,
46 end_sequence_number: Option<u64>,
47 ) -> impl futures::Stream<Item = CheckpointStreamResult> + Send {
48 let rx = self.checkpoint_data_broadcaster.subscribe();
49 self.reader.create_checkpoint_data_stream(
50 rx,
51 start_sequence_number,
52 end_sequence_number,
53 self.cancellation_token.clone(),
54 )
55 }
56
57 fn stream_checkpoint_summary(
58 &self,
59 start_sequence_number: Option<u64>,
60 end_sequence_number: Option<u64>,
61 ) -> impl futures::Stream<Item = CheckpointStreamResult> + Send {
62 let rx = self.checkpoint_summary_broadcaster.subscribe();
63 self.reader.create_checkpoint_summary_stream(
64 rx,
65 start_sequence_number,
66 end_sequence_number,
67 self.cancellation_token.clone(),
68 )
69 }
70}
71
72#[tonic::async_trait]
76impl CheckpointService for CheckpointGrpcService {
77 type StreamCheckpointsStream =
78 Pin<Box<dyn futures::Stream<Item = Result<crate::checkpoint::Checkpoint, Status>> + Send>>;
79
80 async fn stream_checkpoints(
81 &self,
82 request: Request<CheckpointStreamRequest>,
83 ) -> Result<Response<Self::StreamCheckpointsStream>, Status> {
84 let req = request.into_inner();
85 let start_sequence_number = req.start_sequence_number;
86 let end_sequence_number = req.end_sequence_number;
87 let full = req.full;
88
89 let stream: Self::StreamCheckpointsStream = if full {
90 Box::pin(self.stream_checkpoint_data(start_sequence_number, end_sequence_number))
91 } else {
92 Box::pin(self.stream_checkpoint_summary(start_sequence_number, end_sequence_number))
93 };
94 Ok(Response::new(stream))
95 }
96
97 async fn get_epoch_first_checkpoint_sequence_number(
98 &self,
99 request: Request<EpochRequest>,
100 ) -> Result<Response<CheckpointSequenceNumberResponse>, Status> {
101 let epoch = request.into_inner().epoch;
102 debug!(
103 "get_epoch_first_checkpoint_sequence_number called for epoch {}",
104 epoch
105 );
106
107 let sequence_number = if epoch == 0 {
108 0
110 } else {
111 match self.reader.get_epoch_last_checkpoint(epoch - 1) {
113 Ok(Some(last_checkpoint)) => {
114 *last_checkpoint.sequence_number() + 1
116 }
117 Ok(None) => {
118 return Err(Status::not_found(format!(
119 "No checkpoints found for previous epoch {}",
120 epoch - 1
121 )));
122 }
123 Err(e) => {
124 return Err(Status::internal(format!(
125 "Failed to get last checkpoint for epoch {}: {}",
126 epoch - 1,
127 e
128 )));
129 }
130 }
131 };
132
133 info!(
134 "First checkpoint for epoch {}: seq={}",
135 epoch, sequence_number
136 );
137
138 Ok(Response::new(CheckpointSequenceNumberResponse {
139 sequence_number,
140 }))
141 }
142}