iota_grpc_api/client/
event.rs

1// Copyright (c) 2025 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::anyhow;
5use futures::{Stream, StreamExt};
6use iota_json_rpc_types::{BcsEvent, IotaEvent};
7use iota_types::{
8    base_types::{IotaAddress, ObjectID, TransactionDigest},
9    event::EventID,
10};
11use move_core_types::{identifier::Identifier, language_storage::StructTag};
12use tonic::transport::Channel;
13
14use crate::events::{Event, EventStreamRequest, event_service_client::EventServiceClient};
15
16/// Dedicated client for event-related gRPC operations.
17///
18/// This client handles all event service interactions including streaming
19/// events with filtering capabilities.
20#[derive(Clone)]
21pub struct EventClient {
22    client: EventServiceClient<Channel>,
23}
24
25impl EventClient {
26    /// Create a new EventClient from a shared gRPC channel.
27    pub(super) fn new(channel: Channel) -> Self {
28        Self {
29            client: EventServiceClient::new(channel),
30        }
31    }
32
33    /// Stream events with automatic BCS deserialization and filtering.
34    ///
35    /// # Arguments
36    /// * `filter` - Event filter to apply to the stream
37    ///
38    /// # Returns
39    /// A stream of IOTA events that match the specified filter
40    pub async fn stream_events(
41        &mut self,
42        filter: crate::events::EventFilter,
43    ) -> Result<impl Stream<Item = Result<IotaEvent, tonic::Status>>, tonic::Status> {
44        let request = EventStreamRequest {
45            filter: Some(filter),
46        };
47        let stream = self.client.stream_events(request).await?.into_inner();
48
49        Ok(stream.map(|result| {
50            result.and_then(|event| {
51                Self::deserialize_event(&event).map_err(|e| {
52                    tonic::Status::internal(format!("Failed to deserialize event: {e}"))
53                })
54            })
55        }))
56    }
57
58    /// Deserialize event data from BCS bytes.
59    fn deserialize_event(event: &Event) -> anyhow::Result<IotaEvent> {
60        let event_id = event
61            .event_id
62            .as_ref()
63            .ok_or_else(|| anyhow!("Missing event ID"))?;
64
65        let tx_digest = event_id
66            .tx_digest
67            .as_ref()
68            .ok_or_else(|| anyhow!("Missing transaction digest"))?;
69
70        let package_id = event
71            .package_id
72            .as_ref()
73            .ok_or_else(|| anyhow!("Missing package ID"))?;
74
75        let sender = event
76            .sender
77            .as_ref()
78            .ok_or_else(|| anyhow!("Missing sender"))?;
79
80        let bcs_data = event
81            .event_data
82            .as_ref()
83            .ok_or_else(|| anyhow!("Missing event data"))?;
84
85        // Parse the StructTag from string
86        let type_tag: StructTag = event
87            .type_name
88            .parse()
89            .map_err(|e| anyhow!("Failed to parse type tag: {e}"))?;
90
91        // Parse the JSON
92        let parsed_json: serde_json::Value = serde_json::from_str(&event.parsed_json)
93            .map_err(|e| anyhow!("Failed to parse JSON: {e}"))?;
94
95        Ok(IotaEvent {
96            id: EventID {
97                tx_digest: TransactionDigest::new(
98                    tx_digest
99                        .digest
100                        .clone()
101                        .try_into()
102                        .map_err(|_| anyhow!("Invalid transaction digest length"))?,
103                ),
104                event_seq: event_id.event_seq,
105            },
106            package_id: ObjectID::from_bytes(&package_id.address)
107                .map_err(|e| anyhow!("Invalid package ID: {e}"))?,
108            transaction_module: Identifier::new(event.transaction_module.clone())
109                .map_err(|e| anyhow!("Invalid transaction module: {e}"))?,
110            sender: IotaAddress::from_bytes(&sender.address)
111                .map_err(|e| anyhow!("Invalid sender address: {e}"))?,
112            type_: type_tag,
113            parsed_json,
114            bcs: BcsEvent::new(bcs_data.data.clone()),
115            timestamp_ms: event.timestamp_ms,
116        })
117    }
118}