test_cluster/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap},
7    net::SocketAddr,
8    num::NonZeroUsize,
9    path::{Path, PathBuf},
10    sync::{Arc, Mutex},
11    time::Duration,
12};
13
14use futures::{Future, StreamExt, future::join_all};
15use iota_bridge::{
16    crypto::{BridgeAuthorityKeyPair, BridgeAuthoritySignInfo},
17    iota_transaction_builder::{
18        build_add_tokens_on_iota_transaction, build_committee_register_transaction,
19    },
20    types::{
21        BridgeCommitteeValiditySignInfo, CertifiedBridgeAction, VerifiedCertifiedBridgeAction,
22    },
23    utils::{publish_and_register_coins_return_add_coins_on_iota_action, wait_for_server_to_be_up},
24};
25use iota_config::{
26    Config, IOTA_CLIENT_CONFIG, IOTA_KEYSTORE_FILENAME, IOTA_NETWORK_CONFIG, NodeConfig,
27    PersistedConfig,
28    genesis::Genesis,
29    local_ip_utils::get_available_port,
30    node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange},
31};
32use iota_core::{
33    authority_aggregator::AuthorityAggregator, authority_client::NetworkAuthorityClient,
34};
35use iota_genesis_builder::SnapshotSource;
36use iota_json_rpc_api::{
37    BridgeReadApiClient, IndexerApiClient, TransactionBuilderClient, WriteApiClient,
38    error_object_from_rpc,
39};
40use iota_json_rpc_types::{
41    IotaExecutionStatus, IotaObjectDataOptions, IotaObjectResponse, IotaObjectResponseQuery,
42    IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse,
43    IotaTransactionBlockResponseOptions, TransactionFilter,
44};
45use iota_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
46use iota_node::IotaNodeHandle;
47use iota_protocol_config::ProtocolVersion;
48use iota_sdk::{
49    IotaClient, IotaClientBuilder,
50    apis::QuorumDriverApi,
51    iota_client_config::{IotaClientConfig, IotaEnv},
52    wallet_context::WalletContext,
53};
54use iota_swarm::memory::{Swarm, SwarmBuilder};
55use iota_swarm_config::{
56    genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT, GenesisConfig, ValidatorGenesisConfig},
57    network_config::{NetworkConfig, NetworkConfigLight},
58    network_config_builder::{
59        ProtocolVersionsConfig, StateAccumulatorEnabledCallback, StateAccumulatorV1EnabledConfig,
60        SupportedProtocolVersionsCallback,
61    },
62    node_config_builder::{FullnodeConfigBuilder, ValidatorConfigBuilder},
63};
64use iota_test_transaction_builder::TestTransactionBuilder;
65use iota_types::{
66    IOTA_BRIDGE_OBJECT_ID,
67    base_types::{AuthorityName, ConciseableName, IotaAddress, ObjectID, ObjectRef},
68    bridge::{
69        BridgeSummary, BridgeTrait, TOKEN_ID_BTC, TOKEN_ID_ETH, TOKEN_ID_USDC, TOKEN_ID_USDT,
70        get_bridge, get_bridge_obj_initial_shared_version,
71    },
72    committee::{Committee, CommitteeTrait, EpochId},
73    crypto::{AccountKeyPair, IotaKeyPair, KeypairTraits, ToFromBytes, get_key_pair},
74    effects::{TransactionEffects, TransactionEvents},
75    error::IotaResult,
76    governance::MIN_VALIDATOR_JOINING_STAKE_NANOS,
77    iota_system_state::{
78        IotaSystemState, IotaSystemStateTrait,
79        epoch_start_iota_system_state::EpochStartSystemStateTrait,
80    },
81    message_envelope::Message,
82    messages_grpc::HandleCertificateRequestV1,
83    object::Object,
84    quorum_driver_types::ExecuteTransactionRequestType,
85    supported_protocol_versions::SupportedProtocolVersions,
86    traffic_control::{PolicyConfig, RemoteFirewallConfig},
87    transaction::{
88        CertifiedTransaction, ObjectArg, Transaction, TransactionData, TransactionDataAPI,
89        TransactionKind,
90    },
91    utils::to_sender_signed_transaction,
92};
93use jsonrpsee::{
94    core::RpcResult,
95    http_client::{HttpClient, HttpClientBuilder},
96};
97use rand::{distributions::*, rngs::OsRng, seq::SliceRandom};
98use tokio::{
99    task::JoinHandle,
100    time::{Instant, sleep, timeout},
101};
102use tracing::{error, info};
103
104const NUM_VALIDATOR: usize = 4;
105
106pub struct FullNodeHandle {
107    pub iota_node: IotaNodeHandle,
108    pub iota_client: IotaClient,
109    pub rpc_client: HttpClient,
110    pub rpc_url: String,
111}
112
113impl FullNodeHandle {
114    pub async fn new(iota_node: IotaNodeHandle, json_rpc_address: SocketAddr) -> Self {
115        let rpc_url = format!("http://{}", json_rpc_address);
116        let rpc_client = HttpClientBuilder::default().build(&rpc_url).unwrap();
117
118        let iota_client = IotaClientBuilder::default().build(&rpc_url).await.unwrap();
119
120        Self {
121            iota_node,
122            iota_client,
123            rpc_client,
124            rpc_url,
125        }
126    }
127}
128
129struct Faucet {
130    address: IotaAddress,
131    keypair: Arc<tokio::sync::Mutex<IotaKeyPair>>,
132}
133
134pub struct TestCluster {
135    pub swarm: Swarm,
136    pub wallet: WalletContext,
137    pub fullnode_handle: FullNodeHandle,
138    pub bridge_authority_keys: Option<Vec<BridgeAuthorityKeyPair>>,
139    pub bridge_server_ports: Option<Vec<u16>>,
140    faucet: Option<Faucet>,
141}
142
143impl TestCluster {
144    pub fn rpc_client(&self) -> &HttpClient {
145        &self.fullnode_handle.rpc_client
146    }
147
148    pub fn iota_client(&self) -> &IotaClient {
149        &self.fullnode_handle.iota_client
150    }
151
152    pub fn quorum_driver_api(&self) -> &QuorumDriverApi {
153        self.iota_client().quorum_driver_api()
154    }
155
156    pub fn rpc_url(&self) -> &str {
157        &self.fullnode_handle.rpc_url
158    }
159
160    pub fn wallet(&mut self) -> &WalletContext {
161        &self.wallet
162    }
163
164    pub fn wallet_mut(&mut self) -> &mut WalletContext {
165        &mut self.wallet
166    }
167
168    pub fn get_addresses(&self) -> Vec<IotaAddress> {
169        self.wallet.get_addresses()
170    }
171
172    // Helper function to get the 0th address in WalletContext
173    pub fn get_address_0(&self) -> IotaAddress {
174        self.get_addresses()[0]
175    }
176
177    // Helper function to get the 1st address in WalletContext
178    pub fn get_address_1(&self) -> IotaAddress {
179        self.get_addresses()[1]
180    }
181
182    // Helper function to get the 2nd address in WalletContext
183    pub fn get_address_2(&self) -> IotaAddress {
184        self.get_addresses()[2]
185    }
186
187    pub fn fullnode_config_builder(&self) -> FullnodeConfigBuilder {
188        self.swarm.get_fullnode_config_builder()
189    }
190
191    pub fn committee(&self) -> Arc<Committee> {
192        self.fullnode_handle
193            .iota_node
194            .with(|node| node.state().epoch_store_for_testing().committee().clone())
195    }
196
197    /// Convenience method to start a new fullnode in the test cluster.
198    pub async fn spawn_new_fullnode(&mut self) -> FullNodeHandle {
199        self.start_fullnode_from_config(
200            self.fullnode_config_builder()
201                .build(&mut OsRng, self.swarm.config()),
202        )
203        .await
204    }
205
206    pub async fn start_fullnode_from_config(&mut self, config: NodeConfig) -> FullNodeHandle {
207        let json_rpc_address = config.json_rpc_address;
208        let node = self.swarm.spawn_new_node(config).await;
209        FullNodeHandle::new(node, json_rpc_address).await
210    }
211
212    pub fn all_node_handles(&self) -> Vec<IotaNodeHandle> {
213        self.swarm
214            .all_nodes()
215            .flat_map(|n| n.get_node_handle())
216            .collect()
217    }
218
219    pub fn all_validator_handles(&self) -> Vec<IotaNodeHandle> {
220        self.swarm
221            .validator_nodes()
222            .map(|n| n.get_node_handle().unwrap())
223            .collect()
224    }
225
226    pub fn get_validator_pubkeys(&self) -> Vec<AuthorityName> {
227        self.swarm.active_validators().map(|v| v.name()).collect()
228    }
229
230    pub fn get_genesis(&self) -> Genesis {
231        self.swarm.config().genesis.clone()
232    }
233
234    pub fn stop_node(&self, name: &AuthorityName) {
235        self.swarm.node(name).unwrap().stop();
236    }
237
238    pub async fn stop_all_validators(&self) {
239        info!("Stopping all validators in the cluster");
240        self.swarm.active_validators().for_each(|v| v.stop());
241        tokio::time::sleep(Duration::from_secs(3)).await;
242    }
243
244    pub async fn start_all_validators(&self) {
245        info!("Starting all validators in the cluster");
246        for v in self.swarm.validator_nodes() {
247            if v.is_running() {
248                continue;
249            }
250            v.start().await.unwrap();
251        }
252        tokio::time::sleep(Duration::from_secs(3)).await;
253    }
254
255    pub async fn start_node(&self, name: &AuthorityName) {
256        let node = self.swarm.node(name).unwrap();
257        if node.is_running() {
258            return;
259        }
260        node.start().await.unwrap();
261    }
262
263    pub async fn spawn_new_validator(
264        &mut self,
265        genesis_config: ValidatorGenesisConfig,
266    ) -> IotaNodeHandle {
267        let node_config = ValidatorConfigBuilder::new()
268            .build(genesis_config, self.swarm.config().genesis.clone());
269        self.swarm.spawn_new_node(node_config).await
270    }
271
272    pub fn random_node_restarter(self: &Arc<Self>) -> RandomNodeRestarter {
273        RandomNodeRestarter::new(self.clone())
274    }
275
276    pub async fn get_reference_gas_price(&self) -> u64 {
277        self.iota_client()
278            .governance_api()
279            .get_reference_gas_price()
280            .await
281            .expect("failed to get reference gas price")
282    }
283
284    pub async fn get_object_from_fullnode_store(&self, object_id: &ObjectID) -> Option<Object> {
285        self.fullnode_handle
286            .iota_node
287            .with_async(|node| async { node.state().get_object(object_id).await.unwrap() })
288            .await
289    }
290
291    pub async fn get_latest_object_ref(&self, object_id: &ObjectID) -> ObjectRef {
292        self.get_object_from_fullnode_store(object_id)
293            .await
294            .unwrap()
295            .compute_object_reference()
296    }
297
298    pub async fn get_bridge_summary(&self) -> RpcResult<BridgeSummary> {
299        self.iota_client()
300            .http()
301            .get_latest_bridge()
302            .await
303            .map_err(error_object_from_rpc)
304    }
305
306    pub async fn get_object_or_tombstone_from_fullnode_store(
307        &self,
308        object_id: ObjectID,
309    ) -> ObjectRef {
310        self.fullnode_handle
311            .iota_node
312            .state()
313            .get_object_cache_reader()
314            .get_latest_object_ref_or_tombstone(object_id)
315            .unwrap()
316            .unwrap()
317    }
318
319    pub async fn wait_for_run_with_range_shutdown_signal(&self) -> Option<RunWithRange> {
320        self.wait_for_run_with_range_shutdown_signal_with_timeout(Duration::from_secs(60))
321            .await
322    }
323
324    pub async fn wait_for_run_with_range_shutdown_signal_with_timeout(
325        &self,
326        timeout_dur: Duration,
327    ) -> Option<RunWithRange> {
328        let mut shutdown_channel_rx = self
329            .fullnode_handle
330            .iota_node
331            .with(|node| node.subscribe_to_shutdown_channel());
332
333        timeout(timeout_dur, async move {
334            tokio::select! {
335                msg = shutdown_channel_rx.recv() =>
336                {
337                    match msg {
338                        Ok(Some(run_with_range)) => Some(run_with_range),
339                        Ok(None) => None,
340                        Err(e) => {
341                            error!("failed recv from iota-node shutdown channel: {}", e);
342                            None
343                        },
344                    }
345                },
346            }
347        })
348            .await
349            .expect("Timed out waiting for cluster to hit target epoch and recv shutdown signal from iota-node")
350    }
351
352    pub async fn wait_for_protocol_version(
353        &self,
354        target_protocol_version: ProtocolVersion,
355    ) -> IotaSystemState {
356        self.wait_for_protocol_version_with_timeout(
357            target_protocol_version,
358            Duration::from_secs(60),
359        )
360        .await
361    }
362
363    pub async fn wait_for_protocol_version_with_timeout(
364        &self,
365        target_protocol_version: ProtocolVersion,
366        timeout_dur: Duration,
367    ) -> IotaSystemState {
368        timeout(timeout_dur, async move {
369            loop {
370                let system_state = self.wait_for_epoch(None).await;
371                if system_state.protocol_version() >= target_protocol_version.as_u64() {
372                    return system_state;
373                }
374            }
375        })
376        .await
377        .expect("Timed out waiting for cluster to target protocol version")
378    }
379
380    /// Ask 2f+1 validators to close epoch actively, and wait for the entire
381    /// network to reach the next epoch. This requires waiting for both the
382    /// fullnode and all validators to reach the next epoch.
383    pub async fn force_new_epoch(&self) {
384        info!("Starting reconfiguration");
385        let start = Instant::now();
386
387        // Close epoch on 2f+1 validators.
388        let cur_committee = self
389            .fullnode_handle
390            .iota_node
391            .with(|node| node.state().clone_committee_for_testing());
392        let mut cur_stake = 0;
393        for node in self.swarm.active_validators() {
394            node.get_node_handle()
395                .unwrap()
396                .with_async(|node| async {
397                    node.close_epoch_for_testing().await.unwrap();
398                    cur_stake += cur_committee.weight(&node.state().name);
399                })
400                .await;
401            if cur_stake >= cur_committee.quorum_threshold() {
402                break;
403            }
404        }
405        info!("close_epoch complete after {:?}", start.elapsed());
406
407        self.wait_for_epoch(Some(cur_committee.epoch + 1)).await;
408        self.wait_for_epoch_all_nodes(cur_committee.epoch + 1).await;
409
410        info!("reconfiguration complete after {:?}", start.elapsed());
411    }
412
413    /// To detect whether the network has reached such state, we use the
414    /// fullnode as the source of truth, since a fullnode only does epoch
415    /// transition when the network has done so.
416    /// If target_epoch is specified, wait until the cluster reaches that epoch.
417    /// If target_epoch is None, wait until the cluster reaches the next epoch.
418    /// Note that this function does not guarantee that every node is at the
419    /// target epoch.
420    pub async fn wait_for_epoch(&self, target_epoch: Option<EpochId>) -> IotaSystemState {
421        self.wait_for_epoch_with_timeout(target_epoch, Duration::from_secs(60))
422            .await
423    }
424
425    pub async fn wait_for_epoch_on_node(
426        &self,
427        handle: &IotaNodeHandle,
428        target_epoch: Option<EpochId>,
429        timeout_dur: Duration,
430    ) -> IotaSystemState {
431        let mut epoch_rx = handle.with(|node| node.subscribe_to_epoch_change());
432
433        let mut state = None;
434        timeout(timeout_dur, async {
435            let epoch = handle.with(|node| node.state().epoch_store_for_testing().epoch());
436            if Some(epoch) == target_epoch {
437                return handle.with(|node| node.state().get_iota_system_state_object_for_testing().unwrap());
438            }
439            while let Ok(system_state) = epoch_rx.recv().await {
440                info!("received epoch {}", system_state.epoch());
441                state = Some(system_state.clone());
442                match target_epoch {
443                    Some(target_epoch) if system_state.epoch() >= target_epoch => {
444                        return system_state;
445                    }
446                    None => {
447                        return system_state;
448                    }
449                    _ => (),
450                }
451            }
452            unreachable!("Broken reconfig channel");
453        })
454            .await
455            .unwrap_or_else(|_| {
456                error!("Timed out waiting for cluster to reach epoch {target_epoch:?}");
457                if let Some(state) = state {
458                    panic!("Timed out waiting for cluster to reach epoch {target_epoch:?}. Current epoch: {}", state.epoch());
459                }
460                panic!("Timed out waiting for cluster to target epoch {target_epoch:?}")
461            })
462    }
463
464    pub async fn wait_for_epoch_with_timeout(
465        &self,
466        target_epoch: Option<EpochId>,
467        timeout_dur: Duration,
468    ) -> IotaSystemState {
469        self.wait_for_epoch_on_node(&self.fullnode_handle.iota_node, target_epoch, timeout_dur)
470            .await
471    }
472
473    pub async fn wait_for_epoch_all_nodes(&self, target_epoch: EpochId) {
474        let handles: Vec<_> = self
475            .swarm
476            .all_nodes()
477            .map(|node| node.get_node_handle().unwrap())
478            .collect();
479        let tasks: Vec<_> = handles
480            .iter()
481            .map(|handle| {
482                handle.with_async(|node| async {
483                    let mut retries = 0;
484                    loop {
485                        let epoch = node.state().epoch_store_for_testing().epoch();
486                        if epoch == target_epoch {
487                            if let Some(agg) = node.clone_authority_aggregator() {
488                                // This is a fullnode, we need to wait for its auth aggregator to reconfigure as well.
489                                if agg.committee.epoch() == target_epoch {
490                                    break;
491                                }
492                            } else {
493                                // This is a validator, we don't need to check the auth aggregator.
494                                break;
495                            }
496                        }
497                        tokio::time::sleep(Duration::from_secs(1)).await;
498                        retries += 1;
499                        if retries % 5 == 0 {
500                            tracing::warn!(validator=?node.state().name.concise(), "Waiting for {:?} seconds to reach epoch {:?}. Currently at epoch {:?}", retries, target_epoch, epoch);
501                        }
502                    }
503                })
504            })
505            .collect();
506
507        timeout(Duration::from_secs(40), join_all(tasks))
508            .await
509            .expect("timed out waiting for reconfiguration to complete");
510    }
511
512    /// Upgrade the network protocol version, by restarting every validator with
513    /// a new supported versions.
514    /// Note that we don't restart the fullnode here, and it is assumed that the
515    /// fulnode supports the entire version range.
516    pub async fn update_validator_supported_versions(
517        &self,
518        new_supported_versions: SupportedProtocolVersions,
519    ) {
520        for authority in self.get_validator_pubkeys() {
521            self.stop_node(&authority);
522            tokio::time::sleep(Duration::from_millis(1000)).await;
523            self.swarm
524                .node(&authority)
525                .unwrap()
526                .config()
527                .supported_protocol_versions = Some(new_supported_versions);
528            self.start_node(&authority).await;
529            info!("Restarted validator {}", authority);
530        }
531    }
532
533    /// Wait for all nodes in the network to upgrade to `protocol_version`.
534    pub async fn wait_for_all_nodes_upgrade_to(&self, protocol_version: u64) {
535        for h in self.all_node_handles() {
536            h.with_async(|node| async {
537                while node
538                    .state()
539                    .epoch_store_for_testing()
540                    .epoch_start_state()
541                    .protocol_version()
542                    .as_u64()
543                    != protocol_version
544                {
545                    tokio::time::sleep(Duration::from_secs(1)).await;
546                }
547            })
548            .await;
549        }
550    }
551
552    pub async fn trigger_reconfiguration_if_not_yet_and_assert_bridge_committee_initialized(&self) {
553        let mut bridge =
554            get_bridge(self.fullnode_handle.iota_node.state().get_object_store()).unwrap();
555        if !bridge.committee().members.contents.is_empty() {
556            assert_eq!(
557                self.swarm.active_validators().count(),
558                bridge.committee().members.contents.len()
559            );
560            return;
561        }
562        // wait for next epoch
563        self.force_new_epoch().await;
564        bridge = get_bridge(self.fullnode_handle.iota_node.state().get_object_store()).unwrap();
565        // Committee should be initiated
566        assert!(bridge.committee().member_registrations.contents.is_empty());
567        assert_eq!(
568            self.swarm.active_validators().count(),
569            bridge.committee().members.contents.len()
570        );
571    }
572
573    // Wait for bridge node in the cluster to be up and running.
574    pub async fn wait_for_bridge_cluster_to_be_up(&self, timeout_sec: u64) {
575        let bridge_ports = self.bridge_server_ports.as_ref().unwrap();
576        let mut tasks = vec![];
577        for port in bridge_ports.iter() {
578            let server_url = format!("http://127.0.0.1:{}", port);
579            tasks.push(wait_for_server_to_be_up(server_url, timeout_sec));
580        }
581        join_all(tasks)
582            .await
583            .into_iter()
584            .collect::<anyhow::Result<Vec<_>>>()
585            .unwrap();
586    }
587
588    pub async fn get_mut_bridge_arg(&self) -> Option<ObjectArg> {
589        get_bridge_obj_initial_shared_version(
590            self.fullnode_handle.iota_node.state().get_object_store(),
591        )
592        .unwrap()
593        .map(|seq| ObjectArg::SharedObject {
594            id: IOTA_BRIDGE_OBJECT_ID,
595            initial_shared_version: seq,
596            mutable: true,
597        })
598    }
599
600    pub async fn wait_for_authenticator_state_update(&self) {
601        timeout(
602            Duration::from_secs(60),
603            self.fullnode_handle
604                .iota_node
605                .with_async(|node| async move {
606                    let mut txns = node.state().subscription_handler.subscribe_transactions(
607                        TransactionFilter::ChangedObject(
608                            ObjectID::from_hex_literal("0x7").unwrap(),
609                        ),
610                    );
611                    let state = node.state();
612
613                    while let Some(tx) = txns.next().await {
614                        let digest = *tx.transaction_digest();
615                        let tx = state
616                            .get_transaction_cache_reader()
617                            .get_transaction_block(&digest)
618                            .unwrap()
619                            .unwrap();
620                        match &tx.data().intent_message().value.kind() {
621                            TransactionKind::EndOfEpochTransaction(_) => (),
622                            TransactionKind::AuthenticatorStateUpdateV1(_) => break,
623                            _ => panic!("{:?}", tx),
624                        }
625                    }
626                }),
627        )
628        .await
629        .expect("Timed out waiting for authenticator state update");
630    }
631
632    /// Return the highest observed protocol version in the test cluster.
633    pub fn highest_protocol_version(&self) -> ProtocolVersion {
634        self.all_node_handles()
635            .into_iter()
636            .map(|h| {
637                h.with(|node| {
638                    node.state()
639                        .epoch_store_for_testing()
640                        .epoch_start_state()
641                        .protocol_version()
642                })
643            })
644            .max()
645            .expect("at least one node must be up to get highest protocol version")
646    }
647
648    pub async fn test_transaction_builder(&self) -> TestTransactionBuilder {
649        let (sender, gas) = self.wallet.get_one_gas_object().await.unwrap().unwrap();
650        self.test_transaction_builder_with_gas_object(sender, gas)
651            .await
652    }
653
654    pub async fn test_transaction_builder_with_sender(
655        &self,
656        sender: IotaAddress,
657    ) -> TestTransactionBuilder {
658        let gas = self
659            .wallet
660            .get_one_gas_object_owned_by_address(sender)
661            .await
662            .unwrap()
663            .unwrap();
664        self.test_transaction_builder_with_gas_object(sender, gas)
665            .await
666    }
667
668    pub async fn test_transaction_builder_with_gas_object(
669        &self,
670        sender: IotaAddress,
671        gas: ObjectRef,
672    ) -> TestTransactionBuilder {
673        let rgp = self.get_reference_gas_price().await;
674        TestTransactionBuilder::new(sender, gas, rgp)
675    }
676
677    pub fn sign_transaction(&self, tx_data: &TransactionData) -> Transaction {
678        self.wallet.sign_transaction(tx_data)
679    }
680
681    pub async fn sign_and_execute_transaction(
682        &self,
683        tx_data: &TransactionData,
684    ) -> IotaTransactionBlockResponse {
685        let tx = self.wallet.sign_transaction(tx_data);
686        self.execute_transaction(tx).await
687    }
688
689    /// Execute a transaction on the network and wait for it to be executed on
690    /// the rpc fullnode. Also expects the effects status to be
691    /// ExecutionStatus::Success. This function is recommended for
692    /// transaction execution since it most resembles the production path.
693    pub async fn execute_transaction(&self, tx: Transaction) -> IotaTransactionBlockResponse {
694        self.wallet.execute_transaction_must_succeed(tx).await
695    }
696
697    /// Different from `execute_transaction` which returns RPC effects types,
698    /// this function returns raw effects, events and extra objects returned
699    /// by the validators, aggregated manually (without authority
700    /// aggregator). It also does not check whether the transaction is
701    /// executed successfully. In order to keep the fullnode up-to-date so
702    /// that latter queries can read consistent results, it calls
703    /// execute_transaction_may_fail again which goes through fullnode. This
704    /// is less efficient and verbose, but can be used if more details are
705    /// needed from the execution results, and if the transaction is
706    /// expected to fail.
707    pub async fn execute_transaction_return_raw_effects(
708        &self,
709        tx: Transaction,
710    ) -> anyhow::Result<(TransactionEffects, TransactionEvents)> {
711        let results = self
712            .submit_transaction_to_validators(tx.clone(), &self.get_validator_pubkeys())
713            .await?;
714        self.wallet.execute_transaction_may_fail(tx).await.unwrap();
715        Ok(results)
716    }
717
718    pub fn authority_aggregator(&self) -> Arc<AuthorityAggregator<NetworkAuthorityClient>> {
719        self.fullnode_handle
720            .iota_node
721            .with(|node| node.clone_authority_aggregator().unwrap())
722    }
723
724    pub async fn create_certificate(
725        &self,
726        tx: Transaction,
727        client_addr: Option<SocketAddr>,
728    ) -> anyhow::Result<CertifiedTransaction> {
729        let agg = self.authority_aggregator();
730        Ok(agg
731            .process_transaction(tx, client_addr)
732            .await?
733            .into_cert_for_testing())
734    }
735
736    /// Execute a transaction on specified list of validators, and bypassing
737    /// authority aggregator. This allows us to obtain the return value
738    /// directly from validators, so that we can access more information
739    /// directly such as the original effects, events and extra objects
740    /// returned. This also allows us to control which validator to send
741    /// certificates to, which is useful in some tests.
742    pub async fn submit_transaction_to_validators(
743        &self,
744        tx: Transaction,
745        pubkeys: &[AuthorityName],
746    ) -> anyhow::Result<(TransactionEffects, TransactionEvents)> {
747        let agg = self.authority_aggregator();
748        let certificate = agg
749            .process_transaction(tx, None)
750            .await?
751            .into_cert_for_testing();
752        let replies = loop {
753            let futures: Vec<_> = agg
754                .authority_clients
755                .iter()
756                .filter_map(|(name, client)| {
757                    if pubkeys.contains(name) {
758                        Some(client)
759                    } else {
760                        None
761                    }
762                })
763                .map(|client| {
764                    let cert = certificate.clone();
765                    async move {
766                        client
767                            .handle_certificate_v1(
768                                HandleCertificateRequestV1::new(cert).with_events(),
769                                None,
770                            )
771                            .await
772                    }
773                })
774                .collect();
775
776            let replies: Vec<_> = futures::future::join_all(futures)
777                .await
778                .into_iter()
779                .filter(|result| match result {
780                    Err(e) => !e.to_string().contains("deadline has elapsed"),
781                    _ => true,
782                })
783                .collect();
784
785            if !replies.is_empty() {
786                break replies;
787            }
788        };
789        let replies: IotaResult<Vec<_>> = replies.into_iter().collect();
790        let replies = replies?;
791        let mut all_effects = HashMap::new();
792        let mut all_events = HashMap::new();
793        for reply in replies {
794            let effects = reply.signed_effects.into_data();
795            let events = reply.events.unwrap_or_default();
796            all_effects.insert(effects.digest(), effects);
797            all_events.insert(events.digest(), events);
798        }
799        assert_eq!(all_effects.len(), 1);
800        assert_eq!(all_events.len(), 1);
801        Ok((
802            all_effects.into_values().next().unwrap(),
803            all_events.into_values().next().unwrap(),
804        ))
805    }
806
807    /// This call sends some funds from the seeded faucet address to the funding
808    /// address for the given amount and returns the gas object ref. This
809    /// is useful to construct transactions from the funding address.
810    pub async fn fund_address_and_return_gas(
811        &self,
812        rgp: u64,
813        amount: Option<u64>,
814        funding_address: IotaAddress,
815    ) -> ObjectRef {
816        let Faucet { address, keypair } = &self
817            .faucet
818            .as_ref()
819            .expect("Faucet not initialized: incompatible with `NetworkConfig`.");
820
821        let keypair = &*keypair.lock().await;
822
823        let gas_ref = *self
824            .wallet
825            .get_gas_objects_owned_by_address(*address, None)
826            .await
827            .unwrap()
828            .first()
829            .unwrap();
830
831        let tx_data = TestTransactionBuilder::new(*address, gas_ref, rgp)
832            .transfer_iota(amount, funding_address)
833            .build();
834
835        let signed_transaction = to_sender_signed_transaction(tx_data, keypair);
836
837        let response = self
838            .iota_client()
839            .quorum_driver_api()
840            .execute_transaction_block(
841                signed_transaction,
842                IotaTransactionBlockResponseOptions::new().with_effects(),
843                Some(ExecuteTransactionRequestType::WaitForLocalExecution),
844            )
845            .await
846            .unwrap();
847
848        response
849            .effects
850            .unwrap()
851            .created()
852            .first()
853            .unwrap()
854            .reference
855            .to_object_ref()
856    }
857
858    pub async fn transfer_iota_must_exceed(
859        &self,
860        sender: IotaAddress,
861        receiver: IotaAddress,
862        amount: u64,
863    ) -> ObjectID {
864        let tx = self
865            .test_transaction_builder_with_sender(sender)
866            .await
867            .transfer_iota(Some(amount), receiver)
868            .build();
869        let effects = self
870            .sign_and_execute_transaction(&tx)
871            .await
872            .effects
873            .unwrap();
874        assert_eq!(&IotaExecutionStatus::Success, effects.status());
875        effects.created().first().unwrap().object_id()
876    }
877
878    /// Wait to catch up to the given checkpoint sequence
879    /// number with a default timeout of 60 sec
880    pub async fn wait_for_checkpoint(
881        &self,
882        checkpoint_sequence_number: u64,
883        timeout: Option<Duration>,
884    ) {
885        let timeout = timeout.unwrap_or(Duration::from_secs(60));
886        tokio::time::timeout(timeout, async {
887            loop {
888                let fullnode_checkpoint = self
889                    .fullnode_handle
890                    .iota_node
891                    .with(|node| {
892                        node.state()
893                            .get_checkpoint_store()
894                            .get_highest_executed_checkpoint_seq_number()
895                    })
896                    .unwrap();
897
898                match fullnode_checkpoint {
899                    Some(c) if c >= checkpoint_sequence_number => break,
900                    _ => tokio::time::sleep(Duration::from_millis(100)).await,
901                }
902            }
903        })
904        .await
905        .expect("Timeout waiting for indexer to catchup to checkpoint");
906    }
907
908    /// Get all objects owned by an address
909    pub async fn get_owned_objects(
910        &self,
911        address: IotaAddress,
912        options: Option<IotaObjectDataOptions>,
913    ) -> anyhow::Result<Vec<IotaObjectResponse>> {
914        let page = self
915            .rpc_client()
916            .get_owned_objects(
917                address,
918                options.map(IotaObjectResponseQuery::new_with_options),
919                None,
920                None,
921            )
922            .await?;
923
924        Ok(page.data)
925    }
926
927    /// Create transactions based on provided object ids
928    /// by transferring them from one address to another
929    pub async fn transfer_objects(
930        &self,
931        sender: IotaAddress,
932        receiver: IotaAddress,
933        object_ids: Vec<ObjectID>,
934        gas: ObjectID,
935        options: Option<IotaTransactionBlockResponseOptions>,
936    ) -> anyhow::Result<Vec<IotaTransactionBlockResponse>> {
937        let mut transaction_block_resp: Vec<IotaTransactionBlockResponse> = Vec::new();
938
939        for id in object_ids {
940            let response = self
941                .transfer_object(sender, receiver, id, gas, options.clone())
942                .await?;
943
944            transaction_block_resp.push(response);
945        }
946
947        Ok(transaction_block_resp)
948    }
949
950    /// Create a transaction to transfer an object from one address to another.
951    /// The object's type must allow public transfers
952    pub async fn transfer_object(
953        &self,
954        sender: IotaAddress,
955        receiver: IotaAddress,
956        object_id: ObjectID,
957        gas: ObjectID,
958        options: Option<IotaTransactionBlockResponseOptions>,
959    ) -> anyhow::Result<IotaTransactionBlockResponse> {
960        let http_client = self.rpc_client();
961        let transaction_bytes = http_client
962            .transfer_object(sender, object_id, Some(gas), 10_000_000.into(), receiver)
963            .await?;
964
965        let tx = self
966            .wallet
967            .sign_transaction(&transaction_bytes.to_data().unwrap());
968
969        let (tx_bytes, signatures) = tx.to_tx_bytes_and_signatures();
970
971        let response = http_client
972            .execute_transaction_block(
973                tx_bytes,
974                signatures,
975                options,
976                Some(ExecuteTransactionRequestType::WaitForLocalExecution),
977            )
978            .await?;
979
980        Ok(response)
981    }
982
983    #[cfg(msim)]
984    pub fn set_safe_mode_expected(&self, value: bool) {
985        for n in self.all_node_handles() {
986            n.with(|node| node.set_safe_mode_expected(value));
987        }
988    }
989}
990
991pub struct RandomNodeRestarter {
992    test_cluster: Arc<TestCluster>,
993
994    // How frequently should we kill nodes
995    kill_interval: Uniform<Duration>,
996    // How long should we wait before restarting them.
997    restart_delay: Uniform<Duration>,
998
999    task_handle: Mutex<Option<JoinHandle<()>>>,
1000}
1001
1002impl RandomNodeRestarter {
1003    fn new(test_cluster: Arc<TestCluster>) -> Self {
1004        Self {
1005            test_cluster,
1006            kill_interval: Uniform::new(Duration::from_secs(10), Duration::from_secs(11)),
1007            restart_delay: Uniform::new(Duration::from_secs(1), Duration::from_secs(2)),
1008            task_handle: Default::default(),
1009        }
1010    }
1011
1012    pub fn with_kill_interval_secs(mut self, a: u64, b: u64) -> Self {
1013        self.kill_interval = Uniform::new(Duration::from_secs(a), Duration::from_secs(b));
1014        self
1015    }
1016
1017    pub fn with_restart_delay_secs(mut self, a: u64, b: u64) -> Self {
1018        self.restart_delay = Uniform::new(Duration::from_secs(a), Duration::from_secs(b));
1019        self
1020    }
1021
1022    pub fn run(&self) {
1023        let test_cluster = self.test_cluster.clone();
1024        let kill_interval = self.kill_interval;
1025        let restart_delay = self.restart_delay;
1026        let validators = self.test_cluster.get_validator_pubkeys();
1027        let mut task_handle = self.task_handle.lock().unwrap();
1028        assert!(task_handle.is_none());
1029        task_handle.replace(tokio::task::spawn(async move {
1030            loop {
1031                let delay = kill_interval.sample(&mut OsRng);
1032                info!("Sleeping {delay:?} before killing a validator");
1033                sleep(delay).await;
1034
1035                let validator = validators.choose(&mut OsRng).unwrap();
1036                info!("Killing validator {:?}", validator.concise());
1037                test_cluster.stop_node(validator);
1038
1039                let delay = restart_delay.sample(&mut OsRng);
1040                info!("Sleeping {delay:?} before restarting");
1041                sleep(delay).await;
1042                info!("Starting validator {:?}", validator.concise());
1043                test_cluster.start_node(validator).await;
1044            }
1045        }));
1046    }
1047}
1048
1049impl Drop for RandomNodeRestarter {
1050    fn drop(&mut self) {
1051        if let Some(handle) = self.task_handle.lock().unwrap().take() {
1052            handle.abort();
1053        }
1054    }
1055}
1056
1057pub struct TestClusterBuilder {
1058    genesis_config: Option<GenesisConfig>,
1059    network_config: Option<NetworkConfig>,
1060    additional_objects: Vec<Object>,
1061    num_validators: Option<usize>,
1062    fullnode_rpc_port: Option<u16>,
1063    fullnode_rpc_addr: Option<SocketAddr>,
1064    enable_fullnode_events: bool,
1065    validator_supported_protocol_versions_config: ProtocolVersionsConfig,
1066    // Default to validator_supported_protocol_versions_config, but can be overridden.
1067    fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
1068    db_checkpoint_config_validators: DBCheckpointConfig,
1069    db_checkpoint_config_fullnodes: DBCheckpointConfig,
1070    num_unpruned_validators: Option<usize>,
1071    jwk_fetch_interval: Option<Duration>,
1072    config_dir: Option<PathBuf>,
1073    default_jwks: bool,
1074    authority_overload_config: Option<AuthorityOverloadConfig>,
1075    data_ingestion_dir: Option<PathBuf>,
1076    fullnode_run_with_range: Option<RunWithRange>,
1077    fullnode_policy_config: Option<PolicyConfig>,
1078    fullnode_fw_config: Option<RemoteFirewallConfig>,
1079
1080    max_submit_position: Option<usize>,
1081    submit_delay_step_override_millis: Option<u64>,
1082    validator_state_accumulator_config: StateAccumulatorV1EnabledConfig,
1083}
1084
1085impl TestClusterBuilder {
1086    pub fn new() -> Self {
1087        TestClusterBuilder {
1088            genesis_config: None,
1089            network_config: None,
1090            additional_objects: vec![],
1091            fullnode_rpc_port: None,
1092            fullnode_rpc_addr: None,
1093            num_validators: None,
1094            enable_fullnode_events: false,
1095            validator_supported_protocol_versions_config: ProtocolVersionsConfig::Default,
1096            fullnode_supported_protocol_versions_config: None,
1097            db_checkpoint_config_validators: DBCheckpointConfig::default(),
1098            db_checkpoint_config_fullnodes: DBCheckpointConfig::default(),
1099            num_unpruned_validators: None,
1100            jwk_fetch_interval: None,
1101            config_dir: None,
1102            default_jwks: false,
1103            authority_overload_config: None,
1104            data_ingestion_dir: None,
1105            fullnode_run_with_range: None,
1106            fullnode_policy_config: None,
1107            fullnode_fw_config: None,
1108            max_submit_position: None,
1109            submit_delay_step_override_millis: None,
1110            validator_state_accumulator_config: StateAccumulatorV1EnabledConfig::Global(true),
1111        }
1112    }
1113
1114    pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
1115        if let Some(run_with_range) = run_with_range {
1116            self.fullnode_run_with_range = Some(run_with_range);
1117        }
1118        self
1119    }
1120
1121    pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
1122        self.fullnode_policy_config = config;
1123        self
1124    }
1125
1126    pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
1127        self.fullnode_fw_config = config;
1128        self
1129    }
1130
1131    pub fn with_fullnode_rpc_port(mut self, rpc_port: u16) -> Self {
1132        self.fullnode_rpc_port = Some(rpc_port);
1133        self
1134    }
1135
1136    pub fn with_fullnode_rpc_addr(mut self, addr: SocketAddr) -> Self {
1137        self.fullnode_rpc_addr = Some(addr);
1138        self
1139    }
1140
1141    pub fn set_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
1142        assert!(self.genesis_config.is_none() && self.network_config.is_none());
1143        self.genesis_config = Some(genesis_config);
1144        self
1145    }
1146
1147    pub fn set_network_config(mut self, network_config: NetworkConfig) -> Self {
1148        assert!(self.genesis_config.is_none() && self.network_config.is_none());
1149        self.network_config = Some(network_config);
1150        self
1151    }
1152
1153    pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
1154        self.additional_objects.extend(objects);
1155        self
1156    }
1157
1158    pub fn with_num_validators(mut self, num: usize) -> Self {
1159        self.num_validators = Some(num);
1160        self
1161    }
1162
1163    pub fn enable_fullnode_events(mut self) -> Self {
1164        self.enable_fullnode_events = true;
1165        self
1166    }
1167
1168    pub fn with_enable_db_checkpoints_validators(mut self) -> Self {
1169        self.db_checkpoint_config_validators = DBCheckpointConfig {
1170            perform_db_checkpoints_at_epoch_end: true,
1171            checkpoint_path: None,
1172            object_store_config: None,
1173            perform_index_db_checkpoints_at_epoch_end: None,
1174            prune_and_compact_before_upload: None,
1175        };
1176        self
1177    }
1178
1179    pub fn with_enable_db_checkpoints_fullnodes(mut self) -> Self {
1180        self.db_checkpoint_config_fullnodes = DBCheckpointConfig {
1181            perform_db_checkpoints_at_epoch_end: true,
1182            checkpoint_path: None,
1183            object_store_config: None,
1184            perform_index_db_checkpoints_at_epoch_end: None,
1185            prune_and_compact_before_upload: Some(true),
1186        };
1187        self
1188    }
1189
1190    pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
1191        self.get_or_init_genesis_config()
1192            .parameters
1193            .epoch_duration_ms = epoch_duration_ms;
1194        self
1195    }
1196
1197    pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
1198        self.validator_supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
1199        self
1200    }
1201
1202    pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
1203        self.jwk_fetch_interval = Some(i);
1204        self
1205    }
1206
1207    pub fn with_fullnode_supported_protocol_versions_config(
1208        mut self,
1209        c: SupportedProtocolVersions,
1210    ) -> Self {
1211        self.fullnode_supported_protocol_versions_config = Some(ProtocolVersionsConfig::Global(c));
1212        self
1213    }
1214
1215    pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
1216        self.get_or_init_genesis_config()
1217            .parameters
1218            .protocol_version = v;
1219        self
1220    }
1221
1222    pub fn with_supported_protocol_version_callback(
1223        mut self,
1224        func: SupportedProtocolVersionsCallback,
1225    ) -> Self {
1226        self.validator_supported_protocol_versions_config =
1227            ProtocolVersionsConfig::PerValidator(func);
1228        self
1229    }
1230
1231    pub fn with_state_accumulator_callback(
1232        mut self,
1233        func: StateAccumulatorEnabledCallback,
1234    ) -> Self {
1235        self.validator_state_accumulator_config =
1236            StateAccumulatorV1EnabledConfig::PerValidator(func);
1237        self
1238    }
1239
1240    pub fn with_validator_candidates(
1241        mut self,
1242        addresses: impl IntoIterator<Item = IotaAddress>,
1243    ) -> Self {
1244        self.get_or_init_genesis_config()
1245            .accounts
1246            .extend(addresses.into_iter().map(|address| AccountConfig {
1247                address: Some(address),
1248                gas_amounts: vec![DEFAULT_GAS_AMOUNT, MIN_VALIDATOR_JOINING_STAKE_NANOS],
1249            }));
1250        self
1251    }
1252
1253    pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
1254        self.num_unpruned_validators = Some(n);
1255        self
1256    }
1257
1258    pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
1259        self.get_or_init_genesis_config().accounts = accounts;
1260        self
1261    }
1262
1263    pub fn with_migration_data(mut self, migration_sources: Vec<SnapshotSource>) -> Self {
1264        self.get_or_init_genesis_config().migration_sources = migration_sources;
1265        self
1266    }
1267
1268    pub fn with_additional_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
1269        self.get_or_init_genesis_config().accounts.extend(accounts);
1270        self
1271    }
1272
1273    pub fn with_delegator(mut self, delegator: IotaAddress) -> Self {
1274        self.get_or_init_genesis_config().delegator = Some(delegator);
1275        self
1276    }
1277
1278    pub fn with_config_dir(mut self, config_dir: PathBuf) -> Self {
1279        self.config_dir = Some(config_dir);
1280        self
1281    }
1282
1283    pub fn with_default_jwks(mut self) -> Self {
1284        self.default_jwks = true;
1285        self
1286    }
1287
1288    pub fn with_authority_overload_config(mut self, config: AuthorityOverloadConfig) -> Self {
1289        assert!(self.network_config.is_none());
1290        self.authority_overload_config = Some(config);
1291        self
1292    }
1293
1294    pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
1295        self.data_ingestion_dir = Some(path);
1296        self
1297    }
1298
1299    pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
1300        self.max_submit_position = Some(max_submit_position);
1301        self
1302    }
1303
1304    pub fn with_submit_delay_step_override_millis(
1305        mut self,
1306        submit_delay_step_override_millis: u64,
1307    ) -> Self {
1308        self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
1309        self
1310    }
1311
1312    pub async fn build(mut self) -> TestCluster {
1313        // We can add a faucet account to the `GenesisConfig` if there was no
1314        // `NetworkConfig` provided. Only either a `GenesisConfig` or a
1315        // `NetworkConfig` can be used to configure and build the cluster.
1316        let faucet = self.network_config.is_none().then(|| {
1317            let (faucet_address, faucet_keypair): (IotaAddress, AccountKeyPair) = get_key_pair();
1318            let accounts = &mut self.get_or_init_genesis_config().accounts;
1319            accounts.push(AccountConfig {
1320                address: Some(faucet_address),
1321                gas_amounts: vec![DEFAULT_GAS_AMOUNT],
1322            });
1323            Faucet {
1324                address: faucet_address,
1325                keypair: Arc::new(tokio::sync::Mutex::new(IotaKeyPair::Ed25519(
1326                    faucet_keypair,
1327                ))),
1328            }
1329        });
1330
1331        // All test clusters receive a continuous stream of random JWKs.
1332        // If we later use zklogin authenticated transactions in tests we will need to
1333        // supply valid JWKs as well.
1334        #[cfg(msim)]
1335        if !self.default_jwks {
1336            iota_node::set_jwk_injector(Arc::new(|_authority, provider| {
1337                use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
1338                use rand::Rng;
1339
1340                // generate random (and possibly conflicting) id/key pairings.
1341                let id_num = rand::thread_rng().gen_range(1..=4);
1342                let key_num = rand::thread_rng().gen_range(1..=4);
1343
1344                let id = JwkId {
1345                    iss: provider.get_config().iss,
1346                    kid: format!("kid{}", id_num),
1347                };
1348
1349                let jwk = JWK {
1350                    kty: "kty".to_string(),
1351                    e: "e".to_string(),
1352                    n: format!("n{}", key_num),
1353                    alg: "alg".to_string(),
1354                };
1355
1356                Ok(vec![(id, jwk)])
1357            }));
1358        }
1359
1360        let swarm = self.start_swarm().await.unwrap();
1361        let working_dir = swarm.dir();
1362
1363        let mut wallet_conf: IotaClientConfig =
1364            PersistedConfig::read(&working_dir.join(IOTA_CLIENT_CONFIG)).unwrap();
1365
1366        let fullnode = swarm.fullnodes().next().unwrap();
1367        let json_rpc_address = fullnode.config().json_rpc_address;
1368        let fullnode_handle =
1369            FullNodeHandle::new(fullnode.get_node_handle().unwrap(), json_rpc_address).await;
1370
1371        wallet_conf.add_env(IotaEnv::new("localnet", fullnode_handle.rpc_url.clone()));
1372        wallet_conf.set_active_env(Some("localnet".to_string()));
1373
1374        wallet_conf
1375            .persisted(&working_dir.join(IOTA_CLIENT_CONFIG))
1376            .save()
1377            .unwrap();
1378
1379        let wallet_conf = swarm.dir().join(IOTA_CLIENT_CONFIG);
1380        let wallet = WalletContext::new(&wallet_conf, None, None).unwrap();
1381
1382        TestCluster {
1383            swarm,
1384            wallet,
1385            fullnode_handle,
1386            bridge_authority_keys: None,
1387            bridge_server_ports: None,
1388            faucet,
1389        }
1390    }
1391
1392    pub async fn build_with_bridge(
1393        self,
1394        bridge_authority_keys: Vec<BridgeAuthorityKeyPair>,
1395        deploy_tokens: bool,
1396    ) -> TestCluster {
1397        let timer = Instant::now();
1398        let gas_objects_for_authority_keys = bridge_authority_keys
1399            .iter()
1400            .map(|k| {
1401                let address = IotaAddress::from(k.public());
1402                Object::with_id_owner_for_testing(ObjectID::random(), address)
1403            })
1404            .collect::<Vec<_>>();
1405        let mut test_cluster = self
1406            .with_objects(gas_objects_for_authority_keys)
1407            .build()
1408            .await;
1409        info!(
1410            "TestCluster build took {:?} secs",
1411            timer.elapsed().as_secs()
1412        );
1413        let ref_gas_price = test_cluster.get_reference_gas_price().await;
1414        let bridge_arg = test_cluster.get_mut_bridge_arg().await.unwrap();
1415        assert_eq!(
1416            bridge_authority_keys.len(),
1417            test_cluster.swarm.active_validators().count()
1418        );
1419
1420        // Committee registers themselves
1421        let mut server_ports = vec![];
1422        let mut tasks = vec![];
1423        let quorum_driver_api = test_cluster.quorum_driver_api().clone();
1424        for (node, kp) in test_cluster
1425            .swarm
1426            .active_validators()
1427            .zip(bridge_authority_keys.iter())
1428        {
1429            let validator_address = node.config().iota_address();
1430            // create committee registration tx
1431            let gas = test_cluster
1432                .wallet
1433                .get_one_gas_object_owned_by_address(validator_address)
1434                .await
1435                .unwrap()
1436                .unwrap();
1437
1438            let server_port = get_available_port("127.0.0.1");
1439            let server_url = format!("http://127.0.0.1:{}", server_port);
1440            server_ports.push(server_port);
1441            let data = build_committee_register_transaction(
1442                validator_address,
1443                &gas,
1444                bridge_arg,
1445                kp.public().as_bytes().to_vec(),
1446                &server_url,
1447                ref_gas_price,
1448                1000000000,
1449            )
1450            .unwrap();
1451
1452            let tx = Transaction::from_data_and_signer(
1453                data,
1454                vec![node.config().account_key_pair.keypair()],
1455            );
1456            let api_clone = quorum_driver_api.clone();
1457            tasks.push(async move {
1458                api_clone
1459                    .execute_transaction_block(
1460                        tx,
1461                        IotaTransactionBlockResponseOptions::new().with_effects(),
1462                        None,
1463                    )
1464                    .await
1465            });
1466        }
1467
1468        if deploy_tokens {
1469            let timer = Instant::now();
1470            let token_ids = vec![TOKEN_ID_BTC, TOKEN_ID_ETH, TOKEN_ID_USDC, TOKEN_ID_USDT];
1471            let token_prices = vec![500_000_000u64, 30_000_000u64, 1_000u64, 1_000u64];
1472            let action = publish_and_register_coins_return_add_coins_on_iota_action(
1473                test_cluster.wallet(),
1474                bridge_arg,
1475                vec![
1476                    Path::new("../../bridge/move/tokens/btc").into(),
1477                    Path::new("../../bridge/move/tokens/eth").into(),
1478                    Path::new("../../bridge/move/tokens/usdc").into(),
1479                    Path::new("../../bridge/move/tokens/usdt").into(),
1480                ],
1481                token_ids,
1482                token_prices,
1483                0,
1484            );
1485            let action = action.await;
1486            info!("register tokens took {:?} secs", timer.elapsed().as_secs());
1487            let sig_map = bridge_authority_keys
1488                .iter()
1489                .map(|key| {
1490                    (
1491                        key.public().into(),
1492                        BridgeAuthoritySignInfo::new(&action, key).signature,
1493                    )
1494                })
1495                .collect::<BTreeMap<_, _>>();
1496            let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1497                action,
1498                BridgeCommitteeValiditySignInfo {
1499                    signatures: sig_map.clone(),
1500                },
1501            );
1502            let verifired_action_cert =
1503                VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
1504            let sender_address = test_cluster.get_address_0();
1505
1506            await_committee_register_tasks(&test_cluster, tasks).await;
1507
1508            // Wait until committee is set up
1509            test_cluster
1510                .trigger_reconfiguration_if_not_yet_and_assert_bridge_committee_initialized()
1511                .await;
1512
1513            let tx = build_add_tokens_on_iota_transaction(
1514                sender_address,
1515                &test_cluster
1516                    .wallet
1517                    .get_one_gas_object_owned_by_address(sender_address)
1518                    .await
1519                    .unwrap()
1520                    .unwrap(),
1521                verifired_action_cert,
1522                bridge_arg,
1523                ref_gas_price,
1524            )
1525            .unwrap();
1526
1527            let response = test_cluster.sign_and_execute_transaction(&tx).await;
1528            assert_eq!(
1529                response.effects.unwrap().status(),
1530                &IotaExecutionStatus::Success
1531            );
1532            info!("Deploy tokens took {:?} secs", timer.elapsed().as_secs());
1533        } else {
1534            await_committee_register_tasks(&test_cluster, tasks).await;
1535        }
1536
1537        async fn await_committee_register_tasks(
1538            test_cluster: &TestCluster,
1539            tasks: Vec<
1540                impl Future<Output = Result<IotaTransactionBlockResponse, iota_sdk::error::Error>>,
1541            >,
1542        ) {
1543            // The tx may fail if a member tries to register when the committee is already
1544            // finalized. In that case, we just need to check the committee
1545            // members is not empty since once the committee is finalized, it
1546            // should not be empty.
1547            let responses = join_all(tasks).await;
1548            let mut has_failure = false;
1549            for response in responses {
1550                if response.unwrap().effects.unwrap().status() != &IotaExecutionStatus::Success {
1551                    has_failure = true;
1552                }
1553            }
1554            if has_failure {
1555                let bridge_summary = test_cluster.get_bridge_summary().await.unwrap();
1556                assert_ne!(bridge_summary.committee.members.len(), 0);
1557            }
1558        }
1559
1560        info!(
1561            "TestCluster build_with_bridge took {:?} secs",
1562            timer.elapsed().as_secs()
1563        );
1564        test_cluster.bridge_authority_keys = Some(bridge_authority_keys);
1565        test_cluster.bridge_server_ports = Some(server_ports);
1566        test_cluster
1567    }
1568
1569    /// Start a Swarm and set up WalletConfig
1570    async fn start_swarm(&mut self) -> Result<Swarm, anyhow::Error> {
1571        let mut builder: SwarmBuilder = Swarm::builder()
1572            .committee_size(
1573                NonZeroUsize::new(self.num_validators.unwrap_or(NUM_VALIDATOR)).unwrap(),
1574            )
1575            .with_objects(self.additional_objects.clone())
1576            .with_db_checkpoint_config(self.db_checkpoint_config_validators.clone())
1577            .with_supported_protocol_versions_config(
1578                self.validator_supported_protocol_versions_config.clone(),
1579            )
1580            .with_state_accumulator_config(self.validator_state_accumulator_config.clone())
1581            .with_fullnode_count(1)
1582            .with_fullnode_supported_protocol_versions_config(
1583                self.fullnode_supported_protocol_versions_config
1584                    .clone()
1585                    .unwrap_or(self.validator_supported_protocol_versions_config.clone()),
1586            )
1587            .with_db_checkpoint_config(self.db_checkpoint_config_fullnodes.clone())
1588            .with_fullnode_run_with_range(self.fullnode_run_with_range)
1589            .with_fullnode_policy_config(self.fullnode_policy_config.clone())
1590            .with_fullnode_fw_config(self.fullnode_fw_config.clone());
1591
1592        if let Some(genesis_config) = self.genesis_config.take() {
1593            builder = builder.with_genesis_config(genesis_config);
1594        }
1595
1596        if let Some(network_config) = self.network_config.take() {
1597            builder = builder.with_network_config(network_config);
1598        }
1599
1600        if let Some(authority_overload_config) = self.authority_overload_config.take() {
1601            builder = builder.with_authority_overload_config(authority_overload_config);
1602        }
1603
1604        if let Some(fullnode_rpc_addr) = self.fullnode_rpc_addr {
1605            builder = builder.with_fullnode_rpc_addr(fullnode_rpc_addr);
1606        } else if let Some(fullnode_rpc_port) = self.fullnode_rpc_port {
1607            builder = builder.with_fullnode_rpc_port(fullnode_rpc_port);
1608        }
1609
1610        if let Some(num_unpruned_validators) = self.num_unpruned_validators {
1611            builder = builder.with_num_unpruned_validators(num_unpruned_validators);
1612        }
1613
1614        if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
1615            builder = builder.with_jwk_fetch_interval(jwk_fetch_interval);
1616        }
1617
1618        if let Some(config_dir) = self.config_dir.take() {
1619            builder = builder.dir(config_dir);
1620        }
1621
1622        if let Some(data_ingestion_dir) = self.data_ingestion_dir.take() {
1623            builder = builder.with_data_ingestion_dir(data_ingestion_dir);
1624        }
1625
1626        if let Some(max_submit_position) = self.max_submit_position {
1627            builder = builder.with_max_submit_position(max_submit_position);
1628        }
1629
1630        if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis {
1631            builder =
1632                builder.with_submit_delay_step_override_millis(submit_delay_step_override_millis);
1633        }
1634
1635        let mut swarm = builder.build();
1636        swarm.launch().await?;
1637
1638        let dir = swarm.dir();
1639
1640        let network_path = dir.join(IOTA_NETWORK_CONFIG);
1641        let wallet_path = dir.join(IOTA_CLIENT_CONFIG);
1642        let keystore_path = dir.join(IOTA_KEYSTORE_FILENAME);
1643
1644        let network_config = swarm.config();
1645        // Create light config to save
1646        let account_keys = network_config
1647            .account_keys
1648            .iter()
1649            .map(|kp| kp.copy())
1650            .collect();
1651        let network_config_light = NetworkConfigLight::new(
1652            network_config.validator_configs.clone(),
1653            account_keys,
1654            &network_config.genesis,
1655        );
1656        network_config_light.save(network_path)?;
1657
1658        let mut keystore = Keystore::from(FileBasedKeystore::new(&keystore_path)?);
1659        for key in &swarm.config().account_keys {
1660            keystore.add_key(None, IotaKeyPair::Ed25519(key.copy()))?;
1661        }
1662
1663        let active_address = keystore.addresses().first().cloned();
1664
1665        // Create wallet config with stated authorities port
1666        IotaClientConfig::new(FileBasedKeystore::new(&keystore_path)?)
1667            .with_active_address(active_address)
1668            .save(wallet_path)?;
1669
1670        // Return network handle
1671        Ok(swarm)
1672    }
1673
1674    fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
1675        if self.genesis_config.is_none() {
1676            self.genesis_config = Some(GenesisConfig::for_local_testing());
1677        }
1678        self.genesis_config.as_mut().unwrap()
1679    }
1680}
1681
1682impl Default for TestClusterBuilder {
1683    fn default() -> Self {
1684        Self::new()
1685    }
1686}