1use std::{path::PathBuf, sync::Arc, time::Duration};
6
7use futures::future::join_all;
8use iota_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
9use rand::{Rng, SeedableRng, rngs::StdRng, seq::SliceRandom};
10use surf_strategy::SurfStrategy;
11use test_cluster::{TestCluster, TestClusterBuilder};
12use tokio::sync::watch;
13use tracing::info;
14
15use crate::{surfer_state::SurfStatistics, surfer_task::SurferTask};
16
17pub mod surf_strategy;
18mod surfer_state;
19mod surfer_task;
20
21const VALIDATOR_COUNT: usize = 7;
22
23const ACCOUNT_NUM: usize = 20;
24const GAS_OBJECT_COUNT: usize = 3;
25
26pub async fn run(
27 run_duration: Duration,
28 epoch_duration: Duration,
29 package_paths: Vec<PathBuf>,
30) -> SurfStatistics {
31 let cluster = TestClusterBuilder::new()
32 .with_num_validators(VALIDATOR_COUNT)
33 .with_epoch_duration_ms(epoch_duration.as_millis() as u64)
34 .with_accounts(vec![
35 AccountConfig {
36 address: None,
37 gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
38 };
39 ACCOUNT_NUM
40 ])
41 .build()
42 .await;
43 info!(
44 "Started cluster with {} validators and epoch duration of {:?}ms",
45 VALIDATOR_COUNT,
46 epoch_duration.as_millis()
47 );
48 run_with_test_cluster(run_duration, package_paths, cluster.into(), 0).await
49}
50
51pub async fn run_with_test_cluster(
52 run_duration: Duration,
53 package_paths: Vec<PathBuf>,
54 cluster: Arc<TestCluster>,
55 skip_accounts: usize,
58) -> SurfStatistics {
59 run_with_test_cluster_and_strategy(
60 SurfStrategy::default(),
61 run_duration,
62 package_paths,
63 cluster,
64 skip_accounts,
65 )
66 .await
67}
68
69pub async fn run_with_test_cluster_and_strategy(
70 surf_strategy: SurfStrategy,
71 run_duration: Duration,
72 package_paths: Vec<PathBuf>,
73 cluster: Arc<TestCluster>,
74 skip_accounts: usize,
77) -> SurfStatistics {
78 let seed = rand::thread_rng().gen::<u64>();
79 info!("Initial Seed: {:?}", seed);
80 let mut rng = StdRng::seed_from_u64(seed);
81 let (exit_sender, exit_rcv) = watch::channel(());
82
83 let mut tasks = SurferTask::create_surfer_tasks(
84 cluster.clone(),
85 rng.gen::<u64>(),
86 exit_rcv,
87 skip_accounts,
88 surf_strategy,
89 )
90 .await;
91 info!("Created {} surfer tasks", tasks.len());
92
93 for path in &package_paths {
94 tasks
95 .choose_mut(&mut rng)
96 .unwrap()
97 .state
98 .publish_package(path)
99 .await;
100 }
101
102 let mut handles = vec![];
103 for task in tasks {
104 handles.push(tokio::task::spawn(task.surf()));
105 }
106 tokio::time::sleep(run_duration).await;
107 exit_sender.send(()).unwrap();
108 let all_stats: Result<Vec<_>, _> = join_all(handles).await.into_iter().collect();
109 SurfStatistics::aggregate(all_stats.unwrap())
110
111 }