iota_grpc_api/
event_service.rs

1// Copyright (c) 2025 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{str::FromStr, sync::Arc};
5
6use futures::StreamExt;
7use iota_json_rpc_types::{EventFilter, IotaEvent};
8use iota_types::{
9    base_types::{IotaAddress, ObjectID},
10    digests::TransactionDigest,
11};
12use move_core_types::{identifier::Identifier, language_storage::StructTag};
13use tokio_util::sync::CancellationToken;
14use tonic::{Request, Response, Status};
15use tracing::debug;
16
17use crate::{
18    events::{Event, EventId, EventStreamRequest, event_service_server::EventService},
19    types::EventSubscriber,
20};
21
22pub struct EventGrpcService {
23    pub event_subscriber: Arc<dyn EventSubscriber>,
24    pub cancellation_token: CancellationToken,
25}
26
27impl EventGrpcService {
28    pub fn new(
29        event_subscriber: Arc<dyn EventSubscriber>,
30        cancellation_token: CancellationToken,
31    ) -> Self {
32        Self {
33            event_subscriber,
34            cancellation_token,
35        }
36    }
37}
38
39// The `EventService` is the auto-generated trait from the protobuf
40// definition. It's generated by tonic/protobuf and defines the interface that
41// any gRPC event service must implement.
42#[tonic::async_trait]
43impl EventService for EventGrpcService {
44    type StreamEventsStream =
45        std::pin::Pin<Box<dyn futures::Stream<Item = Result<Event, Status>> + Send>>;
46
47    async fn stream_events(
48        &self,
49        request: Request<EventStreamRequest>,
50    ) -> Result<Response<Self::StreamEventsStream>, Status> {
51        let proto_filter = request
52            .into_inner()
53            .filter
54            .ok_or_else(|| Status::invalid_argument("Filter is required"))?;
55
56        let event_filter = create_event_filter(&proto_filter)?;
57        debug!("New gRPC client subscribed with filter: {event_filter:?}");
58
59        // Subscribe to events using the EventSubscriber trait
60        let event_stream = self.event_subscriber.subscribe_events(event_filter);
61        let cancellation_token = self.cancellation_token.clone();
62
63        let stream = async_stream::try_stream! {
64            // Pin the stream for use with .next() and tokio::select!
65            // Safe because the stream has Unpin bound
66            let mut pinned_stream = std::pin::Pin::new(event_stream);
67
68            loop {
69                let event_result = tokio::select! {
70                    event_option = pinned_stream.next() => event_option,
71                    _ = cancellation_token.cancelled() => {
72                        debug!("Event stream cancelled");
73                        None
74                    }
75                };
76
77                match event_result {
78                    Some(event) => {
79                        debug!(
80                            "Event matched filter: TX: {}, Type: {}, Sender: {}",
81                            event.id.tx_digest,
82                            event.type_.name.as_ident_str(),
83                            event.sender
84                        );
85
86                        // Convert to protobuf Event
87                        let proto_event = Event::from(&event);
88
89                        yield proto_event;
90                    }
91                    None => {
92                        // Stream ended or cancellation occurred
93                        debug!("Event stream ended");
94                        break;
95                    }
96                }
97            }
98        };
99
100        Ok(Response::new(Box::pin(stream)))
101    }
102}
103
104/// Convert protobuf EventFilter to iota_json_rpc_types::EventFilter
105fn create_event_filter(proto_filter: &crate::events::EventFilter) -> Result<EventFilter, Status> {
106    use crate::events::event_filter::Filter;
107
108    match &proto_filter.filter {
109        Some(Filter::All(_)) => Ok(EventFilter::All(vec![])),
110        Some(Filter::Sender(f)) => {
111            let sender = parse_iota_address(&f.sender, "Sender address")?;
112            Ok(EventFilter::Sender(sender))
113        }
114        Some(Filter::Transaction(f)) => {
115            let tx_digest = parse_tx_digest(&f.tx_digest, "Transaction digest")?;
116            Ok(EventFilter::Transaction(tx_digest))
117        }
118        Some(Filter::MoveModule(f)) => {
119            let package_id = parse_object_id(&f.package_id, "Package ID")?;
120            Ok(EventFilter::MoveModule {
121                package: package_id,
122                module: parse_identifier(&f.module, "module name")?,
123            })
124        }
125        Some(Filter::MoveEventType(f)) => {
126            let object_id = parse_object_id(&f.package_id, "Package ID")?;
127            let struct_tag = StructTag {
128                address: *object_id,
129                module: parse_identifier(&f.module, "module name")?,
130                name: parse_identifier(&f.name, "event name")?,
131                type_params: vec![],
132            };
133            Ok(EventFilter::MoveEventType(struct_tag))
134        }
135        Some(Filter::MoveEventModule(f)) => {
136            let package_id = parse_object_id(&f.package_id, "Package ID")?;
137            Ok(EventFilter::MoveEventModule {
138                package: package_id,
139                module: parse_identifier(&f.module, "module name")?,
140            })
141        }
142        Some(Filter::TimeRange(f)) => Ok(EventFilter::TimeRange {
143            start_time: f.start_time,
144            end_time: f.end_time,
145        }),
146        None => Ok(EventFilter::All(vec![])),
147    }
148}
149
150// Helper functions to reduce repetition and improve error messages
151fn parse_object_id(
152    address: &Option<crate::common::Address>,
153    field_name: &str,
154) -> Result<ObjectID, Status> {
155    let address = address
156        .as_ref()
157        .ok_or_else(|| Status::invalid_argument(format!("{field_name} is required")))?;
158    ObjectID::from_bytes(&address.address)
159        .map_err(|e| Status::invalid_argument(format!("Invalid {field_name}: {e}")))
160}
161
162fn parse_identifier(id_str: &str, field_name: &str) -> Result<Identifier, Status> {
163    Identifier::from_str(id_str)
164        .map_err(|e| Status::invalid_argument(format!("Invalid {field_name} '{id_str}': {e}")))
165}
166
167fn parse_iota_address(
168    address: &Option<crate::common::Address>,
169    field_name: &str,
170) -> Result<IotaAddress, Status> {
171    let address = address
172        .as_ref()
173        .ok_or_else(|| Status::invalid_argument(format!("{field_name} is required")))?;
174    IotaAddress::from_bytes(&address.address)
175        .map_err(|e| Status::invalid_argument(format!("Invalid {field_name}: {e}")))
176}
177
178fn parse_tx_digest(
179    digest: &Option<crate::common::TransactionDigest>,
180    field_name: &str,
181) -> Result<TransactionDigest, Status> {
182    let digest = digest
183        .as_ref()
184        .ok_or_else(|| Status::invalid_argument(format!("{field_name} is required")))?;
185    TransactionDigest::try_from(digest.digest.as_slice())
186        .map_err(|e| Status::invalid_argument(format!("Invalid {field_name}: {e}")))
187}
188
189// Convert IotaEvent to protobuf Event
190impl From<&IotaEvent> for Event {
191    fn from(event: &IotaEvent) -> Self {
192        Event {
193            event_id: Some(EventId {
194                event_seq: event.id.event_seq,
195                tx_digest: Some(crate::common::TransactionDigest {
196                    digest: event.id.tx_digest.into_inner().to_vec(),
197                }),
198            }),
199            package_id: Some(crate::common::Address {
200                address: event.package_id.to_vec(),
201            }),
202            transaction_module: event.transaction_module.to_string(),
203            sender: Some(crate::common::Address {
204                address: event.sender.to_vec(),
205            }),
206            type_name: event.type_.to_string(),
207            parsed_json: event.parsed_json.to_string(),
208            timestamp_ms: event.timestamp_ms,
209            event_data: Some(crate::common::BcsData {
210                data: event.bcs.bytes().to_vec(),
211            }),
212        }
213    }
214}