iota_bridge/
action_executor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! BridgeActionExecutor receives BridgeActions (from BridgeOrchestrator),
6//! collects bridge authority signatures and submit signatures on chain.
7
8use std::{collections::HashMap, sync::Arc};
9
10use arc_swap::ArcSwap;
11use iota_json_rpc_types::{
12    IotaExecutionStatus, IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse,
13};
14use iota_metrics::spawn_logged_monitored_task;
15use iota_types::{
16    TypeTag,
17    base_types::{IotaAddress, ObjectID, ObjectRef},
18    crypto::{IotaKeyPair, Signature},
19    digests::TransactionDigest,
20    gas_coin::GasCoin,
21    object::Owner,
22    transaction::{ObjectArg, Transaction},
23};
24use shared_crypto::intent::{Intent, IntentMessage};
25use tokio::{sync::Semaphore, time::Duration};
26use tracing::{Instrument, error, info, instrument, warn};
27
28use crate::{
29    client::bridge_authority_aggregator::BridgeAuthorityAggregator,
30    error::BridgeError,
31    events::{
32        TokenTransferAlreadyApproved, TokenTransferAlreadyClaimed, TokenTransferApproved,
33        TokenTransferClaimed,
34    },
35    iota_client::{IotaClient, IotaClientInner},
36    iota_transaction_builder::build_iota_transaction,
37    metrics::BridgeMetrics,
38    retry_with_max_elapsed_time,
39    storage::BridgeOrchestratorTables,
40    types::{BridgeAction, BridgeActionStatus, IsBridgePaused, VerifiedCertifiedBridgeAction},
41};
42
43pub const CHANNEL_SIZE: usize = 1000;
44pub const SIGNING_CONCURRENCY: usize = 10;
45
46// delay schedule: at most 16 times including the initial attempt
47// 0.1s, 0.2s, 0.4s, 0.8s, 1.6s, 3.2s, 6.4s, 12.8s, 25.6s, 51.2s, 102.4s,
48// 204.8s, 409.6s, 819.2s, 1638.4s
49pub const MAX_SIGNING_ATTEMPTS: u64 = 16;
50pub const MAX_EXECUTION_ATTEMPTS: u64 = 16;
51
52async fn delay(attempt_times: u64) {
53    let delay_ms = 100 * (2 ^ attempt_times);
54    tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
55}
56
57#[derive(Debug)]
58pub struct BridgeActionExecutionWrapper(pub BridgeAction, pub u64);
59
60#[derive(Debug)]
61pub struct CertifiedBridgeActionExecutionWrapper(pub VerifiedCertifiedBridgeAction, pub u64);
62
63pub trait BridgeActionExecutorTrait {
64    fn run(
65        self,
66    ) -> (
67        Vec<tokio::task::JoinHandle<()>>,
68        iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
69    );
70}
71
72pub struct BridgeActionExecutor<C> {
73    iota_client: Arc<IotaClient<C>>,
74    bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
75    key: IotaKeyPair,
76    iota_address: IotaAddress,
77    gas_object_id: ObjectID,
78    store: Arc<BridgeOrchestratorTables>,
79    bridge_object_arg: ObjectArg,
80    iota_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
81    bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
82    metrics: Arc<BridgeMetrics>,
83}
84
85impl<C> BridgeActionExecutorTrait for BridgeActionExecutor<C>
86where
87    C: IotaClientInner + 'static,
88{
89    fn run(
90        self,
91    ) -> (
92        Vec<tokio::task::JoinHandle<()>>,
93        iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
94    ) {
95        let (tasks, sender, _) = self.run_inner();
96        (tasks, sender)
97    }
98}
99
100impl<C> BridgeActionExecutor<C>
101where
102    C: IotaClientInner + 'static,
103{
104    pub async fn new(
105        iota_client: Arc<IotaClient<C>>,
106        bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
107        store: Arc<BridgeOrchestratorTables>,
108        key: IotaKeyPair,
109        iota_address: IotaAddress,
110        gas_object_id: ObjectID,
111        iota_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
112        bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
113        metrics: Arc<BridgeMetrics>,
114    ) -> Self {
115        let bridge_object_arg = iota_client
116            .get_mutable_bridge_object_arg_must_succeed()
117            .await;
118        Self {
119            iota_client,
120            bridge_auth_agg,
121            store,
122            key,
123            gas_object_id,
124            iota_address,
125            bridge_object_arg,
126            iota_token_type_tags,
127            bridge_pause_rx,
128            metrics,
129        }
130    }
131
132    fn run_inner(
133        self,
134    ) -> (
135        Vec<tokio::task::JoinHandle<()>>,
136        iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
137        iota_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
138    ) {
139        let key = self.key;
140
141        let (sender, receiver) = iota_metrics::metered_channel::channel(
142            CHANNEL_SIZE,
143            &iota_metrics::get_metrics()
144                .unwrap()
145                .channel_inflight
146                .with_label_values(&["executor_signing_queue"]),
147        );
148
149        let (execution_tx, execution_rx) = iota_metrics::metered_channel::channel(
150            CHANNEL_SIZE,
151            &iota_metrics::get_metrics()
152                .unwrap()
153                .channel_inflight
154                .with_label_values(&["executor_execution_queue"]),
155        );
156        let execution_tx_clone = execution_tx.clone();
157        let sender_clone = sender.clone();
158        let store_clone = self.store.clone();
159        let client_clone = self.iota_client.clone();
160        let mut tasks = vec![];
161        let metrics = self.metrics.clone();
162        tasks.push(spawn_logged_monitored_task!(
163            Self::run_signature_aggregation_loop(
164                client_clone,
165                self.bridge_auth_agg,
166                store_clone,
167                sender_clone,
168                receiver,
169                execution_tx_clone,
170                metrics,
171            )
172        ));
173
174        let metrics = self.metrics.clone();
175        let execution_tx_clone = execution_tx.clone();
176        tasks.push(spawn_logged_monitored_task!(
177            Self::run_onchain_execution_loop(
178                self.iota_client.clone(),
179                key,
180                self.iota_address,
181                self.gas_object_id,
182                self.store.clone(),
183                execution_tx_clone,
184                execution_rx,
185                self.bridge_object_arg,
186                self.iota_token_type_tags,
187                self.bridge_pause_rx,
188                metrics,
189            )
190        ));
191        (tasks, sender, execution_tx)
192    }
193
194    async fn run_signature_aggregation_loop(
195        iota_client: Arc<IotaClient<C>>,
196        auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
197        store: Arc<BridgeOrchestratorTables>,
198        signing_queue_sender: iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
199        mut signing_queue_receiver: iota_metrics::metered_channel::Receiver<
200            BridgeActionExecutionWrapper,
201        >,
202        execution_queue_sender: iota_metrics::metered_channel::Sender<
203            CertifiedBridgeActionExecutionWrapper,
204        >,
205        metrics: Arc<BridgeMetrics>,
206    ) {
207        info!("Starting run_signature_aggregation_loop");
208        let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY));
209        while let Some(action) = signing_queue_receiver.recv().await {
210            Self::handle_signing_task(
211                &semaphore,
212                &auth_agg,
213                &signing_queue_sender,
214                &execution_queue_sender,
215                &iota_client,
216                &store,
217                action,
218                &metrics,
219            )
220            .await;
221        }
222    }
223
224    async fn should_proceed_signing(iota_client: &Arc<IotaClient<C>>) -> bool {
225        let Ok(Ok(is_paused)) =
226            retry_with_max_elapsed_time!(iota_client.is_bridge_paused(), Duration::from_secs(600))
227        else {
228            error!("Failed to get bridge status after retry");
229            return false;
230        };
231        !is_paused
232    }
233
234    #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
235    async fn handle_signing_task(
236        semaphore: &Arc<Semaphore>,
237        auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
238        signing_queue_sender: &iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
239        execution_queue_sender: &iota_metrics::metered_channel::Sender<
240            CertifiedBridgeActionExecutionWrapper,
241        >,
242        iota_client: &Arc<IotaClient<C>>,
243        store: &Arc<BridgeOrchestratorTables>,
244        action: BridgeActionExecutionWrapper,
245        metrics: &Arc<BridgeMetrics>,
246    ) {
247        metrics.action_executor_signing_queue_received_actions.inc();
248        let action_key = action.0.key();
249        info!("Received action for signing: {:?}", action.0);
250
251        // TODO: this is a temporary fix to avoid signing when the bridge is paused.
252        // but the way is implemented is not ideal:
253        // 1. it should check the direction
254        // 2. should use a better mechanism to check the bridge status instead of
255        //    polling for each action
256        let should_proceed = Self::should_proceed_signing(iota_client).await;
257        if !should_proceed {
258            metrics.action_executor_signing_queue_skipped_actions.inc();
259            warn!("skipping signing task: {:?}", action_key);
260            return;
261        }
262
263        let auth_agg_clone = auth_agg.clone();
264        let signing_queue_sender_clone = signing_queue_sender.clone();
265        let execution_queue_sender_clone = execution_queue_sender.clone();
266        let iota_client_clone = iota_client.clone();
267        let store_clone = store.clone();
268        let metrics_clone = metrics.clone();
269        let semaphore_clone = semaphore.clone();
270        spawn_logged_monitored_task!(
271            Self::request_signatures(
272                semaphore_clone,
273                iota_client_clone,
274                auth_agg_clone,
275                action,
276                store_clone,
277                signing_queue_sender_clone,
278                execution_queue_sender_clone,
279                metrics_clone,
280            )
281            .instrument(tracing::debug_span!("request_signatures", action_key=?action_key)),
282            "request_signatures"
283        );
284    }
285
286    // Checks if the action is already processed on chain.
287    // If yes, skip this action and remove it from the pending log.
288    // Returns true if the action is already processed.
289    async fn handle_already_processed_token_transfer_action_maybe(
290        iota_client: &Arc<IotaClient<C>>,
291        action: &BridgeAction,
292        store: &Arc<BridgeOrchestratorTables>,
293        metrics: &Arc<BridgeMetrics>,
294    ) -> bool {
295        let status = iota_client
296            .get_token_transfer_action_onchain_status_until_success(
297                action.chain_id() as u8,
298                action.seq_number(),
299            )
300            .await;
301        match status {
302            BridgeActionStatus::Approved | BridgeActionStatus::Claimed => {
303                info!(
304                    "Action already approved or claimed, removing action from pending logs: {:?}",
305                    action
306                );
307                metrics.action_executor_already_processed_actions.inc();
308                store
309                    .remove_pending_actions(&[action.digest()])
310                    .unwrap_or_else(|e| {
311                        panic!("Write to DB should not fail: {:?}", e);
312                    });
313                true
314            }
315            // Although theoretically a legit IotaToEthBridgeAction should not have
316            // status `NotFound`
317            BridgeActionStatus::Pending | BridgeActionStatus::NotFound => false,
318        }
319    }
320
321    // TODO: introduce a way to properly stagger the handling
322    // for various validators.
323    async fn request_signatures(
324        semaphore: Arc<Semaphore>,
325        iota_client: Arc<IotaClient<C>>,
326        auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
327        action: BridgeActionExecutionWrapper,
328        store: Arc<BridgeOrchestratorTables>,
329        signing_queue_sender: iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
330        execution_queue_sender: iota_metrics::metered_channel::Sender<
331            CertifiedBridgeActionExecutionWrapper,
332        >,
333        metrics: Arc<BridgeMetrics>,
334    ) {
335        let _permit = semaphore
336            .acquire()
337            .await
338            .expect("semaphore should not be closed");
339        info!("requesting signatures");
340        let BridgeActionExecutionWrapper(action, attempt_times) = action;
341
342        // Only token transfer action should reach here
343        match &action {
344            BridgeAction::IotaToEthBridgeAction(_) | BridgeAction::EthToIotaBridgeAction(_) => (),
345            _ => unreachable!("Non token transfer action should not reach here"),
346        };
347
348        // If the action is already processed, skip it.
349        if Self::handle_already_processed_token_transfer_action_maybe(
350            &iota_client,
351            &action,
352            &store,
353            &metrics,
354        )
355        .await
356        {
357            return;
358        }
359        match auth_agg
360            .load()
361            .request_committee_signatures(action.clone())
362            .await
363        {
364            Ok(certificate) => {
365                info!("Sending certificate to execution");
366                execution_queue_sender
367                    .send(CertifiedBridgeActionExecutionWrapper(certificate, 0))
368                    .await
369                    .unwrap_or_else(|e| {
370                        panic!("Sending to execution queue should not fail: {:?}", e);
371                    });
372            }
373            Err(e) => {
374                warn!("Failed to collect sigs for bridge action: {:?}", e);
375                metrics.err_signature_aggregation.inc();
376
377                // TODO: spawn a task for this
378                if attempt_times >= MAX_SIGNING_ATTEMPTS {
379                    error!(
380                        "Manual intervention is required. Failed to collect sigs for bridge action after {MAX_SIGNING_ATTEMPTS} attempts: {:?}",
381                        e
382                    );
383                    return;
384                }
385                delay(attempt_times).await;
386                signing_queue_sender
387                    .send(BridgeActionExecutionWrapper(action, attempt_times + 1))
388                    .await
389                    .unwrap_or_else(|e| {
390                        panic!("Sending to signing queue should not fail: {:?}", e);
391                    });
392            }
393        }
394    }
395
396    // Before calling this function, `key` and `iota_address` need to be
397    // verified to match.
398    async fn run_onchain_execution_loop(
399        iota_client: Arc<IotaClient<C>>,
400        iota_key: IotaKeyPair,
401        iota_address: IotaAddress,
402        gas_object_id: ObjectID,
403        store: Arc<BridgeOrchestratorTables>,
404        execution_queue_sender: iota_metrics::metered_channel::Sender<
405            CertifiedBridgeActionExecutionWrapper,
406        >,
407        mut execution_queue_receiver: iota_metrics::metered_channel::Receiver<
408            CertifiedBridgeActionExecutionWrapper,
409        >,
410        bridge_object_arg: ObjectArg,
411        iota_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
412        bridge_pause_rx: tokio::sync::watch::Receiver<IsBridgePaused>,
413        metrics: Arc<BridgeMetrics>,
414    ) {
415        info!("Starting run_onchain_execution_loop");
416        while let Some(certificate_wrapper) = execution_queue_receiver.recv().await {
417            // When bridge is paused, skip execution.
418            // Skipped actions will be picked up upon node restarting
419            // if bridge is unpaused.
420            if *bridge_pause_rx.borrow() {
421                warn!("Bridge is paused, skipping execution");
422                metrics
423                    .action_executor_execution_queue_skipped_actions_due_to_pausing
424                    .inc();
425                continue;
426            }
427            Self::handle_execution_task(
428                certificate_wrapper,
429                &iota_client,
430                &iota_key,
431                &iota_address,
432                gas_object_id,
433                &store,
434                &execution_queue_sender,
435                &bridge_object_arg,
436                &iota_token_type_tags,
437                &metrics,
438            )
439            .await;
440        }
441        panic!("Execution queue closed unexpectedly");
442    }
443
444    #[instrument(level = "error", skip_all, fields(action_key=?certificate_wrapper.0.data().key(), attempt_times=?certificate_wrapper.1))]
445    async fn handle_execution_task(
446        certificate_wrapper: CertifiedBridgeActionExecutionWrapper,
447        iota_client: &Arc<IotaClient<C>>,
448        iota_key: &IotaKeyPair,
449        iota_address: &IotaAddress,
450        gas_object_id: ObjectID,
451        store: &Arc<BridgeOrchestratorTables>,
452        execution_queue_sender: &iota_metrics::metered_channel::Sender<
453            CertifiedBridgeActionExecutionWrapper,
454        >,
455        bridge_object_arg: &ObjectArg,
456        iota_token_type_tags: &ArcSwap<HashMap<u8, TypeTag>>,
457        metrics: &Arc<BridgeMetrics>,
458    ) {
459        metrics
460            .action_executor_execution_queue_received_actions
461            .inc();
462        let CertifiedBridgeActionExecutionWrapper(certificate, attempt_times) = certificate_wrapper;
463        let action = certificate.data();
464        let action_key = action.key();
465
466        info!("Received certified action for execution: {:?}", action);
467
468        // TODO check gas coin balance here. If gas balance too low, do not proceed.
469        let (gas_coin, gas_object_ref) =
470            Self::get_gas_data_assert_ownership(*iota_address, gas_object_id, iota_client).await;
471        metrics.gas_coin_balance.set(gas_coin.value() as i64);
472
473        let ceriticate_clone = certificate.clone();
474
475        // Check once: if the action is already processed, skip it.
476        if Self::handle_already_processed_token_transfer_action_maybe(
477            iota_client,
478            action,
479            store,
480            metrics,
481        )
482        .await
483        {
484            info!("Action already processed, skipping");
485            return;
486        }
487
488        info!("Building IOTA transaction");
489        let rgp = iota_client.get_reference_gas_price_until_success().await;
490        let tx_data = match build_iota_transaction(
491            *iota_address,
492            &gas_object_ref,
493            ceriticate_clone,
494            *bridge_object_arg,
495            iota_token_type_tags.load().as_ref(),
496            rgp,
497        ) {
498            Ok(tx_data) => tx_data,
499            Err(err) => {
500                metrics.err_build_iota_transaction.inc();
501                error!(
502                    "Manual intervention is required. Failed to build transaction for action {:?}: {:?}",
503                    action, err
504                );
505                // This should not happen, but in case it does, we do not want to
506                // panic, instead we log here for manual intervention.
507                return;
508            }
509        };
510        let sig = Signature::new_secure(
511            &IntentMessage::new(Intent::iota_transaction(), &tx_data),
512            iota_key,
513        );
514        let signed_tx = Transaction::from_data(tx_data, vec![sig]);
515        let tx_digest = *signed_tx.digest();
516
517        // Check twice: If the action is already processed, skip it.
518        if Self::handle_already_processed_token_transfer_action_maybe(
519            iota_client,
520            action,
521            store,
522            metrics,
523        )
524        .await
525        {
526            info!("Action already processed, skipping");
527            return;
528        }
529
530        info!(?tx_digest, ?gas_object_ref, "Sending transaction to IOTA");
531        match iota_client
532            .execute_transaction_block_with_effects(signed_tx)
533            .await
534        {
535            Ok(resp) => {
536                Self::handle_execution_effects(tx_digest, resp, store, action, metrics).await
537            }
538
539            // If the transaction did not go through, retry up to a certain times.
540            Err(err) => {
541                error!(
542                    ?action_key,
543                    ?tx_digest,
544                    "IOTA transaction failed at signing: {err:?}"
545                );
546                metrics.err_iota_transaction_submission.inc();
547                let metrics_clone = metrics.clone();
548                // Do this in a separate task so we won't deadlock here
549                let sender_clone = execution_queue_sender.clone();
550                spawn_logged_monitored_task!(async move {
551                    // If it fails for too many times, log and ask for manual intervention.
552                    metrics_clone
553                        .err_iota_transaction_submission_too_many_failures
554                        .inc();
555                    if attempt_times >= MAX_EXECUTION_ATTEMPTS {
556                        error!("Manual intervention is required. Failed to collect execute transaction for bridge action after {MAX_EXECUTION_ATTEMPTS} attempts: {:?}", err);
557                        return;
558                    }
559                    delay(attempt_times).await;
560                    sender_clone
561                        .send(CertifiedBridgeActionExecutionWrapper(
562                            certificate,
563                            attempt_times + 1,
564                        ))
565                        .await
566                        .unwrap_or_else(|e| {
567                            panic!("Sending to execution queue should not fail: {:?}", e);
568                        });
569                    info!("Re-enqueued certificate for execution");
570                }.instrument(tracing::debug_span!("reenqueue_execution_task", action_key=?action_key)));
571            }
572        }
573    }
574
575    // TODO: do we need a mechanism to periodically read pending actions from DB?
576    async fn handle_execution_effects(
577        tx_digest: TransactionDigest,
578        response: IotaTransactionBlockResponse,
579        store: &Arc<BridgeOrchestratorTables>,
580        action: &BridgeAction,
581        metrics: &Arc<BridgeMetrics>,
582    ) {
583        let effects = response
584            .effects
585            .clone()
586            .expect("We requested effects but got None.");
587        let status = effects.status();
588        match status {
589            IotaExecutionStatus::Success => {
590                let events = response.events.expect("We requested events but got None.");
591                // If the transaction is successful, there must be either
592                // TokenTransferAlreadyClaimed or TokenTransferClaimed event.
593                assert!(
594                    events.data.iter().any(|e| e.type_
595                        == *TokenTransferAlreadyClaimed.get().unwrap()
596                        || e.type_ == *TokenTransferClaimed.get().unwrap()
597                        || e.type_ == *TokenTransferApproved.get().unwrap()
598                        || e.type_ == *TokenTransferAlreadyApproved.get().unwrap()),
599                    "Expected TokenTransferAlreadyClaimed, TokenTransferClaimed, TokenTransferApproved or TokenTransferAlreadyApproved event but got: {:?}",
600                    events,
601                );
602                info!(?tx_digest, "IOTA transaction executed successfully");
603                store
604                    .remove_pending_actions(&[action.digest()])
605                    .unwrap_or_else(|e| {
606                        panic!("Write to DB should not fail: {:?}", e);
607                    })
608            }
609            IotaExecutionStatus::Failure { error } => {
610                // In practice the transaction could fail because of running out of gas, but
611                // really should not be due to other reasons.
612                // This means manual intervention is needed. So we do not push them back to
613                // the execution queue because retries are mostly likely going to fail anyway.
614                // After human examination, the node should be restarted and fetch them from
615                // WAL.
616
617                metrics.err_iota_transaction_execution.inc();
618                error!(
619                    ?tx_digest,
620                    "Manual intervention is needed. IOTA transaction executed and failed with error: {error:?}"
621                );
622            }
623        }
624    }
625
626    /// Panics if the gas object is not owned by the address.
627    async fn get_gas_data_assert_ownership(
628        iota_address: IotaAddress,
629        gas_object_id: ObjectID,
630        iota_client: &IotaClient<C>,
631    ) -> (GasCoin, ObjectRef) {
632        let (gas_coin, gas_obj_ref, owner) = iota_client
633            .get_gas_data_panic_if_not_gas(gas_object_id)
634            .await;
635
636        // TODO: when we add multiple gas support in the future we could discard
637        // transferred gas object instead.
638        assert_eq!(
639            owner,
640            Owner::AddressOwner(iota_address),
641            "Gas object {:?} is no longer owned by address {}",
642            gas_object_id,
643            iota_address
644        );
645        (gas_coin, gas_obj_ref)
646    }
647}
648
649pub async fn submit_to_executor(
650    tx: &iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
651    action: BridgeAction,
652) -> Result<(), BridgeError> {
653    tx.send(BridgeActionExecutionWrapper(action, 0))
654        .await
655        .map_err(|e| BridgeError::Generic(e.to_string()))
656}
657
658#[cfg(test)]
659mod tests {
660    use std::{
661        collections::{BTreeMap, HashMap},
662        str::FromStr,
663    };
664
665    use fastcrypto::traits::KeyPair;
666    use iota_json_rpc_types::{
667        IotaEvent, IotaTransactionBlockEffects, IotaTransactionBlockEvents,
668        IotaTransactionBlockResponse,
669    };
670    use iota_types::{
671        TypeTag, base_types::random_object_ref, crypto::get_key_pair, gas_coin::GasCoin,
672        transaction::TransactionData,
673    };
674    use prometheus::Registry;
675
676    use super::*;
677    use crate::{
678        crypto::{
679            BridgeAuthorityKeyPair, BridgeAuthorityPublicKeyBytes,
680            BridgeAuthorityRecoverableSignature,
681        },
682        events::init_all_struct_tags,
683        iota_mock_client::IotaMockClient,
684        server::mock_handler::BridgeRequestMockHandler,
685        test_utils::{
686            DUMMY_MUTABLE_BRIDGE_OBJECT_ARG, get_test_authorities_and_run_mock_bridge_server,
687            get_test_eth_to_iota_bridge_action, get_test_iota_to_eth_bridge_action,
688            sign_action_with_key,
689        },
690        types::{
691            BRIDGE_PAUSED, BridgeCommittee, BridgeCommitteeValiditySignInfo, CertifiedBridgeAction,
692        },
693    };
694
695    #[tokio::test]
696    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
697    async fn test_onchain_execution_loop() {
698        let SetupData {
699            signing_tx,
700            iota_client_mock,
701            mut tx_subscription,
702            store,
703            secrets,
704            dummy_iota_key,
705            mock0,
706            mock1,
707            mock2,
708            mock3,
709            gas_object_ref,
710            iota_address,
711            iota_token_type_tags,
712            ..
713        } = setup().await;
714        let (action_certificate, _, _) = get_bridge_authority_approved_action(
715            vec![&mock0, &mock1, &mock2, &mock3],
716            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
717            None,
718            true,
719        );
720        let action = action_certificate.data().clone();
721        let id_token_map = (*iota_token_type_tags.load().clone()).clone();
722        let tx_data = build_iota_transaction(
723            iota_address,
724            &gas_object_ref,
725            action_certificate,
726            DUMMY_MUTABLE_BRIDGE_OBJECT_ARG,
727            &id_token_map,
728            1000,
729        )
730        .unwrap();
731
732        let tx_digest = get_tx_digest(tx_data, &dummy_iota_key);
733
734        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
735        iota_client_mock.add_gas_object_info(
736            gas_coin.clone(),
737            gas_object_ref,
738            Owner::AddressOwner(iota_address),
739        );
740
741        // Mock the transaction to be successfully executed
742        let mut event = IotaEvent::random_for_testing();
743        event.type_ = TokenTransferClaimed.get().unwrap().clone();
744        let events = vec![event];
745        mock_transaction_response(
746            &iota_client_mock,
747            tx_digest,
748            IotaExecutionStatus::Success,
749            Some(events),
750            true,
751        );
752
753        store.insert_pending_actions(&[action.clone()]).unwrap();
754        assert_eq!(
755            store.get_all_pending_actions()[&action.digest()],
756            action.clone()
757        );
758
759        // Kick it
760        submit_to_executor(&signing_tx, action.clone())
761            .await
762            .unwrap();
763
764        // Expect to see the transaction to be requested and successfully executed hence
765        // removed from WAL
766        tx_subscription.recv().await.unwrap();
767        assert!(store.get_all_pending_actions().is_empty());
768
769        /////////////////////////////////////////////////////////////////////////////////////////////////
770        ////////////////////////////////////// Test execution failure
771        ////////////////////////////////////// ///////////////////////////////////
772        ////////////////////////////////////// /////////////////////////////////////////
773        ////////////////////////////////////// //////////////////
774
775        let (action_certificate, _, _) = get_bridge_authority_approved_action(
776            vec![&mock0, &mock1, &mock2, &mock3],
777            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
778            None,
779            true,
780        );
781
782        let action = action_certificate.data().clone();
783
784        let tx_data = build_iota_transaction(
785            iota_address,
786            &gas_object_ref,
787            action_certificate,
788            DUMMY_MUTABLE_BRIDGE_OBJECT_ARG,
789            &id_token_map,
790            1000,
791        )
792        .unwrap();
793        let tx_digest = get_tx_digest(tx_data, &dummy_iota_key);
794
795        // Mock the transaction to fail
796        mock_transaction_response(
797            &iota_client_mock,
798            tx_digest,
799            IotaExecutionStatus::Failure {
800                error: "failure is mother of success".to_string(),
801            },
802            None,
803            true,
804        );
805
806        store.insert_pending_actions(&[action.clone()]).unwrap();
807        assert_eq!(
808            store.get_all_pending_actions()[&action.digest()],
809            action.clone()
810        );
811
812        // Kick it
813        submit_to_executor(&signing_tx, action.clone())
814            .await
815            .unwrap();
816
817        // Expect to see the transaction to be requested and but failed
818        tx_subscription.recv().await.unwrap();
819        // The action is not removed from WAL because the transaction failed
820        assert_eq!(
821            store.get_all_pending_actions()[&action.digest()],
822            action.clone()
823        );
824
825        /////////////////////////////////////////////////////////////////////////////////////////////////
826        //////////////////////////// Test transaction failed at signing stage
827        //////////////////////////// /////////////////////////// ///////////////
828        //////////////////////////// ///////////////////////////////////////////////////
829        //////////////////////////// ///
830
831        let (action_certificate, _, _) = get_bridge_authority_approved_action(
832            vec![&mock0, &mock1, &mock2, &mock3],
833            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
834            None,
835            true,
836        );
837
838        let action = action_certificate.data().clone();
839
840        let tx_data = build_iota_transaction(
841            iota_address,
842            &gas_object_ref,
843            action_certificate,
844            DUMMY_MUTABLE_BRIDGE_OBJECT_ARG,
845            &id_token_map,
846            1000,
847        )
848        .unwrap();
849        let tx_digest = get_tx_digest(tx_data, &dummy_iota_key);
850        mock_transaction_error(
851            &iota_client_mock,
852            tx_digest,
853            BridgeError::Generic("some random error".to_string()),
854            true,
855        );
856
857        store.insert_pending_actions(&[action.clone()]).unwrap();
858        assert_eq!(
859            store.get_all_pending_actions()[&action.digest()],
860            action.clone()
861        );
862
863        // Kick it
864        submit_to_executor(&signing_tx, action.clone())
865            .await
866            .unwrap();
867
868        // Failure will trigger retry, we wait for 2 requests before checking WAL log
869        let tx_digest = tx_subscription.recv().await.unwrap();
870        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
871
872        // The retry is still going on, action still in WAL
873        assert!(
874            store
875                .get_all_pending_actions()
876                .contains_key(&action.digest())
877        );
878
879        // Now let it succeed
880        let mut event = IotaEvent::random_for_testing();
881        event.type_ = TokenTransferClaimed.get().unwrap().clone();
882        let events = vec![event];
883        mock_transaction_response(
884            &iota_client_mock,
885            tx_digest,
886            IotaExecutionStatus::Success,
887            Some(events),
888            true,
889        );
890
891        // Give it 1 second to retry and succeed
892        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
893        // The action is successful and should be removed from WAL now
894        assert!(
895            !store
896                .get_all_pending_actions()
897                .contains_key(&action.digest())
898        );
899    }
900
901    #[tokio::test]
902    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
903    async fn test_signature_aggregation_loop() {
904        let SetupData {
905            signing_tx,
906            iota_client_mock,
907            mut tx_subscription,
908            store,
909            secrets,
910            dummy_iota_key,
911            mock0,
912            mock1,
913            mock2,
914            mock3,
915            gas_object_ref,
916            iota_address,
917            iota_token_type_tags,
918            ..
919        } = setup().await;
920        let id_token_map = (*iota_token_type_tags.load().clone()).clone();
921        let (action_certificate, iota_tx_digest, iota_tx_event_index) =
922            get_bridge_authority_approved_action(
923                vec![&mock0, &mock1, &mock2, &mock3],
924                vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
925                None,
926                true,
927            );
928        let action = action_certificate.data().clone();
929        mock_bridge_authority_signing_errors(
930            vec![&mock0, &mock1, &mock2],
931            iota_tx_digest,
932            iota_tx_event_index,
933        );
934        let mut sigs = mock_bridge_authority_sigs(
935            vec![&mock3],
936            &action,
937            vec![&secrets[3]],
938            iota_tx_digest,
939            iota_tx_event_index,
940        );
941
942        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
943        iota_client_mock.add_gas_object_info(
944            gas_coin,
945            gas_object_ref,
946            Owner::AddressOwner(iota_address),
947        );
948        store.insert_pending_actions(&[action.clone()]).unwrap();
949        assert_eq!(
950            store.get_all_pending_actions()[&action.digest()],
951            action.clone()
952        );
953
954        // Kick it
955        submit_to_executor(&signing_tx, action.clone())
956            .await
957            .unwrap();
958
959        // Wait until the transaction is retried at least once (instead of deing
960        // dropped)
961        loop {
962            let requested_times =
963                mock0.get_iota_token_events_requested(iota_tx_digest, iota_tx_event_index);
964            if requested_times >= 2 {
965                break;
966            }
967            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
968        }
969        // Nothing is sent to execute yet
970        assert_eq!(
971            tx_subscription.try_recv().unwrap_err(),
972            tokio::sync::broadcast::error::TryRecvError::Empty
973        );
974        // Still in WAL
975        assert_eq!(
976            store.get_all_pending_actions()[&action.digest()],
977            action.clone()
978        );
979
980        // Let authorities sign the action too. Now we are above the threshold
981        let sig_from_2 = mock_bridge_authority_sigs(
982            vec![&mock2],
983            &action,
984            vec![&secrets[2]],
985            iota_tx_digest,
986            iota_tx_event_index,
987        );
988        sigs.extend(sig_from_2);
989        let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
990            action.clone(),
991            BridgeCommitteeValiditySignInfo { signatures: sigs },
992        );
993        let action_certificate = VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
994        let tx_data = build_iota_transaction(
995            iota_address,
996            &gas_object_ref,
997            action_certificate,
998            DUMMY_MUTABLE_BRIDGE_OBJECT_ARG,
999            &id_token_map,
1000            1000,
1001        )
1002        .unwrap();
1003        let tx_digest = get_tx_digest(tx_data, &dummy_iota_key);
1004
1005        let mut event = IotaEvent::random_for_testing();
1006        event.type_ = TokenTransferClaimed.get().unwrap().clone();
1007        let events = vec![event];
1008        mock_transaction_response(
1009            &iota_client_mock,
1010            tx_digest,
1011            IotaExecutionStatus::Success,
1012            Some(events),
1013            true,
1014        );
1015
1016        // Expect to see the transaction to be requested and succeed
1017        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1018        // The action is removed from WAL
1019        assert!(
1020            !store
1021                .get_all_pending_actions()
1022                .contains_key(&action.digest())
1023        );
1024    }
1025
1026    #[tokio::test]
1027    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
1028    async fn test_skip_request_signature_if_already_processed_on_chain() {
1029        let SetupData {
1030            signing_tx,
1031            iota_client_mock,
1032            mut tx_subscription,
1033            store,
1034            mock0,
1035            mock1,
1036            mock2,
1037            mock3,
1038            ..
1039        } = setup().await;
1040
1041        let iota_tx_digest = TransactionDigest::random();
1042        let iota_tx_event_index = 0;
1043        let action = get_test_iota_to_eth_bridge_action(
1044            Some(iota_tx_digest),
1045            Some(iota_tx_event_index),
1046            None,
1047            None,
1048            None,
1049            None,
1050            None,
1051        );
1052        mock_bridge_authority_signing_errors(
1053            vec![&mock0, &mock1, &mock2, &mock3],
1054            iota_tx_digest,
1055            iota_tx_event_index,
1056        );
1057        store.insert_pending_actions(&[action.clone()]).unwrap();
1058        assert_eq!(
1059            store.get_all_pending_actions()[&action.digest()],
1060            action.clone()
1061        );
1062
1063        // Kick it
1064        submit_to_executor(&signing_tx, action.clone())
1065            .await
1066            .unwrap();
1067        let action_digest = action.digest();
1068
1069        // Wait for 1 second. It should still in the process of retrying requesting sigs
1070        // because we mock errors above.
1071        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1072        tx_subscription.try_recv().unwrap_err();
1073        // And the action is still in WAL
1074        assert!(store.get_all_pending_actions().contains_key(&action_digest));
1075
1076        iota_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1077
1078        // The next retry will see the action is already processed on chain and remove
1079        // it from WAL
1080        let now = std::time::Instant::now();
1081        while store.get_all_pending_actions().contains_key(&action_digest) {
1082            if now.elapsed().as_secs() > 10 {
1083                panic!("Timeout waiting for action to be removed from WAL");
1084            }
1085            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1086        }
1087        tx_subscription.try_recv().unwrap_err();
1088    }
1089
1090    #[tokio::test]
1091    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
1092    async fn test_skip_tx_submission_if_already_processed_on_chain() {
1093        let SetupData {
1094            execution_tx,
1095            iota_client_mock,
1096            mut tx_subscription,
1097            store,
1098            secrets,
1099            dummy_iota_key,
1100            mock0,
1101            mock1,
1102            mock2,
1103            mock3,
1104            gas_object_ref,
1105            iota_address,
1106            iota_token_type_tags,
1107            ..
1108        } = setup().await;
1109        let id_token_map = (*iota_token_type_tags.load().clone()).clone();
1110        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1111            vec![&mock0, &mock1, &mock2, &mock3],
1112            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1113            None,
1114            true,
1115        );
1116
1117        let action = action_certificate.data().clone();
1118        let arg = DUMMY_MUTABLE_BRIDGE_OBJECT_ARG;
1119        let tx_data = build_iota_transaction(
1120            iota_address,
1121            &gas_object_ref,
1122            action_certificate.clone(),
1123            arg,
1124            &id_token_map,
1125            1000,
1126        )
1127        .unwrap();
1128        let tx_digest = get_tx_digest(tx_data, &dummy_iota_key);
1129        mock_transaction_error(
1130            &iota_client_mock,
1131            tx_digest,
1132            BridgeError::Generic("some random error".to_string()),
1133            true,
1134        );
1135
1136        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1137        iota_client_mock.add_gas_object_info(
1138            gas_coin.clone(),
1139            gas_object_ref,
1140            Owner::AddressOwner(iota_address),
1141        );
1142
1143        iota_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1144
1145        store.insert_pending_actions(&[action.clone()]).unwrap();
1146        assert_eq!(
1147            store.get_all_pending_actions()[&action.digest()],
1148            action.clone()
1149        );
1150
1151        // Kick it (send to the execution queue, skipping the signing queue)
1152        execution_tx
1153            .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1154            .await
1155            .unwrap();
1156
1157        // Some requests come in and will fail.
1158        tx_subscription.recv().await.unwrap();
1159
1160        // Set the action to be already approved on chain
1161        iota_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Approved);
1162
1163        // The next retry will see the action is already processed on chain and remove
1164        // it from WAL
1165        let now = std::time::Instant::now();
1166        let action_digest = action.digest();
1167        while store.get_all_pending_actions().contains_key(&action_digest) {
1168            if now.elapsed().as_secs() > 10 {
1169                panic!("Timeout waiting for action to be removed from WAL");
1170            }
1171            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1172        }
1173    }
1174
1175    #[tokio::test]
1176    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
1177    async fn test_skip_tx_submission_if_bridge_is_paused() {
1178        let SetupData {
1179            execution_tx,
1180            iota_client_mock,
1181            mut tx_subscription,
1182            store,
1183            secrets,
1184            dummy_iota_key,
1185            mock0,
1186            mock1,
1187            mock2,
1188            mock3,
1189            gas_object_ref,
1190            iota_address,
1191            iota_token_type_tags,
1192            bridge_pause_tx,
1193            ..
1194        } = setup().await;
1195        let id_token_map: HashMap<u8, TypeTag> = (*iota_token_type_tags.load().clone()).clone();
1196        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1197            vec![&mock0, &mock1, &mock2, &mock3],
1198            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1199            None,
1200            true,
1201        );
1202
1203        let action = action_certificate.data().clone();
1204        let arg = DUMMY_MUTABLE_BRIDGE_OBJECT_ARG;
1205        let tx_data = build_iota_transaction(
1206            iota_address,
1207            &gas_object_ref,
1208            action_certificate.clone(),
1209            arg,
1210            &id_token_map,
1211            1000,
1212        )
1213        .unwrap();
1214        let tx_digest = get_tx_digest(tx_data, &dummy_iota_key);
1215        mock_transaction_error(
1216            &iota_client_mock,
1217            tx_digest,
1218            BridgeError::Generic("some random error".to_string()),
1219            true,
1220        );
1221
1222        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1223        iota_client_mock.add_gas_object_info(
1224            gas_coin.clone(),
1225            gas_object_ref,
1226            Owner::AddressOwner(iota_address),
1227        );
1228        let action_digest = action.digest();
1229        iota_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1230
1231        // assert bridge is unpaused now
1232        assert!(!*bridge_pause_tx.borrow());
1233
1234        store.insert_pending_actions(&[action.clone()]).unwrap();
1235        assert_eq!(
1236            store.get_all_pending_actions()[&action.digest()],
1237            action.clone()
1238        );
1239
1240        // Kick it (send to the execution queue, skipping the signing queue)
1241        execution_tx
1242            .send(CertifiedBridgeActionExecutionWrapper(
1243                action_certificate.clone(),
1244                0,
1245            ))
1246            .await
1247            .unwrap();
1248
1249        // Some requests come in
1250        tx_subscription.recv().await.unwrap();
1251
1252        // Pause the bridge
1253        bridge_pause_tx.send(BRIDGE_PAUSED).unwrap();
1254
1255        // Kick it again
1256        execution_tx
1257            .send(CertifiedBridgeActionExecutionWrapper(action_certificate, 0))
1258            .await
1259            .unwrap();
1260
1261        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1262        // Nothing is sent to execute
1263        assert_eq!(
1264            tx_subscription.try_recv().unwrap_err(),
1265            tokio::sync::broadcast::error::TryRecvError::Empty
1266        );
1267        // Still in WAL
1268        assert_eq!(
1269            store.get_all_pending_actions()[&action_digest],
1270            action.clone()
1271        );
1272    }
1273
1274    #[tokio::test]
1275    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
1276    async fn test_action_executor_handle_new_token() {
1277        let new_token_id = 255u8; // token id that does not exist
1278        let new_type_tag = TypeTag::from_str("0xbeef::beef::BEEF").unwrap();
1279        let SetupData {
1280            execution_tx,
1281            iota_client_mock,
1282            mut tx_subscription,
1283            secrets,
1284            dummy_iota_key,
1285            mock0,
1286            mock1,
1287            mock2,
1288            mock3,
1289            gas_object_ref,
1290            iota_address,
1291            iota_token_type_tags,
1292            ..
1293        } = setup().await;
1294        let mut id_token_map: HashMap<u8, TypeTag> = (*iota_token_type_tags.load().clone()).clone();
1295        let (action_certificate, _, _) = get_bridge_authority_approved_action(
1296            vec![&mock0, &mock1, &mock2, &mock3],
1297            vec![&secrets[0], &secrets[1], &secrets[2], &secrets[3]],
1298            Some(new_token_id),
1299            false, /* we need an eth -> iota action that entails the new token type tag in
1300                    * transaction building */
1301        );
1302
1303        let action = action_certificate.data().clone();
1304        let arg = DUMMY_MUTABLE_BRIDGE_OBJECT_ARG;
1305        let tx_data = build_iota_transaction(
1306            iota_address,
1307            &gas_object_ref,
1308            action_certificate.clone(),
1309            arg,
1310            &maplit::hashmap! {
1311                new_token_id => new_type_tag.clone()
1312            },
1313            1000,
1314        )
1315        .unwrap();
1316        let tx_digest = get_tx_digest(tx_data, &dummy_iota_key);
1317        mock_transaction_error(
1318            &iota_client_mock,
1319            tx_digest,
1320            BridgeError::Generic("some random error".to_string()),
1321            true,
1322        );
1323
1324        let gas_coin = GasCoin::new_for_testing(1_000_000_000_000); // dummy gas coin
1325        iota_client_mock.add_gas_object_info(
1326            gas_coin.clone(),
1327            gas_object_ref,
1328            Owner::AddressOwner(iota_address),
1329        );
1330        iota_client_mock.set_action_onchain_status(&action, BridgeActionStatus::Pending);
1331
1332        // Kick it (send to the execution queue, skipping the signing queue)
1333        execution_tx
1334            .send(CertifiedBridgeActionExecutionWrapper(
1335                action_certificate.clone(),
1336                0,
1337            ))
1338            .await
1339            .unwrap();
1340
1341        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1342        // Nothing is sent to execute, because the token id does not exist yet
1343        assert_eq!(
1344            tx_subscription.try_recv().unwrap_err(),
1345            tokio::sync::broadcast::error::TryRecvError::Empty
1346        );
1347
1348        // Now insert the new token id
1349        id_token_map.insert(new_token_id, new_type_tag);
1350        iota_token_type_tags.store(Arc::new(id_token_map));
1351
1352        // Kick it again
1353        execution_tx
1354            .send(CertifiedBridgeActionExecutionWrapper(
1355                action_certificate.clone(),
1356                0,
1357            ))
1358            .await
1359            .unwrap();
1360
1361        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
1362        // The action is sent to execution
1363        assert_eq!(tx_subscription.recv().await.unwrap(), tx_digest);
1364    }
1365
1366    fn mock_bridge_authority_sigs(
1367        mocks: Vec<&BridgeRequestMockHandler>,
1368        action: &BridgeAction,
1369        secrets: Vec<&BridgeAuthorityKeyPair>,
1370        iota_tx_digest: TransactionDigest,
1371        iota_tx_event_index: u16,
1372    ) -> BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthorityRecoverableSignature> {
1373        assert_eq!(mocks.len(), secrets.len());
1374        let mut signed_actions = BTreeMap::new();
1375        for (mock, secret) in mocks.iter().zip(secrets.iter()) {
1376            let signed_action = sign_action_with_key(action, secret);
1377            mock.add_iota_event_response(
1378                iota_tx_digest,
1379                iota_tx_event_index,
1380                Ok(signed_action.clone()),
1381            );
1382            signed_actions.insert(secret.public().into(), signed_action.into_sig().signature);
1383        }
1384        signed_actions
1385    }
1386
1387    fn mock_bridge_authority_signing_errors(
1388        mocks: Vec<&BridgeRequestMockHandler>,
1389        iota_tx_digest: TransactionDigest,
1390        iota_tx_event_index: u16,
1391    ) {
1392        for mock in mocks {
1393            mock.add_iota_event_response(
1394                iota_tx_digest,
1395                iota_tx_event_index,
1396                Err(BridgeError::RestAPI("small issue".into())),
1397            );
1398        }
1399    }
1400
1401    /// Create a BridgeAction and mock authorities to return signatures
1402    fn get_bridge_authority_approved_action(
1403        mocks: Vec<&BridgeRequestMockHandler>,
1404        secrets: Vec<&BridgeAuthorityKeyPair>,
1405        token_id: Option<u8>,
1406        iota_to_eth: bool,
1407    ) -> (VerifiedCertifiedBridgeAction, TransactionDigest, u16) {
1408        let iota_tx_digest = TransactionDigest::random();
1409        let iota_tx_event_index = 1;
1410        let action = if iota_to_eth {
1411            get_test_iota_to_eth_bridge_action(
1412                Some(iota_tx_digest),
1413                Some(iota_tx_event_index),
1414                None,
1415                None,
1416                None,
1417                None,
1418                token_id,
1419            )
1420        } else {
1421            get_test_eth_to_iota_bridge_action(None, None, None, token_id)
1422        };
1423
1424        let sigs = mock_bridge_authority_sigs(
1425            mocks,
1426            &action,
1427            secrets,
1428            iota_tx_digest,
1429            iota_tx_event_index,
1430        );
1431        let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1432            action,
1433            BridgeCommitteeValiditySignInfo { signatures: sigs },
1434        );
1435        (
1436            VerifiedCertifiedBridgeAction::new_from_verified(certified_action),
1437            iota_tx_digest,
1438            iota_tx_event_index,
1439        )
1440    }
1441
1442    fn get_tx_digest(tx_data: TransactionData, dummy_iota_key: &IotaKeyPair) -> TransactionDigest {
1443        let sig = Signature::new_secure(
1444            &IntentMessage::new(Intent::iota_transaction(), &tx_data),
1445            dummy_iota_key,
1446        );
1447        let signed_tx = Transaction::from_data(tx_data, vec![sig]);
1448        *signed_tx.digest()
1449    }
1450
1451    /// Why is `wildcard` needed? This is because authority signatures
1452    /// are part of transaction data. Depending on whose signatures
1453    /// are included in what order, this may change the tx digest.
1454    fn mock_transaction_response(
1455        iota_client_mock: &IotaMockClient,
1456        tx_digest: TransactionDigest,
1457        status: IotaExecutionStatus,
1458        events: Option<Vec<IotaEvent>>,
1459        wildcard: bool,
1460    ) {
1461        let mut response = IotaTransactionBlockResponse::new(tx_digest);
1462        let effects = IotaTransactionBlockEffects::new_for_testing(tx_digest, status);
1463        if let Some(events) = events {
1464            response.events = Some(IotaTransactionBlockEvents { data: events });
1465        }
1466        response.effects = Some(effects);
1467        if wildcard {
1468            iota_client_mock.set_wildcard_transaction_response(Ok(response));
1469        } else {
1470            iota_client_mock.add_transaction_response(tx_digest, Ok(response));
1471        }
1472    }
1473
1474    fn mock_transaction_error(
1475        iota_client_mock: &IotaMockClient,
1476        tx_digest: TransactionDigest,
1477        error: BridgeError,
1478        wildcard: bool,
1479    ) {
1480        if wildcard {
1481            iota_client_mock.set_wildcard_transaction_response(Err(error));
1482        } else {
1483            iota_client_mock.add_transaction_response(tx_digest, Err(error));
1484        }
1485    }
1486
1487    struct SetupData {
1488        signing_tx: iota_metrics::metered_channel::Sender<BridgeActionExecutionWrapper>,
1489        execution_tx: iota_metrics::metered_channel::Sender<CertifiedBridgeActionExecutionWrapper>,
1490        iota_client_mock: IotaMockClient,
1491        tx_subscription: tokio::sync::broadcast::Receiver<TransactionDigest>,
1492        store: Arc<BridgeOrchestratorTables>,
1493        secrets: Vec<BridgeAuthorityKeyPair>,
1494        dummy_iota_key: IotaKeyPair,
1495        mock0: BridgeRequestMockHandler,
1496        mock1: BridgeRequestMockHandler,
1497        mock2: BridgeRequestMockHandler,
1498        mock3: BridgeRequestMockHandler,
1499        #[expect(unused)]
1500        handles: Vec<tokio::task::JoinHandle<()>>,
1501        gas_object_ref: ObjectRef,
1502        iota_address: IotaAddress,
1503        iota_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
1504        bridge_pause_tx: tokio::sync::watch::Sender<IsBridgePaused>,
1505    }
1506
1507    async fn setup() -> SetupData {
1508        telemetry_subscribers::init_for_testing();
1509        let registry = Registry::new();
1510        iota_metrics::init_metrics(&registry);
1511        init_all_struct_tags();
1512
1513        let (iota_address, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1514        let iota_key = IotaKeyPair::from(kp);
1515        let gas_object_ref = random_object_ref();
1516        let temp_dir = tempfile::tempdir().unwrap();
1517        let store = BridgeOrchestratorTables::new(temp_dir.path());
1518        let iota_client_mock = IotaMockClient::default();
1519        let tx_subscription = iota_client_mock.subscribe_to_requested_transactions();
1520        let iota_client = Arc::new(IotaClient::new_for_testing(iota_client_mock.clone()));
1521
1522        // The dummy key is used to sign transaction so we can get TransactionDigest
1523        // easily. User signature is not part of the transaction so it does not
1524        // matter which key it is.
1525        let (_, dummy_kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
1526        let dummy_iota_key = IotaKeyPair::from(dummy_kp);
1527
1528        let mock0 = BridgeRequestMockHandler::new();
1529        let mock1 = BridgeRequestMockHandler::new();
1530        let mock2 = BridgeRequestMockHandler::new();
1531        let mock3 = BridgeRequestMockHandler::new();
1532
1533        let (mut handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
1534            vec![2500, 2500, 2500, 2500],
1535            vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
1536        );
1537
1538        let committee = BridgeCommittee::new(authorities).unwrap();
1539
1540        let agg = Arc::new(ArcSwap::new(Arc::new(BridgeAuthorityAggregator::new(
1541            Arc::new(committee),
1542        ))));
1543        let metrics = Arc::new(BridgeMetrics::new(&registry));
1544        let iota_token_type_tags = iota_client.get_token_id_map().await.unwrap();
1545        let iota_token_type_tags = Arc::new(ArcSwap::new(Arc::new(iota_token_type_tags)));
1546        let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1547        let executor = BridgeActionExecutor::new(
1548            iota_client.clone(),
1549            agg.clone(),
1550            store.clone(),
1551            iota_key,
1552            iota_address,
1553            gas_object_ref.0,
1554            iota_token_type_tags.clone(),
1555            bridge_pause_rx,
1556            metrics,
1557        )
1558        .await;
1559
1560        let (executor_handle, signing_tx, execution_tx) = executor.run_inner();
1561        handles.extend(executor_handle);
1562
1563        SetupData {
1564            signing_tx,
1565            execution_tx,
1566            iota_client_mock,
1567            tx_subscription,
1568            store,
1569            secrets,
1570            dummy_iota_key,
1571            mock0,
1572            mock1,
1573            mock2,
1574            mock3,
1575            handles,
1576            gas_object_ref,
1577            iota_address,
1578            iota_token_type_tags,
1579            bridge_pause_tx,
1580        }
1581    }
1582}