1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
// Copyright (c) Mysten Labs, Inc.
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use futures::{StreamExt, stream};
use futures_core::Stream;
use iota_json_rpc_api::{IndexerApiClient, ReadApiClient};
use iota_json_rpc_types::{EventFilter, EventPage, IotaEvent};
use iota_types::{base_types::TransactionDigest, event::EventID};
use jsonrpsee::core::client::Subscription;
use crate::{
RpcClient,
error::{Error, IotaRpcResult},
};
/// Defines methods to fetch, query, or subscribe to events on the Iota network.
#[derive(Clone)]
pub struct EventApi {
api: Arc<RpcClient>,
}
impl EventApi {
pub(crate) fn new(api: Arc<RpcClient>) -> Self {
Self { api }
}
/// Subscribe to receive a stream of filtered events.
///
/// Subscription is only possible via WebSockets.
/// For a list of possible event filters, see [EventFilter].
///
/// # Examples
///
/// ```rust, no_run
/// use std::str::FromStr;
///
/// use futures::StreamExt;
/// use iota_json_rpc_types::EventFilter;
/// use iota_sdk::IotaClientBuilder;
/// use iota_types::base_types::IotaAddress;
/// #[tokio::main]
/// async fn main() -> Result<(), anyhow::Error> {
/// let iota = IotaClientBuilder::default()
/// .ws_url("wss://api.mainnet.iota.cafe")
/// .build("https://api.mainnet.iota.cafe")
/// .await?;
/// let mut subscribe_all = iota
/// .event_api()
/// .subscribe_event(EventFilter::All(vec![]))
/// .await?;
/// loop {
/// println!("{:?}", subscribe_all.next().await);
/// }
/// Ok(())
/// }
/// ```
pub async fn subscribe_event(
&self,
filter: EventFilter,
) -> IotaRpcResult<impl Stream<Item = IotaRpcResult<IotaEvent>>> {
match &self.api.ws {
Some(c) => {
let subscription: Subscription<IotaEvent> = c.subscribe_event(filter).await?;
Ok(subscription.map(|item| Ok(item?)))
}
_ => Err(Error::Subscription(
"Subscription only supported by WebSocket client.".to_string(),
)),
}
}
/// Get a list of events for the given transaction digest.
pub async fn get_events(&self, digest: TransactionDigest) -> IotaRpcResult<Vec<IotaEvent>> {
Ok(self.api.http.get_events(digest).await?)
}
/// Get a list of filtered events.
/// The response is paginated and can be ordered ascending or descending.
///
/// For a list of possible event filters, see [EventFilter].
pub async fn query_events(
&self,
query: EventFilter,
cursor: impl Into<Option<EventID>>,
limit: impl Into<Option<usize>>,
descending_order: bool,
) -> IotaRpcResult<EventPage> {
Ok(self
.api
.http
.query_events(query, cursor.into(), limit.into(), Some(descending_order))
.await?)
}
/// Get a stream of filtered events which can be ordered ascending or
/// descending.
///
/// For a list of possible event filters, see [EventFilter].
pub fn get_events_stream(
&self,
query: EventFilter,
cursor: impl Into<Option<EventID>>,
descending_order: bool,
) -> impl Stream<Item = IotaEvent> + '_ {
let cursor = cursor.into();
stream::unfold(
(vec![], cursor, true, query),
move |(mut data, cursor, first, query)| async move {
if let Some(item) = data.pop() {
Some((item, (data, cursor, false, query)))
} else if (cursor.is_none() && first) || cursor.is_some() {
let page = self
.query_events(query.clone(), cursor, Some(100), descending_order)
.await
.ok()?;
let mut data = page.data;
data.reverse();
data.pop()
.map(|item| (item, (data, page.next_cursor, false, query)))
} else {
None
}
},
)
}
}