1use std::{str::FromStr, time::Duration};
6
7use benchmark::{BenchmarkParametersGenerator, LoadType};
8use clap::Parser;
9use client::{ServerProviderClient, aws::AwsClient};
10use eyre::{Context, Result};
11use faults::FaultsType;
12use measurement::MeasurementsCollection;
13use orchestrator::Orchestrator;
14use protocol::iota::{IotaBenchmarkType, IotaProtocol};
15use settings::{CloudProvider, Settings};
16use ssh::SshConnectionManager;
17use testbed::Testbed;
18
19pub mod benchmark;
20pub mod client;
21pub mod display;
22pub mod error;
23pub mod faults;
24pub mod logs;
25pub mod measurement;
26mod monitor;
27pub mod orchestrator;
28pub mod protocol;
29pub mod settings;
30pub mod ssh;
31pub mod testbed;
32
33type Protocol = IotaProtocol;
34type BenchmarkType = IotaBenchmarkType;
35
36#[derive(Parser)]
37#[command(author, version, about = "Testbed orchestrator", long_about = None)]
38pub struct Opts {
39 #[arg(
43 long,
44 value_name = "FILE",
45 default_value = "crates/iota-aws-orchestrator/assets/settings.json",
46 global = true
47 )]
48 settings_path: String,
49
50 #[command(subcommand)]
52 operation: Operation,
53}
54
55#[derive(Parser)]
56pub enum Operation {
57 Testbed {
59 #[command(subcommand)]
60 action: TestbedAction,
61 },
62
63 Benchmark {
65 #[arg(long, default_value = "0", global = true)]
68 benchmark_type: String,
69
70 #[arg(long, value_name = "INT")]
72 committee: usize,
73
74 #[arg(long, value_name = "INT", default_value = "0", global = true)]
76 faults: usize,
77
78 #[arg(long, action, default_value = "false", global = true)]
80 crash_recovery: bool,
81
82 #[arg(long, value_parser = parse_duration, default_value = "60", global = true)]
84 crash_interval: Duration,
85
86 #[arg(long, value_parser = parse_duration, default_value = "600", global = true)]
88 duration: Duration,
89
90 #[arg(long, value_parser = parse_duration, default_value = "15", global = true)]
92 scrape_interval: Duration,
93
94 #[arg(long, action, default_value = "false", global = true)]
96 skip_testbed_update: bool,
97
98 #[arg(long, action, default_value = "false", global = true)]
100 skip_testbed_configuration: bool,
101
102 #[arg(long, action, default_value = "false", global = true)]
104 log_processing: bool,
105
106 #[arg(long, value_name = "INT", default_value = "0", global = true)]
110 dedicated_clients: usize,
111
112 #[arg(long, action, default_value = "false", global = true)]
115 skip_monitoring: bool,
116
117 #[arg(long, action, value_parser = parse_duration, default_value = "30", global = true)]
119 timeout: Duration,
120
121 #[arg(long, value_name = "INT", default_value = "5", global = true)]
123 retries: usize,
124
125 #[command(subcommand)]
127 load_type: Load,
128
129 #[clap(long, action, default_value_t = false, global = true)]
136 use_internal_ip_addresses: bool,
137 },
138
139 Summarize {
141 #[arg(long, value_name = "FILE")]
143 path: String,
144 },
145}
146
147#[derive(Parser)]
148pub enum TestbedAction {
149 Status,
151
152 Deploy {
155 #[arg(long)]
157 instances: usize,
158
159 #[arg(long)]
163 region: Option<String>,
164 },
165
166 Start {
169 #[arg(long, default_value = "200")]
171 instances: usize,
172 },
173
174 Stop,
176
177 Destroy,
179}
180
181#[derive(Parser)]
182pub enum Load {
183 FixedLoad {
185 #[arg(
187 long,
188 value_name = "INT",
189 num_args(1..),
190 value_delimiter = ','
191 )]
192 loads: Vec<usize>,
193 },
194
195 Search {
197 #[arg(long, value_name = "INT", default_value = "250")]
199 starting_load: usize,
200 #[arg(long, value_name = "INT", default_value = "5")]
203 max_iterations: usize,
204 },
205}
206
207fn parse_duration(arg: &str) -> Result<Duration, std::num::ParseIntError> {
208 let seconds = arg.parse()?;
209 Ok(Duration::from_secs(seconds))
210}
211
212#[tokio::main]
213async fn main() -> Result<()> {
214 color_eyre::install()?;
215 let opts: Opts = Opts::parse();
216
217 let settings = Settings::load(&opts.settings_path).wrap_err("Failed to load settings")?;
219
220 match &settings.cloud_provider {
221 CloudProvider::Aws => {
222 let client = AwsClient::new(settings.clone()).await;
224
225 run(settings, client, opts).await
227 }
228 }
229}
230
231async fn run<C: ServerProviderClient>(settings: Settings, client: C, opts: Opts) -> Result<()> {
232 let mut testbed = Testbed::new(settings.clone(), client)
234 .await
235 .wrap_err("Failed to create testbed")?;
236
237 match opts.operation {
238 Operation::Testbed { action } => match action {
239 TestbedAction::Status => testbed.status(),
241
242 TestbedAction::Deploy { instances, region } => testbed
244 .deploy(instances, region)
245 .await
246 .wrap_err("Failed to deploy testbed")?,
247
248 TestbedAction::Start { instances } => testbed
250 .start(instances)
251 .await
252 .wrap_err("Failed to start testbed")?,
253
254 TestbedAction::Stop => testbed.stop().await.wrap_err("Failed to stop testbed")?,
256
257 TestbedAction::Destroy => testbed
259 .destroy()
260 .await
261 .wrap_err("Failed to destroy testbed")?,
262 },
263
264 Operation::Benchmark {
266 benchmark_type,
267 committee,
268 faults,
269 crash_recovery,
270 crash_interval,
271 duration,
272 scrape_interval,
273 skip_testbed_update,
274 skip_testbed_configuration,
275 log_processing,
276 dedicated_clients,
277 skip_monitoring,
278 timeout,
279 retries,
280 load_type,
281 use_internal_ip_addresses,
282 } => {
283 let username = testbed.username();
285 let private_key_file = settings.ssh_private_key_file.clone();
286 let ssh_manager = SshConnectionManager::new(username.into(), private_key_file)
287 .with_timeout(timeout)
288 .with_retries(retries);
289
290 let instances = testbed.instances();
291
292 let setup_commands = testbed
293 .setup_commands()
294 .await
295 .wrap_err("Failed to load testbed setup commands")?;
296
297 let protocol_commands = Protocol::new(&settings);
298 let benchmark_type = BenchmarkType::from_str(&benchmark_type)?;
299
300 let load = match load_type {
301 Load::FixedLoad { loads } => {
302 let loads = if loads.is_empty() { vec![200] } else { loads };
303 LoadType::Fixed(loads)
304 }
305 Load::Search {
306 starting_load,
307 max_iterations,
308 } => LoadType::Search {
309 starting_load,
310 max_iterations,
311 },
312 };
313
314 let fault_type = if !crash_recovery || faults == 0 {
315 FaultsType::Permanent { faults }
316 } else {
317 FaultsType::CrashRecovery {
318 max_faults: faults,
319 interval: crash_interval,
320 }
321 };
322
323 let generator =
324 BenchmarkParametersGenerator::new(committee, load, use_internal_ip_addresses)
325 .with_benchmark_type(benchmark_type)
326 .with_custom_duration(duration)
327 .with_faults(fault_type);
328
329 Orchestrator::new(
330 settings,
331 instances,
332 setup_commands,
333 protocol_commands,
334 ssh_manager,
335 )
336 .with_scrape_interval(scrape_interval)
337 .with_crash_interval(crash_interval)
338 .skip_testbed_updates(skip_testbed_update)
339 .skip_testbed_configuration(skip_testbed_configuration)
340 .with_log_processing(log_processing)
341 .with_dedicated_clients(dedicated_clients)
342 .skip_monitoring(skip_monitoring)
343 .run_benchmarks(generator)
344 .await
345 .wrap_err("Failed to run benchmarks")?;
346 }
347
348 Operation::Summarize { path } => {
350 MeasurementsCollection::<BenchmarkType>::load(path)?.display_summary()
351 }
352 }
353 Ok(())
354}