iota_grpc_api/client/
event.rs1use anyhow::anyhow;
5use futures::{Stream, StreamExt};
6use iota_json_rpc_types::{BcsEvent, IotaEvent};
7use iota_types::{
8 base_types::{IotaAddress, ObjectID, TransactionDigest},
9 event::EventID,
10};
11use move_core_types::{identifier::Identifier, language_storage::StructTag};
12use tonic::transport::Channel;
13
14use crate::events::{Event, EventStreamRequest, event_service_client::EventServiceClient};
15
16#[derive(Clone)]
21pub struct EventClient {
22 client: EventServiceClient<Channel>,
23}
24
25impl EventClient {
26 pub(super) fn new(channel: Channel) -> Self {
28 Self {
29 client: EventServiceClient::new(channel),
30 }
31 }
32
33 pub async fn stream_events(
41 &mut self,
42 filter: crate::events::EventFilter,
43 ) -> Result<impl Stream<Item = Result<IotaEvent, tonic::Status>>, tonic::Status> {
44 let request = EventStreamRequest {
45 filter: Some(filter),
46 };
47 let stream = self.client.stream_events(request).await?.into_inner();
48
49 Ok(stream.map(|result| {
50 result.and_then(|event| {
51 Self::deserialize_event(&event).map_err(|e| {
52 tonic::Status::internal(format!("Failed to deserialize event: {e}"))
53 })
54 })
55 }))
56 }
57
58 fn deserialize_event(event: &Event) -> anyhow::Result<IotaEvent> {
60 let event_id = event
61 .event_id
62 .as_ref()
63 .ok_or_else(|| anyhow!("Missing event ID"))?;
64
65 let tx_digest = event_id
66 .tx_digest
67 .as_ref()
68 .ok_or_else(|| anyhow!("Missing transaction digest"))?;
69
70 let package_id = event
71 .package_id
72 .as_ref()
73 .ok_or_else(|| anyhow!("Missing package ID"))?;
74
75 let sender = event
76 .sender
77 .as_ref()
78 .ok_or_else(|| anyhow!("Missing sender"))?;
79
80 let bcs_data = event
81 .event_data
82 .as_ref()
83 .ok_or_else(|| anyhow!("Missing event data"))?;
84
85 let type_tag: StructTag = event
87 .type_name
88 .parse()
89 .map_err(|e| anyhow!("Failed to parse type tag: {e}"))?;
90
91 let parsed_json: serde_json::Value = serde_json::from_str(&event.parsed_json)
93 .map_err(|e| anyhow!("Failed to parse JSON: {e}"))?;
94
95 Ok(IotaEvent {
96 id: EventID {
97 tx_digest: TransactionDigest::new(
98 tx_digest
99 .digest
100 .clone()
101 .try_into()
102 .map_err(|_| anyhow!("Invalid transaction digest length"))?,
103 ),
104 event_seq: event_id.event_seq,
105 },
106 package_id: ObjectID::from_bytes(&package_id.address)
107 .map_err(|e| anyhow!("Invalid package ID: {e}"))?,
108 transaction_module: Identifier::new(event.transaction_module.clone())
109 .map_err(|e| anyhow!("Invalid transaction module: {e}"))?,
110 sender: IotaAddress::from_bytes(&sender.address)
111 .map_err(|e| anyhow!("Invalid sender address: {e}"))?,
112 type_: type_tag,
113 parsed_json,
114 bcs: BcsEvent::new(bcs_data.data.clone()),
115 timestamp_ms: event.timestamp_ms,
116 })
117 }
118}