1use std::sync::Arc;
5
6use anyhow::Result;
7use iota_grpc_types::{
8 CertifiedCheckpointSummary as GrpcCertifiedCheckpointSummary,
9 CheckpointData as GrpcCheckpointData,
10};
11use iota_json_rpc_types::{EventFilter, IotaEvent};
12use iota_types::{
13 full_checkpoint_content::CheckpointData,
14 messages_checkpoint::CertifiedCheckpointSummary,
15 storage::{RestStateReader, error::Kind},
16};
17use serde::{Deserialize, Serialize};
18use tokio::sync::broadcast::{Receiver, Sender, error::RecvError};
19use tokio_util::sync::CancellationToken;
20use tonic::Status;
21use tracing::debug;
22
23use crate::{checkpoint::Checkpoint, common::BcsData};
24
25pub trait CheckpointSummaryBroadcaster {
27 fn send(&self, summary: &CertifiedCheckpointSummary) -> anyhow::Result<()>;
28}
29
30pub trait CheckpointDataBroadcaster {
32 fn send(&self, data: &CheckpointData) -> anyhow::Result<()>;
33}
34
35pub trait EventSubscriber: Send + Sync {
37 fn subscribe_events(
39 &self,
40 filter: iota_json_rpc_types::EventFilter,
41 ) -> Box<dyn futures::Stream<Item = IotaEvent> + Send + Unpin>;
42}
43
44#[derive(Clone)]
47pub struct GrpcCheckpointSummaryBroadcaster {
48 sender: Sender<Arc<GrpcCertifiedCheckpointSummary>>,
49}
50
51impl GrpcCheckpointSummaryBroadcaster {
52 pub fn new(sender: Sender<Arc<GrpcCertifiedCheckpointSummary>>) -> Self {
53 Self { sender }
54 }
55
56 pub fn subscribe(&self) -> Receiver<Arc<GrpcCertifiedCheckpointSummary>> {
58 self.sender.subscribe()
59 }
60
61 pub fn receiver_count(&self) -> usize {
63 self.sender.receiver_count()
64 }
65
66 pub fn send_traced(&self, summary: &CertifiedCheckpointSummary) {
68 match self.send(summary) {
69 Ok(()) => {
70 debug!(
71 "Sent checkpoint summary #{} to {} gRPC subscriber(s)",
72 *summary.data().sequence_number(),
73 self.receiver_count()
74 );
75 }
76 Err(_) => {
77 debug!(
78 "No gRPC clients subscribed for checkpoint summary #{}",
79 *summary.data().sequence_number()
80 );
81 }
82 }
83 }
84}
85
86impl CheckpointSummaryBroadcaster for GrpcCheckpointSummaryBroadcaster {
87 fn send(&self, summary: &CertifiedCheckpointSummary) -> anyhow::Result<()> {
88 let grpc_summary = Arc::new(GrpcCertifiedCheckpointSummary::from(summary.clone()));
89 self.sender.send(grpc_summary)?;
90 Ok(())
91 }
92}
93
94#[derive(Clone)]
96pub struct GrpcCheckpointDataBroadcaster {
97 sender: Sender<Arc<GrpcCheckpointData>>,
98}
99
100impl GrpcCheckpointDataBroadcaster {
101 pub fn new(sender: Sender<Arc<GrpcCheckpointData>>) -> Self {
102 Self { sender }
103 }
104
105 pub fn subscribe(&self) -> Receiver<Arc<GrpcCheckpointData>> {
107 self.sender.subscribe()
108 }
109
110 pub fn receiver_count(&self) -> usize {
112 self.sender.receiver_count()
113 }
114
115 pub fn send_traced(&self, data: &CheckpointData) {
117 match self.send(data) {
118 Ok(()) => {
119 debug!(
120 "Sent checkpoint data #{} to {} gRPC subscriber(s)",
121 data.checkpoint_summary.data().sequence_number,
122 self.receiver_count()
123 );
124 }
125 Err(_) => {
126 debug!(
127 "No gRPC clients subscribed for checkpoint data #{}",
128 data.checkpoint_summary.data().sequence_number
129 );
130 }
131 }
132 }
133}
134
135impl CheckpointDataBroadcaster for GrpcCheckpointDataBroadcaster {
136 fn send(&self, data: &CheckpointData) -> anyhow::Result<()> {
137 let grpc_data = Arc::new(GrpcCheckpointData::from(data.clone()));
138 self.sender.send(grpc_data)?;
139 Ok(())
140 }
141}
142
143impl CheckpointSummaryBroadcaster for Sender<Arc<CertifiedCheckpointSummary>> {
147 fn send(&self, summary: &CertifiedCheckpointSummary) -> anyhow::Result<()> {
148 self.send(Arc::new(summary.clone()))?;
149 Ok(())
150 }
151}
152
153impl CheckpointDataBroadcaster for Sender<Arc<CheckpointData>> {
155 fn send(&self, data: &CheckpointData) -> anyhow::Result<()> {
156 self.send(Arc::new(data.clone()))?;
157 Ok(())
158 }
159}
160
161impl CheckpointSummaryBroadcaster for () {
164 fn send(&self, _summary: &CertifiedCheckpointSummary) -> anyhow::Result<()> {
165 Ok(())
166 }
167}
168
169impl CheckpointDataBroadcaster for () {
172 fn send(&self, _data: &CheckpointData) -> anyhow::Result<()> {
173 Ok(())
174 }
175}
176
177impl EventSubscriber for () {
180 fn subscribe_events(
181 &self,
182 _filter: EventFilter,
183 ) -> Box<dyn futures::Stream<Item = IotaEvent> + Send + Unpin> {
184 Box::new(Box::pin(futures::stream::empty()))
185 }
186}
187
188impl BcsData {
189 pub fn serialize_from<T>(data: &T) -> Result<Self, bcs::Error>
190 where
191 T: Serialize,
192 {
193 let serialized = bcs::to_bytes(data)?;
194 Ok(BcsData { data: serialized })
195 }
196
197 pub fn deserialize_into<T>(&self) -> Result<T, bcs::Error>
198 where
199 T: for<'de> Deserialize<'de>,
200 {
201 bcs::from_bytes(&self.data)
202 }
203}
204
205pub type CheckpointStreamResult = Result<Checkpoint, Status>;
207
208pub trait GrpcStateReader: Send + Sync + 'static {
215 fn get_latest_checkpoint_sequence_number(&self) -> Option<u64>;
217
218 fn get_checkpoint_summary(&self, seq: u64) -> Option<CertifiedCheckpointSummary>;
220
221 fn get_checkpoint_data(&self, seq: u64) -> Option<CheckpointData>;
223
224 fn get_epoch_last_checkpoint(
226 &self,
227 epoch: u64,
228 ) -> anyhow::Result<Option<CertifiedCheckpointSummary>>;
229}
230
231pub struct RestStateReaderAdapter {
233 inner: Arc<dyn RestStateReader>,
234}
235
236impl GrpcStateReader for RestStateReaderAdapter {
237 fn get_latest_checkpoint_sequence_number(&self) -> Option<u64> {
238 match self.inner.try_get_latest_checkpoint() {
239 Ok(checkpoint) => Some(*checkpoint.sequence_number()),
240 Err(e) => match e.kind() {
241 Kind::Missing => None,
244 _ => panic!("Unexpected storage error: {e}"),
246 },
247 }
248 }
249
250 fn get_checkpoint_summary(&self, seq: u64) -> Option<CertifiedCheckpointSummary> {
251 self.inner
252 .get_checkpoint_by_sequence_number(seq)
253 .map(CertifiedCheckpointSummary::from)
254 }
255
256 fn get_checkpoint_data(&self, seq: u64) -> Option<CheckpointData> {
257 let summary = self.inner.get_checkpoint_by_sequence_number(seq)?;
258 let contents = self.inner.get_checkpoint_contents_by_sequence_number(seq)?;
259 Some(self.inner.get_checkpoint_data(summary, contents))
260 }
261
262 fn get_epoch_last_checkpoint(
263 &self,
264 epoch: u64,
265 ) -> anyhow::Result<Option<CertifiedCheckpointSummary>> {
266 match self.inner.get_epoch_last_checkpoint(epoch) {
267 Ok(Some(checkpoint)) => Ok(Some(CertifiedCheckpointSummary::from(checkpoint))),
268 Ok(None) => Ok(None),
269 Err(e) => Err(e.into()),
270 }
271 }
272}
273
274#[derive(Clone)]
278pub struct GrpcReader {
279 state_reader: Arc<dyn GrpcStateReader>,
280}
281
282impl GrpcReader {
283 pub fn new(state_reader: Arc<dyn GrpcStateReader>) -> Self {
284 Self { state_reader }
285 }
286
287 pub fn from_rest_state_reader(state_reader: Arc<dyn RestStateReader>) -> Self {
288 Self {
289 state_reader: Arc::new(RestStateReaderAdapter {
290 inner: state_reader,
291 }),
292 }
293 }
294
295 pub fn get_epoch_last_checkpoint(
296 &self,
297 epoch: u64,
298 ) -> anyhow::Result<Option<CertifiedCheckpointSummary>> {
299 self.state_reader.get_epoch_last_checkpoint(epoch)
300 }
301
302 fn get_full_checkpoint_data(&self, seq: u64) -> Option<CheckpointData> {
303 self.state_reader.get_checkpoint_data(seq)
304 }
305
306 pub fn get_latest_checkpoint_sequence_number(&self) -> Option<u64> {
307 self.state_reader.get_latest_checkpoint_sequence_number()
308 }
309
310 fn create_checkpoint_stream<T>(
313 &self,
314 mut rx: Receiver<Arc<T>>,
315 start_sequence_number: Option<u64>,
316 end_sequence_number: Option<u64>,
317 is_full: bool,
318 cancellation_token: CancellationToken,
319 fetch_historical: impl Fn(&Self, u64) -> Option<Arc<T>> + Send,
320 get_sequence_number: impl Fn(&Arc<T>) -> u64 + Send,
321 ) -> impl futures::Stream<Item = CheckpointStreamResult> + Send
322 where
323 T: Serialize + Send + Sync + 'static,
324 {
325 let reader = self.clone();
327 async_stream::try_stream! {
328 let data_type_name = if is_full { "data" } else { "summary" };
329 let mut latest = reader.get_latest_checkpoint_sequence_number().unwrap_or(0);
334 debug!("[profile][grpc] Latest checkpoint index: {latest}.");
335 let (mut start, end) = match (start_sequence_number, end_sequence_number) {
336 (None, None) => (latest, u64::MAX),
337 (None, Some(end)) => (end, end),
338 (Some(start), None) => (start, u64::MAX),
339 (Some(start), Some(end)) => (start, end),
340 };
341 while start <= end {
342 if start <= latest {
344 if let Some(item) = fetch_historical(&reader, start) {
345 debug!("[profile][grpc] Fetched checkpoint {data_type_name} for index {start} from DB.");
346 let sequence_number = get_sequence_number(&item);
347 let response = BcsData::serialize_from(&*item)
348 .map(|data| Checkpoint {
349 sequence_number,
350 bcs_data: Some(data),
351 is_full,
352 })
353 .map_err(|e| Status::internal(format!("BCS serialization error: {e}")))?;
354 yield response;
355 if start == end {
356 break;
357 }
358 start += 1;
359 continue;
360 } else {
361 Err(Status::internal(format!("Historical checkpoint {data_type_name} missing/pruned: index={start} latest={latest}.")))?;
362 }
363 }
364 let item_result = tokio::select! {
367 recv_result = rx.recv() => Some(recv_result),
369 _ = cancellation_token.cancelled() => {
370 debug!("[profile][grpc] Checkpoint {data_type_name} stream cancelled");
371 None
372 }
373 };
374
375 match item_result {
376 Some(Ok(item)) => {
377 debug!("[profile][grpc] Get checkpoint {data_type_name} for index {} from broadcast channel", get_sequence_number(&item));
378 let sequence_number = get_sequence_number(&item);
379 if start == sequence_number {
380 let response = BcsData::serialize_from(&*item)
381 .map(|data| Checkpoint {
382 sequence_number,
383 bcs_data: Some(data),
384 is_full,
385 })
386 .map_err(|e| Status::internal(format!("BCS serialization error: {e}")))?;
387 yield response;
388 if start == end {
389 break;
390 }
391 start += 1;
392 continue;
393 }
394 }
396 Some(Err(RecvError::Lagged(_))) => {
397 }
399 Some(Err(RecvError::Closed)) => {
400 Err(Status::internal(format!("Checkpoint {data_type_name} channel closed.")))?;
402 break;
403 }
404 None => {
405 break;
407 }
408 }
409 latest = reader.get_latest_checkpoint_sequence_number().unwrap_or(start);
410 debug!("[profile][grpc] Updating latest checkpoint index to {latest}.");
411 }
412 }
413 }
414
415 pub fn create_checkpoint_data_stream(
417 &self,
418 rx: Receiver<Arc<GrpcCheckpointData>>,
419 start_sequence_number: Option<u64>,
420 end_sequence_number: Option<u64>,
421 cancellation_token: CancellationToken,
422 ) -> impl futures::Stream<Item = CheckpointStreamResult> + Send {
423 self.create_checkpoint_stream(
424 rx,
425 start_sequence_number,
426 end_sequence_number,
427 true,
428 cancellation_token,
429 |reader, seq| {
430 reader
431 .get_full_checkpoint_data(seq)
432 .map(GrpcCheckpointData::from)
433 .map(Arc::new)
434 },
435 |item| item.sequence_number(),
436 )
437 }
438
439 pub fn create_checkpoint_summary_stream(
441 &self,
442 rx: Receiver<Arc<GrpcCertifiedCheckpointSummary>>,
443 start_sequence_number: Option<u64>,
444 end_sequence_number: Option<u64>,
445 cancellation_token: CancellationToken,
446 ) -> impl futures::Stream<Item = CheckpointStreamResult> + Send {
447 self.create_checkpoint_stream(
448 rx,
449 start_sequence_number,
450 end_sequence_number,
451 false,
452 cancellation_token,
453 |reader, seq| {
454 reader
455 .state_reader
456 .get_checkpoint_summary(seq)
457 .map(GrpcCertifiedCheckpointSummary::from)
458 .map(Arc::new)
459 },
460 |item| item.sequence_number(),
461 )
462 }
463}