iota_sdk/apis/
event.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use futures::{StreamExt, stream};
8use futures_core::Stream;
9use iota_json_rpc_api::{IndexerApiClient, ReadApiClient};
10use iota_json_rpc_types::{EventFilter, EventPage, IotaEvent};
11use iota_types::{base_types::TransactionDigest, event::EventID};
12use jsonrpsee::core::client::Subscription;
13
14use crate::{
15    RpcClient,
16    error::{Error, IotaRpcResult},
17};
18
19/// Defines methods to fetch, query, or subscribe to events on the IOTA network.
20#[derive(Clone)]
21pub struct EventApi {
22    api: Arc<RpcClient>,
23}
24
25impl EventApi {
26    pub(crate) fn new(api: Arc<RpcClient>) -> Self {
27        Self { api }
28    }
29
30    /// Subscribe to receive a stream of filtered events.
31    ///
32    /// Subscription is only possible via WebSockets.
33    /// For a list of possible event filters, see [EventFilter].
34    ///
35    /// # Examples
36    ///
37    /// ```rust, no_run
38    /// use std::str::FromStr;
39    ///
40    /// use futures::StreamExt;
41    /// use iota_json_rpc_types::EventFilter;
42    /// use iota_sdk::IotaClientBuilder;
43    /// use iota_types::base_types::IotaAddress;
44    /// #[tokio::main]
45    /// async fn main() -> Result<(), anyhow::Error> {
46    ///     let iota = IotaClientBuilder::default()
47    ///         .ws_url("wss://api.mainnet.iota.cafe")
48    ///         .build("https://api.mainnet.iota.cafe")
49    ///         .await?;
50    ///     let mut subscribe_all = iota
51    ///         .event_api()
52    ///         .subscribe_event(EventFilter::All(vec![]))
53    ///         .await?;
54    ///     loop {
55    ///         println!("{:?}", subscribe_all.next().await);
56    ///     }
57    ///     Ok(())
58    /// }
59    /// ```
60    pub async fn subscribe_event(
61        &self,
62        filter: EventFilter,
63    ) -> IotaRpcResult<impl Stream<Item = IotaRpcResult<IotaEvent>>> {
64        match &self.api.ws {
65            Some(c) => {
66                let subscription: Subscription<IotaEvent> = c.subscribe_event(filter).await?;
67                Ok(subscription.map(|item| Ok(item?)))
68            }
69            _ => Err(Error::Subscription(
70                "Subscription only supported by WebSocket client.".to_string(),
71            )),
72        }
73    }
74
75    /// Get a list of events for the given transaction digest.
76    pub async fn get_events(&self, digest: TransactionDigest) -> IotaRpcResult<Vec<IotaEvent>> {
77        Ok(self.api.http.get_events(digest).await?)
78    }
79
80    /// Get a list of filtered events.
81    /// The response is paginated and can be ordered ascending or descending.
82    ///
83    /// For a list of possible event filters, see [EventFilter].
84    pub async fn query_events(
85        &self,
86        query: EventFilter,
87        cursor: impl Into<Option<EventID>>,
88        limit: impl Into<Option<usize>>,
89        descending_order: bool,
90    ) -> IotaRpcResult<EventPage> {
91        Ok(self
92            .api
93            .http
94            .query_events(query, cursor.into(), limit.into(), Some(descending_order))
95            .await?)
96    }
97
98    /// Get a stream of filtered events which can be ordered ascending or
99    /// descending.
100    ///
101    /// For a list of possible event filters, see [EventFilter].
102    pub fn get_events_stream(
103        &self,
104        query: EventFilter,
105        cursor: impl Into<Option<EventID>>,
106        descending_order: bool,
107    ) -> impl Stream<Item = IotaEvent> + '_ {
108        let cursor = cursor.into();
109
110        stream::unfold(
111            (vec![], cursor, true, query),
112            move |(mut data, cursor, first, query)| async move {
113                if let Some(item) = data.pop() {
114                    Some((item, (data, cursor, false, query)))
115                } else if (cursor.is_none() && first) || cursor.is_some() {
116                    let page = self
117                        .query_events(query.clone(), cursor, Some(100), descending_order)
118                        .await
119                        .ok()?;
120                    let mut data = page.data;
121                    data.reverse();
122                    data.pop()
123                        .map(|item| (item, (data, page.next_cursor, false, query)))
124                } else {
125                    None
126                }
127            },
128        )
129    }
130}