iota_bridge/
iota_syncer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! The IotaSyncer module is responsible for synchronizing Events emitted
6//! on IOTA blockchain from concerned modules of bridge package 0x9.
7
8use std::{collections::HashMap, sync::Arc};
9
10use iota_json_rpc_types::IotaEvent;
11use iota_metrics::spawn_logged_monitored_task;
12use iota_types::{BRIDGE_PACKAGE_ID, Identifier, event::EventID};
13use tokio::{
14    task::JoinHandle,
15    time::{self, Duration},
16};
17
18use crate::{
19    error::BridgeResult,
20    iota_client::{IotaClient, IotaClientInner},
21    retry_with_max_elapsed_time,
22};
23
24const IOTA_EVENTS_CHANNEL_SIZE: usize = 1000;
25
26/// Map from contract address to their start cursor (exclusive)
27pub type IotaTargetModules = HashMap<Identifier, Option<EventID>>;
28
29pub struct IotaSyncer<C> {
30    iota_client: Arc<IotaClient<C>>,
31    // The last transaction that the syncer has fully processed.
32    // Syncer will resume post this transaction (i.e. exclusive), when it starts.
33    cursors: IotaTargetModules,
34}
35
36impl<C> IotaSyncer<C>
37where
38    C: IotaClientInner + 'static,
39{
40    pub fn new(iota_client: Arc<IotaClient<C>>, cursors: IotaTargetModules) -> Self {
41        Self {
42            iota_client,
43            cursors,
44        }
45    }
46
47    pub async fn run(
48        self,
49        query_interval: Duration,
50    ) -> BridgeResult<(
51        Vec<JoinHandle<()>>,
52        iota_metrics::metered_channel::Receiver<(Identifier, Vec<IotaEvent>)>,
53    )> {
54        let (events_tx, events_rx) = iota_metrics::metered_channel::channel(
55            IOTA_EVENTS_CHANNEL_SIZE,
56            &iota_metrics::get_metrics()
57                .unwrap()
58                .channel_inflight
59                .with_label_values(&["iota_events_queue"]),
60        );
61
62        let mut task_handles = vec![];
63        for (module, cursor) in self.cursors {
64            let events_rx_clone: iota_metrics::metered_channel::Sender<(
65                Identifier,
66                Vec<IotaEvent>,
67            )> = events_tx.clone();
68            let iota_client_clone = self.iota_client.clone();
69            task_handles.push(spawn_logged_monitored_task!(
70                Self::run_event_listening_task(
71                    module,
72                    cursor,
73                    events_rx_clone,
74                    iota_client_clone,
75                    query_interval
76                )
77            ));
78        }
79        Ok((task_handles, events_rx))
80    }
81
82    async fn run_event_listening_task(
83        // The module where interested events are defined.
84        // Module is always of bridge package 0x9.
85        module: Identifier,
86        mut cursor: Option<EventID>,
87        events_sender: iota_metrics::metered_channel::Sender<(Identifier, Vec<IotaEvent>)>,
88        iota_client: Arc<IotaClient<C>>,
89        query_interval: Duration,
90    ) {
91        tracing::info!(?module, ?cursor, "Starting iota events listening task");
92        let mut interval = time::interval(query_interval);
93        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
94        loop {
95            interval.tick().await;
96            let Ok(Ok(events)) = retry_with_max_elapsed_time!(
97                iota_client.query_events_by_module(BRIDGE_PACKAGE_ID, module.clone(), cursor),
98                Duration::from_secs(120)
99            ) else {
100                tracing::error!("Failed to query events from iota client after retry");
101                continue;
102            };
103
104            let len = events.data.len();
105            if len != 0 {
106                events_sender
107                    .send((module.clone(), events.data))
108                    .await
109                    .expect("All IOTA event channel receivers are closed");
110                if let Some(next) = events.next_cursor {
111                    cursor = Some(next);
112                }
113                tracing::info!(?module, ?cursor, "Observed {len} new IOTA events");
114            }
115        }
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use iota_json_rpc_types::EventPage;
122    use iota_types::{Identifier, digests::TransactionDigest, event::EventID};
123    use prometheus::Registry;
124    use tokio::time::timeout;
125
126    use super::*;
127    use crate::{iota_client::IotaClient, iota_mock_client::IotaMockClient};
128
129    #[tokio::test]
130    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
131    async fn test_iota_syncer_basic() -> anyhow::Result<()> {
132        telemetry_subscribers::init_for_testing();
133        let registry = Registry::new();
134        iota_metrics::init_metrics(&registry);
135
136        let mock = IotaMockClient::default();
137        let client = Arc::new(IotaClient::new_for_testing(mock.clone()));
138        let module_foo = Identifier::new("Foo").unwrap();
139        let module_bar = Identifier::new("Bar").unwrap();
140        let empty_events = EventPage::empty();
141        let cursor = EventID {
142            tx_digest: TransactionDigest::random(),
143            event_seq: 0,
144        };
145        add_event_response(&mock, module_foo.clone(), cursor, empty_events.clone());
146        add_event_response(&mock, module_bar.clone(), cursor, empty_events.clone());
147
148        let target_modules = HashMap::from_iter(vec![
149            (module_foo.clone(), Some(cursor)),
150            (module_bar.clone(), Some(cursor)),
151        ]);
152        let interval = Duration::from_millis(200);
153        let (_handles, mut events_rx) = IotaSyncer::new(client, target_modules)
154            .run(interval)
155            .await
156            .unwrap();
157
158        // Initially there are no events
159        assert_no_more_events(interval, &mut events_rx).await;
160
161        // Module Foo has new events
162        let mut event_1: IotaEvent = IotaEvent::random_for_testing();
163        let package_id = BRIDGE_PACKAGE_ID;
164        event_1.type_.address = package_id.into();
165        event_1.type_.module = module_foo.clone();
166        let module_foo_events_1: iota_json_rpc_types::Page<IotaEvent, EventID> = EventPage {
167            data: vec![event_1.clone(), event_1.clone()],
168            next_cursor: Some(event_1.id),
169            has_next_page: false,
170        };
171        add_event_response(&mock, module_foo.clone(), event_1.id, empty_events.clone());
172        add_event_response(
173            &mock,
174            module_foo.clone(),
175            cursor,
176            module_foo_events_1.clone(),
177        );
178
179        let (identifier, received_events) = events_rx.recv().await.unwrap();
180        assert_eq!(identifier, module_foo);
181        assert_eq!(received_events.len(), 2);
182        assert_eq!(received_events[0].id, event_1.id);
183        assert_eq!(received_events[1].id, event_1.id);
184        // No more
185        assert_no_more_events(interval, &mut events_rx).await;
186
187        // Module Bar has new events
188        let mut event_2: IotaEvent = IotaEvent::random_for_testing();
189        event_2.type_.address = package_id.into();
190        event_2.type_.module = module_bar.clone();
191        let module_bar_events_1 = EventPage {
192            data: vec![event_2.clone()],
193            next_cursor: Some(event_2.id),
194            has_next_page: false,
195        };
196        add_event_response(&mock, module_bar.clone(), event_2.id, empty_events.clone());
197
198        add_event_response(&mock, module_bar.clone(), cursor, module_bar_events_1);
199
200        let (identifier, received_events) = events_rx.recv().await.unwrap();
201        assert_eq!(identifier, module_bar);
202        assert_eq!(received_events.len(), 1);
203        assert_eq!(received_events[0].id, event_2.id);
204        // No more
205        assert_no_more_events(interval, &mut events_rx).await;
206
207        Ok(())
208    }
209
210    async fn assert_no_more_events(
211        interval: Duration,
212        events_rx: &mut iota_metrics::metered_channel::Receiver<(Identifier, Vec<IotaEvent>)>,
213    ) {
214        match timeout(interval * 2, events_rx.recv()).await {
215            Err(_e) => (),
216            other => panic!("Should have timed out, but got: {:?}", other),
217        };
218    }
219
220    fn add_event_response(
221        mock: &IotaMockClient,
222        module: Identifier,
223        cursor: EventID,
224        events: EventPage,
225    ) {
226        mock.add_event_response(BRIDGE_PACKAGE_ID, module.clone(), cursor, events.clone());
227    }
228}