1use std::{collections::HashMap, sync::Arc};
9
10use iota_json_rpc_types::IotaEvent;
11use iota_metrics::spawn_logged_monitored_task;
12use iota_types::{BRIDGE_PACKAGE_ID, Identifier, event::EventID};
13use tokio::{
14 task::JoinHandle,
15 time::{self, Duration},
16};
17
18use crate::{
19 error::BridgeResult,
20 iota_client::{IotaClient, IotaClientInner},
21 retry_with_max_elapsed_time,
22};
23
24const IOTA_EVENTS_CHANNEL_SIZE: usize = 1000;
25
26pub type IotaTargetModules = HashMap<Identifier, Option<EventID>>;
28
29pub struct IotaSyncer<C> {
30 iota_client: Arc<IotaClient<C>>,
31 cursors: IotaTargetModules,
34}
35
36impl<C> IotaSyncer<C>
37where
38 C: IotaClientInner + 'static,
39{
40 pub fn new(iota_client: Arc<IotaClient<C>>, cursors: IotaTargetModules) -> Self {
41 Self {
42 iota_client,
43 cursors,
44 }
45 }
46
47 pub async fn run(
48 self,
49 query_interval: Duration,
50 ) -> BridgeResult<(
51 Vec<JoinHandle<()>>,
52 iota_metrics::metered_channel::Receiver<(Identifier, Vec<IotaEvent>)>,
53 )> {
54 let (events_tx, events_rx) = iota_metrics::metered_channel::channel(
55 IOTA_EVENTS_CHANNEL_SIZE,
56 &iota_metrics::get_metrics()
57 .unwrap()
58 .channel_inflight
59 .with_label_values(&["iota_events_queue"]),
60 );
61
62 let mut task_handles = vec![];
63 for (module, cursor) in self.cursors {
64 let events_rx_clone: iota_metrics::metered_channel::Sender<(
65 Identifier,
66 Vec<IotaEvent>,
67 )> = events_tx.clone();
68 let iota_client_clone = self.iota_client.clone();
69 task_handles.push(spawn_logged_monitored_task!(
70 Self::run_event_listening_task(
71 module,
72 cursor,
73 events_rx_clone,
74 iota_client_clone,
75 query_interval
76 )
77 ));
78 }
79 Ok((task_handles, events_rx))
80 }
81
82 async fn run_event_listening_task(
83 module: Identifier,
86 mut cursor: Option<EventID>,
87 events_sender: iota_metrics::metered_channel::Sender<(Identifier, Vec<IotaEvent>)>,
88 iota_client: Arc<IotaClient<C>>,
89 query_interval: Duration,
90 ) {
91 tracing::info!(?module, ?cursor, "Starting iota events listening task");
92 let mut interval = time::interval(query_interval);
93 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
94 loop {
95 interval.tick().await;
96 let Ok(Ok(events)) = retry_with_max_elapsed_time!(
97 iota_client.query_events_by_module(BRIDGE_PACKAGE_ID, module.clone(), cursor),
98 Duration::from_secs(120)
99 ) else {
100 tracing::error!("Failed to query events from iota client after retry");
101 continue;
102 };
103
104 let len = events.data.len();
105 if len != 0 {
106 events_sender
107 .send((module.clone(), events.data))
108 .await
109 .expect("All IOTA event channel receivers are closed");
110 if let Some(next) = events.next_cursor {
111 cursor = Some(next);
112 }
113 tracing::info!(?module, ?cursor, "Observed {len} new IOTA events");
114 }
115 }
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use iota_json_rpc_types::EventPage;
122 use iota_types::{Identifier, digests::TransactionDigest, event::EventID};
123 use prometheus::Registry;
124 use tokio::time::timeout;
125
126 use super::*;
127 use crate::{iota_client::IotaClient, iota_mock_client::IotaMockClient};
128
129 #[tokio::test]
130 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
131 async fn test_iota_syncer_basic() -> anyhow::Result<()> {
132 telemetry_subscribers::init_for_testing();
133 let registry = Registry::new();
134 iota_metrics::init_metrics(®istry);
135
136 let mock = IotaMockClient::default();
137 let client = Arc::new(IotaClient::new_for_testing(mock.clone()));
138 let module_foo = Identifier::new("Foo").unwrap();
139 let module_bar = Identifier::new("Bar").unwrap();
140 let empty_events = EventPage::empty();
141 let cursor = EventID {
142 tx_digest: TransactionDigest::random(),
143 event_seq: 0,
144 };
145 add_event_response(&mock, module_foo.clone(), cursor, empty_events.clone());
146 add_event_response(&mock, module_bar.clone(), cursor, empty_events.clone());
147
148 let target_modules = HashMap::from_iter(vec![
149 (module_foo.clone(), Some(cursor)),
150 (module_bar.clone(), Some(cursor)),
151 ]);
152 let interval = Duration::from_millis(200);
153 let (_handles, mut events_rx) = IotaSyncer::new(client, target_modules)
154 .run(interval)
155 .await
156 .unwrap();
157
158 assert_no_more_events(interval, &mut events_rx).await;
160
161 let mut event_1: IotaEvent = IotaEvent::random_for_testing();
163 let package_id = BRIDGE_PACKAGE_ID;
164 event_1.type_.address = package_id.into();
165 event_1.type_.module = module_foo.clone();
166 let module_foo_events_1: iota_json_rpc_types::Page<IotaEvent, EventID> = EventPage {
167 data: vec![event_1.clone(), event_1.clone()],
168 next_cursor: Some(event_1.id),
169 has_next_page: false,
170 };
171 add_event_response(&mock, module_foo.clone(), event_1.id, empty_events.clone());
172 add_event_response(
173 &mock,
174 module_foo.clone(),
175 cursor,
176 module_foo_events_1.clone(),
177 );
178
179 let (identifier, received_events) = events_rx.recv().await.unwrap();
180 assert_eq!(identifier, module_foo);
181 assert_eq!(received_events.len(), 2);
182 assert_eq!(received_events[0].id, event_1.id);
183 assert_eq!(received_events[1].id, event_1.id);
184 assert_no_more_events(interval, &mut events_rx).await;
186
187 let mut event_2: IotaEvent = IotaEvent::random_for_testing();
189 event_2.type_.address = package_id.into();
190 event_2.type_.module = module_bar.clone();
191 let module_bar_events_1 = EventPage {
192 data: vec![event_2.clone()],
193 next_cursor: Some(event_2.id),
194 has_next_page: false,
195 };
196 add_event_response(&mock, module_bar.clone(), event_2.id, empty_events.clone());
197
198 add_event_response(&mock, module_bar.clone(), cursor, module_bar_events_1);
199
200 let (identifier, received_events) = events_rx.recv().await.unwrap();
201 assert_eq!(identifier, module_bar);
202 assert_eq!(received_events.len(), 1);
203 assert_eq!(received_events[0].id, event_2.id);
204 assert_no_more_events(interval, &mut events_rx).await;
206
207 Ok(())
208 }
209
210 async fn assert_no_more_events(
211 interval: Duration,
212 events_rx: &mut iota_metrics::metered_channel::Receiver<(Identifier, Vec<IotaEvent>)>,
213 ) {
214 match timeout(interval * 2, events_rx.recv()).await {
215 Err(_e) => (),
216 other => panic!("Should have timed out, but got: {:?}", other),
217 };
218 }
219
220 fn add_event_response(
221 mock: &IotaMockClient,
222 module: Identifier,
223 cursor: EventID,
224 events: EventPage,
225 ) {
226 mock.add_event_response(BRIDGE_PACKAGE_ID, module.clone(), cursor, events.clone());
227 }
228}