iota_grpc_api/
event_service.rs1use std::{str::FromStr, sync::Arc};
5
6use futures::StreamExt;
7use iota_json_rpc_types::{EventFilter, IotaEvent};
8use iota_types::{
9 base_types::{IotaAddress, ObjectID},
10 digests::TransactionDigest,
11};
12use move_core_types::{identifier::Identifier, language_storage::StructTag};
13use tokio_util::sync::CancellationToken;
14use tonic::{Request, Response, Status};
15use tracing::debug;
16
17use crate::{
18 events::{Event, EventId, EventStreamRequest, event_service_server::EventService},
19 types::EventSubscriber,
20};
21
22pub struct EventGrpcService {
23 pub event_subscriber: Arc<dyn EventSubscriber>,
24 pub cancellation_token: CancellationToken,
25}
26
27impl EventGrpcService {
28 pub fn new(
29 event_subscriber: Arc<dyn EventSubscriber>,
30 cancellation_token: CancellationToken,
31 ) -> Self {
32 Self {
33 event_subscriber,
34 cancellation_token,
35 }
36 }
37}
38
39#[tonic::async_trait]
43impl EventService for EventGrpcService {
44 type StreamEventsStream =
45 std::pin::Pin<Box<dyn futures::Stream<Item = Result<Event, Status>> + Send>>;
46
47 async fn stream_events(
48 &self,
49 request: Request<EventStreamRequest>,
50 ) -> Result<Response<Self::StreamEventsStream>, Status> {
51 let proto_filter = request
52 .into_inner()
53 .filter
54 .ok_or_else(|| Status::invalid_argument("Filter is required"))?;
55
56 let event_filter = create_event_filter(&proto_filter)?;
57 debug!("New gRPC client subscribed with filter: {event_filter:?}");
58
59 let event_stream = self.event_subscriber.subscribe_events(event_filter);
61 let cancellation_token = self.cancellation_token.clone();
62
63 let stream = async_stream::try_stream! {
64 let mut pinned_stream = std::pin::Pin::new(event_stream);
67
68 loop {
69 let event_result = tokio::select! {
70 event_option = pinned_stream.next() => event_option,
71 _ = cancellation_token.cancelled() => {
72 debug!("Event stream cancelled");
73 None
74 }
75 };
76
77 match event_result {
78 Some(event) => {
79 debug!(
80 "Event matched filter: TX: {}, Type: {}, Sender: {}",
81 event.id.tx_digest,
82 event.type_.name.as_ident_str(),
83 event.sender
84 );
85
86 let proto_event = Event::from(&event);
88
89 yield proto_event;
90 }
91 None => {
92 debug!("Event stream ended");
94 break;
95 }
96 }
97 }
98 };
99
100 Ok(Response::new(Box::pin(stream)))
101 }
102}
103
104fn create_event_filter(proto_filter: &crate::events::EventFilter) -> Result<EventFilter, Status> {
106 use crate::events::event_filter::Filter;
107
108 match &proto_filter.filter {
109 Some(Filter::All(_)) => Ok(EventFilter::All(vec![])),
110 Some(Filter::Sender(f)) => {
111 let sender = parse_iota_address(&f.sender, "Sender address")?;
112 Ok(EventFilter::Sender(sender))
113 }
114 Some(Filter::Transaction(f)) => {
115 let tx_digest = parse_tx_digest(&f.tx_digest, "Transaction digest")?;
116 Ok(EventFilter::Transaction(tx_digest))
117 }
118 Some(Filter::MoveModule(f)) => {
119 let package_id = parse_object_id(&f.package_id, "Package ID")?;
120 Ok(EventFilter::MoveModule {
121 package: package_id,
122 module: parse_identifier(&f.module, "module name")?,
123 })
124 }
125 Some(Filter::MoveEventType(f)) => {
126 let object_id = parse_object_id(&f.package_id, "Package ID")?;
127 let struct_tag = StructTag {
128 address: *object_id,
129 module: parse_identifier(&f.module, "module name")?,
130 name: parse_identifier(&f.name, "event name")?,
131 type_params: vec![],
132 };
133 Ok(EventFilter::MoveEventType(struct_tag))
134 }
135 Some(Filter::MoveEventModule(f)) => {
136 let package_id = parse_object_id(&f.package_id, "Package ID")?;
137 Ok(EventFilter::MoveEventModule {
138 package: package_id,
139 module: parse_identifier(&f.module, "module name")?,
140 })
141 }
142 Some(Filter::TimeRange(f)) => Ok(EventFilter::TimeRange {
143 start_time: f.start_time,
144 end_time: f.end_time,
145 }),
146 None => Ok(EventFilter::All(vec![])),
147 }
148}
149
150fn parse_object_id(
152 address: &Option<crate::common::Address>,
153 field_name: &str,
154) -> Result<ObjectID, Status> {
155 let address = address
156 .as_ref()
157 .ok_or_else(|| Status::invalid_argument(format!("{field_name} is required")))?;
158 ObjectID::from_bytes(&address.address)
159 .map_err(|e| Status::invalid_argument(format!("Invalid {field_name}: {e}")))
160}
161
162fn parse_identifier(id_str: &str, field_name: &str) -> Result<Identifier, Status> {
163 Identifier::from_str(id_str)
164 .map_err(|e| Status::invalid_argument(format!("Invalid {field_name} '{id_str}': {e}")))
165}
166
167fn parse_iota_address(
168 address: &Option<crate::common::Address>,
169 field_name: &str,
170) -> Result<IotaAddress, Status> {
171 let address = address
172 .as_ref()
173 .ok_or_else(|| Status::invalid_argument(format!("{field_name} is required")))?;
174 IotaAddress::from_bytes(&address.address)
175 .map_err(|e| Status::invalid_argument(format!("Invalid {field_name}: {e}")))
176}
177
178fn parse_tx_digest(
179 digest: &Option<crate::common::TransactionDigest>,
180 field_name: &str,
181) -> Result<TransactionDigest, Status> {
182 let digest = digest
183 .as_ref()
184 .ok_or_else(|| Status::invalid_argument(format!("{field_name} is required")))?;
185 TransactionDigest::try_from(digest.digest.as_slice())
186 .map_err(|e| Status::invalid_argument(format!("Invalid {field_name}: {e}")))
187}
188
189impl From<&IotaEvent> for Event {
191 fn from(event: &IotaEvent) -> Self {
192 Event {
193 event_id: Some(EventId {
194 event_seq: event.id.event_seq,
195 tx_digest: Some(crate::common::TransactionDigest {
196 digest: event.id.tx_digest.into_inner().to_vec(),
197 }),
198 }),
199 package_id: Some(crate::common::Address {
200 address: event.package_id.to_vec(),
201 }),
202 transaction_module: event.transaction_module.to_string(),
203 sender: Some(crate::common::Address {
204 address: event.sender.to_vec(),
205 }),
206 type_name: event.type_.to_string(),
207 parsed_json: event.parsed_json.to_string(),
208 timestamp_ms: event.timestamp_ms,
209 event_data: Some(crate::common::BcsData {
210 data: event.bcs.bytes().to_vec(),
211 }),
212 }
213 }
214}