1use std::{str::FromStr, time::Duration};
6
7use benchmark::{BenchmarkParametersGenerator, LoadType};
8use clap::{Parser, ValueEnum};
9use client::{ServerProviderClient, aws::AwsClient};
10use eyre::{Context, Result};
11use faults::FaultsType;
12use measurement::MeasurementsCollection;
13use orchestrator::Orchestrator;
14use protocol::iota::{IotaBenchmarkType, IotaProtocol};
15use serde::{Deserialize, Serialize};
16use settings::{CloudProvider, Settings};
17use ssh::SshConnectionManager;
18use testbed::Testbed;
19
20use crate::net_latency::TopologyLayout;
21
22pub mod benchmark;
23pub mod build_cache;
24pub mod client;
25pub mod display;
26pub mod error;
27pub mod faults;
28pub mod logger;
29pub mod logs;
30pub mod measurement;
31mod monitor;
32pub mod net_latency;
33pub mod orchestrator;
34pub mod protocol;
35pub mod settings;
36pub mod ssh;
37pub mod testbed;
38
39type Protocol = IotaProtocol;
40type BenchmarkType = IotaBenchmarkType;
41
42#[derive(Parser)]
43#[command(author, version, about = "Testbed orchestrator", long_about = None)]
44pub struct Opts {
45 #[arg(
49 long,
50 value_name = "FILE",
51 default_value = "crates/iota-aws-orchestrator/assets/settings.json",
52 global = true
53 )]
54 settings_path: String,
55
56 #[command(subcommand)]
58 operation: Operation,
59}
60
61#[derive(Parser)]
62pub enum Operation {
63 Testbed {
65 #[command(subcommand)]
66 action: TestbedAction,
67 },
68
69 Benchmark {
71 #[arg(long, default_value = "0", global = true)]
74 benchmark_type: String,
75
76 #[arg(long, value_name = "INT")]
78 committee: usize,
79
80 #[arg(long, value_name = "INT", default_value = "0", global = true)]
82 faults: usize,
83
84 #[arg(long, action, default_value = "false", global = true)]
86 crash_recovery: bool,
87
88 #[arg(long, value_parser = parse_duration, default_value = "60", global = true)]
90 crash_interval: Duration,
91
92 #[arg(long, value_parser = parse_duration, default_value = "600", global = true)]
94 duration: Duration,
95
96 #[arg(long, value_parser = parse_duration, default_value = "15", global = true)]
98 scrape_interval: Duration,
99
100 #[arg(long, action, default_value = "false", global = true)]
102 skip_testbed_update: bool,
103
104 #[arg(long, action, default_value = "false", global = true)]
106 skip_testbed_configuration: bool,
107
108 #[arg(long, action, default_value = "false", global = true)]
110 log_processing: bool,
111
112 #[arg(long, value_name = "INT", default_value = "0", global = true)]
116 dedicated_clients: usize,
117
118 #[arg(long, action, default_value = "false", global = true)]
121 skip_monitoring: bool,
122
123 #[arg(long, action, value_parser = parse_duration, default_value = "30", global = true)]
125 timeout: Duration,
126
127 #[arg(long, value_name = "INT", default_value = "5", global = true)]
129 retries: usize,
130
131 #[command(subcommand)]
133 load_type: Load,
134
135 #[clap(long, action, default_value_t = false, global = true)]
142 use_internal_ip_addresses: bool,
143
144 #[arg(long, global = true)]
147 latency_topology: Option<LatencyTopology>,
148 #[arg(long = "latency-perturbation-spec", global = true)]
150 latency_perturbation_spec: Option<PerturbationSpec>,
151
152 #[arg(long, value_name = "INT", default_value = "10", global = true)]
154 number_of_clusters: usize,
155
156 #[arg(long, value_name = "INT", default_value = "5", global = true)]
158 number_of_triangles: u16,
159
160 #[arg(long, value_name = "INT", default_value = "20", global = true)]
162 added_latency: u16,
163
164 #[arg(long, value_name = "INT", default_value = "400", global = true)]
166 maximum_latency: u16,
167
168 #[arg(long, default_value = "starfish", global = true)]
171 consensus_protocol: ConsensusProtocol,
172
173 #[arg(long, value_name = "INT", global = true)]
175 epoch_duration_ms: Option<u64>,
176
177 #[arg(long, value_name = "INT", default_value = "400", global = true)]
179 max_pipeline_delay: u32,
180
181 #[arg(long, value_name = "INT", default_value = "1", global = true)]
184 blocking_connections: usize,
185
186 #[arg(long, action, default_value_t = false)]
189 use_current_timestamp_for_genesis: bool,
190 },
191
192 Summarize {
194 #[arg(long, value_name = "FILE")]
196 path: String,
197 },
198}
199
200#[derive(Parser)]
201pub enum TestbedAction {
202 Status,
204
205 Deploy {
208 #[arg(long)]
210 instances: usize,
211
212 #[arg(long, action, default_value = "false", global = true)]
214 skip_monitoring: bool,
215
216 #[arg(long, value_name = "INT", default_value = "0", global = true)]
218 dedicated_clients: usize,
219
220 #[arg(long, action, default_value = "false", global = true)]
223 use_spot_instances: bool,
224
225 #[arg(long)]
228 id: String,
229 },
230
231 Start {
234 #[arg(long, default_value = "200")]
236 instances: usize,
237
238 #[arg(long, action, default_value = "false", global = true)]
240 skip_monitoring: bool,
241
242 #[arg(long, value_name = "INT", default_value = "0", global = true)]
244 dedicated_clients: usize,
245 },
246
247 Stop {
249 #[arg(long, action, default_value = "false", global = true)]
251 keep_monitoring: bool,
252 },
253
254 Destroy {
256 #[arg(long, action, default_value = "false", global = true)]
258 keep_monitoring: bool,
259 },
260}
261
262#[derive(Parser)]
263pub enum Load {
264 FixedLoad {
266 #[arg(
268 long,
269 value_name = "INT",
270 num_args(1..),
271 value_delimiter = ','
272 )]
273 loads: Vec<usize>,
274 },
275
276 Search {
278 #[arg(long, value_name = "INT", default_value = "250")]
280 starting_load: usize,
281 #[arg(long, value_name = "INT", default_value = "5")]
284 max_iterations: usize,
285 },
286}
287#[derive(ValueEnum, Clone, Debug)]
288pub enum PerturbationSpec {
289 BrokenTriangle,
290 Blocking,
291 }
293
294#[derive(ValueEnum, Clone, Debug)]
295pub enum LatencyTopology {
296 RandomGeographical,
299 RandomClustered,
302 HardCodedClustered,
304 Mainnet,
306}
307
308#[derive(ValueEnum, Clone, Debug, Deserialize, Serialize)]
309pub enum ConsensusProtocol {
310 Starfish,
311 Mysticeti,
312 SwapEachEpoch,
313}
314
315fn parse_duration(arg: &str) -> Result<Duration, std::num::ParseIntError> {
316 let seconds = arg.parse()?;
317 Ok(Duration::from_secs(seconds))
318}
319
320#[tokio::main]
321async fn main() -> Result<()> {
322 color_eyre::install()?;
323 let opts: Opts = Opts::parse();
324
325 let settings = Settings::load(&opts.settings_path).wrap_err("Failed to load settings")?;
327
328 match &settings.cloud_provider {
329 CloudProvider::Aws => {
330 let client = AwsClient::new(settings.clone()).await;
332
333 run(settings, client, opts).await
335 }
336 }
337}
338
339async fn run<C: ServerProviderClient>(settings: Settings, client: C, opts: Opts) -> Result<()> {
340 let mut testbed = Testbed::new(settings.clone(), client)
342 .await
343 .wrap_err("Failed to create testbed")?;
344
345 match opts.operation {
346 Operation::Testbed { action } => match action {
347 TestbedAction::Status => testbed.status(),
349
350 TestbedAction::Deploy {
352 instances,
353 dedicated_clients,
354 skip_monitoring,
355 use_spot_instances,
356 id,
357 } => testbed
358 .deploy(
359 instances,
360 skip_monitoring,
361 dedicated_clients,
362 use_spot_instances,
363 id,
364 )
365 .await
366 .wrap_err("Failed to deploy testbed")?,
367
368 TestbedAction::Start {
370 instances,
371 skip_monitoring,
372 dedicated_clients,
373 } => testbed
374 .start(instances, dedicated_clients, skip_monitoring)
375 .await
376 .wrap_err("Failed to start testbed")?,
377
378 TestbedAction::Stop { keep_monitoring } => testbed
380 .stop(keep_monitoring)
381 .await
382 .wrap_err("Failed to stop testbed")?,
383
384 TestbedAction::Destroy { keep_monitoring } => testbed
386 .destroy(keep_monitoring)
387 .await
388 .wrap_err("Failed to destroy testbed")?,
389 },
390
391 Operation::Benchmark {
393 benchmark_type,
394 committee,
395 faults,
396 crash_recovery,
397 crash_interval,
398 duration,
399 scrape_interval,
400 skip_testbed_update,
401 skip_testbed_configuration,
402 log_processing,
403 dedicated_clients,
404 skip_monitoring,
405 timeout,
406 retries,
407 load_type,
408 use_internal_ip_addresses,
409 latency_perturbation_spec,
410 latency_topology,
411 added_latency,
412 number_of_triangles,
413 number_of_clusters,
414 consensus_protocol,
415 maximum_latency,
416 epoch_duration_ms,
417 blocking_connections,
418 use_current_timestamp_for_genesis,
419 max_pipeline_delay,
420 } => {
421 let username = testbed.username();
423 let private_key_file = settings.ssh_private_key_file.clone();
424 let ssh_manager = SshConnectionManager::new(username.into(), private_key_file)
425 .with_timeout(timeout)
426 .with_retries(retries);
427
428 let node_instances = testbed.node_instances();
429 let client_instances = testbed.client_instances();
430 let metrics_instance = testbed.metrics_instance();
431
432 let setup_commands = testbed
433 .setup_commands()
434 .await
435 .wrap_err("Failed to load testbed setup commands")?;
436
437 let protocol_commands = Protocol::new(&settings);
438 let benchmark_type = BenchmarkType::from_str(&benchmark_type)?;
439
440 let load = match load_type {
441 Load::FixedLoad { loads } => {
442 let loads = if loads.is_empty() { vec![200] } else { loads };
443 LoadType::Fixed(loads)
444 }
445 Load::Search {
446 starting_load,
447 max_iterations,
448 } => LoadType::Search {
449 starting_load,
450 max_iterations,
451 },
452 };
453
454 let fault_type = if !crash_recovery || faults == 0 {
455 FaultsType::Permanent { faults }
456 } else {
457 FaultsType::CrashRecovery {
458 max_faults: faults,
459 interval: crash_interval,
460 }
461 };
462
463 let perturbation_spec = match latency_perturbation_spec {
464 Some(PerturbationSpec::BrokenTriangle) => {
465 net_latency::PerturbationSpec::BrokenTriangle {
466 added_latency,
467 number_of_triangles,
468 }
469 }
470 Some(PerturbationSpec::Blocking) => net_latency::PerturbationSpec::Blocking {
471 number_of_blocked_connections: blocking_connections,
472 },
473 None => net_latency::PerturbationSpec::None,
474 };
475
476 let latency_topology = match latency_topology {
477 Some(LatencyTopology::RandomGeographical) => {
478 Some(TopologyLayout::RandomGeographical)
479 }
480 Some(LatencyTopology::RandomClustered) => {
481 Some(TopologyLayout::RandomClustered { number_of_clusters })
482 }
483 Some(LatencyTopology::HardCodedClustered) => {
484 Some(TopologyLayout::HardCodedClustered)
485 }
486 Some(LatencyTopology::Mainnet) => Some(TopologyLayout::Mainnet),
487 None => None,
488 };
489
490 let generator = BenchmarkParametersGenerator::new(
491 committee,
492 dedicated_clients,
493 load,
494 use_internal_ip_addresses,
495 )
496 .with_benchmark_type(benchmark_type)
497 .with_custom_duration(duration)
498 .with_perturbation_spec(perturbation_spec)
499 .with_latency_topology(latency_topology)
500 .with_consensus_protocol(consensus_protocol)
501 .with_max_latency(maximum_latency)
502 .with_epoch_duration(epoch_duration_ms)
503 .with_max_pipeline_delay(max_pipeline_delay)
504 .with_current_timestamp_for_genesis(use_current_timestamp_for_genesis)
505 .with_faults(fault_type);
506
507 Orchestrator::new(
508 settings,
509 node_instances,
510 client_instances,
511 metrics_instance,
512 setup_commands,
513 protocol_commands,
514 ssh_manager,
515 )
516 .with_scrape_interval(scrape_interval)
517 .with_crash_interval(crash_interval)
518 .skip_testbed_updates(skip_testbed_update)
519 .skip_testbed_configuration(skip_testbed_configuration)
520 .with_log_processing(log_processing)
521 .with_dedicated_clients(dedicated_clients)
522 .skip_monitoring(skip_monitoring)
523 .run_benchmarks(generator)
524 .await
525 .wrap_err("Failed to run benchmarks")?;
526 }
527
528 Operation::Summarize { path } => {
530 MeasurementsCollection::<BenchmarkType>::load(path)?.display_summary()
531 }
532 }
533 Ok(())
534}