iota_surfer/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{path::PathBuf, sync::Arc, time::Duration};
6
7use futures::future::join_all;
8use iota_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
9use rand::{Rng, SeedableRng, rngs::StdRng, seq::SliceRandom};
10use surf_strategy::SurfStrategy;
11use test_cluster::{TestCluster, TestClusterBuilder};
12use tokio::sync::watch;
13use tracing::info;
14
15use crate::{surfer_state::SurfStatistics, surfer_task::SurferTask};
16
17pub mod surf_strategy;
18mod surfer_state;
19mod surfer_task;
20
21const VALIDATOR_COUNT: usize = 7;
22
23const ACCOUNT_NUM: usize = 20;
24const GAS_OBJECT_COUNT: usize = 3;
25
26pub async fn run(
27    run_duration: Duration,
28    epoch_duration: Duration,
29    package_paths: Vec<PathBuf>,
30) -> SurfStatistics {
31    let cluster = TestClusterBuilder::new()
32        .with_num_validators(VALIDATOR_COUNT)
33        .with_epoch_duration_ms(epoch_duration.as_millis() as u64)
34        .with_accounts(vec![
35            AccountConfig {
36                address: None,
37                gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
38            };
39            ACCOUNT_NUM
40        ])
41        .build()
42        .await;
43    info!(
44        "Started cluster with {} validators and epoch duration of {:?}ms",
45        VALIDATOR_COUNT,
46        epoch_duration.as_millis()
47    );
48    run_with_test_cluster(run_duration, package_paths, cluster.into(), 0).await
49}
50
51pub async fn run_with_test_cluster(
52    run_duration: Duration,
53    package_paths: Vec<PathBuf>,
54    cluster: Arc<TestCluster>,
55    // Skips the first N accounts, for use in case this is running concurrently with other
56    // processes that also need gas.
57    skip_accounts: usize,
58) -> SurfStatistics {
59    run_with_test_cluster_and_strategy(
60        SurfStrategy::default(),
61        run_duration,
62        package_paths,
63        cluster,
64        skip_accounts,
65    )
66    .await
67}
68
69pub async fn run_with_test_cluster_and_strategy(
70    surf_strategy: SurfStrategy,
71    run_duration: Duration,
72    package_paths: Vec<PathBuf>,
73    cluster: Arc<TestCluster>,
74    // Skips the first N accounts, for use in case this is running concurrently with other
75    // processes that also need gas.
76    skip_accounts: usize,
77) -> SurfStatistics {
78    let seed = rand::thread_rng().gen::<u64>();
79    info!("Initial Seed: {:?}", seed);
80    let mut rng = StdRng::seed_from_u64(seed);
81    let (exit_sender, exit_rcv) = watch::channel(());
82
83    let mut tasks = SurferTask::create_surfer_tasks(
84        cluster.clone(),
85        rng.gen::<u64>(),
86        exit_rcv,
87        skip_accounts,
88        surf_strategy,
89    )
90    .await;
91    info!("Created {} surfer tasks", tasks.len());
92
93    for path in &package_paths {
94        tasks
95            .choose_mut(&mut rng)
96            .unwrap()
97            .state
98            .publish_package(path)
99            .await;
100    }
101
102    let mut handles = vec![];
103    for task in tasks {
104        handles.push(tokio::task::spawn(task.surf()));
105    }
106    tokio::time::sleep(run_duration).await;
107    exit_sender.send(()).unwrap();
108    let all_stats: Result<Vec<_>, _> = join_all(handles).await.into_iter().collect();
109    SurfStatistics::aggregate(all_stats.unwrap())
110
111    // TODO: Right now it will panic here complaining about dropping a tokio
112    // runtime inside of another tokio runtime. Reason unclear.
113}