iota_sdk/apis/event.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use futures::{StreamExt, stream};
8use futures_core::Stream;
9use iota_json_rpc_api::{IndexerApiClient, ReadApiClient};
10use iota_json_rpc_types::{EventFilter, EventPage, IotaEvent};
11use iota_types::{base_types::TransactionDigest, event::EventID};
12use jsonrpsee::core::client::Subscription;
13
14use crate::{
15 RpcClient,
16 error::{Error, IotaRpcResult},
17};
18
19/// Defines methods to fetch, query, or subscribe to events on the IOTA network.
20#[derive(Clone)]
21pub struct EventApi {
22 api: Arc<RpcClient>,
23}
24
25impl EventApi {
26 pub(crate) fn new(api: Arc<RpcClient>) -> Self {
27 Self { api }
28 }
29
30 /// Subscribe to receive a stream of filtered events.
31 ///
32 /// Subscription is only possible via WebSockets.
33 /// For a list of possible event filters, see [EventFilter].
34 ///
35 /// # Examples
36 ///
37 /// ```rust, no_run
38 /// use std::str::FromStr;
39 ///
40 /// use futures::StreamExt;
41 /// use iota_json_rpc_types::EventFilter;
42 /// use iota_sdk::IotaClientBuilder;
43 /// use iota_types::base_types::IotaAddress;
44 /// #[tokio::main]
45 /// async fn main() -> Result<(), anyhow::Error> {
46 /// let iota = IotaClientBuilder::default()
47 /// .ws_url("wss://api.mainnet.iota.cafe")
48 /// .build("https://api.mainnet.iota.cafe")
49 /// .await?;
50 /// let mut subscribe_all = iota
51 /// .event_api()
52 /// .subscribe_event(EventFilter::All(vec![]))
53 /// .await?;
54 /// loop {
55 /// println!("{:?}", subscribe_all.next().await);
56 /// }
57 /// Ok(())
58 /// }
59 /// ```
60 pub async fn subscribe_event(
61 &self,
62 filter: EventFilter,
63 ) -> IotaRpcResult<impl Stream<Item = IotaRpcResult<IotaEvent>>> {
64 match &self.api.ws {
65 Some(c) => {
66 let subscription: Subscription<IotaEvent> = c.subscribe_event(filter).await?;
67 Ok(subscription.map(|item| Ok(item?)))
68 }
69 _ => Err(Error::Subscription(
70 "Subscription only supported by WebSocket client.".to_string(),
71 )),
72 }
73 }
74
75 /// Get a list of events for the given transaction digest.
76 pub async fn get_events(&self, digest: TransactionDigest) -> IotaRpcResult<Vec<IotaEvent>> {
77 Ok(self.api.http.get_events(digest).await?)
78 }
79
80 /// Get a list of filtered events.
81 /// The response is paginated and can be ordered ascending or descending.
82 ///
83 /// For a list of possible event filters, see [EventFilter].
84 pub async fn query_events(
85 &self,
86 query: EventFilter,
87 cursor: impl Into<Option<EventID>>,
88 limit: impl Into<Option<usize>>,
89 descending_order: bool,
90 ) -> IotaRpcResult<EventPage> {
91 Ok(self
92 .api
93 .http
94 .query_events(query, cursor.into(), limit.into(), Some(descending_order))
95 .await?)
96 }
97
98 /// Get a stream of filtered events which can be ordered ascending or
99 /// descending.
100 ///
101 /// For a list of possible event filters, see [EventFilter].
102 pub fn get_events_stream(
103 &self,
104 query: EventFilter,
105 cursor: impl Into<Option<EventID>>,
106 descending_order: bool,
107 ) -> impl Stream<Item = IotaEvent> + '_ {
108 let cursor = cursor.into();
109
110 stream::unfold(
111 (vec![], cursor, true, query),
112 move |(mut data, cursor, first, query)| async move {
113 if let Some(item) = data.pop() {
114 Some((item, (data, cursor, false, query)))
115 } else if (cursor.is_none() && first) || cursor.is_some() {
116 let page = self
117 .query_events(query.clone(), cursor, Some(100), descending_order)
118 .await
119 .ok()?;
120 let mut data = page.data;
121 data.reverse();
122 data.pop()
123 .map(|item| (item, (data, page.next_cursor, false, query)))
124 } else {
125 None
126 }
127 },
128 )
129 }
130}