1use std::{fs, str::FromStr, time::Duration};
6
7use benchmark::{BenchmarkParametersGenerator, LoadType};
8use clap::{Parser, ValueEnum};
9use client::{ServerProviderClient, aws::AwsClient};
10use eyre::{Context, Result};
11use faults::FaultsType;
12use iota_benchmark::workloads::abstract_account::{AuthenticatorKind, TxPayloadObjType};
13use measurement::MeasurementsCollection;
14use orchestrator::Orchestrator;
15use protocol::iota::{IotaBenchmarkType, IotaProtocol};
16use serde::{Deserialize, Serialize};
17use settings::{CloudProvider, Settings};
18use ssh::SshConnectionManager;
19use testbed::Testbed;
20
21use crate::{benchmark::RunInterval, net_latency::TopologyLayout};
22
23pub mod benchmark;
24pub mod build_cache;
25pub mod client;
26pub mod display;
27pub mod error;
28pub mod faults;
29pub mod logger;
30pub mod logs;
31pub mod measurement;
32mod monitor;
33pub mod net_latency;
34pub mod orchestrator;
35pub mod protocol;
36pub mod settings;
37pub mod ssh;
38pub mod testbed;
39
40type Protocol = IotaProtocol;
41type BenchmarkType = IotaBenchmarkType;
42
43#[derive(Parser)]
44#[command(author, version, about = "Testbed orchestrator", long_about = None)]
45pub struct Opts {
46 #[arg(
50 long,
51 value_name = "FILE",
52 default_value = "crates/iota-aws-orchestrator/assets/settings.json",
53 global = true
54 )]
55 settings_path: String,
56
57 #[command(subcommand)]
59 operation: Operation,
60}
61
62fn parse_run_interval(s: &str) -> Result<RunInterval, String> {
63 s.parse()
64}
65
66#[derive(Parser)]
67#[allow(clippy::large_enum_variant)]
68pub enum Operation {
69 Testbed {
71 #[command(subcommand)]
72 action: TestbedAction,
73 },
74
75 Benchmark {
77 #[arg(long, default_value = "0", global = true)]
79 benchmark_type: String,
80
81 #[arg(long, default_value = "ed25519", global = true)]
83 aa_authenticator: AuthenticatorKind,
84
85 #[arg(long, default_value = "false", global = true)]
87 should_fail: bool,
88
89 #[arg(long, default_value = "owned-object", global = true)]
91 tx_payload_obj_type: TxPayloadObjType,
92
93 #[arg(long, default_value = "2", global = true)]
97 stress_num_workers: u64,
98
99 #[arg(long, default_value = "1000", global = true)]
102 aa_split_amount: u64,
103
104 #[arg(long, default_value = "10", global = true)]
109 stress_in_flight_ratio: u64,
110
111 #[arg(long, default_value = "8", global = true)]
114 stress_num_client_threads: u64,
115
116 #[arg(long, default_value = "8", global = true)]
119 stress_num_server_threads: u64,
120
121 #[arg(long, value_name = "INT")]
123 committee: usize,
124
125 #[arg(long, value_name = "INT", default_value = "0", global = true)]
127 faults: usize,
128
129 #[arg(long, action, default_value = "false", global = true)]
131 crash_recovery: bool,
132
133 #[arg(long, value_parser = parse_duration, default_value = "60", global = true)]
135 crash_interval: Duration,
136
137 #[arg(long, value_parser = parse_run_interval)]
139 run_interval: RunInterval,
140
141 #[arg(long, value_parser = parse_duration, default_value = "15", global = true)]
143 scrape_interval: Duration,
144
145 #[arg(long, action, default_value = "false", global = true)]
147 skip_testbed_update: bool,
148
149 #[arg(long, action, default_value = "false", global = true)]
151 skip_testbed_configuration: bool,
152
153 #[arg(long, action, default_value = "false", global = true)]
155 log_processing: bool,
156
157 #[arg(long, value_name = "INT", default_value = "0", global = true)]
161 dedicated_clients: usize,
162
163 #[arg(long, action, default_value = "false", global = true)]
166 skip_monitoring: bool,
167
168 #[arg(long, action, value_parser = parse_duration, default_value = "30", global = true)]
170 timeout: Duration,
171
172 #[arg(long, value_name = "INT", default_value = "5", global = true)]
174 retries: usize,
175
176 #[command(subcommand)]
178 load_type: Load,
179
180 #[clap(long, action, default_value_t = false, global = true)]
187 use_internal_ip_addresses: bool,
188
189 #[arg(long, global = true)]
192 latency_topology: Option<LatencyTopology>,
193 #[arg(long = "latency-perturbation-spec", global = true)]
195 latency_perturbation_spec: Option<PerturbationSpec>,
196
197 #[arg(long, value_name = "INT", default_value = "10", global = true)]
199 number_of_clusters: usize,
200
201 #[arg(long, value_name = "INT", default_value = "5", global = true)]
203 number_of_triangles: u16,
204
205 #[arg(long, value_name = "INT", default_value = "20", global = true)]
207 added_latency: u16,
208
209 #[arg(long, value_name = "INT", default_value = "400", global = true)]
211 maximum_latency: u16,
212
213 #[arg(long, default_value = "starfish", global = true)]
216 consensus_protocol: ConsensusProtocol,
217
218 #[arg(long, value_name = "INT", global = true)]
220 epoch_duration_ms: Option<u64>,
221
222 #[arg(long, value_name = "INT", default_value = "400", global = true)]
224 max_pipeline_delay: u32,
225
226 #[arg(long, value_name = "INT", default_value = "1", global = true)]
229 blocking_connections: usize,
230
231 #[arg(long, action, default_value_t = false)]
234 use_current_timestamp_for_genesis: bool,
235
236 #[arg(long, value_name = "INT", global = true)]
239 shared_counter_hotness_factor: Option<u8>,
240
241 #[arg(long, value_name = "INT", global = true)]
243 num_shared_counters: Option<usize>,
244
245 #[arg(long, value_name = "/home/ubuntu/benchmark_stats.json", global = true)]
248 benchmark_stats_path: Option<String>,
249 },
250
251 Summarize {
253 #[arg(long, value_name = "FILE")]
255 path: String,
256 },
257}
258
259#[derive(Parser)]
260pub enum TestbedAction {
261 Status,
263
264 Deploy {
267 #[arg(long)]
269 instances: usize,
270
271 #[arg(long, action, default_value = "false", global = true)]
273 skip_monitoring: bool,
274
275 #[arg(long, value_name = "INT", default_value = "0", global = true)]
277 dedicated_clients: usize,
278
279 #[arg(long, action, default_value = "false", global = true)]
282 use_spot_instances: bool,
283
284 #[arg(long)]
287 id: String,
288 },
289
290 Start {
293 #[arg(long, default_value = "200")]
295 instances: usize,
296
297 #[arg(long, action, default_value = "false", global = true)]
299 skip_monitoring: bool,
300
301 #[arg(long, value_name = "INT", default_value = "0", global = true)]
303 dedicated_clients: usize,
304 },
305
306 Stop {
308 #[arg(long, action, default_value = "false", global = true)]
310 keep_monitoring: bool,
311 },
312
313 Destroy {
315 #[arg(long, action, default_value = "false", global = true)]
317 keep_monitoring: bool,
318
319 #[arg(short = 'f', long, action, default_value = "false", global = true)]
321 force: bool,
322 },
323}
324
325#[derive(Parser)]
326pub enum Load {
327 FixedLoad {
329 #[arg(
331 long,
332 value_name = "INT",
333 num_args(1..),
334 value_delimiter = ','
335 )]
336 loads: Vec<usize>,
337 },
338
339 Search {
341 #[arg(long, value_name = "INT", default_value = "250")]
343 starting_load: usize,
344 #[arg(long, value_name = "INT", default_value = "5")]
347 max_iterations: usize,
348 },
349}
350#[derive(ValueEnum, Clone, Debug)]
351pub enum PerturbationSpec {
352 BrokenTriangle,
353 Blocking,
354 }
356
357#[derive(ValueEnum, Clone, Debug)]
358pub enum LatencyTopology {
359 RandomGeographical,
362 RandomClustered,
365 HardCodedClustered,
367 Mainnet,
369}
370
371#[derive(ValueEnum, Clone, Debug, Deserialize, Serialize)]
372pub enum ConsensusProtocol {
373 Starfish,
374 Mysticeti,
375 SwapEachEpoch,
376}
377
378fn parse_duration(arg: &str) -> Result<Duration, std::num::ParseIntError> {
379 let seconds = arg.parse()?;
380 Ok(Duration::from_secs(seconds))
381}
382
383#[tokio::main]
384async fn main() -> Result<()> {
385 color_eyre::install()?;
386 let opts: Opts = Opts::parse();
387
388 let settings = Settings::load(&opts.settings_path).wrap_err("Failed to load settings")?;
390
391 match &settings.cloud_provider {
392 CloudProvider::Aws => {
393 let client = AwsClient::new(settings.clone()).await;
395
396 run(settings, client, opts).await
398 }
399 }
400}
401
402fn init_benchmark_logger(
405 settings: &Settings,
406 operation: &str,
407) -> std::io::Result<(std::path::PathBuf, crate::logger::SwappableWriter)> {
408 let commit = settings.repository.commit.replace("/", "_");
409
410 let mut timestamp = chrono::Local::now().format("%y%m%d_%H%M%S").to_string();
411 timestamp.push('_');
412 timestamp.push_str(operation);
413
414 let benchmark_dir = settings.results_dir.join(&commit).join(×tamp);
415 fs::create_dir_all(&benchmark_dir)?;
416
417 let swappable_writer = crate::logger::init_logger(&benchmark_dir)?;
418
419 Ok((benchmark_dir, swappable_writer))
420}
421
422async fn run<C: ServerProviderClient>(settings: Settings, client: C, opts: Opts) -> Result<()> {
423 let mut testbed = Testbed::new(settings.clone(), client)
425 .await
426 .wrap_err("Failed to create testbed")?;
427
428 match opts.operation {
429 Operation::Testbed { action } => match action {
430 TestbedAction::Status => testbed.status(),
432
433 TestbedAction::Deploy {
435 instances,
436 dedicated_clients,
437 skip_monitoring,
438 use_spot_instances,
439 id,
440 } => {
441 let (_benchmark_dir, _writer) = init_benchmark_logger(&settings, "deploy")?;
442
443 testbed
444 .deploy(
445 instances,
446 skip_monitoring,
447 dedicated_clients,
448 use_spot_instances,
449 id,
450 )
451 .await
452 .wrap_err("Failed to deploy testbed")?
453 }
454
455 TestbedAction::Start {
457 instances,
458 skip_monitoring,
459 dedicated_clients,
460 } => testbed
461 .start(instances, dedicated_clients, skip_monitoring)
462 .await
463 .wrap_err("Failed to start testbed")?,
464
465 TestbedAction::Stop { keep_monitoring } => testbed
467 .stop(keep_monitoring)
468 .await
469 .wrap_err("Failed to stop testbed")?,
470
471 TestbedAction::Destroy {
473 keep_monitoring,
474 force,
475 } => testbed
476 .destroy(keep_monitoring, force)
477 .await
478 .wrap_err("Failed to destroy testbed")?,
479 },
480
481 Operation::Benchmark {
483 benchmark_type,
484 committee,
485 faults,
486 crash_recovery,
487 crash_interval,
488 run_interval,
489 scrape_interval,
490 skip_testbed_update,
491 skip_testbed_configuration,
492 log_processing,
493 dedicated_clients,
494 skip_monitoring,
495 timeout,
496 retries,
497 load_type,
498 use_internal_ip_addresses,
499 latency_perturbation_spec,
500 latency_topology,
501 added_latency,
502 number_of_triangles,
503 number_of_clusters,
504 consensus_protocol,
505 maximum_latency,
506 epoch_duration_ms,
507 blocking_connections,
508 use_current_timestamp_for_genesis,
509 max_pipeline_delay,
510 aa_authenticator,
511 should_fail,
512 tx_payload_obj_type,
513 stress_num_workers,
514 aa_split_amount,
515 stress_in_flight_ratio,
516 stress_num_client_threads,
517 stress_num_server_threads,
518 shared_counter_hotness_factor,
519 num_shared_counters,
520 benchmark_stats_path,
521 } => {
522 let username = testbed.username();
524 let private_key_file = settings.ssh_private_key_file.clone();
525 let ssh_manager = SshConnectionManager::new(username.into(), private_key_file)
526 .with_timeout(timeout)
527 .with_retries(retries);
528
529 let node_instances = testbed.node_instances();
530 let client_instances = testbed.client_instances();
531 let metrics_instance = testbed.metrics_instance();
532
533 let setup_commands = testbed
534 .setup_commands()
535 .await
536 .wrap_err("Failed to load testbed setup commands")?;
537
538 let protocol_commands = Protocol::new(&settings);
539 let benchmark_type =
540 BenchmarkType::from_str(&benchmark_type).map_err(|e| eyre::eyre!(e))?;
541
542 let load = match load_type {
543 Load::FixedLoad { loads } => {
544 let loads = if loads.is_empty() { vec![200] } else { loads };
545 LoadType::Fixed(loads)
546 }
547 Load::Search {
548 starting_load,
549 max_iterations,
550 } => LoadType::Search {
551 starting_load,
552 max_iterations,
553 },
554 };
555
556 let fault_type = if !crash_recovery || faults == 0 {
557 FaultsType::Permanent { faults }
558 } else {
559 FaultsType::CrashRecovery {
560 max_faults: faults,
561 interval: crash_interval,
562 }
563 };
564
565 let perturbation_spec = match latency_perturbation_spec {
566 Some(PerturbationSpec::BrokenTriangle) => {
567 net_latency::PerturbationSpec::BrokenTriangle {
568 added_latency,
569 number_of_triangles,
570 }
571 }
572 Some(PerturbationSpec::Blocking) => net_latency::PerturbationSpec::Blocking {
573 number_of_blocked_connections: blocking_connections,
574 },
575 None => net_latency::PerturbationSpec::None,
576 };
577
578 let latency_topology = match latency_topology {
579 Some(LatencyTopology::RandomGeographical) => {
580 Some(TopologyLayout::RandomGeographical)
581 }
582 Some(LatencyTopology::RandomClustered) => {
583 Some(TopologyLayout::RandomClustered { number_of_clusters })
584 }
585 Some(LatencyTopology::HardCodedClustered) => {
586 Some(TopologyLayout::HardCodedClustered)
587 }
588 Some(LatencyTopology::Mainnet) => Some(TopologyLayout::Mainnet),
589 None => None,
590 };
591
592 let (benchmark_dir, writer) = init_benchmark_logger(&settings, "benchmark_run")?;
593
594 let mut generator = BenchmarkParametersGenerator::new(
595 committee,
596 dedicated_clients,
597 load,
598 use_internal_ip_addresses,
599 )
600 .with_benchmark_type(benchmark_type)
601 .with_custom_run_interval(run_interval)
602 .with_perturbation_spec(perturbation_spec)
603 .with_latency_topology(latency_topology)
604 .with_consensus_protocol(consensus_protocol)
605 .with_max_latency(maximum_latency)
606 .with_epoch_duration(epoch_duration_ms)
607 .with_max_pipeline_delay(max_pipeline_delay)
608 .with_current_timestamp_for_genesis(use_current_timestamp_for_genesis)
609 .with_faults(fault_type)
610 .with_aa_authenticator(aa_authenticator)
611 .with_should_fail(should_fail)
612 .with_tx_payload_obj_type(tx_payload_obj_type)
613 .with_stress_num_workers(stress_num_workers)
614 .with_aa_split_amount(aa_split_amount)
615 .with_stress_in_flight_ratio(stress_in_flight_ratio)
616 .with_stress_client_threads(stress_num_client_threads)
617 .with_stress_server_threads(stress_num_server_threads)
618 .with_benchmark_stats_path(benchmark_stats_path.clone());
619
620 if let Some(factor) = shared_counter_hotness_factor {
621 generator = generator.with_shared_counter_hotness_factor(factor);
622 }
623 if let Some(counters) = num_shared_counters {
624 generator = generator.with_num_shared_counters(counters);
625 }
626
627 Orchestrator::new(
628 settings,
629 node_instances,
630 client_instances,
631 metrics_instance,
632 setup_commands,
633 protocol_commands,
634 ssh_manager,
635 )
636 .with_scrape_interval(scrape_interval)
637 .with_crash_interval(crash_interval)
638 .skip_testbed_updates(skip_testbed_update)
639 .skip_testbed_configuration(skip_testbed_configuration)
640 .with_log_processing(log_processing)
641 .with_dedicated_clients(dedicated_clients)
642 .skip_monitoring(skip_monitoring)
643 .with_benchmark_dir_and_writer(benchmark_dir, writer)
644 .run_benchmarks(generator)
645 .await
646 .wrap_err("Failed to run benchmarks")?;
647 }
648
649 Operation::Summarize { path } => {
651 MeasurementsCollection::<BenchmarkType>::load(path)?.display_summary()
652 }
653 }
654 Ok(())
655}