iota_sdk/apis/event.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use futures::{StreamExt, stream};
8use futures_core::Stream;
9use iota_json_rpc_api::{IndexerApiClient, ReadApiClient};
10use iota_json_rpc_types::{EventFilter, EventPage, IotaEvent};
11use iota_types::{base_types::TransactionDigest, event::EventID};
12use jsonrpsee::core::client::Subscription;
13
14use crate::{
15 RpcClient,
16 error::{Error, IotaRpcResult},
17};
18
19/// Defines methods to fetch, query, or subscribe to events on the IOTA network.
20#[derive(Clone)]
21pub struct EventApi {
22 api: Arc<RpcClient>,
23}
24
25impl EventApi {
26 pub(crate) fn new(api: Arc<RpcClient>) -> Self {
27 Self { api }
28 }
29
30 /// Subscribe to receive a stream of filtered events.
31 ///
32 /// Subscription is only possible via WebSockets.
33 /// For a list of possible event filters, see [EventFilter].
34 ///
35 /// # Examples
36 ///
37 /// ```rust, no_run
38 /// use std::str::FromStr;
39 ///
40 /// use futures::StreamExt;
41 /// use iota_sdk::{IotaClientBuilder, rpc_types::EventFilter, types::base_types::IotaAddress};
42 ///
43 /// #[tokio::main]
44 /// async fn main() -> Result<(), anyhow::Error> {
45 /// let iota = IotaClientBuilder::default()
46 /// .ws_url("wss://api.mainnet.iota.cafe")
47 /// .build("https://api.mainnet.iota.cafe")
48 /// .await?;
49 /// let mut subscribe_all = iota
50 /// .event_api()
51 /// .subscribe_event(EventFilter::All(vec![]))
52 /// .await?;
53 /// loop {
54 /// println!("{:?}", subscribe_all.next().await);
55 /// }
56 /// Ok(())
57 /// }
58 /// ```
59 pub async fn subscribe_event(
60 &self,
61 filter: EventFilter,
62 ) -> IotaRpcResult<impl Stream<Item = IotaRpcResult<IotaEvent>>> {
63 match &self.api.ws {
64 Some(c) => {
65 let subscription: Subscription<IotaEvent> = c.subscribe_event(filter).await?;
66 Ok(subscription.map(|item| Ok(item?)))
67 }
68 _ => Err(Error::Subscription(
69 "Subscription only supported by WebSocket client.".to_string(),
70 )),
71 }
72 }
73
74 /// Get a list of events for the given transaction digest.
75 pub async fn get_events(&self, digest: TransactionDigest) -> IotaRpcResult<Vec<IotaEvent>> {
76 Ok(self.api.http.get_events(digest).await?)
77 }
78
79 /// Get a list of filtered events.
80 /// The response is paginated and can be ordered ascending or descending.
81 ///
82 /// For a list of possible event filters, see [EventFilter].
83 pub async fn query_events(
84 &self,
85 query: EventFilter,
86 cursor: impl Into<Option<EventID>>,
87 limit: impl Into<Option<usize>>,
88 descending_order: bool,
89 ) -> IotaRpcResult<EventPage> {
90 Ok(self
91 .api
92 .http
93 .query_events(query, cursor.into(), limit.into(), Some(descending_order))
94 .await?)
95 }
96
97 /// Get a stream of filtered events which can be ordered ascending or
98 /// descending.
99 ///
100 /// For a list of possible event filters, see [EventFilter].
101 pub fn get_events_stream(
102 &self,
103 query: EventFilter,
104 cursor: impl Into<Option<EventID>>,
105 descending_order: bool,
106 ) -> impl Stream<Item = IotaEvent> + '_ {
107 let cursor = cursor.into();
108
109 stream::unfold(
110 (vec![], cursor, true, query),
111 move |(mut data, cursor, first, query)| async move {
112 if let Some(item) = data.pop() {
113 Some((item, (data, cursor, false, query)))
114 } else if (cursor.is_none() && first) || cursor.is_some() {
115 let page = self
116 .query_events(query.clone(), cursor, Some(100), descending_order)
117 .await
118 .ok()?;
119 let mut data = page.data;
120 data.reverse();
121 data.pop()
122 .map(|item| (item, (data, page.next_cursor, false, query)))
123 } else {
124 None
125 }
126 },
127 )
128 }
129}