iota_bridge/
orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! `BridgeOrchestrator` is the component that:
6//! 1. monitors IOTA and Ethereum events with the help of `IotaSyncer` and
7//!    `EthSyncer`
8//! 2. updates WAL table and cursor tables
9//! 2. hands actions to `BridgeExecutor` for execution
10
11use std::sync::Arc;
12
13use ethers::types::Address as EthAddress;
14use iota_json_rpc_types::IotaEvent;
15use iota_metrics::spawn_logged_monitored_task;
16use iota_types::Identifier;
17use tokio::task::JoinHandle;
18use tracing::{error, info};
19
20use crate::{
21    abi::EthBridgeEvent,
22    action_executor::{
23        BridgeActionExecutionWrapper, BridgeActionExecutorTrait, submit_to_executor,
24    },
25    error::BridgeError,
26    events::IotaBridgeEvent,
27    iota_client::{IotaClient, IotaClientInner},
28    metrics::BridgeMetrics,
29    storage::BridgeOrchestratorTables,
30    types::EthLog,
31};
32
33pub struct BridgeOrchestrator<C> {
34    _iota_client: Arc<IotaClient<C>>,
35    iota_events_rx: iota_metrics::metered_channel::Receiver<(Identifier, Vec<IotaEvent>)>,
36    eth_events_rx: iota_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
37    store: Arc<BridgeOrchestratorTables>,
38    monitor_tx: iota_metrics::metered_channel::Sender<IotaBridgeEvent>,
39    metrics: Arc<BridgeMetrics>,
40}
41
42impl<C> BridgeOrchestrator<C>
43where
44    C: IotaClientInner + 'static,
45{
46    pub fn new(
47        iota_client: Arc<IotaClient<C>>,
48        iota_events_rx: iota_metrics::metered_channel::Receiver<(Identifier, Vec<IotaEvent>)>,
49        eth_events_rx: iota_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
50        store: Arc<BridgeOrchestratorTables>,
51        monitor_tx: iota_metrics::metered_channel::Sender<IotaBridgeEvent>,
52        metrics: Arc<BridgeMetrics>,
53    ) -> Self {
54        Self {
55            _iota_client: iota_client,
56            iota_events_rx,
57            eth_events_rx,
58            store,
59            monitor_tx,
60            metrics,
61        }
62    }
63
64    pub async fn run(
65        self,
66        bridge_action_executor: impl BridgeActionExecutorTrait,
67    ) -> Vec<JoinHandle<()>> {
68        tracing::info!("Starting BridgeOrchestrator");
69        let mut task_handles = vec![];
70        let store_clone = self.store.clone();
71
72        // Spawn BridgeActionExecutor
73        let (handles, executor_sender) = bridge_action_executor.run();
74        task_handles.extend(handles);
75        let executor_sender_clone = executor_sender.clone();
76        let metrics_clone = self.metrics.clone();
77        task_handles.push(spawn_logged_monitored_task!(Self::run_iota_watcher(
78            store_clone,
79            executor_sender_clone,
80            self.iota_events_rx,
81            self.monitor_tx,
82            metrics_clone,
83        )));
84        let store_clone = self.store.clone();
85
86        // Re-submit pending actions to executor
87        let actions = store_clone
88            .get_all_pending_actions()
89            .into_values()
90            .collect::<Vec<_>>();
91        for action in actions {
92            submit_to_executor(&executor_sender, action)
93                .await
94                .expect("Submit to executor should not fail");
95        }
96
97        let metrics_clone = self.metrics.clone();
98        task_handles.push(spawn_logged_monitored_task!(Self::run_eth_watcher(
99            store_clone,
100            executor_sender,
101            self.eth_events_rx,
102            metrics_clone,
103        )));
104
105        task_handles
106    }
107
108    async fn run_iota_watcher(
109        store: Arc<BridgeOrchestratorTables>,
110        executor_tx: iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
111        mut iota_events_rx: iota_metrics::metered_channel::Receiver<(Identifier, Vec<IotaEvent>)>,
112        monitor_tx: iota_metrics::metered_channel::Sender<IotaBridgeEvent>,
113        metrics: Arc<BridgeMetrics>,
114    ) {
115        info!("Starting iota watcher task");
116        while let Some((identifier, events)) = iota_events_rx.recv().await {
117            if events.is_empty() {
118                continue;
119            }
120            info!("Received {} IOTA events: {:?}", events.len(), events);
121            metrics
122                .iota_watcher_received_events
123                .inc_by(events.len() as u64);
124            let bridge_events = events
125                .iter()
126                .filter_map(|iota_event| {
127                    match IotaBridgeEvent::try_from_iota_event(iota_event) {
128                        Ok(bridge_event) => Some(bridge_event),
129                        // On testnet some early bridge transactions could have zero value (before
130                        // we disallow it in Move)
131                        Err(BridgeError::ZeroValueBridgeTransfer(_)) => {
132                            error!("Zero value bridge transfer: {:?}", iota_event);
133                            None
134                        }
135                        Err(e) => {
136                            panic!(
137                                "IOTA Event could not be deserialized to IotaBridgeEvent: {:?}",
138                                e
139                            );
140                        }
141                    }
142                })
143                .collect::<Vec<_>>();
144
145            let mut actions = vec![];
146            for (iota_event, opt_bridge_event) in events.iter().zip(bridge_events) {
147                if opt_bridge_event.is_none() {
148                    // TODO: we probably should not miss any events, log for now.
149                    metrics.iota_watcher_unrecognized_events.inc();
150                    error!("IOTA event not recognized: {:?}", iota_event);
151                    continue;
152                }
153                // Unwrap safe: checked above
154                let bridge_event: IotaBridgeEvent = opt_bridge_event.unwrap();
155                info!("Observed IOTA bridge event: {:?}", bridge_event);
156
157                // Send event to monitor
158                monitor_tx
159                    .send(bridge_event.clone())
160                    .await
161                    .expect("Sending event to monitor channel should not fail");
162
163                if let Some(action) = bridge_event
164                    .try_into_bridge_action(iota_event.id.tx_digest, iota_event.id.event_seq as u16)
165                {
166                    actions.push(action);
167                }
168            }
169
170            if !actions.is_empty() {
171                info!(
172                    "Received {} actions from IOTA: {:?}",
173                    actions.len(),
174                    actions
175                );
176                metrics
177                    .iota_watcher_received_actions
178                    .inc_by(actions.len() as u64);
179                // Write action to pending WAL
180                store
181                    .insert_pending_actions(&actions)
182                    .expect("Store operation should not fail");
183                for action in actions {
184                    submit_to_executor(&executor_tx, action)
185                        .await
186                        .expect("Submit to executor should not fail");
187                }
188            }
189
190            // Unwrap safe: in the beginning of the loop we checked that events is not empty
191            let cursor = events.last().unwrap().id;
192            store
193                .update_iota_event_cursor(identifier, cursor)
194                .expect("Store operation should not fail");
195        }
196        panic!("IOTA event channel was closed unexpectedly");
197    }
198
199    async fn run_eth_watcher(
200        store: Arc<BridgeOrchestratorTables>,
201        executor_tx: iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
202        mut eth_events_rx: iota_metrics::metered_channel::Receiver<(
203            ethers::types::Address,
204            u64,
205            Vec<EthLog>,
206        )>,
207        metrics: Arc<BridgeMetrics>,
208    ) {
209        info!("Starting eth watcher task");
210        while let Some((contract, end_block, logs)) = eth_events_rx.recv().await {
211            if logs.is_empty() {
212                store
213                    .update_eth_event_cursor(contract, end_block)
214                    .expect("Store operation should not fail");
215                continue;
216            }
217
218            info!("Received {} Eth events", logs.len());
219            metrics
220                .eth_watcher_received_events
221                .inc_by(logs.len() as u64);
222
223            let bridge_events = logs
224                .iter()
225                .map(EthBridgeEvent::try_from_eth_log)
226                .collect::<Vec<_>>();
227
228            let mut actions = vec![];
229            for (log, opt_bridge_event) in logs.iter().zip(bridge_events) {
230                if opt_bridge_event.is_none() {
231                    // TODO: we probably should not miss any events, log for now.
232                    metrics.eth_watcher_unrecognized_events.inc();
233                    error!("Eth event not recognized: {:?}", log);
234                    continue;
235                }
236                // Unwrap safe: checked above
237                let bridge_event = opt_bridge_event.unwrap();
238                info!("Observed Eth bridge event: {:?}", bridge_event);
239
240                match bridge_event.try_into_bridge_action(log.tx_hash, log.log_index_in_tx) {
241                    Ok(Some(action)) => actions.push(action),
242                    Ok(None) => {}
243                    Err(e) => {
244                        error!(eth_tx_hash=?log.tx_hash, eth_event_index=?log.log_index_in_tx, "Error converting EthBridgeEvent to BridgeAction: {:?}", e);
245                    }
246                }
247                // TODO: handle non Action events
248            }
249            if !actions.is_empty() {
250                info!("Received {} actions from Eth: {:?}", actions.len(), actions);
251                metrics
252                    .eth_watcher_received_actions
253                    .inc_by(actions.len() as u64);
254                // Write action to pending WAL
255                store
256                    .insert_pending_actions(&actions)
257                    .expect("Store operation should not fail");
258                // Execution will remove the pending actions from DB when the action is
259                // completed.
260                for action in actions {
261                    submit_to_executor(&executor_tx, action)
262                        .await
263                        .expect("Submit to executor should not fail");
264                }
265            }
266
267            store
268                .update_eth_event_cursor(contract, end_block)
269                .expect("Store operation should not fail");
270        }
271        panic!("Eth event channel was closed");
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use std::str::FromStr;
278
279    use ethers::types::{Address as EthAddress, TxHash};
280    use prometheus::Registry;
281
282    use super::*;
283    use crate::{
284        events::{init_all_struct_tags, tests::get_test_iota_event_and_action},
285        iota_mock_client::IotaMockClient,
286        test_utils::{
287            get_test_eth_to_iota_bridge_action, get_test_iota_to_eth_bridge_action,
288            get_test_log_and_action,
289        },
290        types::BridgeActionDigest,
291    };
292
293    #[tokio::test]
294    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
295    async fn test_iota_watcher_task() {
296        // Note: this test may fail because of the following reasons:
297        // the IotaEvent's struct tag does not match the ones in events.rs
298
299        let (
300            iota_events_tx,
301            iota_events_rx,
302            _eth_events_tx,
303            eth_events_rx,
304            monitor_tx,
305            _monitor_rx,
306            iota_client,
307            store,
308        ) = setup();
309        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
310        // start orchestrator
311        let registry = Registry::new();
312        let metrics = Arc::new(BridgeMetrics::new(&registry));
313        let _handles = BridgeOrchestrator::new(
314            Arc::new(iota_client),
315            iota_events_rx,
316            eth_events_rx,
317            store.clone(),
318            monitor_tx,
319            metrics,
320        )
321        .run(executor)
322        .await;
323
324        let identifier = Identifier::from_str("test_iota_watcher_task").unwrap();
325        let (iota_event, bridge_action) = get_test_iota_event_and_action(identifier.clone());
326        iota_events_tx
327            .send((identifier.clone(), vec![iota_event.clone()]))
328            .await
329            .unwrap();
330
331        let start = std::time::Instant::now();
332        // Executor should have received the action
333        assert_eq!(
334            executor_requested_action_rx.recv().await.unwrap(),
335            bridge_action.digest()
336        );
337        loop {
338            let actions = store.get_all_pending_actions();
339            if actions.is_empty() {
340                if start.elapsed().as_secs() > 5 {
341                    panic!("Timed out waiting for action to be written to WAL");
342                }
343                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
344                continue;
345            }
346            assert_eq!(actions.len(), 1);
347            let action = actions.get(&bridge_action.digest()).unwrap();
348            assert_eq!(action, &bridge_action);
349            assert_eq!(
350                store.get_iota_event_cursors(&[identifier]).unwrap()[0].unwrap(),
351                iota_event.id,
352            );
353            break;
354        }
355    }
356
357    #[tokio::test]
358    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
359    async fn test_eth_watcher_task() {
360        // Note: this test may fail because of the following reasons:
361        // 1. Log and BridgeAction returned from `get_test_log_and_action` are not in
362        //    sync
363        // 2. Log returned from `get_test_log_and_action` is not parseable log (not
364        //    abigen!, check abi.rs)
365
366        let (
367            _iota_events_tx,
368            iota_events_rx,
369            eth_events_tx,
370            eth_events_rx,
371            monitor_tx,
372            _monitor_rx,
373            iota_client,
374            store,
375        ) = setup();
376        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
377        // start orchestrator
378        let registry = Registry::new();
379        let metrics = Arc::new(BridgeMetrics::new(&registry));
380        let _handles = BridgeOrchestrator::new(
381            Arc::new(iota_client),
382            iota_events_rx,
383            eth_events_rx,
384            store.clone(),
385            monitor_tx,
386            metrics,
387        )
388        .run(executor)
389        .await;
390        let address = EthAddress::random();
391        let (log, bridge_action) = get_test_log_and_action(address, TxHash::random(), 10);
392        let log_index_in_tx = 10;
393        let log_block_num = log.block_number.unwrap().as_u64();
394        let eth_log = EthLog {
395            log: log.clone(),
396            tx_hash: log.transaction_hash.unwrap(),
397            block_number: log_block_num,
398            log_index_in_tx,
399        };
400        let end_block_num = log_block_num + 15;
401
402        eth_events_tx
403            .send((address, end_block_num, vec![eth_log.clone()]))
404            .await
405            .unwrap();
406
407        // Executor should have received the action
408        assert_eq!(
409            executor_requested_action_rx.recv().await.unwrap(),
410            bridge_action.digest()
411        );
412        let start = std::time::Instant::now();
413        loop {
414            let actions = store.get_all_pending_actions();
415            if actions.is_empty() {
416                if start.elapsed().as_secs() > 5 {
417                    panic!("Timed out waiting for action to be written to WAL");
418                }
419                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
420                continue;
421            }
422            assert_eq!(actions.len(), 1);
423            let action = actions.get(&bridge_action.digest()).unwrap();
424            assert_eq!(action, &bridge_action);
425            assert_eq!(
426                store.get_eth_event_cursors(&[address]).unwrap()[0].unwrap(),
427                end_block_num,
428            );
429            break;
430        }
431    }
432
433    #[tokio::test]
434    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
435    /// Test that when orchestrator starts, all pending actions are sent to
436    /// executor
437    async fn test_resume_actions_in_pending_logs() {
438        let (
439            _iota_events_tx,
440            iota_events_rx,
441            _eth_events_tx,
442            eth_events_rx,
443            monitor_tx,
444            _monitor_rx,
445            iota_client,
446            store,
447        ) = setup();
448        let (executor, mut executor_requested_action_rx) = MockExecutor::new();
449
450        let action1 = get_test_iota_to_eth_bridge_action(
451            None,
452            Some(0),
453            Some(99),
454            Some(10000),
455            None,
456            None,
457            None,
458        );
459
460        let action2 = get_test_eth_to_iota_bridge_action(None, None, None, None);
461        store
462            .insert_pending_actions(&vec![action1.clone(), action2.clone()])
463            .unwrap();
464
465        // start orchestrator
466        let registry = Registry::new();
467        let metrics = Arc::new(BridgeMetrics::new(&registry));
468        let _handles = BridgeOrchestrator::new(
469            Arc::new(iota_client),
470            iota_events_rx,
471            eth_events_rx,
472            store.clone(),
473            monitor_tx,
474            metrics,
475        )
476        .run(executor)
477        .await;
478
479        // Executor should have received the action
480        let mut digests = std::collections::HashSet::new();
481        digests.insert(executor_requested_action_rx.recv().await.unwrap());
482        digests.insert(executor_requested_action_rx.recv().await.unwrap());
483        assert!(digests.contains(&action1.digest()));
484        assert!(digests.contains(&action2.digest()));
485        assert_eq!(digests.len(), 2);
486    }
487
488    #[expect(clippy::type_complexity)]
489    fn setup() -> (
490        iota_metrics::metered_channel::Sender<(Identifier, Vec<IotaEvent>)>,
491        iota_metrics::metered_channel::Receiver<(Identifier, Vec<IotaEvent>)>,
492        iota_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
493        iota_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
494        iota_metrics::metered_channel::Sender<IotaBridgeEvent>,
495        iota_metrics::metered_channel::Receiver<IotaBridgeEvent>,
496        IotaClient<IotaMockClient>,
497        Arc<BridgeOrchestratorTables>,
498    ) {
499        telemetry_subscribers::init_for_testing();
500        let registry = Registry::new();
501        iota_metrics::init_metrics(&registry);
502
503        init_all_struct_tags();
504
505        let temp_dir = tempfile::tempdir().unwrap();
506        let store = BridgeOrchestratorTables::new(temp_dir.path());
507
508        let mock_client = IotaMockClient::default();
509        let iota_client = IotaClient::new_for_testing(mock_client.clone());
510
511        let (eth_events_tx, eth_events_rx) = iota_metrics::metered_channel::channel(
512            100,
513            &iota_metrics::get_metrics()
514                .unwrap()
515                .channel_inflight
516                .with_label_values(&["unit_test_eth_events_queue"]),
517        );
518
519        let (iota_events_tx, iota_events_rx) = iota_metrics::metered_channel::channel(
520            100,
521            &iota_metrics::get_metrics()
522                .unwrap()
523                .channel_inflight
524                .with_label_values(&["unit_test_iota_events_queue"]),
525        );
526        let (monitor_tx, monitor_rx) = iota_metrics::metered_channel::channel(
527            10000,
528            &iota_metrics::get_metrics()
529                .unwrap()
530                .channel_inflight
531                .with_label_values(&["monitor_queue"]),
532        );
533        (
534            iota_events_tx,
535            iota_events_rx,
536            eth_events_tx,
537            eth_events_rx,
538            monitor_tx,
539            monitor_rx,
540            iota_client,
541            store,
542        )
543    }
544
545    /// A `BridgeActionExecutorTrait` implementation that only tracks the
546    /// submitted actions.
547    struct MockExecutor {
548        requested_transactions_tx: tokio::sync::broadcast::Sender<BridgeActionDigest>,
549    }
550
551    impl MockExecutor {
552        fn new() -> (Self, tokio::sync::broadcast::Receiver<BridgeActionDigest>) {
553            let (tx, rx) = tokio::sync::broadcast::channel(100);
554            (
555                Self {
556                    requested_transactions_tx: tx,
557                },
558                rx,
559            )
560        }
561    }
562
563    impl BridgeActionExecutorTrait for MockExecutor {
564        fn run(
565            self,
566        ) -> (
567            Vec<tokio::task::JoinHandle<()>>,
568            iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
569        ) {
570            let (tx, mut rx) = iota_metrics::metered_channel::channel::<BridgeActionExecutionWrapper>(
571                100,
572                &iota_metrics::get_metrics()
573                    .unwrap()
574                    .channel_inflight
575                    .with_label_values(&["unit_test_mock_executor"]),
576            );
577
578            let handles = tokio::spawn(async move {
579                while let Some(action) = rx.recv().await {
580                    self.requested_transactions_tx
581                        .send(action.0.digest())
582                        .unwrap();
583                }
584            });
585            (vec![handles], tx)
586        }
587    }
588}