iota_cluster_test/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use cluster::{Cluster, ClusterFactory};
9use config::ClusterTestOpt;
10use futures::{StreamExt, stream::FuturesUnordered};
11use helper::ObjectChecker;
12use iota_faucet::CoinInfo;
13use iota_json_rpc_types::{
14    IotaExecutionStatus, IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse,
15    IotaTransactionBlockResponseOptions, TransactionBlockBytes,
16};
17use iota_sdk::{IotaClient, wallet_context::WalletContext};
18use iota_test_transaction_builder::batch_make_transfer_transactions;
19use iota_types::{
20    base_types::{IotaAddress, TransactionDigest},
21    gas_coin::GasCoin,
22    iota_system_state::iota_system_state_summary::IotaSystemStateSummary,
23    object::Owner,
24    quorum_driver_types::ExecuteTransactionRequestType,
25    transaction::{Transaction, TransactionData},
26};
27use jsonrpsee::{
28    core::{client::ClientT, params::ArrayParams},
29    http_client::HttpClientBuilder,
30};
31use test_case::{
32    coin_index_test::CoinIndexTest, coin_merge_split_test::CoinMergeSplitTest,
33    fullnode_build_publish_transaction_test::FullNodeBuildPublishTransactionTest,
34    fullnode_execute_transaction_test::FullNodeExecuteTransactionTest,
35    native_transfer_test::NativeTransferTest, random_beacon_test::RandomBeaconTest,
36    shared_object_test::SharedCounterTest,
37};
38use tokio::time::{self, Duration};
39use tracing::{error, info};
40use wallet_client::WalletClient;
41
42use crate::faucet::{FaucetClient, FaucetClientFactory};
43
44pub mod cluster;
45pub mod config;
46pub mod faucet;
47pub mod helper;
48pub mod test_case;
49pub mod wallet_client;
50
51pub use iota_genesis_builder::SnapshotUrl as MigrationSnapshotUrl;
52
53pub struct TestContext {
54    /// Cluster handle that allows access to various components in a cluster
55    cluster: Box<dyn Cluster + Sync + Send>,
56    /// Client that provides wallet context and fullnode access
57    client: WalletClient,
58    /// Facuet client that provides faucet access to a test
59    faucet: Arc<dyn FaucetClient + Sync + Send>,
60}
61
62impl TestContext {
63    async fn get_iota_from_faucet(&self, minimum_coins: Option<usize>) -> Vec<GasCoin> {
64        let addr = self.get_wallet_address();
65        let faucet_response = self.faucet.request_iota_coins(addr).await;
66
67        let coin_info = faucet_response
68            .transferred_gas_objects
69            .iter()
70            .map(|coin_info| coin_info.transfer_tx_digest)
71            .collect::<Vec<_>>();
72        self.let_fullnode_sync(coin_info, 5).await;
73
74        let gas_coins = self
75            .check_owner_and_into_gas_coin(faucet_response.transferred_gas_objects, addr)
76            .await;
77
78        let minimum_coins = minimum_coins.unwrap_or(1);
79
80        if gas_coins.len() < minimum_coins {
81            panic!(
82                "Expect to get at least {minimum_coins} IOTA Coins for address {addr}, but only got {}",
83                gas_coins.len()
84            )
85        }
86
87        gas_coins
88    }
89
90    fn get_context(&self) -> &WalletClient {
91        &self.client
92    }
93
94    fn get_fullnode_client(&self) -> &IotaClient {
95        self.client.get_fullnode_client()
96    }
97
98    fn clone_fullnode_client(&self) -> IotaClient {
99        self.client.get_fullnode_client().clone()
100    }
101
102    fn get_fullnode_rpc_url(&self) -> &str {
103        self.cluster.fullnode_url()
104    }
105
106    fn get_wallet(&self) -> &WalletContext {
107        self.client.get_wallet()
108    }
109
110    async fn get_latest_iota_system_state(&self) -> IotaSystemStateSummary {
111        self.client
112            .get_fullnode_client()
113            .governance_api()
114            .get_latest_iota_system_state()
115            .await
116            .unwrap()
117    }
118
119    async fn get_reference_gas_price(&self) -> u64 {
120        self.client
121            .get_fullnode_client()
122            .governance_api()
123            .get_reference_gas_price()
124            .await
125            .unwrap()
126    }
127
128    fn get_wallet_address(&self) -> IotaAddress {
129        self.client.get_wallet_address()
130    }
131
132    /// See `make_transactions_with_wallet_context` for potential caveats
133    /// of this helper function.
134    pub async fn make_transactions(&self, max_txn_num: usize) -> Vec<Transaction> {
135        batch_make_transfer_transactions(self.get_wallet(), max_txn_num).await
136    }
137
138    pub async fn build_transaction_remotely(
139        &self,
140        method: &str,
141        params: ArrayParams,
142    ) -> anyhow::Result<TransactionData> {
143        let fn_rpc_url = self.get_fullnode_rpc_url();
144        // TODO cache this?
145        let rpc_client = HttpClientBuilder::default().build(fn_rpc_url)?;
146
147        TransactionBlockBytes::to_data(rpc_client.request(method, params).await?)
148    }
149
150    async fn sign_and_execute(
151        &self,
152        txn_data: TransactionData,
153        desc: &str,
154    ) -> IotaTransactionBlockResponse {
155        let signature = self.get_context().sign(&txn_data, desc);
156        let resp = self
157            .get_fullnode_client()
158            .quorum_driver_api()
159            .execute_transaction_block(
160                Transaction::from_data(txn_data, vec![signature]),
161                IotaTransactionBlockResponseOptions::new()
162                    .with_object_changes()
163                    .with_balance_changes()
164                    .with_effects()
165                    .with_events(),
166                Some(ExecuteTransactionRequestType::WaitForLocalExecution),
167            )
168            .await
169            .unwrap_or_else(|e| panic!("Failed to execute transaction for {}. {}", desc, e));
170        assert!(
171            matches!(
172                resp.effects.as_ref().unwrap().status(),
173                IotaExecutionStatus::Success
174            ),
175            "Failed to execute transaction for {desc}: {:?}",
176            resp
177        );
178        resp
179    }
180
181    pub async fn setup(options: ClusterTestOpt) -> Result<Self, anyhow::Error> {
182        let cluster = ClusterFactory::start(&options).await?;
183        let wallet_client = WalletClient::new_from_cluster(&cluster).await;
184        let faucet = FaucetClientFactory::new_from_cluster(&cluster).await;
185        Ok(Self {
186            cluster,
187            client: wallet_client,
188            faucet,
189        })
190    }
191
192    // TODO: figure out a more efficient way to test a local cluster
193    // A potential way to do this is to subscribe to txns from fullnode
194    // when the feature is ready
195    pub async fn let_fullnode_sync(&self, digests: Vec<TransactionDigest>, timeout_sec: u64) {
196        let mut futures = FuturesUnordered::new();
197        for digest in digests.clone() {
198            let task = self.get_tx_with_retry_times(digest, 1);
199            futures.push(Box::pin(task));
200        }
201        let mut sleep = Box::pin(time::sleep(Duration::from_secs(timeout_sec)));
202
203        loop {
204            tokio::select! {
205                _ = &mut sleep => {
206                    panic!("Fullnode does not know all of {:?} after {} secs.", digests, timeout_sec);
207                }
208                res = futures.next() => {
209                    match res {
210                        Some((true, _, _)) => {},
211                        Some((false, digest, retry_times)) => {
212                            let task = self.get_tx_with_retry_times(digest, retry_times);
213                            futures.push(Box::pin(task));
214                        },
215                        None => break, // all txns appear on fullnode, mission completed
216                    }
217                }
218            }
219        }
220    }
221
222    async fn get_tx_with_retry_times(
223        &self,
224        digest: TransactionDigest,
225        retry_times: u64,
226    ) -> (bool, TransactionDigest, u64) {
227        match self
228            .client
229            .get_fullnode_client()
230            .read_api()
231            .get_transaction_with_options(digest, IotaTransactionBlockResponseOptions::new())
232            .await
233        {
234            Ok(_) => (true, digest, retry_times),
235            Err(_) => {
236                time::sleep(Duration::from_millis(300 * retry_times)).await;
237                (false, digest, retry_times + 1)
238            }
239        }
240    }
241
242    async fn check_owner_and_into_gas_coin(
243        &self,
244        coin_info: Vec<CoinInfo>,
245        owner: IotaAddress,
246    ) -> Vec<GasCoin> {
247        futures::future::join_all(
248            coin_info
249                .iter()
250                .map(|coin_info| {
251                    ObjectChecker::new(coin_info.id)
252                        .owner(Owner::AddressOwner(owner))
253                        .check_into_gas_coin(self.get_fullnode_client())
254                })
255                .collect::<Vec<_>>(),
256        )
257        .await
258        .into_iter()
259        .collect::<Vec<_>>()
260    }
261}
262
263pub struct TestCase<'a> {
264    test_case: Box<dyn TestCaseImpl + 'a>,
265}
266
267impl<'a> TestCase<'a> {
268    pub fn new(test_case: impl TestCaseImpl + 'a) -> Self {
269        TestCase {
270            test_case: (Box::new(test_case)),
271        }
272    }
273
274    pub async fn run(self, ctx: &mut TestContext) -> bool {
275        let test_name = self.test_case.name();
276        info!("Running test {}.", test_name);
277
278        // TODO: unwind panic and fail gracefully?
279
280        match self.test_case.run(ctx).await {
281            Ok(()) => {
282                info!("Test {test_name} succeeded.");
283                true
284            }
285            Err(e) => {
286                error!("Test {test_name} failed with error: {e}.");
287                false
288            }
289        }
290    }
291}
292
293#[async_trait]
294pub trait TestCaseImpl {
295    fn name(&self) -> &'static str;
296    fn description(&self) -> &'static str;
297    async fn run(&self, ctx: &mut TestContext) -> Result<(), anyhow::Error>;
298}
299
300pub struct ClusterTest;
301
302impl ClusterTest {
303    pub async fn run(options: ClusterTestOpt) {
304        let mut ctx = TestContext::setup(options)
305            .await
306            .unwrap_or_else(|e| panic!("Failed to set up TestContext, e: {e}"));
307
308        // TODO: collect tests from each test_case file instead.
309        let tests = vec![
310            TestCase::new(NativeTransferTest {}),
311            TestCase::new(CoinMergeSplitTest {}),
312            TestCase::new(SharedCounterTest {}),
313            TestCase::new(FullNodeExecuteTransactionTest {}),
314            TestCase::new(FullNodeBuildPublishTransactionTest {}),
315            TestCase::new(CoinIndexTest {}),
316            TestCase::new(RandomBeaconTest {}),
317        ];
318
319        // TODO: improve the runner parallelism for efficiency
320        // For now we run tests serially
321        let mut success_cnt = 0;
322        let total_cnt = tests.len() as i32;
323        for t in tests {
324            let is_success = t.run(&mut ctx).await as i32;
325            success_cnt += is_success;
326        }
327        if success_cnt < total_cnt {
328            // If any test failed, panic to bubble up the signal
329            panic!("{success_cnt} of {total_cnt} tests passed.");
330        }
331        info!("{success_cnt} of {total_cnt} tests passed.");
332    }
333}