Skip to main content

iota_cluster_test/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{net::SocketAddr, path::Path};
6
7use async_trait::async_trait;
8use iota_config::{
9    Config, IOTA_GENESIS_FILENAME, IOTA_KEYSTORE_FILENAME, IOTA_NETWORK_CONFIG, PersistedConfig,
10    genesis::Genesis,
11};
12use iota_genesis_builder::SnapshotSource;
13use iota_graphql_rpc::{
14    config::ConnectionConfig, test_infra::cluster::start_graphql_server_with_fn_rpc,
15};
16use iota_indexer::test_utils::{IndexerTypeConfig, start_test_indexer};
17use iota_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
18use iota_sdk::{
19    iota_client_config::{IotaClientConfig, IotaEnv},
20    wallet_context::WalletContext,
21};
22use iota_swarm::memory::Swarm;
23use iota_swarm_config::{
24    genesis_config::GenesisConfig,
25    network_config::{NetworkConfig, NetworkConfigLight},
26};
27use iota_types::{
28    base_types::address_from_iota_pub_key,
29    crypto::{AccountKeyPair, IotaKeyPair, KeypairTraits, get_key_pair},
30};
31use tempfile::tempdir;
32use test_cluster::{TestCluster, TestClusterBuilder};
33use tracing::info;
34
35use super::config::{ClusterTestOpt, Env};
36
37const DEVNET_FAUCET_ADDR: &str = "https://faucet.devnet.iota.cafe:443";
38const TESTNET_FAUCET_ADDR: &str = "https://faucet.testnet.iota.cafe:443";
39const DEVNET_FULLNODE_ADDR: &str = "https://api.devnet.iota.cafe:443";
40const TESTNET_FULLNODE_ADDR: &str = "https://api.testnet.iota.cafe:443";
41
42pub struct ClusterFactory;
43
44impl ClusterFactory {
45    pub async fn start(
46        options: &ClusterTestOpt,
47    ) -> Result<Box<dyn Cluster + Sync + Send>, anyhow::Error> {
48        Ok(match &options.env {
49            Env::NewLocal => Box::new(LocalNewCluster::start(options).await?) as Box<_>,
50            _ => Box::new(RemoteRunningCluster::start(options).await?) as Box<_>,
51        })
52    }
53}
54
55/// Cluster Abstraction
56#[async_trait]
57pub trait Cluster {
58    async fn start(options: &ClusterTestOpt) -> Result<Self, anyhow::Error>
59    where
60        Self: Sized;
61
62    fn fullnode_url(&self) -> &str;
63
64    /// gRPC endpoint of the fullnode, when one is available (local clusters).
65    fn grpc_url(&self) -> Option<&str>;
66
67    fn user_key(&self) -> AccountKeyPair;
68    fn indexer_url(&self) -> &Option<String>;
69
70    /// Returns faucet url in a remote cluster.
71    fn remote_faucet_url(&self) -> Option<&str>;
72
73    /// Returns faucet key in a local cluster.
74    fn local_faucet_key(&self) -> Option<&AccountKeyPair>;
75
76    /// Place to put config for the wallet, and any locally running services.
77    fn config_directory(&self) -> &Path;
78}
79
80/// Represents an up and running cluster deployed remotely.
81pub struct RemoteRunningCluster {
82    fullnode_url: String,
83    faucet_url: String,
84    config_directory: tempfile::TempDir,
85}
86
87#[async_trait]
88impl Cluster for RemoteRunningCluster {
89    async fn start(options: &ClusterTestOpt) -> Result<Self, anyhow::Error> {
90        let (fullnode_url, faucet_url) = match options.env {
91            Env::Devnet => (
92                String::from(DEVNET_FULLNODE_ADDR),
93                String::from(DEVNET_FAUCET_ADDR),
94            ),
95            Env::Testnet => (
96                String::from(TESTNET_FULLNODE_ADDR),
97                String::from(TESTNET_FAUCET_ADDR),
98            ),
99            Env::CustomRemote => (
100                options
101                    .fullnode_address
102                    .clone()
103                    .expect("expect 'fullnode_address' for Env::Custom"),
104                options
105                    .faucet_address
106                    .clone()
107                    .expect("expect 'faucet_address' for Env::Custom"),
108            ),
109            Env::NewLocal => {
110                unreachable!("the NewLocal variant shouldn't use RemoteRunningCluster")
111            }
112        };
113
114        // TODO: test connectivity before proceeding?
115
116        Ok(Self {
117            fullnode_url,
118            faucet_url,
119            config_directory: tempfile::tempdir()?,
120        })
121    }
122
123    fn fullnode_url(&self) -> &str {
124        &self.fullnode_url
125    }
126
127    fn grpc_url(&self) -> Option<&str> {
128        None
129    }
130
131    fn indexer_url(&self) -> &Option<String> {
132        &None
133    }
134
135    fn user_key(&self) -> AccountKeyPair {
136        get_key_pair().1
137    }
138
139    fn remote_faucet_url(&self) -> Option<&str> {
140        Some(&self.faucet_url)
141    }
142
143    fn local_faucet_key(&self) -> Option<&AccountKeyPair> {
144        None
145    }
146
147    fn config_directory(&self) -> &Path {
148        self.config_directory.path()
149    }
150}
151
152/// Represents a local Cluster which starts per cluster test run.
153pub struct LocalNewCluster {
154    test_cluster: TestCluster,
155    fullnode_url: String,
156    grpc_url: String,
157    indexer_url: Option<String>,
158    faucet_key: AccountKeyPair,
159    config_directory: tempfile::TempDir,
160}
161
162impl LocalNewCluster {
163    #[allow(unused)]
164    pub fn swarm(&self) -> &Swarm {
165        &self.test_cluster.swarm
166    }
167}
168
169#[async_trait]
170impl Cluster for LocalNewCluster {
171    async fn start(options: &ClusterTestOpt) -> Result<Self, anyhow::Error> {
172        let data_ingestion_path = tempdir()?.keep();
173        // TODO: options should contain port instead of address
174        let fullnode_rpc_addr = options.fullnode_address.as_ref().map(|addr| {
175            addr.parse::<SocketAddr>()
176                .expect("unable to parse fullnode address")
177        });
178
179        let indexer_address = options.indexer_address.as_ref().map(|addr| {
180            addr.parse::<SocketAddr>()
181                .expect("unable to parse indexer address")
182        });
183
184        let mut cluster_builder = TestClusterBuilder::new()
185            .enable_fullnode_events()
186            .with_data_ingestion_dir(data_ingestion_path.clone())
187            .with_fullnode_enable_grpc_api(true);
188
189        // Check if we already have a config directory that is passed
190        if let Some(config_dir) = options.config_dir.clone() {
191            assert!(options.epoch_duration_ms.is_none());
192            // Load the config of the IOTA authority.
193            let network_config_path = config_dir.join(IOTA_NETWORK_CONFIG);
194            let NetworkConfigLight {
195                validator_configs,
196                account_keys,
197                committee_with_network: _,
198            } = PersistedConfig::read(&network_config_path).map_err(|err| {
199                err.context(format!(
200                    "cannot open IOTA network config file at {network_config_path:?}"
201                ))
202            })?;
203
204            // Add genesis objects
205            let genesis_path = config_dir.join(IOTA_GENESIS_FILENAME);
206            let genesis = Genesis::load(genesis_path)?;
207            let network_config = NetworkConfig {
208                validator_configs,
209                account_keys,
210                genesis,
211            };
212            cluster_builder = cluster_builder.set_network_config(network_config);
213
214            cluster_builder = cluster_builder.with_config_dir(config_dir);
215        } else {
216            // Let the faucet account hold 1000 gas objects on genesis
217            let mut genesis_config = GenesisConfig::custom_genesis(1, 100);
218            // Add any migration sources
219            let local_snapshots = options
220                .local_migration_snapshots
221                .iter()
222                .cloned()
223                .map(SnapshotSource::Local);
224            let remote_snapshots = options
225                .remote_migration_snapshots
226                .iter()
227                .cloned()
228                .map(SnapshotSource::S3);
229            genesis_config.migration_sources = local_snapshots.chain(remote_snapshots).collect();
230            // Custom genesis should be build here where we add the extra accounts
231            cluster_builder = cluster_builder.set_genesis_config(genesis_config);
232
233            if let Some(epoch_duration_ms) = options.epoch_duration_ms {
234                cluster_builder = cluster_builder.with_epoch_duration_ms(epoch_duration_ms);
235            }
236        }
237
238        if let Some(fullnode_rpc_addr) = fullnode_rpc_addr {
239            cluster_builder = cluster_builder.with_fullnode_rpc_addr(fullnode_rpc_addr);
240        }
241
242        let mut test_cluster = cluster_builder.build().await;
243
244        // Use the wealthy account for faucet
245        let faucet_key = test_cluster.swarm.config_mut().account_keys.swap_remove(0);
246        let faucet_address = address_from_iota_pub_key(faucet_key.public());
247        info!(?faucet_address, "faucet_address");
248
249        // This cluster has fullnode handle, safe to unwrap
250        let fullnode_url = test_cluster.fullnode_handle.rpc_url.clone();
251        let grpc_url = test_cluster.grpc_url();
252
253        if let (Some(pg_address), Some(indexer_address)) =
254            (options.pg_address.clone(), indexer_address)
255        {
256            // Start in writer mode
257            start_test_indexer(
258                pg_address.clone(),
259                // reset the existing db
260                true,
261                None,
262                test_cluster.grpc_url(),
263                IndexerTypeConfig::writer_mode(None),
264                Some(data_ingestion_path.clone()),
265            )
266            .await;
267
268            // Start in reader mode
269            start_test_indexer(
270                pg_address,
271                false,
272                None,
273                test_cluster.grpc_url(),
274                IndexerTypeConfig::reader_mode(indexer_address.to_string()),
275                Some(data_ingestion_path),
276            )
277            .await;
278        }
279
280        if let Some(graphql_address) = &options.graphql_address {
281            let graphql_address = graphql_address.parse::<SocketAddr>()?;
282            let graphql_connection_config = ConnectionConfig::new(
283                Some(graphql_address.port()),
284                Some(graphql_address.ip().to_string()),
285                options.pg_address.clone(),
286                None,
287                None,
288                None,
289                None,
290                None,
291            );
292
293            start_graphql_server_with_fn_rpc(
294                graphql_connection_config.clone(),
295                Some(test_cluster.grpc_url()),
296                // resolves to default cancellation_token
297                None,
298                // resolves to default service config
299                None,
300            )
301            .await;
302        }
303
304        // Let nodes connect to one another
305        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
306
307        // TODO: test connectivity before proceeding?
308        Ok(Self {
309            test_cluster,
310            fullnode_url,
311            grpc_url,
312            faucet_key,
313            config_directory: tempfile::tempdir()?,
314            indexer_url: options.indexer_address.clone(),
315        })
316    }
317
318    fn fullnode_url(&self) -> &str {
319        &self.fullnode_url
320    }
321
322    fn grpc_url(&self) -> Option<&str> {
323        Some(&self.grpc_url)
324    }
325
326    fn indexer_url(&self) -> &Option<String> {
327        &self.indexer_url
328    }
329
330    fn user_key(&self) -> AccountKeyPair {
331        get_key_pair().1
332    }
333
334    fn remote_faucet_url(&self) -> Option<&str> {
335        None
336    }
337
338    fn local_faucet_key(&self) -> Option<&AccountKeyPair> {
339        Some(&self.faucet_key)
340    }
341
342    fn config_directory(&self) -> &Path {
343        self.config_directory.path()
344    }
345}
346
347// Make linter happy
348#[async_trait]
349impl Cluster for Box<dyn Cluster + Send + Sync> {
350    async fn start(_options: &ClusterTestOpt) -> Result<Self, anyhow::Error> {
351        unreachable!(
352            "if we already have a boxed Cluster trait object we wouldn't have to call this function"
353        );
354    }
355    fn fullnode_url(&self) -> &str {
356        (**self).fullnode_url()
357    }
358    fn grpc_url(&self) -> Option<&str> {
359        (**self).grpc_url()
360    }
361    fn indexer_url(&self) -> &Option<String> {
362        (**self).indexer_url()
363    }
364
365    fn user_key(&self) -> AccountKeyPair {
366        (**self).user_key()
367    }
368
369    fn remote_faucet_url(&self) -> Option<&str> {
370        (**self).remote_faucet_url()
371    }
372
373    fn local_faucet_key(&self) -> Option<&AccountKeyPair> {
374        (**self).local_faucet_key()
375    }
376
377    fn config_directory(&self) -> &Path {
378        (**self).config_directory()
379    }
380}
381
382pub fn new_wallet_context_from_cluster(
383    cluster: &(dyn Cluster + Sync + Send),
384    key_pair: AccountKeyPair,
385) -> WalletContext {
386    let config_dir = cluster.config_directory();
387    let wallet_config_path = config_dir.join("client.yaml");
388    let fullnode_url = cluster.fullnode_url();
389    info!("Use RPC: {fullnode_url}");
390    let keystore_path = config_dir.join(IOTA_KEYSTORE_FILENAME);
391    let mut keystore = Keystore::from(FileBasedKeystore::new(&keystore_path).unwrap());
392    let address = address_from_iota_pub_key(key_pair.public());
393    keystore
394        .add_key(None, IotaKeyPair::Ed25519(key_pair))
395        .unwrap();
396    IotaClientConfig::new(keystore)
397        .with_envs([IotaEnv::new("localnet", fullnode_url)])
398        .with_active_address(address)
399        .with_active_env("localnet".to_string())
400        .persisted(&wallet_config_path)
401        .save()
402        .unwrap();
403
404    info!(
405        "Initialize wallet from config path: {:?}",
406        wallet_config_path
407    );
408
409    WalletContext::new(&wallet_config_path).unwrap_or_else(|e| {
410        panic!("failed to init wallet context from path {wallet_config_path:?}, error: {e}")
411    })
412}