1use std::{
6 collections::{BTreeMap, HashMap},
7 net::SocketAddr,
8 num::NonZeroUsize,
9 path::{Path, PathBuf},
10 sync::{Arc, Mutex},
11 time::Duration,
12};
13
14use futures::{Future, StreamExt, future::join_all};
15use iota_bridge::{
16 crypto::{BridgeAuthorityKeyPair, BridgeAuthoritySignInfo},
17 iota_transaction_builder::{
18 build_add_tokens_on_iota_transaction, build_committee_register_transaction,
19 },
20 types::{
21 BridgeCommitteeValiditySignInfo, CertifiedBridgeAction, VerifiedCertifiedBridgeAction,
22 },
23 utils::{publish_and_register_coins_return_add_coins_on_iota_action, wait_for_server_to_be_up},
24};
25use iota_config::{
26 Config, IOTA_CLIENT_CONFIG, IOTA_KEYSTORE_FILENAME, IOTA_NETWORK_CONFIG, NodeConfig,
27 PersistedConfig,
28 genesis::Genesis,
29 local_ip_utils::get_available_port,
30 node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange},
31};
32use iota_core::{
33 authority_aggregator::AuthorityAggregator, authority_client::NetworkAuthorityClient,
34};
35use iota_genesis_builder::SnapshotSource;
36use iota_json_rpc_api::{
37 BridgeReadApiClient, IndexerApiClient, TransactionBuilderClient, WriteApiClient,
38 error_object_from_rpc,
39};
40use iota_json_rpc_types::{
41 IotaExecutionStatus, IotaObjectDataOptions, IotaObjectResponse, IotaObjectResponseQuery,
42 IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse,
43 IotaTransactionBlockResponseOptions, TransactionFilter,
44};
45use iota_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
46use iota_node::IotaNodeHandle;
47use iota_protocol_config::ProtocolVersion;
48use iota_sdk::{
49 IotaClient, IotaClientBuilder,
50 apis::QuorumDriverApi,
51 iota_client_config::{IotaClientConfig, IotaEnv},
52 wallet_context::WalletContext,
53};
54use iota_swarm::memory::{Swarm, SwarmBuilder};
55use iota_swarm_config::{
56 genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT, GenesisConfig, ValidatorGenesisConfig},
57 network_config::{NetworkConfig, NetworkConfigLight},
58 network_config_builder::{
59 ProtocolVersionsConfig, StateAccumulatorEnabledCallback, StateAccumulatorV1EnabledConfig,
60 SupportedProtocolVersionsCallback,
61 },
62 node_config_builder::{FullnodeConfigBuilder, ValidatorConfigBuilder},
63};
64use iota_test_transaction_builder::TestTransactionBuilder;
65use iota_types::{
66 IOTA_BRIDGE_OBJECT_ID,
67 base_types::{AuthorityName, ConciseableName, IotaAddress, ObjectID, ObjectRef},
68 bridge::{
69 BridgeSummary, BridgeTrait, TOKEN_ID_BTC, TOKEN_ID_ETH, TOKEN_ID_USDC, TOKEN_ID_USDT,
70 get_bridge, get_bridge_obj_initial_shared_version,
71 },
72 committee::{Committee, CommitteeTrait, EpochId},
73 crypto::{AccountKeyPair, IotaKeyPair, KeypairTraits, ToFromBytes, get_key_pair},
74 effects::{TransactionEffects, TransactionEvents},
75 error::IotaResult,
76 governance::MIN_VALIDATOR_JOINING_STAKE_NANOS,
77 iota_system_state::{
78 IotaSystemState, IotaSystemStateTrait,
79 epoch_start_iota_system_state::EpochStartSystemStateTrait,
80 },
81 message_envelope::Message,
82 messages_grpc::HandleCertificateRequestV1,
83 object::Object,
84 quorum_driver_types::ExecuteTransactionRequestType,
85 supported_protocol_versions::SupportedProtocolVersions,
86 traffic_control::{PolicyConfig, RemoteFirewallConfig},
87 transaction::{
88 CertifiedTransaction, ObjectArg, Transaction, TransactionData, TransactionDataAPI,
89 TransactionKind,
90 },
91 utils::to_sender_signed_transaction,
92};
93use jsonrpsee::{
94 core::RpcResult,
95 http_client::{HttpClient, HttpClientBuilder},
96};
97use rand::{distributions::*, rngs::OsRng, seq::SliceRandom};
98use tokio::{
99 task::JoinHandle,
100 time::{Instant, sleep, timeout},
101};
102use tracing::{error, info};
103
104const NUM_VALIDATOR: usize = 4;
105
106pub struct FullNodeHandle {
107 pub iota_node: IotaNodeHandle,
108 pub iota_client: IotaClient,
109 pub rpc_client: HttpClient,
110 pub rpc_url: String,
111}
112
113impl FullNodeHandle {
114 pub async fn new(iota_node: IotaNodeHandle, json_rpc_address: SocketAddr) -> Self {
115 let rpc_url = format!("http://{}", json_rpc_address);
116 let rpc_client = HttpClientBuilder::default().build(&rpc_url).unwrap();
117
118 let iota_client = IotaClientBuilder::default().build(&rpc_url).await.unwrap();
119
120 Self {
121 iota_node,
122 iota_client,
123 rpc_client,
124 rpc_url,
125 }
126 }
127}
128
129struct Faucet {
130 address: IotaAddress,
131 keypair: Arc<tokio::sync::Mutex<IotaKeyPair>>,
132}
133
134pub struct TestCluster {
135 pub swarm: Swarm,
136 pub wallet: WalletContext,
137 pub fullnode_handle: FullNodeHandle,
138 pub bridge_authority_keys: Option<Vec<BridgeAuthorityKeyPair>>,
139 pub bridge_server_ports: Option<Vec<u16>>,
140 faucet: Option<Faucet>,
141}
142
143impl TestCluster {
144 pub fn rpc_client(&self) -> &HttpClient {
145 &self.fullnode_handle.rpc_client
146 }
147
148 pub fn iota_client(&self) -> &IotaClient {
149 &self.fullnode_handle.iota_client
150 }
151
152 pub fn quorum_driver_api(&self) -> &QuorumDriverApi {
153 self.iota_client().quorum_driver_api()
154 }
155
156 pub fn rpc_url(&self) -> &str {
157 &self.fullnode_handle.rpc_url
158 }
159
160 pub fn wallet(&mut self) -> &WalletContext {
161 &self.wallet
162 }
163
164 pub fn wallet_mut(&mut self) -> &mut WalletContext {
165 &mut self.wallet
166 }
167
168 pub fn get_addresses(&self) -> Vec<IotaAddress> {
169 self.wallet.get_addresses()
170 }
171
172 pub fn get_address_0(&self) -> IotaAddress {
174 self.get_addresses()[0]
175 }
176
177 pub fn get_address_1(&self) -> IotaAddress {
179 self.get_addresses()[1]
180 }
181
182 pub fn get_address_2(&self) -> IotaAddress {
184 self.get_addresses()[2]
185 }
186
187 pub fn fullnode_config_builder(&self) -> FullnodeConfigBuilder {
188 self.swarm.get_fullnode_config_builder()
189 }
190
191 pub fn committee(&self) -> Arc<Committee> {
192 self.fullnode_handle
193 .iota_node
194 .with(|node| node.state().epoch_store_for_testing().committee().clone())
195 }
196
197 pub async fn spawn_new_fullnode(&mut self) -> FullNodeHandle {
199 self.start_fullnode_from_config(
200 self.fullnode_config_builder()
201 .build(&mut OsRng, self.swarm.config()),
202 )
203 .await
204 }
205
206 pub async fn start_fullnode_from_config(&mut self, config: NodeConfig) -> FullNodeHandle {
207 let json_rpc_address = config.json_rpc_address;
208 let node = self.swarm.spawn_new_node(config).await;
209 FullNodeHandle::new(node, json_rpc_address).await
210 }
211
212 pub fn all_node_handles(&self) -> Vec<IotaNodeHandle> {
213 self.swarm
214 .all_nodes()
215 .flat_map(|n| n.get_node_handle())
216 .collect()
217 }
218
219 pub fn all_validator_handles(&self) -> Vec<IotaNodeHandle> {
220 self.swarm
221 .validator_nodes()
222 .map(|n| n.get_node_handle().unwrap())
223 .collect()
224 }
225
226 pub fn get_validator_pubkeys(&self) -> Vec<AuthorityName> {
227 self.swarm.active_validators().map(|v| v.name()).collect()
228 }
229
230 pub fn get_genesis(&self) -> Genesis {
231 self.swarm.config().genesis.clone()
232 }
233
234 pub fn stop_node(&self, name: &AuthorityName) {
235 self.swarm.node(name).unwrap().stop();
236 }
237
238 pub async fn stop_all_validators(&self) {
239 info!("Stopping all validators in the cluster");
240 self.swarm.active_validators().for_each(|v| v.stop());
241 tokio::time::sleep(Duration::from_secs(3)).await;
242 }
243
244 pub async fn start_all_validators(&self) {
245 info!("Starting all validators in the cluster");
246 for v in self.swarm.validator_nodes() {
247 if v.is_running() {
248 continue;
249 }
250 v.start().await.unwrap();
251 }
252 tokio::time::sleep(Duration::from_secs(3)).await;
253 }
254
255 pub async fn start_node(&self, name: &AuthorityName) {
256 let node = self.swarm.node(name).unwrap();
257 if node.is_running() {
258 return;
259 }
260 node.start().await.unwrap();
261 }
262
263 pub async fn spawn_new_validator(
264 &mut self,
265 genesis_config: ValidatorGenesisConfig,
266 ) -> IotaNodeHandle {
267 let node_config = ValidatorConfigBuilder::new()
268 .build(genesis_config, self.swarm.config().genesis.clone());
269 self.swarm.spawn_new_node(node_config).await
270 }
271
272 pub fn random_node_restarter(self: &Arc<Self>) -> RandomNodeRestarter {
273 RandomNodeRestarter::new(self.clone())
274 }
275
276 pub async fn get_reference_gas_price(&self) -> u64 {
277 self.iota_client()
278 .governance_api()
279 .get_reference_gas_price()
280 .await
281 .expect("failed to get reference gas price")
282 }
283
284 pub async fn get_object_from_fullnode_store(&self, object_id: &ObjectID) -> Option<Object> {
285 self.fullnode_handle
286 .iota_node
287 .with_async(|node| async { node.state().get_object(object_id).await.unwrap() })
288 .await
289 }
290
291 pub async fn get_latest_object_ref(&self, object_id: &ObjectID) -> ObjectRef {
292 self.get_object_from_fullnode_store(object_id)
293 .await
294 .unwrap()
295 .compute_object_reference()
296 }
297
298 pub async fn get_bridge_summary(&self) -> RpcResult<BridgeSummary> {
299 self.iota_client()
300 .http()
301 .get_latest_bridge()
302 .await
303 .map_err(error_object_from_rpc)
304 }
305
306 pub async fn get_object_or_tombstone_from_fullnode_store(
307 &self,
308 object_id: ObjectID,
309 ) -> ObjectRef {
310 self.fullnode_handle
311 .iota_node
312 .state()
313 .get_object_cache_reader()
314 .get_latest_object_ref_or_tombstone(object_id)
315 .unwrap()
316 .unwrap()
317 }
318
319 pub async fn wait_for_run_with_range_shutdown_signal(&self) -> Option<RunWithRange> {
320 self.wait_for_run_with_range_shutdown_signal_with_timeout(Duration::from_secs(60))
321 .await
322 }
323
324 pub async fn wait_for_run_with_range_shutdown_signal_with_timeout(
325 &self,
326 timeout_dur: Duration,
327 ) -> Option<RunWithRange> {
328 let mut shutdown_channel_rx = self
329 .fullnode_handle
330 .iota_node
331 .with(|node| node.subscribe_to_shutdown_channel());
332
333 timeout(timeout_dur, async move {
334 tokio::select! {
335 msg = shutdown_channel_rx.recv() =>
336 {
337 match msg {
338 Ok(Some(run_with_range)) => Some(run_with_range),
339 Ok(None) => None,
340 Err(e) => {
341 error!("failed recv from iota-node shutdown channel: {}", e);
342 None
343 },
344 }
345 },
346 }
347 })
348 .await
349 .expect("Timed out waiting for cluster to hit target epoch and recv shutdown signal from iota-node")
350 }
351
352 pub async fn wait_for_protocol_version(
353 &self,
354 target_protocol_version: ProtocolVersion,
355 ) -> IotaSystemState {
356 self.wait_for_protocol_version_with_timeout(
357 target_protocol_version,
358 Duration::from_secs(60),
359 )
360 .await
361 }
362
363 pub async fn wait_for_protocol_version_with_timeout(
364 &self,
365 target_protocol_version: ProtocolVersion,
366 timeout_dur: Duration,
367 ) -> IotaSystemState {
368 timeout(timeout_dur, async move {
369 loop {
370 let system_state = self.wait_for_epoch(None).await;
371 if system_state.protocol_version() >= target_protocol_version.as_u64() {
372 return system_state;
373 }
374 }
375 })
376 .await
377 .expect("Timed out waiting for cluster to target protocol version")
378 }
379
380 pub async fn force_new_epoch(&self) {
384 info!("Starting reconfiguration");
385 let start = Instant::now();
386
387 let cur_committee = self
389 .fullnode_handle
390 .iota_node
391 .with(|node| node.state().clone_committee_for_testing());
392 let mut cur_stake = 0;
393 for node in self.swarm.active_validators() {
394 node.get_node_handle()
395 .unwrap()
396 .with_async(|node| async {
397 node.close_epoch_for_testing().await.unwrap();
398 cur_stake += cur_committee.weight(&node.state().name);
399 })
400 .await;
401 if cur_stake >= cur_committee.quorum_threshold() {
402 break;
403 }
404 }
405 info!("close_epoch complete after {:?}", start.elapsed());
406
407 self.wait_for_epoch(Some(cur_committee.epoch + 1)).await;
408 self.wait_for_epoch_all_nodes(cur_committee.epoch + 1).await;
409
410 info!("reconfiguration complete after {:?}", start.elapsed());
411 }
412
413 pub async fn wait_for_epoch(&self, target_epoch: Option<EpochId>) -> IotaSystemState {
421 self.wait_for_epoch_with_timeout(target_epoch, Duration::from_secs(60))
422 .await
423 }
424
425 pub async fn wait_for_epoch_on_node(
426 &self,
427 handle: &IotaNodeHandle,
428 target_epoch: Option<EpochId>,
429 timeout_dur: Duration,
430 ) -> IotaSystemState {
431 let mut epoch_rx = handle.with(|node| node.subscribe_to_epoch_change());
432
433 let mut state = None;
434 timeout(timeout_dur, async {
435 let epoch = handle.with(|node| node.state().epoch_store_for_testing().epoch());
436 if Some(epoch) == target_epoch {
437 return handle.with(|node| node.state().get_iota_system_state_object_for_testing().unwrap());
438 }
439 while let Ok(system_state) = epoch_rx.recv().await {
440 info!("received epoch {}", system_state.epoch());
441 state = Some(system_state.clone());
442 match target_epoch {
443 Some(target_epoch) if system_state.epoch() >= target_epoch => {
444 return system_state;
445 }
446 None => {
447 return system_state;
448 }
449 _ => (),
450 }
451 }
452 unreachable!("Broken reconfig channel");
453 })
454 .await
455 .unwrap_or_else(|_| {
456 error!("Timed out waiting for cluster to reach epoch {target_epoch:?}");
457 if let Some(state) = state {
458 panic!("Timed out waiting for cluster to reach epoch {target_epoch:?}. Current epoch: {}", state.epoch());
459 }
460 panic!("Timed out waiting for cluster to target epoch {target_epoch:?}")
461 })
462 }
463
464 pub async fn wait_for_epoch_with_timeout(
465 &self,
466 target_epoch: Option<EpochId>,
467 timeout_dur: Duration,
468 ) -> IotaSystemState {
469 self.wait_for_epoch_on_node(&self.fullnode_handle.iota_node, target_epoch, timeout_dur)
470 .await
471 }
472
473 pub async fn wait_for_epoch_all_nodes(&self, target_epoch: EpochId) {
474 let handles: Vec<_> = self
475 .swarm
476 .all_nodes()
477 .map(|node| node.get_node_handle().unwrap())
478 .collect();
479 let tasks: Vec<_> = handles
480 .iter()
481 .map(|handle| {
482 handle.with_async(|node| async {
483 let mut retries = 0;
484 loop {
485 let epoch = node.state().epoch_store_for_testing().epoch();
486 if epoch == target_epoch {
487 if let Some(agg) = node.clone_authority_aggregator() {
488 if agg.committee.epoch() == target_epoch {
490 break;
491 }
492 } else {
493 break;
495 }
496 }
497 tokio::time::sleep(Duration::from_secs(1)).await;
498 retries += 1;
499 if retries % 5 == 0 {
500 tracing::warn!(validator=?node.state().name.concise(), "Waiting for {:?} seconds to reach epoch {:?}. Currently at epoch {:?}", retries, target_epoch, epoch);
501 }
502 }
503 })
504 })
505 .collect();
506
507 timeout(Duration::from_secs(40), join_all(tasks))
508 .await
509 .expect("timed out waiting for reconfiguration to complete");
510 }
511
512 pub async fn update_validator_supported_versions(
517 &self,
518 new_supported_versions: SupportedProtocolVersions,
519 ) {
520 for authority in self.get_validator_pubkeys() {
521 self.stop_node(&authority);
522 tokio::time::sleep(Duration::from_millis(1000)).await;
523 self.swarm
524 .node(&authority)
525 .unwrap()
526 .config()
527 .supported_protocol_versions = Some(new_supported_versions);
528 self.start_node(&authority).await;
529 info!("Restarted validator {}", authority);
530 }
531 }
532
533 pub async fn wait_for_all_nodes_upgrade_to(&self, protocol_version: u64) {
535 for h in self.all_node_handles() {
536 h.with_async(|node| async {
537 while node
538 .state()
539 .epoch_store_for_testing()
540 .epoch_start_state()
541 .protocol_version()
542 .as_u64()
543 != protocol_version
544 {
545 tokio::time::sleep(Duration::from_secs(1)).await;
546 }
547 })
548 .await;
549 }
550 }
551
552 pub async fn trigger_reconfiguration_if_not_yet_and_assert_bridge_committee_initialized(&self) {
553 let mut bridge =
554 get_bridge(self.fullnode_handle.iota_node.state().get_object_store()).unwrap();
555 if !bridge.committee().members.contents.is_empty() {
556 assert_eq!(
557 self.swarm.active_validators().count(),
558 bridge.committee().members.contents.len()
559 );
560 return;
561 }
562 self.force_new_epoch().await;
564 bridge = get_bridge(self.fullnode_handle.iota_node.state().get_object_store()).unwrap();
565 assert!(bridge.committee().member_registrations.contents.is_empty());
567 assert_eq!(
568 self.swarm.active_validators().count(),
569 bridge.committee().members.contents.len()
570 );
571 }
572
573 pub async fn wait_for_bridge_cluster_to_be_up(&self, timeout_sec: u64) {
575 let bridge_ports = self.bridge_server_ports.as_ref().unwrap();
576 let mut tasks = vec![];
577 for port in bridge_ports.iter() {
578 let server_url = format!("http://127.0.0.1:{}", port);
579 tasks.push(wait_for_server_to_be_up(server_url, timeout_sec));
580 }
581 join_all(tasks)
582 .await
583 .into_iter()
584 .collect::<anyhow::Result<Vec<_>>>()
585 .unwrap();
586 }
587
588 pub async fn get_mut_bridge_arg(&self) -> Option<ObjectArg> {
589 get_bridge_obj_initial_shared_version(
590 self.fullnode_handle.iota_node.state().get_object_store(),
591 )
592 .unwrap()
593 .map(|seq| ObjectArg::SharedObject {
594 id: IOTA_BRIDGE_OBJECT_ID,
595 initial_shared_version: seq,
596 mutable: true,
597 })
598 }
599
600 pub async fn wait_for_authenticator_state_update(&self) {
601 timeout(
602 Duration::from_secs(60),
603 self.fullnode_handle
604 .iota_node
605 .with_async(|node| async move {
606 let mut txns = node.state().subscription_handler.subscribe_transactions(
607 TransactionFilter::ChangedObject(
608 ObjectID::from_hex_literal("0x7").unwrap(),
609 ),
610 );
611 let state = node.state();
612
613 while let Some(tx) = txns.next().await {
614 let digest = *tx.transaction_digest();
615 let tx = state
616 .get_transaction_cache_reader()
617 .get_transaction_block(&digest)
618 .unwrap()
619 .unwrap();
620 match &tx.data().intent_message().value.kind() {
621 TransactionKind::EndOfEpochTransaction(_) => (),
622 TransactionKind::AuthenticatorStateUpdateV1(_) => break,
623 _ => panic!("{:?}", tx),
624 }
625 }
626 }),
627 )
628 .await
629 .expect("Timed out waiting for authenticator state update");
630 }
631
632 pub fn highest_protocol_version(&self) -> ProtocolVersion {
634 self.all_node_handles()
635 .into_iter()
636 .map(|h| {
637 h.with(|node| {
638 node.state()
639 .epoch_store_for_testing()
640 .epoch_start_state()
641 .protocol_version()
642 })
643 })
644 .max()
645 .expect("at least one node must be up to get highest protocol version")
646 }
647
648 pub async fn test_transaction_builder(&self) -> TestTransactionBuilder {
649 let (sender, gas) = self.wallet.get_one_gas_object().await.unwrap().unwrap();
650 self.test_transaction_builder_with_gas_object(sender, gas)
651 .await
652 }
653
654 pub async fn test_transaction_builder_with_sender(
655 &self,
656 sender: IotaAddress,
657 ) -> TestTransactionBuilder {
658 let gas = self
659 .wallet
660 .get_one_gas_object_owned_by_address(sender)
661 .await
662 .unwrap()
663 .unwrap();
664 self.test_transaction_builder_with_gas_object(sender, gas)
665 .await
666 }
667
668 pub async fn test_transaction_builder_with_gas_object(
669 &self,
670 sender: IotaAddress,
671 gas: ObjectRef,
672 ) -> TestTransactionBuilder {
673 let rgp = self.get_reference_gas_price().await;
674 TestTransactionBuilder::new(sender, gas, rgp)
675 }
676
677 pub fn sign_transaction(&self, tx_data: &TransactionData) -> Transaction {
678 self.wallet.sign_transaction(tx_data)
679 }
680
681 pub async fn sign_and_execute_transaction(
682 &self,
683 tx_data: &TransactionData,
684 ) -> IotaTransactionBlockResponse {
685 let tx = self.wallet.sign_transaction(tx_data);
686 self.execute_transaction(tx).await
687 }
688
689 pub async fn execute_transaction(&self, tx: Transaction) -> IotaTransactionBlockResponse {
694 self.wallet.execute_transaction_must_succeed(tx).await
695 }
696
697 pub async fn execute_transaction_return_raw_effects(
708 &self,
709 tx: Transaction,
710 ) -> anyhow::Result<(TransactionEffects, TransactionEvents)> {
711 let results = self
712 .submit_transaction_to_validators(tx.clone(), &self.get_validator_pubkeys())
713 .await?;
714 self.wallet.execute_transaction_may_fail(tx).await.unwrap();
715 Ok(results)
716 }
717
718 pub fn authority_aggregator(&self) -> Arc<AuthorityAggregator<NetworkAuthorityClient>> {
719 self.fullnode_handle
720 .iota_node
721 .with(|node| node.clone_authority_aggregator().unwrap())
722 }
723
724 pub async fn create_certificate(
725 &self,
726 tx: Transaction,
727 client_addr: Option<SocketAddr>,
728 ) -> anyhow::Result<CertifiedTransaction> {
729 let agg = self.authority_aggregator();
730 Ok(agg
731 .process_transaction(tx, client_addr)
732 .await?
733 .into_cert_for_testing())
734 }
735
736 pub async fn submit_transaction_to_validators(
743 &self,
744 tx: Transaction,
745 pubkeys: &[AuthorityName],
746 ) -> anyhow::Result<(TransactionEffects, TransactionEvents)> {
747 let agg = self.authority_aggregator();
748 let certificate = agg
749 .process_transaction(tx, None)
750 .await?
751 .into_cert_for_testing();
752 let replies = loop {
753 let futures: Vec<_> = agg
754 .authority_clients
755 .iter()
756 .filter_map(|(name, client)| {
757 if pubkeys.contains(name) {
758 Some(client)
759 } else {
760 None
761 }
762 })
763 .map(|client| {
764 let cert = certificate.clone();
765 async move {
766 client
767 .handle_certificate_v1(
768 HandleCertificateRequestV1::new(cert).with_events(),
769 None,
770 )
771 .await
772 }
773 })
774 .collect();
775
776 let replies: Vec<_> = futures::future::join_all(futures)
777 .await
778 .into_iter()
779 .filter(|result| match result {
780 Err(e) => !e.to_string().contains("deadline has elapsed"),
781 _ => true,
782 })
783 .collect();
784
785 if !replies.is_empty() {
786 break replies;
787 }
788 };
789 let replies: IotaResult<Vec<_>> = replies.into_iter().collect();
790 let replies = replies?;
791 let mut all_effects = HashMap::new();
792 let mut all_events = HashMap::new();
793 for reply in replies {
794 let effects = reply.signed_effects.into_data();
795 let events = reply.events.unwrap_or_default();
796 all_effects.insert(effects.digest(), effects);
797 all_events.insert(events.digest(), events);
798 }
799 assert_eq!(all_effects.len(), 1);
800 assert_eq!(all_events.len(), 1);
801 Ok((
802 all_effects.into_values().next().unwrap(),
803 all_events.into_values().next().unwrap(),
804 ))
805 }
806
807 pub async fn fund_address_and_return_gas(
811 &self,
812 rgp: u64,
813 amount: Option<u64>,
814 funding_address: IotaAddress,
815 ) -> ObjectRef {
816 let Faucet { address, keypair } = &self
817 .faucet
818 .as_ref()
819 .expect("Faucet not initialized: incompatible with `NetworkConfig`.");
820
821 let keypair = &*keypair.lock().await;
822
823 let gas_ref = *self
824 .wallet
825 .get_gas_objects_owned_by_address(*address, None)
826 .await
827 .unwrap()
828 .first()
829 .unwrap();
830
831 let tx_data = TestTransactionBuilder::new(*address, gas_ref, rgp)
832 .transfer_iota(amount, funding_address)
833 .build();
834
835 let signed_transaction = to_sender_signed_transaction(tx_data, keypair);
836
837 let response = self
838 .iota_client()
839 .quorum_driver_api()
840 .execute_transaction_block(
841 signed_transaction,
842 IotaTransactionBlockResponseOptions::new().with_effects(),
843 Some(ExecuteTransactionRequestType::WaitForLocalExecution),
844 )
845 .await
846 .unwrap();
847
848 response
849 .effects
850 .unwrap()
851 .created()
852 .first()
853 .unwrap()
854 .reference
855 .to_object_ref()
856 }
857
858 pub async fn transfer_iota_must_exceed(
859 &self,
860 sender: IotaAddress,
861 receiver: IotaAddress,
862 amount: u64,
863 ) -> ObjectID {
864 let tx = self
865 .test_transaction_builder_with_sender(sender)
866 .await
867 .transfer_iota(Some(amount), receiver)
868 .build();
869 let effects = self
870 .sign_and_execute_transaction(&tx)
871 .await
872 .effects
873 .unwrap();
874 assert_eq!(&IotaExecutionStatus::Success, effects.status());
875 effects.created().first().unwrap().object_id()
876 }
877
878 pub async fn wait_for_checkpoint(
881 &self,
882 checkpoint_sequence_number: u64,
883 timeout: Option<Duration>,
884 ) {
885 let timeout = timeout.unwrap_or(Duration::from_secs(60));
886 tokio::time::timeout(timeout, async {
887 loop {
888 let fullnode_checkpoint = self
889 .fullnode_handle
890 .iota_node
891 .with(|node| {
892 node.state()
893 .get_checkpoint_store()
894 .get_highest_executed_checkpoint_seq_number()
895 })
896 .unwrap();
897
898 match fullnode_checkpoint {
899 Some(c) if c >= checkpoint_sequence_number => break,
900 _ => tokio::time::sleep(Duration::from_millis(100)).await,
901 }
902 }
903 })
904 .await
905 .expect("Timeout waiting for indexer to catchup to checkpoint");
906 }
907
908 pub async fn get_owned_objects(
910 &self,
911 address: IotaAddress,
912 options: Option<IotaObjectDataOptions>,
913 ) -> anyhow::Result<Vec<IotaObjectResponse>> {
914 let page = self
915 .rpc_client()
916 .get_owned_objects(
917 address,
918 options.map(IotaObjectResponseQuery::new_with_options),
919 None,
920 None,
921 )
922 .await?;
923
924 Ok(page.data)
925 }
926
927 pub async fn transfer_objects(
930 &self,
931 sender: IotaAddress,
932 receiver: IotaAddress,
933 object_ids: Vec<ObjectID>,
934 gas: ObjectID,
935 options: Option<IotaTransactionBlockResponseOptions>,
936 ) -> anyhow::Result<Vec<IotaTransactionBlockResponse>> {
937 let mut transaction_block_resp: Vec<IotaTransactionBlockResponse> = Vec::new();
938
939 for id in object_ids {
940 let response = self
941 .transfer_object(sender, receiver, id, gas, options.clone())
942 .await?;
943
944 transaction_block_resp.push(response);
945 }
946
947 Ok(transaction_block_resp)
948 }
949
950 pub async fn transfer_object(
953 &self,
954 sender: IotaAddress,
955 receiver: IotaAddress,
956 object_id: ObjectID,
957 gas: ObjectID,
958 options: Option<IotaTransactionBlockResponseOptions>,
959 ) -> anyhow::Result<IotaTransactionBlockResponse> {
960 let http_client = self.rpc_client();
961 let transaction_bytes = http_client
962 .transfer_object(sender, object_id, Some(gas), 10_000_000.into(), receiver)
963 .await?;
964
965 let tx = self
966 .wallet
967 .sign_transaction(&transaction_bytes.to_data().unwrap());
968
969 let (tx_bytes, signatures) = tx.to_tx_bytes_and_signatures();
970
971 let response = http_client
972 .execute_transaction_block(
973 tx_bytes,
974 signatures,
975 options,
976 Some(ExecuteTransactionRequestType::WaitForLocalExecution),
977 )
978 .await?;
979
980 Ok(response)
981 }
982
983 #[cfg(msim)]
984 pub fn set_safe_mode_expected(&self, value: bool) {
985 for n in self.all_node_handles() {
986 n.with(|node| node.set_safe_mode_expected(value));
987 }
988 }
989}
990
991pub struct RandomNodeRestarter {
992 test_cluster: Arc<TestCluster>,
993
994 kill_interval: Uniform<Duration>,
996 restart_delay: Uniform<Duration>,
998
999 task_handle: Mutex<Option<JoinHandle<()>>>,
1000}
1001
1002impl RandomNodeRestarter {
1003 fn new(test_cluster: Arc<TestCluster>) -> Self {
1004 Self {
1005 test_cluster,
1006 kill_interval: Uniform::new(Duration::from_secs(10), Duration::from_secs(11)),
1007 restart_delay: Uniform::new(Duration::from_secs(1), Duration::from_secs(2)),
1008 task_handle: Default::default(),
1009 }
1010 }
1011
1012 pub fn with_kill_interval_secs(mut self, a: u64, b: u64) -> Self {
1013 self.kill_interval = Uniform::new(Duration::from_secs(a), Duration::from_secs(b));
1014 self
1015 }
1016
1017 pub fn with_restart_delay_secs(mut self, a: u64, b: u64) -> Self {
1018 self.restart_delay = Uniform::new(Duration::from_secs(a), Duration::from_secs(b));
1019 self
1020 }
1021
1022 pub fn run(&self) {
1023 let test_cluster = self.test_cluster.clone();
1024 let kill_interval = self.kill_interval;
1025 let restart_delay = self.restart_delay;
1026 let validators = self.test_cluster.get_validator_pubkeys();
1027 let mut task_handle = self.task_handle.lock().unwrap();
1028 assert!(task_handle.is_none());
1029 task_handle.replace(tokio::task::spawn(async move {
1030 loop {
1031 let delay = kill_interval.sample(&mut OsRng);
1032 info!("Sleeping {delay:?} before killing a validator");
1033 sleep(delay).await;
1034
1035 let validator = validators.choose(&mut OsRng).unwrap();
1036 info!("Killing validator {:?}", validator.concise());
1037 test_cluster.stop_node(validator);
1038
1039 let delay = restart_delay.sample(&mut OsRng);
1040 info!("Sleeping {delay:?} before restarting");
1041 sleep(delay).await;
1042 info!("Starting validator {:?}", validator.concise());
1043 test_cluster.start_node(validator).await;
1044 }
1045 }));
1046 }
1047}
1048
1049impl Drop for RandomNodeRestarter {
1050 fn drop(&mut self) {
1051 if let Some(handle) = self.task_handle.lock().unwrap().take() {
1052 handle.abort();
1053 }
1054 }
1055}
1056
1057pub struct TestClusterBuilder {
1058 genesis_config: Option<GenesisConfig>,
1059 network_config: Option<NetworkConfig>,
1060 additional_objects: Vec<Object>,
1061 num_validators: Option<usize>,
1062 fullnode_rpc_port: Option<u16>,
1063 fullnode_rpc_addr: Option<SocketAddr>,
1064 enable_fullnode_events: bool,
1065 validator_supported_protocol_versions_config: ProtocolVersionsConfig,
1066 fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
1068 db_checkpoint_config_validators: DBCheckpointConfig,
1069 db_checkpoint_config_fullnodes: DBCheckpointConfig,
1070 num_unpruned_validators: Option<usize>,
1071 jwk_fetch_interval: Option<Duration>,
1072 config_dir: Option<PathBuf>,
1073 default_jwks: bool,
1074 authority_overload_config: Option<AuthorityOverloadConfig>,
1075 data_ingestion_dir: Option<PathBuf>,
1076 fullnode_run_with_range: Option<RunWithRange>,
1077 fullnode_policy_config: Option<PolicyConfig>,
1078 fullnode_fw_config: Option<RemoteFirewallConfig>,
1079
1080 max_submit_position: Option<usize>,
1081 submit_delay_step_override_millis: Option<u64>,
1082 validator_state_accumulator_config: StateAccumulatorV1EnabledConfig,
1083}
1084
1085impl TestClusterBuilder {
1086 pub fn new() -> Self {
1087 TestClusterBuilder {
1088 genesis_config: None,
1089 network_config: None,
1090 additional_objects: vec![],
1091 fullnode_rpc_port: None,
1092 fullnode_rpc_addr: None,
1093 num_validators: None,
1094 enable_fullnode_events: false,
1095 validator_supported_protocol_versions_config: ProtocolVersionsConfig::Default,
1096 fullnode_supported_protocol_versions_config: None,
1097 db_checkpoint_config_validators: DBCheckpointConfig::default(),
1098 db_checkpoint_config_fullnodes: DBCheckpointConfig::default(),
1099 num_unpruned_validators: None,
1100 jwk_fetch_interval: None,
1101 config_dir: None,
1102 default_jwks: false,
1103 authority_overload_config: None,
1104 data_ingestion_dir: None,
1105 fullnode_run_with_range: None,
1106 fullnode_policy_config: None,
1107 fullnode_fw_config: None,
1108 max_submit_position: None,
1109 submit_delay_step_override_millis: None,
1110 validator_state_accumulator_config: StateAccumulatorV1EnabledConfig::Global(true),
1111 }
1112 }
1113
1114 pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
1115 if let Some(run_with_range) = run_with_range {
1116 self.fullnode_run_with_range = Some(run_with_range);
1117 }
1118 self
1119 }
1120
1121 pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
1122 self.fullnode_policy_config = config;
1123 self
1124 }
1125
1126 pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
1127 self.fullnode_fw_config = config;
1128 self
1129 }
1130
1131 pub fn with_fullnode_rpc_port(mut self, rpc_port: u16) -> Self {
1132 self.fullnode_rpc_port = Some(rpc_port);
1133 self
1134 }
1135
1136 pub fn with_fullnode_rpc_addr(mut self, addr: SocketAddr) -> Self {
1137 self.fullnode_rpc_addr = Some(addr);
1138 self
1139 }
1140
1141 pub fn set_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
1142 assert!(self.genesis_config.is_none() && self.network_config.is_none());
1143 self.genesis_config = Some(genesis_config);
1144 self
1145 }
1146
1147 pub fn set_network_config(mut self, network_config: NetworkConfig) -> Self {
1148 assert!(self.genesis_config.is_none() && self.network_config.is_none());
1149 self.network_config = Some(network_config);
1150 self
1151 }
1152
1153 pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
1154 self.additional_objects.extend(objects);
1155 self
1156 }
1157
1158 pub fn with_num_validators(mut self, num: usize) -> Self {
1159 self.num_validators = Some(num);
1160 self
1161 }
1162
1163 pub fn enable_fullnode_events(mut self) -> Self {
1164 self.enable_fullnode_events = true;
1165 self
1166 }
1167
1168 pub fn with_enable_db_checkpoints_validators(mut self) -> Self {
1169 self.db_checkpoint_config_validators = DBCheckpointConfig {
1170 perform_db_checkpoints_at_epoch_end: true,
1171 checkpoint_path: None,
1172 object_store_config: None,
1173 perform_index_db_checkpoints_at_epoch_end: None,
1174 prune_and_compact_before_upload: None,
1175 };
1176 self
1177 }
1178
1179 pub fn with_enable_db_checkpoints_fullnodes(mut self) -> Self {
1180 self.db_checkpoint_config_fullnodes = DBCheckpointConfig {
1181 perform_db_checkpoints_at_epoch_end: true,
1182 checkpoint_path: None,
1183 object_store_config: None,
1184 perform_index_db_checkpoints_at_epoch_end: None,
1185 prune_and_compact_before_upload: Some(true),
1186 };
1187 self
1188 }
1189
1190 pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
1191 self.get_or_init_genesis_config()
1192 .parameters
1193 .epoch_duration_ms = epoch_duration_ms;
1194 self
1195 }
1196
1197 pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
1198 self.validator_supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
1199 self
1200 }
1201
1202 pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
1203 self.jwk_fetch_interval = Some(i);
1204 self
1205 }
1206
1207 pub fn with_fullnode_supported_protocol_versions_config(
1208 mut self,
1209 c: SupportedProtocolVersions,
1210 ) -> Self {
1211 self.fullnode_supported_protocol_versions_config = Some(ProtocolVersionsConfig::Global(c));
1212 self
1213 }
1214
1215 pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
1216 self.get_or_init_genesis_config()
1217 .parameters
1218 .protocol_version = v;
1219 self
1220 }
1221
1222 pub fn with_supported_protocol_version_callback(
1223 mut self,
1224 func: SupportedProtocolVersionsCallback,
1225 ) -> Self {
1226 self.validator_supported_protocol_versions_config =
1227 ProtocolVersionsConfig::PerValidator(func);
1228 self
1229 }
1230
1231 pub fn with_state_accumulator_callback(
1232 mut self,
1233 func: StateAccumulatorEnabledCallback,
1234 ) -> Self {
1235 self.validator_state_accumulator_config =
1236 StateAccumulatorV1EnabledConfig::PerValidator(func);
1237 self
1238 }
1239
1240 pub fn with_validator_candidates(
1241 mut self,
1242 addresses: impl IntoIterator<Item = IotaAddress>,
1243 ) -> Self {
1244 self.get_or_init_genesis_config()
1245 .accounts
1246 .extend(addresses.into_iter().map(|address| AccountConfig {
1247 address: Some(address),
1248 gas_amounts: vec![DEFAULT_GAS_AMOUNT, MIN_VALIDATOR_JOINING_STAKE_NANOS],
1249 }));
1250 self
1251 }
1252
1253 pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
1254 self.num_unpruned_validators = Some(n);
1255 self
1256 }
1257
1258 pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
1259 self.get_or_init_genesis_config().accounts = accounts;
1260 self
1261 }
1262
1263 pub fn with_migration_data(mut self, migration_sources: Vec<SnapshotSource>) -> Self {
1264 self.get_or_init_genesis_config().migration_sources = migration_sources;
1265 self
1266 }
1267
1268 pub fn with_additional_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
1269 self.get_or_init_genesis_config().accounts.extend(accounts);
1270 self
1271 }
1272
1273 pub fn with_delegator(mut self, delegator: IotaAddress) -> Self {
1274 self.get_or_init_genesis_config().delegator = Some(delegator);
1275 self
1276 }
1277
1278 pub fn with_config_dir(mut self, config_dir: PathBuf) -> Self {
1279 self.config_dir = Some(config_dir);
1280 self
1281 }
1282
1283 pub fn with_default_jwks(mut self) -> Self {
1284 self.default_jwks = true;
1285 self
1286 }
1287
1288 pub fn with_authority_overload_config(mut self, config: AuthorityOverloadConfig) -> Self {
1289 assert!(self.network_config.is_none());
1290 self.authority_overload_config = Some(config);
1291 self
1292 }
1293
1294 pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
1295 self.data_ingestion_dir = Some(path);
1296 self
1297 }
1298
1299 pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
1300 self.max_submit_position = Some(max_submit_position);
1301 self
1302 }
1303
1304 pub fn with_submit_delay_step_override_millis(
1305 mut self,
1306 submit_delay_step_override_millis: u64,
1307 ) -> Self {
1308 self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
1309 self
1310 }
1311
1312 pub async fn build(mut self) -> TestCluster {
1313 let faucet = self.network_config.is_none().then(|| {
1317 let (faucet_address, faucet_keypair): (IotaAddress, AccountKeyPair) = get_key_pair();
1318 let accounts = &mut self.get_or_init_genesis_config().accounts;
1319 accounts.push(AccountConfig {
1320 address: Some(faucet_address),
1321 gas_amounts: vec![DEFAULT_GAS_AMOUNT],
1322 });
1323 Faucet {
1324 address: faucet_address,
1325 keypair: Arc::new(tokio::sync::Mutex::new(IotaKeyPair::Ed25519(
1326 faucet_keypair,
1327 ))),
1328 }
1329 });
1330
1331 #[cfg(msim)]
1335 if !self.default_jwks {
1336 iota_node::set_jwk_injector(Arc::new(|_authority, provider| {
1337 use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
1338 use rand::Rng;
1339
1340 let id_num = rand::thread_rng().gen_range(1..=4);
1342 let key_num = rand::thread_rng().gen_range(1..=4);
1343
1344 let id = JwkId {
1345 iss: provider.get_config().iss,
1346 kid: format!("kid{}", id_num),
1347 };
1348
1349 let jwk = JWK {
1350 kty: "kty".to_string(),
1351 e: "e".to_string(),
1352 n: format!("n{}", key_num),
1353 alg: "alg".to_string(),
1354 };
1355
1356 Ok(vec![(id, jwk)])
1357 }));
1358 }
1359
1360 let swarm = self.start_swarm().await.unwrap();
1361 let working_dir = swarm.dir();
1362
1363 let mut wallet_conf: IotaClientConfig =
1364 PersistedConfig::read(&working_dir.join(IOTA_CLIENT_CONFIG)).unwrap();
1365
1366 let fullnode = swarm.fullnodes().next().unwrap();
1367 let json_rpc_address = fullnode.config().json_rpc_address;
1368 let fullnode_handle =
1369 FullNodeHandle::new(fullnode.get_node_handle().unwrap(), json_rpc_address).await;
1370
1371 wallet_conf.add_env(IotaEnv::new("localnet", fullnode_handle.rpc_url.clone()));
1372 wallet_conf.set_active_env(Some("localnet".to_string()));
1373
1374 wallet_conf
1375 .persisted(&working_dir.join(IOTA_CLIENT_CONFIG))
1376 .save()
1377 .unwrap();
1378
1379 let wallet_conf = swarm.dir().join(IOTA_CLIENT_CONFIG);
1380 let wallet = WalletContext::new(&wallet_conf, None, None).unwrap();
1381
1382 TestCluster {
1383 swarm,
1384 wallet,
1385 fullnode_handle,
1386 bridge_authority_keys: None,
1387 bridge_server_ports: None,
1388 faucet,
1389 }
1390 }
1391
1392 pub async fn build_with_bridge(
1393 self,
1394 bridge_authority_keys: Vec<BridgeAuthorityKeyPair>,
1395 deploy_tokens: bool,
1396 ) -> TestCluster {
1397 let timer = Instant::now();
1398 let gas_objects_for_authority_keys = bridge_authority_keys
1399 .iter()
1400 .map(|k| {
1401 let address = IotaAddress::from(k.public());
1402 Object::with_id_owner_for_testing(ObjectID::random(), address)
1403 })
1404 .collect::<Vec<_>>();
1405 let mut test_cluster = self
1406 .with_objects(gas_objects_for_authority_keys)
1407 .build()
1408 .await;
1409 info!(
1410 "TestCluster build took {:?} secs",
1411 timer.elapsed().as_secs()
1412 );
1413 let ref_gas_price = test_cluster.get_reference_gas_price().await;
1414 let bridge_arg = test_cluster.get_mut_bridge_arg().await.unwrap();
1415 assert_eq!(
1416 bridge_authority_keys.len(),
1417 test_cluster.swarm.active_validators().count()
1418 );
1419
1420 let mut server_ports = vec![];
1422 let mut tasks = vec![];
1423 let quorum_driver_api = test_cluster.quorum_driver_api().clone();
1424 for (node, kp) in test_cluster
1425 .swarm
1426 .active_validators()
1427 .zip(bridge_authority_keys.iter())
1428 {
1429 let validator_address = node.config().iota_address();
1430 let gas = test_cluster
1432 .wallet
1433 .get_one_gas_object_owned_by_address(validator_address)
1434 .await
1435 .unwrap()
1436 .unwrap();
1437
1438 let server_port = get_available_port("127.0.0.1");
1439 let server_url = format!("http://127.0.0.1:{}", server_port);
1440 server_ports.push(server_port);
1441 let data = build_committee_register_transaction(
1442 validator_address,
1443 &gas,
1444 bridge_arg,
1445 kp.public().as_bytes().to_vec(),
1446 &server_url,
1447 ref_gas_price,
1448 1000000000,
1449 )
1450 .unwrap();
1451
1452 let tx = Transaction::from_data_and_signer(
1453 data,
1454 vec![node.config().account_key_pair.keypair()],
1455 );
1456 let api_clone = quorum_driver_api.clone();
1457 tasks.push(async move {
1458 api_clone
1459 .execute_transaction_block(
1460 tx,
1461 IotaTransactionBlockResponseOptions::new().with_effects(),
1462 None,
1463 )
1464 .await
1465 });
1466 }
1467
1468 if deploy_tokens {
1469 let timer = Instant::now();
1470 let token_ids = vec![TOKEN_ID_BTC, TOKEN_ID_ETH, TOKEN_ID_USDC, TOKEN_ID_USDT];
1471 let token_prices = vec![500_000_000u64, 30_000_000u64, 1_000u64, 1_000u64];
1472 let action = publish_and_register_coins_return_add_coins_on_iota_action(
1473 test_cluster.wallet(),
1474 bridge_arg,
1475 vec![
1476 Path::new("../../bridge/move/tokens/btc").into(),
1477 Path::new("../../bridge/move/tokens/eth").into(),
1478 Path::new("../../bridge/move/tokens/usdc").into(),
1479 Path::new("../../bridge/move/tokens/usdt").into(),
1480 ],
1481 token_ids,
1482 token_prices,
1483 0,
1484 );
1485 let action = action.await;
1486 info!("register tokens took {:?} secs", timer.elapsed().as_secs());
1487 let sig_map = bridge_authority_keys
1488 .iter()
1489 .map(|key| {
1490 (
1491 key.public().into(),
1492 BridgeAuthoritySignInfo::new(&action, key).signature,
1493 )
1494 })
1495 .collect::<BTreeMap<_, _>>();
1496 let certified_action = CertifiedBridgeAction::new_from_data_and_sig(
1497 action,
1498 BridgeCommitteeValiditySignInfo {
1499 signatures: sig_map.clone(),
1500 },
1501 );
1502 let verifired_action_cert =
1503 VerifiedCertifiedBridgeAction::new_from_verified(certified_action);
1504 let sender_address = test_cluster.get_address_0();
1505
1506 await_committee_register_tasks(&test_cluster, tasks).await;
1507
1508 test_cluster
1510 .trigger_reconfiguration_if_not_yet_and_assert_bridge_committee_initialized()
1511 .await;
1512
1513 let tx = build_add_tokens_on_iota_transaction(
1514 sender_address,
1515 &test_cluster
1516 .wallet
1517 .get_one_gas_object_owned_by_address(sender_address)
1518 .await
1519 .unwrap()
1520 .unwrap(),
1521 verifired_action_cert,
1522 bridge_arg,
1523 ref_gas_price,
1524 )
1525 .unwrap();
1526
1527 let response = test_cluster.sign_and_execute_transaction(&tx).await;
1528 assert_eq!(
1529 response.effects.unwrap().status(),
1530 &IotaExecutionStatus::Success
1531 );
1532 info!("Deploy tokens took {:?} secs", timer.elapsed().as_secs());
1533 } else {
1534 await_committee_register_tasks(&test_cluster, tasks).await;
1535 }
1536
1537 async fn await_committee_register_tasks(
1538 test_cluster: &TestCluster,
1539 tasks: Vec<
1540 impl Future<Output = Result<IotaTransactionBlockResponse, iota_sdk::error::Error>>,
1541 >,
1542 ) {
1543 let responses = join_all(tasks).await;
1548 let mut has_failure = false;
1549 for response in responses {
1550 if response.unwrap().effects.unwrap().status() != &IotaExecutionStatus::Success {
1551 has_failure = true;
1552 }
1553 }
1554 if has_failure {
1555 let bridge_summary = test_cluster.get_bridge_summary().await.unwrap();
1556 assert_ne!(bridge_summary.committee.members.len(), 0);
1557 }
1558 }
1559
1560 info!(
1561 "TestCluster build_with_bridge took {:?} secs",
1562 timer.elapsed().as_secs()
1563 );
1564 test_cluster.bridge_authority_keys = Some(bridge_authority_keys);
1565 test_cluster.bridge_server_ports = Some(server_ports);
1566 test_cluster
1567 }
1568
1569 async fn start_swarm(&mut self) -> Result<Swarm, anyhow::Error> {
1571 let mut builder: SwarmBuilder = Swarm::builder()
1572 .committee_size(
1573 NonZeroUsize::new(self.num_validators.unwrap_or(NUM_VALIDATOR)).unwrap(),
1574 )
1575 .with_objects(self.additional_objects.clone())
1576 .with_db_checkpoint_config(self.db_checkpoint_config_validators.clone())
1577 .with_supported_protocol_versions_config(
1578 self.validator_supported_protocol_versions_config.clone(),
1579 )
1580 .with_state_accumulator_config(self.validator_state_accumulator_config.clone())
1581 .with_fullnode_count(1)
1582 .with_fullnode_supported_protocol_versions_config(
1583 self.fullnode_supported_protocol_versions_config
1584 .clone()
1585 .unwrap_or(self.validator_supported_protocol_versions_config.clone()),
1586 )
1587 .with_db_checkpoint_config(self.db_checkpoint_config_fullnodes.clone())
1588 .with_fullnode_run_with_range(self.fullnode_run_with_range)
1589 .with_fullnode_policy_config(self.fullnode_policy_config.clone())
1590 .with_fullnode_fw_config(self.fullnode_fw_config.clone());
1591
1592 if let Some(genesis_config) = self.genesis_config.take() {
1593 builder = builder.with_genesis_config(genesis_config);
1594 }
1595
1596 if let Some(network_config) = self.network_config.take() {
1597 builder = builder.with_network_config(network_config);
1598 }
1599
1600 if let Some(authority_overload_config) = self.authority_overload_config.take() {
1601 builder = builder.with_authority_overload_config(authority_overload_config);
1602 }
1603
1604 if let Some(fullnode_rpc_addr) = self.fullnode_rpc_addr {
1605 builder = builder.with_fullnode_rpc_addr(fullnode_rpc_addr);
1606 } else if let Some(fullnode_rpc_port) = self.fullnode_rpc_port {
1607 builder = builder.with_fullnode_rpc_port(fullnode_rpc_port);
1608 }
1609
1610 if let Some(num_unpruned_validators) = self.num_unpruned_validators {
1611 builder = builder.with_num_unpruned_validators(num_unpruned_validators);
1612 }
1613
1614 if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
1615 builder = builder.with_jwk_fetch_interval(jwk_fetch_interval);
1616 }
1617
1618 if let Some(config_dir) = self.config_dir.take() {
1619 builder = builder.dir(config_dir);
1620 }
1621
1622 if let Some(data_ingestion_dir) = self.data_ingestion_dir.take() {
1623 builder = builder.with_data_ingestion_dir(data_ingestion_dir);
1624 }
1625
1626 if let Some(max_submit_position) = self.max_submit_position {
1627 builder = builder.with_max_submit_position(max_submit_position);
1628 }
1629
1630 if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis {
1631 builder =
1632 builder.with_submit_delay_step_override_millis(submit_delay_step_override_millis);
1633 }
1634
1635 let mut swarm = builder.build();
1636 swarm.launch().await?;
1637
1638 let dir = swarm.dir();
1639
1640 let network_path = dir.join(IOTA_NETWORK_CONFIG);
1641 let wallet_path = dir.join(IOTA_CLIENT_CONFIG);
1642 let keystore_path = dir.join(IOTA_KEYSTORE_FILENAME);
1643
1644 let network_config = swarm.config();
1645 let account_keys = network_config
1647 .account_keys
1648 .iter()
1649 .map(|kp| kp.copy())
1650 .collect();
1651 let network_config_light = NetworkConfigLight::new(
1652 network_config.validator_configs.clone(),
1653 account_keys,
1654 &network_config.genesis,
1655 );
1656 network_config_light.save(network_path)?;
1657
1658 let mut keystore = Keystore::from(FileBasedKeystore::new(&keystore_path)?);
1659 for key in &swarm.config().account_keys {
1660 keystore.add_key(None, IotaKeyPair::Ed25519(key.copy()))?;
1661 }
1662
1663 let active_address = keystore.addresses().first().cloned();
1664
1665 IotaClientConfig::new(FileBasedKeystore::new(&keystore_path)?)
1667 .with_active_address(active_address)
1668 .save(wallet_path)?;
1669
1670 Ok(swarm)
1672 }
1673
1674 fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
1675 if self.genesis_config.is_none() {
1676 self.genesis_config = Some(GenesisConfig::for_local_testing());
1677 }
1678 self.genesis_config.as_mut().unwrap()
1679 }
1680}
1681
1682impl Default for TestClusterBuilder {
1683 fn default() -> Self {
1684 Self::new()
1685 }
1686}