1use std::sync::Arc;
12
13use ethers::types::Address as EthAddress;
14use iota_json_rpc_types::IotaEvent;
15use iota_metrics::spawn_logged_monitored_task;
16use iota_types::Identifier;
17use tokio::task::JoinHandle;
18use tracing::{error, info};
19
20use crate::{
21 abi::EthBridgeEvent,
22 action_executor::{
23 BridgeActionExecutionWrapper, BridgeActionExecutorTrait, submit_to_executor,
24 },
25 error::BridgeError,
26 events::IotaBridgeEvent,
27 iota_client::{IotaClient, IotaClientInner},
28 metrics::BridgeMetrics,
29 storage::BridgeOrchestratorTables,
30 types::EthLog,
31};
32
33pub struct BridgeOrchestrator<C> {
34 _iota_client: Arc<IotaClient<C>>,
35 iota_events_rx: iota_metrics::metered_channel::Receiver<(Identifier, Vec<IotaEvent>)>,
36 eth_events_rx: iota_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
37 store: Arc<BridgeOrchestratorTables>,
38 monitor_tx: iota_metrics::metered_channel::Sender<IotaBridgeEvent>,
39 metrics: Arc<BridgeMetrics>,
40}
41
42impl<C> BridgeOrchestrator<C>
43where
44 C: IotaClientInner + 'static,
45{
46 pub fn new(
47 iota_client: Arc<IotaClient<C>>,
48 iota_events_rx: iota_metrics::metered_channel::Receiver<(Identifier, Vec<IotaEvent>)>,
49 eth_events_rx: iota_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
50 store: Arc<BridgeOrchestratorTables>,
51 monitor_tx: iota_metrics::metered_channel::Sender<IotaBridgeEvent>,
52 metrics: Arc<BridgeMetrics>,
53 ) -> Self {
54 Self {
55 _iota_client: iota_client,
56 iota_events_rx,
57 eth_events_rx,
58 store,
59 monitor_tx,
60 metrics,
61 }
62 }
63
64 pub async fn run(
65 self,
66 bridge_action_executor: impl BridgeActionExecutorTrait,
67 ) -> Vec<JoinHandle<()>> {
68 tracing::info!("Starting BridgeOrchestrator");
69 let mut task_handles = vec![];
70 let store_clone = self.store.clone();
71
72 let (handles, executor_sender) = bridge_action_executor.run();
74 task_handles.extend(handles);
75 let executor_sender_clone = executor_sender.clone();
76 let metrics_clone = self.metrics.clone();
77 task_handles.push(spawn_logged_monitored_task!(Self::run_iota_watcher(
78 store_clone,
79 executor_sender_clone,
80 self.iota_events_rx,
81 self.monitor_tx,
82 metrics_clone,
83 )));
84 let store_clone = self.store.clone();
85
86 let actions = store_clone
88 .get_all_pending_actions()
89 .into_values()
90 .collect::<Vec<_>>();
91 for action in actions {
92 submit_to_executor(&executor_sender, action)
93 .await
94 .expect("Submit to executor should not fail");
95 }
96
97 let metrics_clone = self.metrics.clone();
98 task_handles.push(spawn_logged_monitored_task!(Self::run_eth_watcher(
99 store_clone,
100 executor_sender,
101 self.eth_events_rx,
102 metrics_clone,
103 )));
104
105 task_handles
106 }
107
108 async fn run_iota_watcher(
109 store: Arc<BridgeOrchestratorTables>,
110 executor_tx: iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
111 mut iota_events_rx: iota_metrics::metered_channel::Receiver<(Identifier, Vec<IotaEvent>)>,
112 monitor_tx: iota_metrics::metered_channel::Sender<IotaBridgeEvent>,
113 metrics: Arc<BridgeMetrics>,
114 ) {
115 info!("Starting iota watcher task");
116 while let Some((identifier, events)) = iota_events_rx.recv().await {
117 if events.is_empty() {
118 continue;
119 }
120 info!("Received {} IOTA events: {:?}", events.len(), events);
121 metrics
122 .iota_watcher_received_events
123 .inc_by(events.len() as u64);
124 let bridge_events = events
125 .iter()
126 .filter_map(|iota_event| {
127 match IotaBridgeEvent::try_from_iota_event(iota_event) {
128 Ok(bridge_event) => Some(bridge_event),
129 Err(BridgeError::ZeroValueBridgeTransfer(_)) => {
132 error!("Zero value bridge transfer: {:?}", iota_event);
133 None
134 }
135 Err(e) => {
136 panic!(
137 "IOTA Event could not be deserialized to IotaBridgeEvent: {:?}",
138 e
139 );
140 }
141 }
142 })
143 .collect::<Vec<_>>();
144
145 let mut actions = vec![];
146 for (iota_event, opt_bridge_event) in events.iter().zip(bridge_events) {
147 if opt_bridge_event.is_none() {
148 metrics.iota_watcher_unrecognized_events.inc();
150 error!("IOTA event not recognized: {:?}", iota_event);
151 continue;
152 }
153 let bridge_event: IotaBridgeEvent = opt_bridge_event.unwrap();
155 info!("Observed IOTA bridge event: {:?}", bridge_event);
156
157 monitor_tx
159 .send(bridge_event.clone())
160 .await
161 .expect("Sending event to monitor channel should not fail");
162
163 if let Some(action) = bridge_event
164 .try_into_bridge_action(iota_event.id.tx_digest, iota_event.id.event_seq as u16)
165 {
166 actions.push(action);
167 }
168 }
169
170 if !actions.is_empty() {
171 info!(
172 "Received {} actions from IOTA: {:?}",
173 actions.len(),
174 actions
175 );
176 metrics
177 .iota_watcher_received_actions
178 .inc_by(actions.len() as u64);
179 store
181 .insert_pending_actions(&actions)
182 .expect("Store operation should not fail");
183 for action in actions {
184 submit_to_executor(&executor_tx, action)
185 .await
186 .expect("Submit to executor should not fail");
187 }
188 }
189
190 let cursor = events.last().unwrap().id;
192 store
193 .update_iota_event_cursor(identifier, cursor)
194 .expect("Store operation should not fail");
195 }
196 panic!("IOTA event channel was closed unexpectedly");
197 }
198
199 async fn run_eth_watcher(
200 store: Arc<BridgeOrchestratorTables>,
201 executor_tx: iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
202 mut eth_events_rx: iota_metrics::metered_channel::Receiver<(
203 ethers::types::Address,
204 u64,
205 Vec<EthLog>,
206 )>,
207 metrics: Arc<BridgeMetrics>,
208 ) {
209 info!("Starting eth watcher task");
210 while let Some((contract, end_block, logs)) = eth_events_rx.recv().await {
211 if logs.is_empty() {
212 store
213 .update_eth_event_cursor(contract, end_block)
214 .expect("Store operation should not fail");
215 continue;
216 }
217
218 info!("Received {} Eth events", logs.len());
219 metrics
220 .eth_watcher_received_events
221 .inc_by(logs.len() as u64);
222
223 let bridge_events = logs
224 .iter()
225 .map(EthBridgeEvent::try_from_eth_log)
226 .collect::<Vec<_>>();
227
228 let mut actions = vec![];
229 for (log, opt_bridge_event) in logs.iter().zip(bridge_events) {
230 if opt_bridge_event.is_none() {
231 metrics.eth_watcher_unrecognized_events.inc();
233 error!("Eth event not recognized: {:?}", log);
234 continue;
235 }
236 let bridge_event = opt_bridge_event.unwrap();
238 info!("Observed Eth bridge event: {:?}", bridge_event);
239
240 match bridge_event.try_into_bridge_action(log.tx_hash, log.log_index_in_tx) {
241 Ok(Some(action)) => actions.push(action),
242 Ok(None) => {}
243 Err(e) => {
244 error!(eth_tx_hash=?log.tx_hash, eth_event_index=?log.log_index_in_tx, "Error converting EthBridgeEvent to BridgeAction: {:?}", e);
245 }
246 }
247 }
249 if !actions.is_empty() {
250 info!("Received {} actions from Eth: {:?}", actions.len(), actions);
251 metrics
252 .eth_watcher_received_actions
253 .inc_by(actions.len() as u64);
254 store
256 .insert_pending_actions(&actions)
257 .expect("Store operation should not fail");
258 for action in actions {
261 submit_to_executor(&executor_tx, action)
262 .await
263 .expect("Submit to executor should not fail");
264 }
265 }
266
267 store
268 .update_eth_event_cursor(contract, end_block)
269 .expect("Store operation should not fail");
270 }
271 panic!("Eth event channel was closed");
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use std::str::FromStr;
278
279 use ethers::types::{Address as EthAddress, TxHash};
280 use prometheus::Registry;
281
282 use super::*;
283 use crate::{
284 events::{init_all_struct_tags, tests::get_test_iota_event_and_action},
285 iota_mock_client::IotaMockClient,
286 test_utils::{
287 get_test_eth_to_iota_bridge_action, get_test_iota_to_eth_bridge_action,
288 get_test_log_and_action,
289 },
290 types::BridgeActionDigest,
291 };
292
293 #[tokio::test]
294 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
295 async fn test_iota_watcher_task() {
296 let (
300 iota_events_tx,
301 iota_events_rx,
302 _eth_events_tx,
303 eth_events_rx,
304 monitor_tx,
305 _monitor_rx,
306 iota_client,
307 store,
308 ) = setup();
309 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
310 let registry = Registry::new();
312 let metrics = Arc::new(BridgeMetrics::new(®istry));
313 let _handles = BridgeOrchestrator::new(
314 Arc::new(iota_client),
315 iota_events_rx,
316 eth_events_rx,
317 store.clone(),
318 monitor_tx,
319 metrics,
320 )
321 .run(executor)
322 .await;
323
324 let identifier = Identifier::from_str("test_iota_watcher_task").unwrap();
325 let (iota_event, bridge_action) = get_test_iota_event_and_action(identifier.clone());
326 iota_events_tx
327 .send((identifier.clone(), vec![iota_event.clone()]))
328 .await
329 .unwrap();
330
331 let start = std::time::Instant::now();
332 assert_eq!(
334 executor_requested_action_rx.recv().await.unwrap(),
335 bridge_action.digest()
336 );
337 loop {
338 let actions = store.get_all_pending_actions();
339 if actions.is_empty() {
340 if start.elapsed().as_secs() > 5 {
341 panic!("Timed out waiting for action to be written to WAL");
342 }
343 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
344 continue;
345 }
346 assert_eq!(actions.len(), 1);
347 let action = actions.get(&bridge_action.digest()).unwrap();
348 assert_eq!(action, &bridge_action);
349 assert_eq!(
350 store.get_iota_event_cursors(&[identifier]).unwrap()[0].unwrap(),
351 iota_event.id,
352 );
353 break;
354 }
355 }
356
357 #[tokio::test]
358 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
359 async fn test_eth_watcher_task() {
360 let (
367 _iota_events_tx,
368 iota_events_rx,
369 eth_events_tx,
370 eth_events_rx,
371 monitor_tx,
372 _monitor_rx,
373 iota_client,
374 store,
375 ) = setup();
376 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
377 let registry = Registry::new();
379 let metrics = Arc::new(BridgeMetrics::new(®istry));
380 let _handles = BridgeOrchestrator::new(
381 Arc::new(iota_client),
382 iota_events_rx,
383 eth_events_rx,
384 store.clone(),
385 monitor_tx,
386 metrics,
387 )
388 .run(executor)
389 .await;
390 let address = EthAddress::random();
391 let (log, bridge_action) = get_test_log_and_action(address, TxHash::random(), 10);
392 let log_index_in_tx = 10;
393 let log_block_num = log.block_number.unwrap().as_u64();
394 let eth_log = EthLog {
395 log: log.clone(),
396 tx_hash: log.transaction_hash.unwrap(),
397 block_number: log_block_num,
398 log_index_in_tx,
399 };
400 let end_block_num = log_block_num + 15;
401
402 eth_events_tx
403 .send((address, end_block_num, vec![eth_log.clone()]))
404 .await
405 .unwrap();
406
407 assert_eq!(
409 executor_requested_action_rx.recv().await.unwrap(),
410 bridge_action.digest()
411 );
412 let start = std::time::Instant::now();
413 loop {
414 let actions = store.get_all_pending_actions();
415 if actions.is_empty() {
416 if start.elapsed().as_secs() > 5 {
417 panic!("Timed out waiting for action to be written to WAL");
418 }
419 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
420 continue;
421 }
422 assert_eq!(actions.len(), 1);
423 let action = actions.get(&bridge_action.digest()).unwrap();
424 assert_eq!(action, &bridge_action);
425 assert_eq!(
426 store.get_eth_event_cursors(&[address]).unwrap()[0].unwrap(),
427 end_block_num,
428 );
429 break;
430 }
431 }
432
433 #[tokio::test]
434 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
435 async fn test_resume_actions_in_pending_logs() {
438 let (
439 _iota_events_tx,
440 iota_events_rx,
441 _eth_events_tx,
442 eth_events_rx,
443 monitor_tx,
444 _monitor_rx,
445 iota_client,
446 store,
447 ) = setup();
448 let (executor, mut executor_requested_action_rx) = MockExecutor::new();
449
450 let action1 = get_test_iota_to_eth_bridge_action(
451 None,
452 Some(0),
453 Some(99),
454 Some(10000),
455 None,
456 None,
457 None,
458 );
459
460 let action2 = get_test_eth_to_iota_bridge_action(None, None, None, None);
461 store
462 .insert_pending_actions(&vec![action1.clone(), action2.clone()])
463 .unwrap();
464
465 let registry = Registry::new();
467 let metrics = Arc::new(BridgeMetrics::new(®istry));
468 let _handles = BridgeOrchestrator::new(
469 Arc::new(iota_client),
470 iota_events_rx,
471 eth_events_rx,
472 store.clone(),
473 monitor_tx,
474 metrics,
475 )
476 .run(executor)
477 .await;
478
479 let mut digests = std::collections::HashSet::new();
481 digests.insert(executor_requested_action_rx.recv().await.unwrap());
482 digests.insert(executor_requested_action_rx.recv().await.unwrap());
483 assert!(digests.contains(&action1.digest()));
484 assert!(digests.contains(&action2.digest()));
485 assert_eq!(digests.len(), 2);
486 }
487
488 #[expect(clippy::type_complexity)]
489 fn setup() -> (
490 iota_metrics::metered_channel::Sender<(Identifier, Vec<IotaEvent>)>,
491 iota_metrics::metered_channel::Receiver<(Identifier, Vec<IotaEvent>)>,
492 iota_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
493 iota_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
494 iota_metrics::metered_channel::Sender<IotaBridgeEvent>,
495 iota_metrics::metered_channel::Receiver<IotaBridgeEvent>,
496 IotaClient<IotaMockClient>,
497 Arc<BridgeOrchestratorTables>,
498 ) {
499 telemetry_subscribers::init_for_testing();
500 let registry = Registry::new();
501 iota_metrics::init_metrics(®istry);
502
503 init_all_struct_tags();
504
505 let temp_dir = tempfile::tempdir().unwrap();
506 let store = BridgeOrchestratorTables::new(temp_dir.path());
507
508 let mock_client = IotaMockClient::default();
509 let iota_client = IotaClient::new_for_testing(mock_client.clone());
510
511 let (eth_events_tx, eth_events_rx) = iota_metrics::metered_channel::channel(
512 100,
513 &iota_metrics::get_metrics()
514 .unwrap()
515 .channel_inflight
516 .with_label_values(&["unit_test_eth_events_queue"]),
517 );
518
519 let (iota_events_tx, iota_events_rx) = iota_metrics::metered_channel::channel(
520 100,
521 &iota_metrics::get_metrics()
522 .unwrap()
523 .channel_inflight
524 .with_label_values(&["unit_test_iota_events_queue"]),
525 );
526 let (monitor_tx, monitor_rx) = iota_metrics::metered_channel::channel(
527 10000,
528 &iota_metrics::get_metrics()
529 .unwrap()
530 .channel_inflight
531 .with_label_values(&["monitor_queue"]),
532 );
533 (
534 iota_events_tx,
535 iota_events_rx,
536 eth_events_tx,
537 eth_events_rx,
538 monitor_tx,
539 monitor_rx,
540 iota_client,
541 store,
542 )
543 }
544
545 struct MockExecutor {
548 requested_transactions_tx: tokio::sync::broadcast::Sender<BridgeActionDigest>,
549 }
550
551 impl MockExecutor {
552 fn new() -> (Self, tokio::sync::broadcast::Receiver<BridgeActionDigest>) {
553 let (tx, rx) = tokio::sync::broadcast::channel(100);
554 (
555 Self {
556 requested_transactions_tx: tx,
557 },
558 rx,
559 )
560 }
561 }
562
563 impl BridgeActionExecutorTrait for MockExecutor {
564 fn run(
565 self,
566 ) -> (
567 Vec<tokio::task::JoinHandle<()>>,
568 iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
569 ) {
570 let (tx, mut rx) = iota_metrics::metered_channel::channel::<BridgeActionExecutionWrapper>(
571 100,
572 &iota_metrics::get_metrics()
573 .unwrap()
574 .channel_inflight
575 .with_label_values(&["unit_test_mock_executor"]),
576 );
577
578 let handles = tokio::spawn(async move {
579 while let Some(action) = rx.recv().await {
580 self.requested_transactions_tx
581 .send(action.0.digest())
582 .unwrap();
583 }
584 });
585 (vec![handles], tx)
586 }
587 }
588}