1use std::{str::FromStr, time::Duration};
6
7use benchmark::{BenchmarkParametersGenerator, LoadType};
8use clap::Parser;
9use client::{ServerProviderClient, aws::AwsClient};
10use eyre::{Context, Result};
11use faults::FaultsType;
12use measurement::MeasurementsCollection;
13use orchestrator::Orchestrator;
14use protocol::iota::{IotaBenchmarkType, IotaProtocol};
15use settings::{CloudProvider, Settings};
16use ssh::SshConnectionManager;
17use testbed::Testbed;
18
19pub mod benchmark;
20pub mod client;
21pub mod display;
22pub mod error;
23pub mod faults;
24pub mod logs;
25pub mod measurement;
26mod monitor;
27pub mod orchestrator;
28pub mod protocol;
29pub mod settings;
30pub mod ssh;
31pub mod testbed;
32
33type Protocol = IotaProtocol;
34type BenchmarkType = IotaBenchmarkType;
35
36#[derive(Parser)]
37#[command(author, version, about = "Testbed orchestrator", long_about = None)]
38pub struct Opts {
39 #[arg(
43 long,
44 value_name = "FILE",
45 default_value = "crates/iota-aws-orchestrator/assets/settings.json",
46 global = true
47 )]
48 settings_path: String,
49
50 #[command(subcommand)]
52 operation: Operation,
53}
54
55#[derive(Parser)]
56pub enum Operation {
57 Testbed {
59 #[command(subcommand)]
60 action: TestbedAction,
61 },
62
63 Benchmark {
65 #[arg(long, default_value = "0", global = true)]
68 benchmark_type: String,
69
70 #[arg(long, value_name = "INT")]
72 committee: usize,
73
74 #[arg(long, value_name = "INT", default_value = "0", global = true)]
76 faults: usize,
77
78 #[arg(long, action, default_value = "false", global = true)]
80 crash_recovery: bool,
81
82 #[arg(long, value_parser = parse_duration, default_value = "60", global = true)]
84 crash_interval: Duration,
85
86 #[arg(long, value_parser = parse_duration, default_value = "600", global = true)]
88 duration: Duration,
89
90 #[arg(long, value_parser = parse_duration, default_value = "15", global = true)]
92 scrape_interval: Duration,
93
94 #[arg(long, action, default_value = "false", global = true)]
96 skip_testbed_update: bool,
97
98 #[arg(long, action, default_value = "false", global = true)]
100 skip_testbed_configuration: bool,
101
102 #[arg(long, action, default_value = "false", global = true)]
104 log_processing: bool,
105
106 #[arg(long, value_name = "INT", default_value = "0", global = true)]
110 dedicated_clients: usize,
111
112 #[arg(long, action, default_value = "false", global = true)]
115 skip_monitoring: bool,
116
117 #[arg(long, action, value_parser = parse_duration, default_value = "30", global = true)]
119 timeout: Duration,
120
121 #[arg(long, value_name = "INT", default_value = "5", global = true)]
123 retries: usize,
124
125 #[command(subcommand)]
127 load_type: Load,
128 },
129
130 Summarize {
132 #[arg(long, value_name = "FILE")]
134 path: String,
135 },
136}
137
138#[derive(Parser)]
139pub enum TestbedAction {
140 Status,
142
143 Deploy {
146 #[arg(long)]
148 instances: usize,
149
150 #[arg(long)]
154 region: Option<String>,
155 },
156
157 Start {
160 #[arg(long, default_value = "200")]
162 instances: usize,
163 },
164
165 Stop,
167
168 Destroy,
170}
171
172#[derive(Parser)]
173pub enum Load {
174 FixedLoad {
176 #[arg(
178 long,
179 value_name = "INT",
180 num_args(1..),
181 value_delimiter = ','
182 )]
183 loads: Vec<usize>,
184 },
185
186 Search {
188 #[arg(long, value_name = "INT", default_value = "250")]
190 starting_load: usize,
191 #[arg(long, value_name = "INT", default_value = "5")]
194 max_iterations: usize,
195 },
196}
197
198fn parse_duration(arg: &str) -> Result<Duration, std::num::ParseIntError> {
199 let seconds = arg.parse()?;
200 Ok(Duration::from_secs(seconds))
201}
202
203#[tokio::main]
204async fn main() -> Result<()> {
205 color_eyre::install()?;
206 let opts: Opts = Opts::parse();
207
208 let settings = Settings::load(&opts.settings_path).wrap_err("Failed to load settings")?;
210
211 match &settings.cloud_provider {
212 CloudProvider::Aws => {
213 let client = AwsClient::new(settings.clone()).await;
215
216 run(settings, client, opts).await
218 }
219 }
220}
221
222async fn run<C: ServerProviderClient>(settings: Settings, client: C, opts: Opts) -> Result<()> {
223 let mut testbed = Testbed::new(settings.clone(), client)
225 .await
226 .wrap_err("Failed to create testbed")?;
227
228 match opts.operation {
229 Operation::Testbed { action } => match action {
230 TestbedAction::Status => testbed.status(),
232
233 TestbedAction::Deploy { instances, region } => testbed
235 .deploy(instances, region)
236 .await
237 .wrap_err("Failed to deploy testbed")?,
238
239 TestbedAction::Start { instances } => testbed
241 .start(instances)
242 .await
243 .wrap_err("Failed to start testbed")?,
244
245 TestbedAction::Stop => testbed.stop().await.wrap_err("Failed to stop testbed")?,
247
248 TestbedAction::Destroy => testbed
250 .destroy()
251 .await
252 .wrap_err("Failed to destroy testbed")?,
253 },
254
255 Operation::Benchmark {
257 benchmark_type,
258 committee,
259 faults,
260 crash_recovery,
261 crash_interval,
262 duration,
263 scrape_interval,
264 skip_testbed_update,
265 skip_testbed_configuration,
266 log_processing,
267 dedicated_clients,
268 skip_monitoring,
269 timeout,
270 retries,
271 load_type,
272 } => {
273 let username = testbed.username();
275 let private_key_file = settings.ssh_private_key_file.clone();
276 let ssh_manager = SshConnectionManager::new(username.into(), private_key_file)
277 .with_timeout(timeout)
278 .with_retries(retries);
279
280 let instances = testbed.instances();
281
282 let setup_commands = testbed
283 .setup_commands()
284 .await
285 .wrap_err("Failed to load testbed setup commands")?;
286
287 let protocol_commands = Protocol::new(&settings);
288 let benchmark_type = BenchmarkType::from_str(&benchmark_type)?;
289
290 let load = match load_type {
291 Load::FixedLoad { loads } => {
292 let loads = if loads.is_empty() { vec![200] } else { loads };
293 LoadType::Fixed(loads)
294 }
295 Load::Search {
296 starting_load,
297 max_iterations,
298 } => LoadType::Search {
299 starting_load,
300 max_iterations,
301 },
302 };
303
304 let fault_type = if !crash_recovery || faults == 0 {
305 FaultsType::Permanent { faults }
306 } else {
307 FaultsType::CrashRecovery {
308 max_faults: faults,
309 interval: crash_interval,
310 }
311 };
312
313 let generator = BenchmarkParametersGenerator::new(committee, load)
314 .with_benchmark_type(benchmark_type)
315 .with_custom_duration(duration)
316 .with_faults(fault_type);
317
318 Orchestrator::new(
319 settings,
320 instances,
321 setup_commands,
322 protocol_commands,
323 ssh_manager,
324 )
325 .with_scrape_interval(scrape_interval)
326 .with_crash_interval(crash_interval)
327 .skip_testbed_updates(skip_testbed_update)
328 .skip_testbed_configuration(skip_testbed_configuration)
329 .with_log_processing(log_processing)
330 .with_dedicated_clients(dedicated_clients)
331 .skip_monitoring(skip_monitoring)
332 .run_benchmarks(generator)
333 .await
334 .wrap_err("Failed to run benchmarks")?;
335 }
336
337 Operation::Summarize { path } => {
339 MeasurementsCollection::<BenchmarkType>::load(path)?.display_summary()
340 }
341 }
342 Ok(())
343}