iota_sdk/apis/
event.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use futures::{StreamExt, stream};
8use futures_core::Stream;
9use iota_json_rpc_api::{IndexerApiClient, ReadApiClient};
10use iota_json_rpc_types::{EventFilter, EventPage, IotaEvent};
11use iota_types::{base_types::TransactionDigest, event::EventID};
12use jsonrpsee::core::client::Subscription;
13
14use crate::{
15    RpcClient,
16    error::{Error, IotaRpcResult},
17};
18
19/// Defines methods to fetch, query, or subscribe to events on the IOTA network.
20#[derive(Clone)]
21pub struct EventApi {
22    api: Arc<RpcClient>,
23}
24
25impl EventApi {
26    pub(crate) fn new(api: Arc<RpcClient>) -> Self {
27        Self { api }
28    }
29
30    /// Subscribe to receive a stream of filtered events.
31    ///
32    /// Subscription is only possible via WebSockets.
33    /// For a list of possible event filters, see [EventFilter].
34    ///
35    /// # Examples
36    ///
37    /// ```rust, no_run
38    /// use std::str::FromStr;
39    ///
40    /// use futures::StreamExt;
41    /// use iota_sdk::{IotaClientBuilder, rpc_types::EventFilter, types::base_types::IotaAddress};
42    ///
43    /// #[tokio::main]
44    /// async fn main() -> Result<(), anyhow::Error> {
45    ///     let iota = IotaClientBuilder::default()
46    ///         .ws_url("wss://api.mainnet.iota.cafe")
47    ///         .build("https://api.mainnet.iota.cafe")
48    ///         .await?;
49    ///     let mut subscribe_all = iota
50    ///         .event_api()
51    ///         .subscribe_event(EventFilter::All(vec![]))
52    ///         .await?;
53    ///     loop {
54    ///         println!("{:?}", subscribe_all.next().await);
55    ///     }
56    ///     Ok(())
57    /// }
58    /// ```
59    pub async fn subscribe_event(
60        &self,
61        filter: EventFilter,
62    ) -> IotaRpcResult<impl Stream<Item = IotaRpcResult<IotaEvent>>> {
63        match &self.api.ws {
64            Some(c) => {
65                let subscription: Subscription<IotaEvent> = c.subscribe_event(filter).await?;
66                Ok(subscription.map(|item| Ok(item?)))
67            }
68            _ => Err(Error::Subscription(
69                "Subscription only supported by WebSocket client.".to_string(),
70            )),
71        }
72    }
73
74    /// Get a list of events for the given transaction digest.
75    pub async fn get_events(&self, digest: TransactionDigest) -> IotaRpcResult<Vec<IotaEvent>> {
76        Ok(self.api.http.get_events(digest).await?)
77    }
78
79    /// Get a list of filtered events.
80    /// The response is paginated and can be ordered ascending or descending.
81    ///
82    /// For a list of possible event filters, see [EventFilter].
83    pub async fn query_events(
84        &self,
85        query: EventFilter,
86        cursor: impl Into<Option<EventID>>,
87        limit: impl Into<Option<usize>>,
88        descending_order: bool,
89    ) -> IotaRpcResult<EventPage> {
90        Ok(self
91            .api
92            .http
93            .query_events(query, cursor.into(), limit.into(), Some(descending_order))
94            .await?)
95    }
96
97    /// Get a stream of filtered events which can be ordered ascending or
98    /// descending.
99    ///
100    /// For a list of possible event filters, see [EventFilter].
101    pub fn get_events_stream(
102        &self,
103        query: EventFilter,
104        cursor: impl Into<Option<EventID>>,
105        descending_order: bool,
106    ) -> impl Stream<Item = IotaEvent> + '_ {
107        let cursor = cursor.into();
108
109        stream::unfold(
110            (vec![], cursor, true, query),
111            move |(mut data, cursor, first, query)| async move {
112                if let Some(item) = data.pop() {
113                    Some((item, (data, cursor, false, query)))
114                } else if (cursor.is_none() && first) || cursor.is_some() {
115                    let page = self
116                        .query_events(query.clone(), cursor, Some(100), descending_order)
117                        .await
118                        .ok()?;
119                    let mut data = page.data;
120                    data.reverse();
121                    data.pop()
122                        .map(|item| (item, (data, page.next_cursor, false, query)))
123                } else {
124                    None
125                }
126            },
127        )
128    }
129}