iota_cluster_test/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{net::SocketAddr, path::Path};
6
7use async_trait::async_trait;
8use iota_config::{
9    Config, IOTA_GENESIS_FILENAME, IOTA_KEYSTORE_FILENAME, IOTA_NETWORK_CONFIG, PersistedConfig,
10    genesis::Genesis,
11};
12use iota_genesis_builder::SnapshotSource;
13use iota_graphql_rpc::{
14    config::ConnectionConfig, test_infra::cluster::start_graphql_server_with_fn_rpc,
15};
16use iota_indexer::test_utils::{IndexerTypeConfig, start_test_indexer};
17use iota_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
18use iota_sdk::{
19    iota_client_config::{IotaClientConfig, IotaEnv},
20    wallet_context::WalletContext,
21};
22use iota_swarm::memory::Swarm;
23use iota_swarm_config::{
24    genesis_config::GenesisConfig,
25    network_config::{NetworkConfig, NetworkConfigLight},
26};
27use iota_types::{
28    base_types::IotaAddress,
29    crypto::{AccountKeyPair, IotaKeyPair, KeypairTraits, get_key_pair},
30};
31use tempfile::tempdir;
32use test_cluster::{TestCluster, TestClusterBuilder};
33use tracing::info;
34
35use super::config::{ClusterTestOpt, Env};
36
37const DEVNET_FAUCET_ADDR: &str = "https://faucet.devnet.iota.cafe:443";
38const TESTNET_FAUCET_ADDR: &str = "https://faucet.testnet.iota.cafe:443";
39const DEVNET_FULLNODE_ADDR: &str = "https://api.devnet.iota.cafe:443";
40const TESTNET_FULLNODE_ADDR: &str = "https://api.testnet.iota.cafe:443";
41
42pub struct ClusterFactory;
43
44impl ClusterFactory {
45    pub async fn start(
46        options: &ClusterTestOpt,
47    ) -> Result<Box<dyn Cluster + Sync + Send>, anyhow::Error> {
48        Ok(match &options.env {
49            Env::NewLocal => Box::new(LocalNewCluster::start(options).await?),
50            _ => Box::new(RemoteRunningCluster::start(options).await?),
51        })
52    }
53}
54
55/// Cluster Abstraction
56#[async_trait]
57pub trait Cluster {
58    async fn start(options: &ClusterTestOpt) -> Result<Self, anyhow::Error>
59    where
60        Self: Sized;
61
62    fn fullnode_url(&self) -> &str;
63    fn user_key(&self) -> AccountKeyPair;
64    fn indexer_url(&self) -> &Option<String>;
65
66    /// Returns faucet url in a remote cluster.
67    fn remote_faucet_url(&self) -> Option<&str>;
68
69    /// Returns faucet key in a local cluster.
70    fn local_faucet_key(&self) -> Option<&AccountKeyPair>;
71
72    /// Place to put config for the wallet, and any locally running services.
73    fn config_directory(&self) -> &Path;
74}
75
76/// Represents an up and running cluster deployed remotely.
77pub struct RemoteRunningCluster {
78    fullnode_url: String,
79    faucet_url: String,
80    config_directory: tempfile::TempDir,
81}
82
83#[async_trait]
84impl Cluster for RemoteRunningCluster {
85    async fn start(options: &ClusterTestOpt) -> Result<Self, anyhow::Error> {
86        let (fullnode_url, faucet_url) = match options.env {
87            Env::Devnet => (
88                String::from(DEVNET_FULLNODE_ADDR),
89                String::from(DEVNET_FAUCET_ADDR),
90            ),
91            Env::Testnet => (
92                String::from(TESTNET_FULLNODE_ADDR),
93                String::from(TESTNET_FAUCET_ADDR),
94            ),
95            Env::CustomRemote => (
96                options
97                    .fullnode_address
98                    .clone()
99                    .expect("expect 'fullnode_address' for Env::Custom"),
100                options
101                    .faucet_address
102                    .clone()
103                    .expect("expect 'faucet_address' for Env::Custom"),
104            ),
105            Env::NewLocal => {
106                unreachable!("the NewLocal variant shouldn't use RemoteRunningCluster")
107            }
108        };
109
110        // TODO: test connectivity before proceeding?
111
112        Ok(Self {
113            fullnode_url,
114            faucet_url,
115            config_directory: tempfile::tempdir()?,
116        })
117    }
118
119    fn fullnode_url(&self) -> &str {
120        &self.fullnode_url
121    }
122
123    fn indexer_url(&self) -> &Option<String> {
124        &None
125    }
126
127    fn user_key(&self) -> AccountKeyPair {
128        get_key_pair().1
129    }
130
131    fn remote_faucet_url(&self) -> Option<&str> {
132        Some(&self.faucet_url)
133    }
134
135    fn local_faucet_key(&self) -> Option<&AccountKeyPair> {
136        None
137    }
138
139    fn config_directory(&self) -> &Path {
140        self.config_directory.path()
141    }
142}
143
144/// Represents a local Cluster which starts per cluster test run.
145pub struct LocalNewCluster {
146    test_cluster: TestCluster,
147    fullnode_url: String,
148    indexer_url: Option<String>,
149    faucet_key: AccountKeyPair,
150    config_directory: tempfile::TempDir,
151}
152
153impl LocalNewCluster {
154    #[allow(unused)]
155    pub fn swarm(&self) -> &Swarm {
156        &self.test_cluster.swarm
157    }
158}
159
160#[async_trait]
161impl Cluster for LocalNewCluster {
162    async fn start(options: &ClusterTestOpt) -> Result<Self, anyhow::Error> {
163        let data_ingestion_path = tempdir()?.keep();
164        // TODO: options should contain port instead of address
165        let fullnode_rpc_addr = options.fullnode_address.as_ref().map(|addr| {
166            addr.parse::<SocketAddr>()
167                .expect("unable to parse fullnode address")
168        });
169
170        let indexer_address = options.indexer_address.as_ref().map(|addr| {
171            addr.parse::<SocketAddr>()
172                .expect("unable to parse indexer address")
173        });
174
175        let mut cluster_builder = TestClusterBuilder::new()
176            .enable_fullnode_events()
177            .with_data_ingestion_dir(data_ingestion_path.clone())
178            .with_fullnode_enable_grpc_api(true);
179
180        // Check if we already have a config directory that is passed
181        if let Some(config_dir) = options.config_dir.clone() {
182            assert!(options.epoch_duration_ms.is_none());
183            // Load the config of the IOTA authority.
184            let network_config_path = config_dir.join(IOTA_NETWORK_CONFIG);
185            let NetworkConfigLight {
186                validator_configs,
187                account_keys,
188                committee_with_network: _,
189            } = PersistedConfig::read(&network_config_path).map_err(|err| {
190                err.context(format!(
191                    "cannot open IOTA network config file at {network_config_path:?}"
192                ))
193            })?;
194
195            // Add genesis objects
196            let genesis_path = config_dir.join(IOTA_GENESIS_FILENAME);
197            let genesis = Genesis::load(genesis_path)?;
198            let network_config = NetworkConfig {
199                validator_configs,
200                account_keys,
201                genesis,
202            };
203            cluster_builder = cluster_builder.set_network_config(network_config);
204
205            cluster_builder = cluster_builder.with_config_dir(config_dir);
206        } else {
207            // Let the faucet account hold 1000 gas objects on genesis
208            let mut genesis_config = GenesisConfig::custom_genesis(1, 100);
209            // Add any migration sources
210            let local_snapshots = options
211                .local_migration_snapshots
212                .iter()
213                .cloned()
214                .map(SnapshotSource::Local);
215            let remote_snapshots = options
216                .remote_migration_snapshots
217                .iter()
218                .cloned()
219                .map(SnapshotSource::S3);
220            genesis_config.migration_sources = local_snapshots.chain(remote_snapshots).collect();
221            // Custom genesis should be build here where we add the extra accounts
222            cluster_builder = cluster_builder.set_genesis_config(genesis_config);
223
224            if let Some(epoch_duration_ms) = options.epoch_duration_ms {
225                cluster_builder = cluster_builder.with_epoch_duration_ms(epoch_duration_ms);
226            }
227        }
228
229        if let Some(fullnode_rpc_addr) = fullnode_rpc_addr {
230            cluster_builder = cluster_builder.with_fullnode_rpc_addr(fullnode_rpc_addr);
231        }
232
233        let mut test_cluster = cluster_builder.build().await;
234
235        // Use the wealthy account for faucet
236        let faucet_key = test_cluster.swarm.config_mut().account_keys.swap_remove(0);
237        let faucet_address = IotaAddress::from(faucet_key.public());
238        info!(?faucet_address, "faucet_address");
239
240        // This cluster has fullnode handle, safe to unwrap
241        let fullnode_url = test_cluster.fullnode_handle.rpc_url.clone();
242
243        if let (Some(pg_address), Some(indexer_address)) =
244            (options.pg_address.clone(), indexer_address)
245        {
246            // Start in writer mode
247            start_test_indexer(
248                pg_address.clone(),
249                // reset the existing db
250                true,
251                None,
252                test_cluster.grpc_url(),
253                IndexerTypeConfig::writer_mode(None, None),
254                Some(data_ingestion_path.clone()),
255            )
256            .await;
257
258            // Start in reader mode
259            start_test_indexer(
260                pg_address,
261                false,
262                None,
263                test_cluster.grpc_url(),
264                IndexerTypeConfig::reader_mode(indexer_address.to_string()),
265                Some(data_ingestion_path),
266            )
267            .await;
268        }
269
270        if let Some(graphql_address) = &options.graphql_address {
271            let graphql_address = graphql_address.parse::<SocketAddr>()?;
272            let graphql_connection_config = ConnectionConfig::new(
273                Some(graphql_address.port()),
274                Some(graphql_address.ip().to_string()),
275                options.pg_address.clone(),
276                None,
277                None,
278                None,
279                None,
280            );
281
282            start_graphql_server_with_fn_rpc(
283                graphql_connection_config.clone(),
284                Some(test_cluster.grpc_url()),
285                // resolves to default cancellation_token
286                None,
287                // resolves to default service config
288                None,
289            )
290            .await;
291        }
292
293        // Let nodes connect to one another
294        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
295
296        // TODO: test connectivity before proceeding?
297        Ok(Self {
298            test_cluster,
299            fullnode_url,
300            faucet_key,
301            config_directory: tempfile::tempdir()?,
302            indexer_url: options.indexer_address.clone(),
303        })
304    }
305
306    fn fullnode_url(&self) -> &str {
307        &self.fullnode_url
308    }
309
310    fn indexer_url(&self) -> &Option<String> {
311        &self.indexer_url
312    }
313
314    fn user_key(&self) -> AccountKeyPair {
315        get_key_pair().1
316    }
317
318    fn remote_faucet_url(&self) -> Option<&str> {
319        None
320    }
321
322    fn local_faucet_key(&self) -> Option<&AccountKeyPair> {
323        Some(&self.faucet_key)
324    }
325
326    fn config_directory(&self) -> &Path {
327        self.config_directory.path()
328    }
329}
330
331// Make linter happy
332#[async_trait]
333impl Cluster for Box<dyn Cluster + Send + Sync> {
334    async fn start(_options: &ClusterTestOpt) -> Result<Self, anyhow::Error> {
335        unreachable!(
336            "if we already have a boxed Cluster trait object we wouldn't have to call this function"
337        );
338    }
339    fn fullnode_url(&self) -> &str {
340        (**self).fullnode_url()
341    }
342    fn indexer_url(&self) -> &Option<String> {
343        (**self).indexer_url()
344    }
345
346    fn user_key(&self) -> AccountKeyPair {
347        (**self).user_key()
348    }
349
350    fn remote_faucet_url(&self) -> Option<&str> {
351        (**self).remote_faucet_url()
352    }
353
354    fn local_faucet_key(&self) -> Option<&AccountKeyPair> {
355        (**self).local_faucet_key()
356    }
357
358    fn config_directory(&self) -> &Path {
359        (**self).config_directory()
360    }
361}
362
363pub fn new_wallet_context_from_cluster(
364    cluster: &(dyn Cluster + Sync + Send),
365    key_pair: AccountKeyPair,
366) -> WalletContext {
367    let config_dir = cluster.config_directory();
368    let wallet_config_path = config_dir.join("client.yaml");
369    let fullnode_url = cluster.fullnode_url();
370    info!("Use RPC: {fullnode_url}");
371    let keystore_path = config_dir.join(IOTA_KEYSTORE_FILENAME);
372    let mut keystore = Keystore::from(FileBasedKeystore::new(&keystore_path).unwrap());
373    let address: IotaAddress = key_pair.public().into();
374    keystore
375        .add_key(None, IotaKeyPair::Ed25519(key_pair))
376        .unwrap();
377    IotaClientConfig::new(keystore)
378        .with_envs([IotaEnv::new("localnet", fullnode_url)])
379        .with_active_address(address)
380        .with_active_env("localnet".to_string())
381        .persisted(&wallet_config_path)
382        .save()
383        .unwrap();
384
385    info!(
386        "Initialize wallet from config path: {:?}",
387        wallet_config_path
388    );
389
390    WalletContext::new(&wallet_config_path).unwrap_or_else(|e| {
391        panic!("failed to init wallet context from path {wallet_config_path:?}, error: {e}")
392    })
393}