iota_grpc_api/
types.rs

1// Copyright (c) 2025 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Result;
7use iota_grpc_types::{
8    CertifiedCheckpointSummary as GrpcCertifiedCheckpointSummary,
9    CheckpointData as GrpcCheckpointData,
10};
11use iota_json_rpc_types::{EventFilter, IotaEvent};
12use iota_types::{
13    full_checkpoint_content::CheckpointData,
14    messages_checkpoint::CertifiedCheckpointSummary,
15    storage::{RestStateReader, error::Kind},
16};
17use serde::{Deserialize, Serialize};
18use tokio::sync::broadcast::{Receiver, Sender, error::RecvError};
19use tokio_util::sync::CancellationToken;
20use tonic::Status;
21use tracing::debug;
22
23use crate::{checkpoint::Checkpoint, common::BcsData};
24
25/// Trait for broadcasting checkpoint summaries
26pub trait CheckpointSummaryBroadcaster {
27    fn send(&self, summary: &CertifiedCheckpointSummary) -> anyhow::Result<()>;
28}
29
30/// Trait for broadcasting checkpoint data
31pub trait CheckpointDataBroadcaster {
32    fn send(&self, data: &CheckpointData) -> anyhow::Result<()>;
33}
34
35/// Trait for subscribing to event streams (used by gRPC service)
36pub trait EventSubscriber: Send + Sync {
37    /// Subscribe to events with the given filter
38    fn subscribe_events(
39        &self,
40        filter: iota_json_rpc_types::EventFilter,
41    ) -> Box<dyn futures::Stream<Item = IotaEvent> + Send + Unpin>;
42}
43
44/// Wrapper that converts native CertifiedCheckpointSummary to gRPC type before
45/// broadcasting
46#[derive(Clone)]
47pub struct GrpcCheckpointSummaryBroadcaster {
48    sender: Sender<Arc<GrpcCertifiedCheckpointSummary>>,
49}
50
51impl GrpcCheckpointSummaryBroadcaster {
52    pub fn new(sender: Sender<Arc<GrpcCertifiedCheckpointSummary>>) -> Self {
53        Self { sender }
54    }
55
56    /// Subscribe to checkpoint summary broadcasts
57    pub fn subscribe(&self) -> Receiver<Arc<GrpcCertifiedCheckpointSummary>> {
58        self.sender.subscribe()
59    }
60
61    /// Get the number of active receivers
62    pub fn receiver_count(&self) -> usize {
63        self.sender.receiver_count()
64    }
65
66    /// Send with integrated tracing and error handling
67    pub fn send_traced(&self, summary: &CertifiedCheckpointSummary) {
68        match self.send(summary) {
69            Ok(()) => {
70                debug!(
71                    "Sent checkpoint summary #{} to {} gRPC subscriber(s)",
72                    *summary.data().sequence_number(),
73                    self.receiver_count()
74                );
75            }
76            Err(_) => {
77                debug!(
78                    "No gRPC clients subscribed for checkpoint summary #{}",
79                    *summary.data().sequence_number()
80                );
81            }
82        }
83    }
84}
85
86impl CheckpointSummaryBroadcaster for GrpcCheckpointSummaryBroadcaster {
87    fn send(&self, summary: &CertifiedCheckpointSummary) -> anyhow::Result<()> {
88        let grpc_summary = Arc::new(GrpcCertifiedCheckpointSummary::from(summary.clone()));
89        self.sender.send(grpc_summary)?;
90        Ok(())
91    }
92}
93
94/// Wrapper that converts native CheckpointData to gRPC type before broadcasting
95#[derive(Clone)]
96pub struct GrpcCheckpointDataBroadcaster {
97    sender: Sender<Arc<GrpcCheckpointData>>,
98}
99
100impl GrpcCheckpointDataBroadcaster {
101    pub fn new(sender: Sender<Arc<GrpcCheckpointData>>) -> Self {
102        Self { sender }
103    }
104
105    /// Subscribe to checkpoint data broadcasts
106    pub fn subscribe(&self) -> Receiver<Arc<GrpcCheckpointData>> {
107        self.sender.subscribe()
108    }
109
110    /// Get the number of active receivers
111    pub fn receiver_count(&self) -> usize {
112        self.sender.receiver_count()
113    }
114
115    /// Send with integrated tracing and error handling
116    pub fn send_traced(&self, data: &CheckpointData) {
117        match self.send(data) {
118            Ok(()) => {
119                debug!(
120                    "Sent checkpoint data #{} to {} gRPC subscriber(s)",
121                    data.checkpoint_summary.data().sequence_number,
122                    self.receiver_count()
123                );
124            }
125            Err(_) => {
126                debug!(
127                    "No gRPC clients subscribed for checkpoint data #{}",
128                    data.checkpoint_summary.data().sequence_number
129                );
130            }
131        }
132    }
133}
134
135impl CheckpointDataBroadcaster for GrpcCheckpointDataBroadcaster {
136    fn send(&self, data: &CheckpointData) -> anyhow::Result<()> {
137        let grpc_data = Arc::new(GrpcCheckpointData::from(data.clone()));
138        self.sender.send(grpc_data)?;
139        Ok(())
140    }
141}
142
143// Standard implementations for common types
144
145/// Implementation for tokio broadcast sender
146impl CheckpointSummaryBroadcaster for Sender<Arc<CertifiedCheckpointSummary>> {
147    fn send(&self, summary: &CertifiedCheckpointSummary) -> anyhow::Result<()> {
148        self.send(Arc::new(summary.clone()))?;
149        Ok(())
150    }
151}
152
153/// Implementation for tokio broadcast sender
154impl CheckpointDataBroadcaster for Sender<Arc<CheckpointData>> {
155    fn send(&self, data: &CheckpointData) -> anyhow::Result<()> {
156        self.send(Arc::new(data.clone()))?;
157        Ok(())
158    }
159}
160
161/// No-op implementation for unit type (used in tests and when broadcasting is
162/// disabled)
163impl CheckpointSummaryBroadcaster for () {
164    fn send(&self, _summary: &CertifiedCheckpointSummary) -> anyhow::Result<()> {
165        Ok(())
166    }
167}
168
169/// No-op implementation for unit type (used in tests and when broadcasting is
170/// disabled)
171impl CheckpointDataBroadcaster for () {
172    fn send(&self, _data: &CheckpointData) -> anyhow::Result<()> {
173        Ok(())
174    }
175}
176
177/// No-op implementation for unit type (used in tests and when event
178/// subscription is not needed)
179impl EventSubscriber for () {
180    fn subscribe_events(
181        &self,
182        _filter: EventFilter,
183    ) -> Box<dyn futures::Stream<Item = IotaEvent> + Send + Unpin> {
184        Box::new(Box::pin(futures::stream::empty()))
185    }
186}
187
188impl BcsData {
189    pub fn serialize_from<T>(data: &T) -> Result<Self, bcs::Error>
190    where
191        T: Serialize,
192    {
193        let serialized = bcs::to_bytes(data)?;
194        Ok(BcsData { data: serialized })
195    }
196
197    pub fn deserialize_into<T>(&self) -> Result<T, bcs::Error>
198    where
199        T: for<'de> Deserialize<'de>,
200    {
201        bcs::from_bytes(&self.data)
202    }
203}
204
205// Type aliases and utility types
206pub type CheckpointStreamResult = Result<Checkpoint, Status>;
207
208// Storage abstraction traits for gRPC access
209// These traits provide an abstraction layer over the storage backend,
210// making it easier to implement gRPC services with different storage types
211// (e.g., production database vs simulacrum for testing).
212
213/// Trait for reading checkpoint data from storage
214pub trait GrpcStateReader: Send + Sync + 'static {
215    /// Get the latest checkpoint sequence number
216    fn get_latest_checkpoint_sequence_number(&self) -> Option<u64>;
217
218    /// Get checkpoint summary by sequence number
219    fn get_checkpoint_summary(&self, seq: u64) -> Option<CertifiedCheckpointSummary>;
220
221    /// Get full checkpoint data by sequence number
222    fn get_checkpoint_data(&self, seq: u64) -> Option<CheckpointData>;
223
224    /// Get epoch's last checkpoint for epoch boundary calculations
225    fn get_epoch_last_checkpoint(
226        &self,
227        epoch: u64,
228    ) -> anyhow::Result<Option<CertifiedCheckpointSummary>>;
229}
230
231/// Adapter that implements GrpcStateReader for RestStateReader
232pub struct RestStateReaderAdapter {
233    inner: Arc<dyn RestStateReader>,
234}
235
236impl GrpcStateReader for RestStateReaderAdapter {
237    fn get_latest_checkpoint_sequence_number(&self) -> Option<u64> {
238        match self.inner.try_get_latest_checkpoint() {
239            Ok(checkpoint) => Some(*checkpoint.sequence_number()),
240            Err(e) => match e.kind() {
241                // Expected during server initialization when no checkpoints have been executed yet
242                // Return None to indicate service is not ready rather than panicking
243                Kind::Missing => None,
244                // Unexpected storage errors
245                _ => panic!("Unexpected storage error: {e}"),
246            },
247        }
248    }
249
250    fn get_checkpoint_summary(&self, seq: u64) -> Option<CertifiedCheckpointSummary> {
251        self.inner
252            .get_checkpoint_by_sequence_number(seq)
253            .map(CertifiedCheckpointSummary::from)
254    }
255
256    fn get_checkpoint_data(&self, seq: u64) -> Option<CheckpointData> {
257        let summary = self.inner.get_checkpoint_by_sequence_number(seq)?;
258        let contents = self.inner.get_checkpoint_contents_by_sequence_number(seq)?;
259        Some(self.inner.get_checkpoint_data(summary, contents))
260    }
261
262    fn get_epoch_last_checkpoint(
263        &self,
264        epoch: u64,
265    ) -> anyhow::Result<Option<CertifiedCheckpointSummary>> {
266        match self.inner.get_epoch_last_checkpoint(epoch) {
267            Ok(Some(checkpoint)) => Ok(Some(CertifiedCheckpointSummary::from(checkpoint))),
268            Ok(None) => Ok(None),
269            Err(e) => Err(e.into()),
270        }
271    }
272}
273
274/// Central gRPC data reader that provides unified access to checkpoint data.
275/// It provides methods for streaming both full checkpoint data and checkpoint
276/// summaries.
277#[derive(Clone)]
278pub struct GrpcReader {
279    state_reader: Arc<dyn GrpcStateReader>,
280}
281
282impl GrpcReader {
283    pub fn new(state_reader: Arc<dyn GrpcStateReader>) -> Self {
284        Self { state_reader }
285    }
286
287    pub fn from_rest_state_reader(state_reader: Arc<dyn RestStateReader>) -> Self {
288        Self {
289            state_reader: Arc::new(RestStateReaderAdapter {
290                inner: state_reader,
291            }),
292        }
293    }
294
295    pub fn get_epoch_last_checkpoint(
296        &self,
297        epoch: u64,
298    ) -> anyhow::Result<Option<CertifiedCheckpointSummary>> {
299        self.state_reader.get_epoch_last_checkpoint(epoch)
300    }
301
302    fn get_full_checkpoint_data(&self, seq: u64) -> Option<CheckpointData> {
303        self.state_reader.get_checkpoint_data(seq)
304    }
305
306    pub fn get_latest_checkpoint_sequence_number(&self) -> Option<u64> {
307        self.state_reader.get_latest_checkpoint_sequence_number()
308    }
309
310    /// Generic checkpoint streaming implementation that works with checkpoint
311    /// data and summaries.
312    fn create_checkpoint_stream<T>(
313        &self,
314        mut rx: Receiver<Arc<T>>,
315        start_sequence_number: Option<u64>,
316        end_sequence_number: Option<u64>,
317        is_full: bool,
318        cancellation_token: CancellationToken,
319        fetch_historical: impl Fn(&Self, u64) -> Option<Arc<T>> + Send,
320        get_sequence_number: impl Fn(&Arc<T>) -> u64 + Send,
321    ) -> impl futures::Stream<Item = CheckpointStreamResult> + Send
322    where
323        T: Serialize + Send + Sync + 'static,
324    {
325        // Clone self to avoid lifetime issues with the async stream
326        let reader = self.clone();
327        async_stream::try_stream! {
328            let data_type_name = if is_full { "data" } else { "summary" };
329            // Link to issue (https://github.com/iotaledger/iota/issues/7943)
330            // TODO: Modify the latest checkpoint to start from 1.
331            // Note that we do not stream the Genesis checkpoint because its size
332            // can be very big. The genesis checkpoint should be imported directly.
333            let mut latest = reader.get_latest_checkpoint_sequence_number().unwrap_or(0);
334            debug!("[profile][grpc] Latest checkpoint index: {latest}.");
335            let (mut start, end) = match (start_sequence_number, end_sequence_number) {
336                (None, None) => (latest, u64::MAX),
337                (None, Some(end)) => (end, end),
338                (Some(start), None) => (start, u64::MAX),
339                (Some(start), Some(end)) => (start, end),
340            };
341            while start <= end {
342                // try fetching historical data from the DB first
343                if start <= latest {
344                    if let Some(item) = fetch_historical(&reader, start) {
345                        debug!("[profile][grpc] Fetched checkpoint {data_type_name} for index {start} from DB.");
346                        let sequence_number = get_sequence_number(&item);
347                        let response = BcsData::serialize_from(&*item)
348                            .map(|data| Checkpoint {
349                                sequence_number,
350                                bcs_data: Some(data),
351                                is_full,
352                            })
353                            .map_err(|e| Status::internal(format!("BCS serialization error: {e}")))?;
354                        yield response;
355                        if start == end {
356                            break;
357                        }
358                        start += 1;
359                        continue;
360                    } else {
361                        Err(Status::internal(format!("Historical checkpoint {data_type_name} missing/pruned: index={start} latest={latest}.")))?;
362                    }
363                }
364                // latest < start, live phase
365                // wait for broadcast or cancellation
366                let item_result = tokio::select! {
367                    // note: tokio::select! cannot return results, so we put the match logic after the select
368                    recv_result = rx.recv() => Some(recv_result),
369                    _ = cancellation_token.cancelled() => {
370                        debug!("[profile][grpc] Checkpoint {data_type_name} stream cancelled");
371                        None
372                    }
373                };
374
375                match item_result {
376                    Some(Ok(item)) => {
377                        debug!("[profile][grpc] Get checkpoint {data_type_name} for index {} from broadcast channel", get_sequence_number(&item));
378                        let sequence_number = get_sequence_number(&item);
379                        if start == sequence_number {
380                            let response = BcsData::serialize_from(&*item)
381                                .map(|data| Checkpoint {
382                                    sequence_number,
383                                    bcs_data: Some(data),
384                                    is_full,
385                                })
386                                .map_err(|e| Status::internal(format!("BCS serialization error: {e}")))?;
387                            yield response;
388                            if start == end {
389                                break;
390                            }
391                            start += 1;
392                            continue;
393                        }
394                        // else item sequence doesn't match, drop it and continue
395                    }
396                    Some(Err(RecvError::Lagged(_))) => {
397                        // continue, lagged item should be picked up from history DB
398                    }
399                    Some(Err(RecvError::Closed)) => {
400                        // report internal error to the stream and break
401                        Err(Status::internal(format!("Checkpoint {data_type_name} channel closed.")))?;
402                        break;
403                    }
404                    None => {
405                        // Cancellation was triggered
406                        break;
407                    }
408                }
409                latest = reader.get_latest_checkpoint_sequence_number().unwrap_or(start);
410                debug!("[profile][grpc] Updating latest checkpoint index to {latest}.");
411            }
412        }
413    }
414
415    /// Create a checkpoint stream for full checkpoint data
416    pub fn create_checkpoint_data_stream(
417        &self,
418        rx: Receiver<Arc<GrpcCheckpointData>>,
419        start_sequence_number: Option<u64>,
420        end_sequence_number: Option<u64>,
421        cancellation_token: CancellationToken,
422    ) -> impl futures::Stream<Item = CheckpointStreamResult> + Send {
423        self.create_checkpoint_stream(
424            rx,
425            start_sequence_number,
426            end_sequence_number,
427            true,
428            cancellation_token,
429            |reader, seq| {
430                reader
431                    .get_full_checkpoint_data(seq)
432                    .map(GrpcCheckpointData::from)
433                    .map(Arc::new)
434            },
435            |item| item.sequence_number(),
436        )
437    }
438
439    /// Create a checkpoint stream for checkpoint summaries
440    pub fn create_checkpoint_summary_stream(
441        &self,
442        rx: Receiver<Arc<GrpcCertifiedCheckpointSummary>>,
443        start_sequence_number: Option<u64>,
444        end_sequence_number: Option<u64>,
445        cancellation_token: CancellationToken,
446    ) -> impl futures::Stream<Item = CheckpointStreamResult> + Send {
447        self.create_checkpoint_stream(
448            rx,
449            start_sequence_number,
450            end_sequence_number,
451            false,
452            cancellation_token,
453            |reader, seq| {
454                reader
455                    .state_reader
456                    .get_checkpoint_summary(seq)
457                    .map(GrpcCertifiedCheckpointSummary::from)
458                    .map(Arc::new)
459            },
460            |item| item.sequence_number(),
461        )
462    }
463}