1use std::{collections::HashMap, sync::Arc};
9
10use arc_swap::ArcSwap;
11use iota_json_rpc_types::{
12 IotaExecutionStatus, IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse,
13};
14use iota_metrics::spawn_logged_monitored_task;
15use iota_types::{
16 TypeTag,
17 base_types::{IotaAddress, ObjectID, ObjectRef},
18 crypto::{IotaKeyPair, Signature},
19 digests::TransactionDigest,
20 gas_coin::GasCoin,
21 object::Owner,
22 transaction::{ObjectArg, Transaction},
23};
24use shared_crypto::intent::{Intent, IntentMessage};
25use tokio::{sync::Semaphore, time::Duration};
26use tracing::{Instrument, error, info, instrument, warn};
27
28use crate::{
29 client::bridge_authority_aggregator::BridgeAuthorityAggregator,
30 error::BridgeError,
31 events::{
32 TokenTransferAlreadyApproved, TokenTransferAlreadyClaimed, TokenTransferApproved,
33 TokenTransferClaimed,
34 },
35 iota_client::{IotaClient, IotaClientInner},
36 iota_transaction_builder::build_iota_transaction,
37 metrics::BridgeMetrics,
38 retry_with_max_elapsed_time,
39 storage::BridgeOrchestratorTables,
40 types::{BridgeAction, BridgeActionStatus, IsBridgePaused, VerifiedCertifiedBridgeAction},
41};
42
43pub const CHANNEL_SIZE: usize = 1000;
44pub const SIGNING_CONCURRENCY: usize = 10;
45
46pub const MAX_SIGNING_ATTEMPTS: u64 = 16;
50pub const MAX_EXECUTION_ATTEMPTS: u64 = 16;
51
52async fn delay(attempt_times: u64) {
53 let delay_ms = 100 * (2 ^ attempt_times);
54 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
55}
56
57#[derive(Debug)]
58pub struct BridgeActionExecutionWrapper(pub BridgeAction, pub u64);
59
60#[derive(Debug)]
61pub struct CertifiedBridgeActionExecutionWrapper(pub VerifiedCertifiedBridgeAction, pub u64);
62
63pub trait BridgeActionExecutorTrait {
64 fn run(
65 self,
66 ) -> (
67 Vec<tokio::task::JoinHandle<()>>,
68 iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
69 );
70}
71
72pub struct BridgeActionExecutor<C> {
73 iota_client: Arc<IotaClient<C>>,
74 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
75 key: IotaKeyPair,
76 iota_address: IotaAddress,
77 gas_object_id: ObjectID,
78 store: Arc<BridgeOrchestratorTables>,
79 bridge_object_arg: ObjectArg,
80 iota_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
81 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
82 metrics: Arc<BridgeMetrics>,
83}
84
85impl<C> BridgeActionExecutorTrait for BridgeActionExecutor<C>
86where
87 C: IotaClientInner + 'static,
88{
89 fn run(
90 self,
91 ) -> (
92 Vec<tokio::task::JoinHandle<()>>,
93 iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
94 ) {
95 let (tasks, sender, _) = self.run_inner();
96 (tasks, sender)
97 }
98}
99
100impl<C> BridgeActionExecutor<C>
101where
102 C: IotaClientInner + 'static,
103{
104 pub async fn new(
105 iota_client: Arc<IotaClient<C>>,
106 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
107 store: Arc<BridgeOrchestratorTables>,
108 key: IotaKeyPair,
109 iota_address: IotaAddress,
110 gas_object_id: ObjectID,
111 iota_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
112 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
113 metrics: Arc<BridgeMetrics>,
114 ) -> Self {
115 let bridge_object_arg = iota_client
116 .get_mutable_bridge_object_arg_must_succeed()
117 .await;
118 Self {
119 iota_client,
120 bridge_auth_agg,
121 store,
122 key,
123 gas_object_id,
124 iota_address,
125 bridge_object_arg,
126 iota_token_type_tags,
127 bridge_pause_rx,
128 metrics,
129 }
130 }
131
132 fn run_inner(
133 self,
134 ) -> (
135 Vec<tokio::task::JoinHandle<()>>,
136 iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
137 iota_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
138 ) {
139 let key = self.key;
140
141 let (sender, receiver) = iota_metrics::metered_channel::channel(
142 CHANNEL_SIZE,
143 &iota_metrics::get_metrics()
144 .unwrap()
145 .channel_inflight
146 .with_label_values(&["executor_signing_queue"]),
147 );
148
149 let (execution_tx, execution_rx) = iota_metrics::metered_channel::channel(
150 CHANNEL_SIZE,
151 &iota_metrics::get_metrics()
152 .unwrap()
153 .channel_inflight
154 .with_label_values(&["executor_execution_queue"]),
155 );
156 let execution_tx_clone = execution_tx.clone();
157 let sender_clone = sender.clone();
158 let store_clone = self.store.clone();
159 let client_clone = self.iota_client.clone();
160 let mut tasks = vec![];
161 let metrics = self.metrics.clone();
162 tasks.push(spawn_logged_monitored_task!(
163 Self::run_signature_aggregation_loop(
164 client_clone,
165 self.bridge_auth_agg,
166 store_clone,
167 sender_clone,
168 receiver,
169 execution_tx_clone,
170 metrics,
171 )
172 ));
173
174 let metrics = self.metrics.clone();
175 let execution_tx_clone = execution_tx.clone();
176 tasks.push(spawn_logged_monitored_task!(
177 Self::run_onchain_execution_loop(
178 self.iota_client.clone(),
179 key,
180 self.iota_address,
181 self.gas_object_id,
182 self.store.clone(),
183 execution_tx_clone,
184 execution_rx,
185 self.bridge_object_arg,
186 self.iota_token_type_tags,
187 self.bridge_pause_rx,
188 metrics,
189 )
190 ));
191 (tasks, sender, execution_tx)
192 }
193
194 async fn run_signature_aggregation_loop(
195 iota_client: Arc<IotaClient<C>>,
196 auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
197 store: Arc<BridgeOrchestratorTables>,
198 signing_queue_sender: iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
199 mut signing_queue_receiver: iota_metrics::metered_channel::Receiver<
200 BridgeActionExecutionWrapper,
201 >,
202 execution_queue_sender: iota_metrics::metered_channel::Sender<
203 CertifiedBridgeActionExecutionWrapper,
204 >,
205 metrics: Arc<BridgeMetrics>,
206 ) {
207 info!("Starting run_signature_aggregation_loop");
208 let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY));
209 while let Some(action) = signing_queue_receiver.recv().await {
210 Self::handle_signing_task(
211 &semaphore,
212 &auth_agg,
213 &signing_queue_sender,
214 &execution_queue_sender,
215 &iota_client,
216 &store,
217 action,
218 &metrics,
219 )
220 .await;
221 }
222 }
223
224 async fn should_proceed_signing(iota_client: &Arc<IotaClient<C>>) -> bool {
225 let Ok(Ok(is_paused)) =
226 retry_with_max_elapsed_time!(iota_client.is_bridge_paused(), Duration::from_secs(600))
227 else {
228 error!("Failed to get bridge status after retry");
229 return false;
230 };
231 !is_paused
232 }
233
234 #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
235 async fn handle_signing_task(
236 semaphore: &Arc<Semaphore>,
237 auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
238 signing_queue_sender: &iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
239 execution_queue_sender: &iota_metrics::metered_channel::Sender<
240 CertifiedBridgeActionExecutionWrapper,
241 >,
242 iota_client: &Arc<IotaClient<C>>,
243 store: &Arc<BridgeOrchestratorTables>,
244 action: BridgeActionExecutionWrapper,
245 metrics: &Arc<BridgeMetrics>,
246 ) {
247 metrics.action_executor_signing_queue_received_actions.inc();
248 let action_key = action.0.key();
249 info!("Received action for signing: {:?}", action.0);
250
251 let should_proceed = Self::should_proceed_signing(iota_client).await;
257 if !should_proceed {
258 metrics.action_executor_signing_queue_skipped_actions.inc();
259 warn!("skipping signing task: {:?}", action_key);
260 return;
261 }
262
263 let auth_agg_clone = auth_agg.clone();
264 let signing_queue_sender_clone = signing_queue_sender.clone();
265 let execution_queue_sender_clone = execution_queue_sender.clone();
266 let iota_client_clone = iota_client.clone();
267 let store_clone = store.clone();
268 let metrics_clone = metrics.clone();
269 let semaphore_clone = semaphore.clone();
270 spawn_logged_monitored_task!(
271 Self::request_signatures(
272 semaphore_clone,
273 iota_client_clone,
274 auth_agg_clone,
275 action,
276 store_clone,
277 signing_queue_sender_clone,
278 execution_queue_sender_clone,
279 metrics_clone,
280 )
281 .instrument(tracing::debug_span!("request_signatures", action_key=?action_key)),
282 "request_signatures"
283 );
284 }
285
286 async fn handle_already_processed_token_transfer_action_maybe(
290 iota_client: &Arc<IotaClient<C>>,
291 action: &BridgeAction,
292 store: &Arc<BridgeOrchestratorTables>,
293 metrics: &Arc<BridgeMetrics>,
294 ) -> bool {
295 let status = iota_client
296 .get_token_transfer_action_onchain_status_until_success(
297 action.chain_id() as u8,
298 action.seq_number(),
299 )
300 .await;
301 match status {
302 BridgeActionStatus::Approved | BridgeActionStatus::Claimed => {
303 info!(
304 "Action already approved or claimed, removing action from pending logs: {:?}",
305 action
306 );
307 metrics.action_executor_already_processed_actions.inc();
308 store
309 .remove_pending_actions(&[action.digest()])
310 .unwrap_or_else(|e| {
311 panic!("Write to DB should not fail: {:?}", e);
312 });
313 true
314 }
315 BridgeActionStatus::Pending | BridgeActionStatus::NotFound => false,
318 }
319 }
320
321 async fn request_signatures(
324 semaphore: Arc<Semaphore>,
325 iota_client: Arc<IotaClient<C>>,
326 auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
327 action: BridgeActionExecutionWrapper,
328 store: Arc<BridgeOrchestratorTables>,
329 signing_queue_sender: iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
330 execution_queue_sender: iota_metrics::metered_channel::Sender<
331 CertifiedBridgeActionExecutionWrapper,
332 >,
333 metrics: Arc<BridgeMetrics>,
334 ) {
335 let _permit = semaphore
336 .acquire()
337 .await
338 .expect("semaphore should not be closed");
339 info!("requesting signatures");
340 let BridgeActionExecutionWrapper(action, attempt_times) = action;
341
342 match &action {
344 BridgeAction::IotaToEthBridgeAction(_) | BridgeAction::EthToIotaBridgeAction(_) => (),
345 _ => unreachable!("Non token transfer action should not reach here"),
346 };
347
348 if Self::handle_already_processed_token_transfer_action_maybe(
350 &iota_client,
351 &action,
352 &store,
353 &metrics,
354 )
355 .await
356 {
357 return;
358 }
359 match auth_agg
360 .load()
361 .request_committee_signatures(action.clone())
362 .await
363 {
364 Ok(certificate) => {
365 info!("Sending certificate to execution");
366 execution_queue_sender
367 .send(CertifiedBridgeActionExecutionWrapper(certificate, 0))
368 .await
369 .unwrap_or_else(|e| {
370 panic!("Sending to execution queue should not fail: {:?}", e);
371 });
372 }
373 Err(e) => {
374 warn!("Failed to collect sigs for bridge action: {:?}", e);
375 metrics.err_signature_aggregation.inc();
376
377 if attempt_times >= MAX_SIGNING_ATTEMPTS {
379 error!(
380 "Manual intervention is required. Failed to collect sigs for bridge action after {MAX_SIGNING_ATTEMPTS} attempts: {:?}",
381 e
382 );
383 return;
384 }
385 delay(attempt_times).await;
386 signing_queue_sender
387 .send(BridgeActionExecutionWrapper(action, attempt_times + 1))
388 .await
389 .unwrap_or_else(|e| {
390 panic!("Sending to signing queue should not fail: {:?}", e);
391 });
392 }
393 }
394 }
395
396 async fn run_onchain_execution_loop(
399 iota_client: Arc<IotaClient<C>>,
400 iota_key: IotaKeyPair,
401 iota_address: IotaAddress,
402 gas_object_id: ObjectID,
403 store: Arc<BridgeOrchestratorTables>,
404 execution_queue_sender: iota_metrics::metered_channel::Sender<
405 CertifiedBridgeActionExecutionWrapper,
406 >,
407 mut execution_queue_receiver: iota_metrics::metered_channel::Receiver<
408 CertifiedBridgeActionExecutionWrapper,
409 >,
410 bridge_object_arg: ObjectArg,
411 iota_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
412 bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
413 metrics: Arc<BridgeMetrics>,
414 ) {
415 info!("Starting run_onchain_execution_loop");
416 while let Some(certificate_wrapper) = execution_queue_receiver.recv().await {
417 if *bridge_pause_rx.borrow() {
421 warn!("Bridge is paused, skipping execution");
422 metrics
423 .action_executor_execution_queue_skipped_actions_due_to_pausing
424 .inc();
425 continue;
426 }
427 Self::handle_execution_task(
428 certificate_wrapper,
429 &iota_client,
430 &iota_key,
431 &iota_address,
432 gas_object_id,
433 &store,
434 &execution_queue_sender,
435 &bridge_object_arg,
436 &iota_token_type_tags,
437 &metrics,
438 )
439 .await;
440 }
441 panic!("Execution queue closed unexpectedly");
442 }
443
444 #[instrument(level = "error", skip_all, fields(action_key=?certificate_wrapper.0.data().key(), attempt_times=?certificate_wrapper.1))]
445 async fn handle_execution_task(
446 certificate_wrapper: CertifiedBridgeActionExecutionWrapper,
447 iota_client: &Arc<IotaClient<C>>,
448 iota_key: &IotaKeyPair,
449 iota_address: &IotaAddress,
450 gas_object_id: ObjectID,
451 store: &Arc<BridgeOrchestratorTables>,
452 execution_queue_sender: &iota_metrics::metered_channel::Sender<
453 CertifiedBridgeActionExecutionWrapper,
454 >,
455 bridge_object_arg: &ObjectArg,
456 iota_token_type_tags: &ArcSwap<HashMap<u8, TypeTag>>,
457 metrics: &Arc<BridgeMetrics>,
458 ) {
459 metrics
460 .action_executor_execution_queue_received_actions
461 .inc();
462 let CertifiedBridgeActionExecutionWrapper(certificate, attempt_times) = certificate_wrapper;
463 let action = certificate.data();
464 let action_key = action.key();
465
466 info!("Received certified action for execution: {:?}", action);
467
468 let (gas_coin, gas_object_ref) =
470 Self::get_gas_data_assert_ownership(*iota_address, gas_object_id, iota_client).await;
471 metrics.gas_coin_balance.set(gas_coin.value() as i64);
472
473 let ceriticate_clone = certificate.clone();
474
475 if Self::handle_already_processed_token_transfer_action_maybe(
477 iota_client,
478 action,
479 store,
480 metrics,
481 )
482 .await
483 {
484 info!("Action already processed, skipping");
485 return;
486 }
487
488 info!("Building IOTA transaction");
489 let rgp = iota_client.get_reference_gas_price_until_success().await;
490 let tx_data = match build_iota_transaction(
491 *iota_address,
492 &gas_object_ref,
493 ceriticate_clone,
494 *bridge_object_arg,
495 iota_token_type_tags.load().as_ref(),
496 rgp,
497 ) {
498 Ok(tx_data) => tx_data,
499 Err(err) => {
500 metrics.err_build_iota_transaction.inc();
501 error!(
502 "Manual intervention is required. Failed to build transaction for action {:?}: {:?}",
503 action, err
504 );
505 return;
508 }
509 };
510 let sig = Signature::new_secure(
511 &IntentMessage::new(Intent::iota_transaction(), &tx_data),
512 iota_key,
513 );
514 let signed_tx = Transaction::from_data(tx_data, vec![sig]);
515 let tx_digest = *signed_tx.digest();
516
517 if Self::handle_already_processed_token_transfer_action_maybe(
519 iota_client,
520 action,
521 store,
522 metrics,
523 )
524 .await
525 {
526 info!("Action already processed, skipping");
527 return;
528 }
529
530 info!(?tx_digest, ?gas_object_ref, "Sending transaction to IOTA");
531 match iota_client
532 .execute_transaction_block_with_effects(signed_tx)
533 .await
534 {
535 Ok(resp) => {
536 Self::handle_execution_effects(tx_digest, resp, store, action, metrics).await
537 }
538
539 Err(err) => {
541 error!(
542 ?action_key,
543 ?tx_digest,
544 "IOTA transaction failed at signing: {err:?}"
545 );
546 metrics.err_iota_transaction_submission.inc();
547 let metrics_clone = metrics.clone();
548 let sender_clone = execution_queue_sender.clone();
550 spawn_logged_monitored_task!(async move {
551 metrics_clone
553 .err_iota_transaction_submission_too_many_failures
554 .inc();
555 if attempt_times >= MAX_EXECUTION_ATTEMPTS {
556 error!("Manual intervention is required. Failed to collect execute transaction for bridge action after {MAX_EXECUTION_ATTEMPTS} attempts: {:?}", err);
557 return;
558 }
559 delay(attempt_times).await;
560 sender_clone
561 .send(CertifiedBridgeActionExecutionWrapper(
562 certificate,
563 attempt_times + 1,
564 ))
565 .await
566 .unwrap_or_else(|e| {
567 panic!("Sending to execution queue should not fail: {:?}", e);
568 });
569 info!("Re-enqueued certificate for execution");
570 }.instrument(tracing::debug_span!("reenqueue_execution_task", action_key=?action_key)));
571 }
572 }
573 }
574
575 async fn handle_execution_effects(
577 tx_digest: TransactionDigest,
578 response: IotaTransactionBlockResponse,
579 store: &Arc<BridgeOrchestratorTables>,
580 action: &BridgeAction,
581 metrics: &Arc<BridgeMetrics>,
582 ) {
583 let effects = response
584 .effects
585 .clone()
586 .expect("We requested effects but got None.");
587 let status = effects.status();
588 match status {
589 IotaExecutionStatus::Success => {
590 let events = response.events.expect("We requested events but got None.");
591 assert!(
594 events.data.iter().any(|e| e.type_
595 == *TokenTransferAlreadyClaimed.get().unwrap()
596 || e.type_ == *TokenTransferClaimed.get().unwrap()
597 || e.type_ == *TokenTransferApproved.get().unwrap()
598 || e.type_ == *TokenTransferAlreadyApproved.get().unwrap()),
599 "Expected TokenTransferAlreadyClaimed, TokenTransferClaimed, TokenTransferApproved or TokenTransferAlreadyApproved event but got: {:?}",
600 events,
601 );
602 info!(?tx_digest, "IOTA transaction executed successfully");
603 store
604 .remove_pending_actions(&[action.digest()])
605 .unwrap_or_else(|e| {
606 panic!("Write to DB should not fail: {:?}", e);
607 })
608 }
609 IotaExecutionStatus::Failure { error } => {
610 metrics.err_iota_transaction_execution.inc();
618 error!(
619 ?tx_digest,
620 "Manual intervention is needed. IOTA transaction executed and failed with error: {error:?}"
621 );
622 }
623 }
624 }
625
626 async fn get_gas_data_assert_ownership(
628 iota_address: IotaAddress,
629 gas_object_id: ObjectID,
630 iota_client: &IotaClient<C>,
631 ) -> (GasCoin, ObjectRef) {
632 let (gas_coin, gas_obj_ref, owner) = iota_client
633 .get_gas_data_panic_if_not_gas(gas_object_id)
634 .await;
635
636 assert_eq!(
639 owner,
640 Owner::AddressOwner(iota_address),
641 "Gas object {:?} is no longer owned by address {}",
642 gas_object_id,
643 iota_address
644 );
645 (gas_coin, gas_obj_ref)
646 }
647}
648
649pub async fn submit_to_executor(
650 tx: &iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
651 action: BridgeAction,
652) -> Result<(), BridgeError> {
653 tx.send(BridgeActionExecutionWrapper(action, 0))
654 .await
655 .map_err(|e| BridgeError::Generic(e.to_string()))
656}
657
658#[cfg(test)]
659mod tests {
660 use std::{
661 collections::{BTreeMap, HashMap},
662 str::FromStr,
663 };
664
665 use fastcrypto::traits::KeyPair;
666 use iota_json_rpc_types::{
667 IotaEvent, IotaTransactionBlockEffects, IotaTransactionBlockEvents,
668 IotaTransactionBlockResponse,
669 };
670 use iota_types::{
671 TypeTag, base_types::random_object_ref, crypto::get_key_pair, gas_coin::GasCoin,
672 transaction::TransactionData,
673 };
674 use prometheus::Registry;
675
676 use super::*;
677 use crate::{
678 crypto::{
679 BridgeAuthorityKeyPair, BridgeAuthorityPublicKeyBytes,
680 BridgeAuthorityRecoverableSignature,
681 },
682 events::init_all_struct_tags,
683 iota_mock_client::IotaMockClient,
684 server::mock_handler::BridgeRequestMockHandler,
685 test_utils::{
686 DUMMY_MUTABLE_BRIDGE_OBJECT_ARG, get_test_authorities_and_run_mock_bridge_server,
687 get_test_eth_to_iota_bridge_action, get_test_iota_to_eth_bridge_action,
688 sign_action_with_key,
689 },
690 types::{
691 BRIDGE_PAUSED, BridgeCommittee, BridgeCommitteeValiditySignInfo, CertifiedBridgeAction,
692 },
693 };
694
695 #[tokio::test]
696 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
697 async fn test_onchain_execution_loop() {
698 let SetupData {
699 signing_tx,
700 iota_client_mock,
701 mut tx_subscription,
702 store,
703 secrets,
704 dummy_iota_key,
705 mock0,
706 mock1,
707 mock2,
708 mock3,
709 gas_object_ref,
710 iota_address,
711 iota_token_type_tags,
712 ..
713 } = setup().await;
714 let (action_certificate, _, _) = get_bridge_authority_approved_action(
715 vec![&mock0, &mock1, &mock2, &mock3],
716 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
717 None,
718 true,
719 );
720 let action = action_certificate.data().clone();
721 let id_token_map = (*iota_token_type_tags.load().clone()).clone();
722 let tx_data = build_iota_transaction(
723 iota_address,
724 &gas_object_ref,
725 action_certificate,
726 DUMMY_MUTABLE_BRIDGE_OBJECT_ARG,
727 &id_token_map,
728 1000,
729 )
730 .unwrap();
731
732 let tx_digest = get_tx_digest(tx_data, &dummy_iota_key);
733
734 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); iota_client_mock.add_gas_object_info(
736 gas_coin.clone(),
737 gas_object_ref,
738 Owner::AddressOwner(iota_address),
739 );
740
741 let mut event = IotaEvent::random_for_testing();
743 event.type_ = TokenTransferClaimed.get().unwrap().clone();
744 let events = vec![event];
745 mock_transaction_response(
746 &iota_client_mock,
747 tx_digest,
748 IotaExecutionStatus::Success,
749 Some(events),
750 true,
751 );
752
753 store.insert_pending_actions(&[action.clone()]).unwrap();
754 assert_eq!(
755 store.get_all_pending_actions()[&action.digest()],
756 action.clone()
757 );
758
759 submit_to_executor(&signing_tx, action.clone())
761 .await
762 .unwrap();
763
764 tx_subscription.recv().await.unwrap();
767 assert!(store.get_all_pending_actions().is_empty());
768
769 let (action_certificate, _, _) = get_bridge_authority_approved_action(
776 vec![&mock0, &mock1, &mock2, &mock3],
777 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
778 None,
779 true,
780 );
781
782 let action = action_certificate.data().clone();
783
784 let tx_data = build_iota_transaction(
785 iota_address,
786 &gas_object_ref,
787 action_certificate,
788 DUMMY_MUTABLE_BRIDGE_OBJECT_ARG,
789 &id_token_map,
790 1000,
791 )
792 .unwrap();
793 let tx_digest = get_tx_digest(tx_data, &dummy_iota_key);
794
795 mock_transaction_response(
797 &iota_client_mock,
798 tx_digest,
799 IotaExecutionStatus::Failure {
800 error: "failure is mother of success".to_string(),
801 },
802 None,
803 true,
804 );
805
806 store.insert_pending_actions(&[action.clone()]).unwrap();
807 assert_eq!(
808 store.get_all_pending_actions()[&action.digest()],
809 action.clone()
810 );
811
812 submit_to_executor(&signing_tx, action.clone())
814 .await
815 .unwrap();
816
817 tx_subscription.recv().await.unwrap();
819 assert_eq!(
821 store.get_all_pending_actions()[&action.digest()],
822 action.clone()
823 );
824
825 let (action_certificate, _, _) = get_bridge_authority_approved_action(
832 vec![&mock0, &mock1, &mock2, &mock3],
833 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
834 None,
835 true,
836 );
837
838 let action = action_certificate.data().clone();
839
840 let tx_data = build_iota_transaction(
841 iota_address,
842 &gas_object_ref,
843 action_certificate,
844 DUMMY_MUTABLE_BRIDGE_OBJECT_ARG,
845 &id_token_map,
846 1000,
847 )
848 .unwrap();
849 let tx_digest = get_tx_digest(tx_data, &dummy_iota_key);
850 mock_transaction_error(
851 &iota_client_mock,
852 tx_digest,
853 BridgeError::Generic("some random error".to_string()),
854 true,
855 );
856
857 store.insert_pending_actions(&[action.clone()]).unwrap();
858 assert_eq!(
859 store.get_all_pending_actions()[&action.digest()],
860 action.clone()
861 );
862
863 submit_to_executor(&signing_tx, action.clone())
865 .await
866 .unwrap();
867
868 let tx_digest = tx_subscription.recv().await.unwrap();
870 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
871
872 assert!(
874 store
875 .get_all_pending_actions()
876 .contains_key(&action.digest())
877 );
878
879 let mut event = IotaEvent::random_for_testing();
881 event.type_ = TokenTransferClaimed.get().unwrap().clone();
882 let events = vec![event];
883 mock_transaction_response(
884 &iota_client_mock,
885 tx_digest,
886 IotaExecutionStatus::Success,
887 Some(events),
888 true,
889 );
890
891 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
893 assert!(
895 !store
896 .get_all_pending_actions()
897 .contains_key(&action.digest())
898 );
899 }
900
901 #[tokio::test]
902 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
903 async fn test_signature_aggregation_loop() {
904 let SetupData {
905 signing_tx,
906 iota_client_mock,
907 mut tx_subscription,
908 store,
909 secrets,
910 dummy_iota_key,
911 mock0,
912 mock1,
913 mock2,
914 mock3,
915 gas_object_ref,
916 iota_address,
917 iota_token_type_tags,
918 ..
919 } = setup().await;
920 let id_token_map = (*iota_token_type_tags.load().clone()).clone();
921 let (action_certificate, iota_tx_digest, iota_tx_event_index) =
922 get_bridge_authority_approved_action(
923 vec![&mock0, &mock1, &mock2, &mock3],
924 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
925 None,
926 true,
927 );
928 let action = action_certificate.data().clone();
929 mock_bridge_authority_signing_errors(
930 vec![&mock0, &mock1, &mock2],
931 iota_tx_digest,
932 iota_tx_event_index,
933 );
934 let mut sigs = mock_bridge_authority_sigs(
935 vec![&mock3],
936 &action,
937 vec![&secrets[3]],
938 iota_tx_digest,
939 iota_tx_event_index,
940 );
941
942 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); iota_client_mock.add_gas_object_info(
944 gas_coin,
945 gas_object_ref,
946 Owner::AddressOwner(iota_address),
947 );
948 store.insert_pending_actions(&[action.clone()]).unwrap();
949 assert_eq!(
950 store.get_all_pending_actions()[&action.digest()],
951 action.clone()
952 );
953
954 submit_to_executor(&signing_tx, action.clone())
956 .await
957 .unwrap();
958
959 loop {
962 let requested_times =
963 mock0.get_iota_token_events_requested(iota_tx_digest, iota_tx_event_index);
964 if requested_times >= 2 {
965 break;
966 }
967 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
968 }
969 assert_eq!(
971 tx_subscription.try_recv().unwrap_err(),
972 tokio::sync::broadcast::error::TryRecvError::Empty
973 );
974 assert_eq!(
976 store.get_all_pending_actions()[&action.digest()],
977 action.clone()
978 );
979
980 let sig_from_2 = mock_bridge_authority_sigs(
982 vec![&mock2],
983 &action,
984 vec![&secrets[2]],
985 iota_tx_digest,
986 iota_tx_event_index,
987 );
988 sigs.extend(sig_from_2);
989 let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
990 action.clone(),
991 BridgeCommitteeValiditySignInfo { signatures: sigs },
992 );
993 let action_certificate = VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
994 let tx_data = build_iota_transaction(
995 iota_address,
996 &gas_object_ref,
997 action_certificate,
998 DUMMY_MUTABLE_BRIDGE_OBJECT_ARG,
999 &id_token_map,
1000 1000,
1001 )
1002 .unwrap();
1003 let tx_digest = get_tx_digest(tx_data, &dummy_iota_key);
1004
1005 let mut event = IotaEvent::random_for_testing();
1006 event.type_ = TokenTransferClaimed.get().unwrap().clone();
1007 let events = vec![event];
1008 mock_transaction_response(
1009 &iota_client_mock,
1010 tx_digest,
1011 IotaExecutionStatus::Success,
1012 Some(events),
1013 true,
1014 );
1015
1016 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1018 assert!(
1020 !store
1021 .get_all_pending_actions()
1022 .contains_key(&action.digest())
1023 );
1024 }
1025
1026 #[tokio::test]
1027 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
1028 async fn test_skip_request_signature_if_already_processed_on_chain() {
1029 let SetupData {
1030 signing_tx,
1031 iota_client_mock,
1032 mut tx_subscription,
1033 store,
1034 mock0,
1035 mock1,
1036 mock2,
1037 mock3,
1038 ..
1039 } = setup().await;
1040
1041 let iota_tx_digest = TransactionDigest::random();
1042 let iota_tx_event_index = 0;
1043 let action = get_test_iota_to_eth_bridge_action(
1044 Some(iota_tx_digest),
1045 Some(iota_tx_event_index),
1046 None,
1047 None,
1048 None,
1049 None,
1050 None,
1051 );
1052 mock_bridge_authority_signing_errors(
1053 vec![&mock0, &mock1, &mock2, &mock3],
1054 iota_tx_digest,
1055 iota_tx_event_index,
1056 );
1057 store.insert_pending_actions(&[action.clone()]).unwrap();
1058 assert_eq!(
1059 store.get_all_pending_actions()[&action.digest()],
1060 action.clone()
1061 );
1062
1063 submit_to_executor(&signing_tx, action.clone())
1065 .await
1066 .unwrap();
1067 let action_digest = action.digest();
1068
1069 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1072 tx_subscription.try_recv().unwrap_err();
1073 assert!(store.get_all_pending_actions().contains_key(&action_digest));
1075
1076 iota_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1077
1078 let now = std::time::Instant::now();
1081 while store.get_all_pending_actions().contains_key(&action_digest) {
1082 if now.elapsed().as_secs() > 10 {
1083 panic!("Timeout waiting for action to be removed from WAL");
1084 }
1085 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1086 }
1087 tx_subscription.try_recv().unwrap_err();
1088 }
1089
1090 #[tokio::test]
1091 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
1092 async fn test_skip_tx_submission_if_already_processed_on_chain() {
1093 let SetupData {
1094 execution_tx,
1095 iota_client_mock,
1096 mut tx_subscription,
1097 store,
1098 secrets,
1099 dummy_iota_key,
1100 mock0,
1101 mock1,
1102 mock2,
1103 mock3,
1104 gas_object_ref,
1105 iota_address,
1106 iota_token_type_tags,
1107 ..
1108 } = setup().await;
1109 let id_token_map = (*iota_token_type_tags.load().clone()).clone();
1110 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1111 vec![&mock0, &mock1, &mock2, &mock3],
1112 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1113 None,
1114 true,
1115 );
1116
1117 let action = action_certificate.data().clone();
1118 let arg = DUMMY_MUTABLE_BRIDGE_OBJECT_ARG;
1119 let tx_data = build_iota_transaction(
1120 iota_address,
1121 &gas_object_ref,
1122 action_certificate.clone(),
1123 arg,
1124 &id_token_map,
1125 1000,
1126 )
1127 .unwrap();
1128 let tx_digest = get_tx_digest(tx_data, &dummy_iota_key);
1129 mock_transaction_error(
1130 &iota_client_mock,
1131 tx_digest,
1132 BridgeError::Generic("some random error".to_string()),
1133 true,
1134 );
1135
1136 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); iota_client_mock.add_gas_object_info(
1138 gas_coin.clone(),
1139 gas_object_ref,
1140 Owner::AddressOwner(iota_address),
1141 );
1142
1143 iota_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1144
1145 store.insert_pending_actions(&[action.clone()]).unwrap();
1146 assert_eq!(
1147 store.get_all_pending_actions()[&action.digest()],
1148 action.clone()
1149 );
1150
1151 execution_tx
1153 .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1154 .await
1155 .unwrap();
1156
1157 tx_subscription.recv().await.unwrap();
1159
1160 iota_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1162
1163 let now = std::time::Instant::now();
1166 let action_digest = action.digest();
1167 while store.get_all_pending_actions().contains_key(&action_digest) {
1168 if now.elapsed().as_secs() > 10 {
1169 panic!("Timeout waiting for action to be removed from WAL");
1170 }
1171 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1172 }
1173 }
1174
1175 #[tokio::test]
1176 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
1177 async fn test_skip_tx_submission_if_bridge_is_paused() {
1178 let SetupData {
1179 execution_tx,
1180 iota_client_mock,
1181 mut tx_subscription,
1182 store,
1183 secrets,
1184 dummy_iota_key,
1185 mock0,
1186 mock1,
1187 mock2,
1188 mock3,
1189 gas_object_ref,
1190 iota_address,
1191 iota_token_type_tags,
1192 bridge_pause_tx,
1193 ..
1194 } = setup().await;
1195 let id_token_map: HashMap<u8, TypeTag> = (*iota_token_type_tags.load().clone()).clone();
1196 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1197 vec![&mock0, &mock1, &mock2, &mock3],
1198 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1199 None,
1200 true,
1201 );
1202
1203 let action = action_certificate.data().clone();
1204 let arg = DUMMY_MUTABLE_BRIDGE_OBJECT_ARG;
1205 let tx_data = build_iota_transaction(
1206 iota_address,
1207 &gas_object_ref,
1208 action_certificate.clone(),
1209 arg,
1210 &id_token_map,
1211 1000,
1212 )
1213 .unwrap();
1214 let tx_digest = get_tx_digest(tx_data, &dummy_iota_key);
1215 mock_transaction_error(
1216 &iota_client_mock,
1217 tx_digest,
1218 BridgeError::Generic("some random error".to_string()),
1219 true,
1220 );
1221
1222 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); iota_client_mock.add_gas_object_info(
1224 gas_coin.clone(),
1225 gas_object_ref,
1226 Owner::AddressOwner(iota_address),
1227 );
1228 let action_digest = action.digest();
1229 iota_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1230
1231 assert!(!*bridge_pause_tx.borrow());
1233
1234 store.insert_pending_actions(&[action.clone()]).unwrap();
1235 assert_eq!(
1236 store.get_all_pending_actions()[&action.digest()],
1237 action.clone()
1238 );
1239
1240 execution_tx
1242 .send(CertifiedBridgeActionExecutionWrapper(
1243 action_certificate.clone(),
1244 0,
1245 ))
1246 .await
1247 .unwrap();
1248
1249 tx_subscription.recv().await.unwrap();
1251
1252 bridge_pause_tx.send(BRIDGE_PAUSED).unwrap();
1254
1255 execution_tx
1257 .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1258 .await
1259 .unwrap();
1260
1261 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1262 assert_eq!(
1264 tx_subscription.try_recv().unwrap_err(),
1265 tokio::sync::broadcast::error::TryRecvError::Empty
1266 );
1267 assert_eq!(
1269 store.get_all_pending_actions()[&action_digest],
1270 action.clone()
1271 );
1272 }
1273
1274 #[tokio::test]
1275 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
1276 async fn test_action_executor_handle_new_token() {
1277 let new_token_id = 255u8; let new_type_tag = TypeTag::from_str("0xbeef::beef::BEEF").unwrap();
1279 let SetupData {
1280 execution_tx,
1281 iota_client_mock,
1282 mut tx_subscription,
1283 secrets,
1284 dummy_iota_key,
1285 mock0,
1286 mock1,
1287 mock2,
1288 mock3,
1289 gas_object_ref,
1290 iota_address,
1291 iota_token_type_tags,
1292 ..
1293 } = setup().await;
1294 let mut id_token_map: HashMap<u8, TypeTag> = (*iota_token_type_tags.load().clone()).clone();
1295 let (action_certificate, _, _) = get_bridge_authority_approved_action(
1296 vec![&mock0, &mock1, &mock2, &mock3],
1297 vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1298 Some(new_token_id),
1299 false, );
1302
1303 let action = action_certificate.data().clone();
1304 let arg = DUMMY_MUTABLE_BRIDGE_OBJECT_ARG;
1305 let tx_data = build_iota_transaction(
1306 iota_address,
1307 &gas_object_ref,
1308 action_certificate.clone(),
1309 arg,
1310 &maplit::hashmap! {
1311 new_token_id => new_type_tag.clone()
1312 },
1313 1000,
1314 )
1315 .unwrap();
1316 let tx_digest = get_tx_digest(tx_data, &dummy_iota_key);
1317 mock_transaction_error(
1318 &iota_client_mock,
1319 tx_digest,
1320 BridgeError::Generic("some random error".to_string()),
1321 true,
1322 );
1323
1324 let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); iota_client_mock.add_gas_object_info(
1326 gas_coin.clone(),
1327 gas_object_ref,
1328 Owner::AddressOwner(iota_address),
1329 );
1330 iota_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1331
1332 execution_tx
1334 .send(CertifiedBridgeActionExecutionWrapper(
1335 action_certificate.clone(),
1336 0,
1337 ))
1338 .await
1339 .unwrap();
1340
1341 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1342 assert_eq!(
1344 tx_subscription.try_recv().unwrap_err(),
1345 tokio::sync::broadcast::error::TryRecvError::Empty
1346 );
1347
1348 id_token_map.insert(new_token_id, new_type_tag);
1350 iota_token_type_tags.store(Arc::new(id_token_map));
1351
1352 execution_tx
1354 .send(CertifiedBridgeActionExecutionWrapper(
1355 action_certificate.clone(),
1356 0,
1357 ))
1358 .await
1359 .unwrap();
1360
1361 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1362 assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1364 }
1365
1366 fn mock_bridge_authority_sigs(
1367 mocks: Vec<&BridgeRequestMockHandler>,
1368 action: &BridgeAction,
1369 secrets: Vec<&BridgeAuthorityKeyPair>,
1370 iota_tx_digest: TransactionDigest,
1371 iota_tx_event_index: u16,
1372 ) -> BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthorityRecoverableSignature> {
1373 assert_eq!(mocks.len(), secrets.len());
1374 let mut signed_actions = BTreeMap::new();
1375 for (mock, secret) in mocks.iter().zip(secrets.iter()) {
1376 let signed_action = sign_action_with_key(action, secret);
1377 mock.add_iota_event_response(
1378 iota_tx_digest,
1379 iota_tx_event_index,
1380 Ok(signed_action.clone()),
1381 );
1382 signed_actions.insert(secret.public().into(), signed_action.into_sig().signature);
1383 }
1384 signed_actions
1385 }
1386
1387 fn mock_bridge_authority_signing_errors(
1388 mocks: Vec<&BridgeRequestMockHandler>,
1389 iota_tx_digest: TransactionDigest,
1390 iota_tx_event_index: u16,
1391 ) {
1392 for mock in mocks {
1393 mock.add_iota_event_response(
1394 iota_tx_digest,
1395 iota_tx_event_index,
1396 Err(BridgeError::RestAPI("small issue".into())),
1397 );
1398 }
1399 }
1400
1401 fn get_bridge_authority_approved_action(
1403 mocks: Vec<&BridgeRequestMockHandler>,
1404 secrets: Vec<&BridgeAuthorityKeyPair>,
1405 token_id: Option<u8>,
1406 iota_to_eth: bool,
1407 ) -> (VerifiedCertifiedBridgeAction, TransactionDigest, u16) {
1408 let iota_tx_digest = TransactionDigest::random();
1409 let iota_tx_event_index = 1;
1410 let action = if iota_to_eth {
1411 get_test_iota_to_eth_bridge_action(
1412 Some(iota_tx_digest),
1413 Some(iota_tx_event_index),
1414 None,
1415 None,
1416 None,
1417 None,
1418 token_id,
1419 )
1420 } else {
1421 get_test_eth_to_iota_bridge_action(None, None, None, token_id)
1422 };
1423
1424 let sigs = mock_bridge_authority_sigs(
1425 mocks,
1426 &action,
1427 secrets,
1428 iota_tx_digest,
1429 iota_tx_event_index,
1430 );
1431 let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1432 action,
1433 BridgeCommitteeValiditySignInfo { signatures: sigs },
1434 );
1435 (
1436 VerifiedCertifiedBridgeAction::new_from_verified(certified_action),
1437 iota_tx_digest,
1438 iota_tx_event_index,
1439 )
1440 }
1441
1442 fn get_tx_digest(tx_data: TransactionData, dummy_iota_key: &IotaKeyPair) -> TransactionDigest {
1443 let sig = Signature::new_secure(
1444 &IntentMessage::new(Intent::iota_transaction(), &tx_data),
1445 dummy_iota_key,
1446 );
1447 let signed_tx = Transaction::from_data(tx_data, vec![sig]);
1448 *signed_tx.digest()
1449 }
1450
1451 fn mock_transaction_response(
1455 iota_client_mock: &IotaMockClient,
1456 tx_digest: TransactionDigest,
1457 status: IotaExecutionStatus,
1458 events: Option<Vec<IotaEvent>>,
1459 wildcard: bool,
1460 ) {
1461 let mut response = IotaTransactionBlockResponse::new(tx_digest);
1462 let effects = IotaTransactionBlockEffects::new_for_testing(tx_digest, status);
1463 if let Some(events) = events {
1464 response.events = Some(IotaTransactionBlockEvents { data: events });
1465 }
1466 response.effects = Some(effects);
1467 if wildcard {
1468 iota_client_mock.set_wildcard_transaction_response(Ok(response));
1469 } else {
1470 iota_client_mock.add_transaction_response(tx_digest, Ok(response));
1471 }
1472 }
1473
1474 fn mock_transaction_error(
1475 iota_client_mock: &IotaMockClient,
1476 tx_digest: TransactionDigest,
1477 error: BridgeError,
1478 wildcard: bool,
1479 ) {
1480 if wildcard {
1481 iota_client_mock.set_wildcard_transaction_response(Err(error));
1482 } else {
1483 iota_client_mock.add_transaction_response(tx_digest, Err(error));
1484 }
1485 }
1486
1487 struct SetupData {
1488 signing_tx: iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
1489 execution_tx: iota_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
1490 iota_client_mock: IotaMockClient,
1491 tx_subscription: tokio::sync::broadcast::Receiver<TransactionDigest>,
1492 store: Arc<BridgeOrchestratorTables>,
1493 secrets: Vec<BridgeAuthorityKeyPair>,
1494 dummy_iota_key: IotaKeyPair,
1495 mock0: BridgeRequestMockHandler,
1496 mock1: BridgeRequestMockHandler,
1497 mock2: BridgeRequestMockHandler,
1498 mock3: BridgeRequestMockHandler,
1499 #[expect(unused)]
1500 handles: Vec<tokio::task::JoinHandle<()>>,
1501 gas_object_ref: ObjectRef,
1502 iota_address: IotaAddress,
1503 iota_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
1504 bridge_pause_tx: tokio::sync::watch::Sender<IsBridgePaused>,
1505 }
1506
1507 async fn setup() -> SetupData {
1508 telemetry_subscribers::init_for_testing();
1509 let registry = Registry::new();
1510 iota_metrics::init_metrics(®istry);
1511 init_all_struct_tags();
1512
1513 let (iota_address, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1514 let iota_key = IotaKeyPair::from(kp);
1515 let gas_object_ref = random_object_ref();
1516 let temp_dir = tempfile::tempdir().unwrap();
1517 let store = BridgeOrchestratorTables::new(temp_dir.path());
1518 let iota_client_mock = IotaMockClient::default();
1519 let tx_subscription = iota_client_mock.subscribe_to_requested_transactions();
1520 let iota_client = Arc::new(IotaClient::new_for_testing(iota_client_mock.clone()));
1521
1522 let (_, dummy_kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1526 let dummy_iota_key = IotaKeyPair::from(dummy_kp);
1527
1528 let mock0 = BridgeRequestMockHandler::new();
1529 let mock1 = BridgeRequestMockHandler::new();
1530 let mock2 = BridgeRequestMockHandler::new();
1531 let mock3 = BridgeRequestMockHandler::new();
1532
1533 let (mut handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
1534 vec![2500, 2500, 2500, 2500],
1535 vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
1536 );
1537
1538 let committee = BridgeCommittee::new(authorities).unwrap();
1539
1540 let agg = Arc::new(ArcSwap::new(Arc::new(BridgeAuthorityAggregator::new(
1541 Arc::new(committee),
1542 ))));
1543 let metrics = Arc::new(BridgeMetrics::new(®istry));
1544 let iota_token_type_tags = iota_client.get_token_id_map().await.unwrap();
1545 let iota_token_type_tags = Arc::new(ArcSwap::new(Arc::new(iota_token_type_tags)));
1546 let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1547 let executor = BridgeActionExecutor::new(
1548 iota_client.clone(),
1549 agg.clone(),
1550 store.clone(),
1551 iota_key,
1552 iota_address,
1553 gas_object_ref.0,
1554 iota_token_type_tags.clone(),
1555 bridge_pause_rx,
1556 metrics,
1557 )
1558 .await;
1559
1560 let (executor_handle, signing_tx, execution_tx) = executor.run_inner();
1561 handles.extend(executor_handle);
1562
1563 SetupData {
1564 signing_tx,
1565 execution_tx,
1566 iota_client_mock,
1567 tx_subscription,
1568 store,
1569 secrets,
1570 dummy_iota_key,
1571 mock0,
1572 mock1,
1573 mock2,
1574 mock3,
1575 handles,
1576 gas_object_ref,
1577 iota_address,
1578 iota_token_type_tags,
1579 bridge_pause_tx,
1580 }
1581 }
1582}