1use std::{str::FromStr, time::Duration};
6
7use benchmark::{BenchmarkParametersGenerator, LoadType};
8use clap::{Parser, ValueEnum};
9use client::{ServerProviderClient, aws::AwsClient};
10use eyre::{Context, Result};
11use faults::FaultsType;
12use measurement::MeasurementsCollection;
13use orchestrator::Orchestrator;
14use protocol::iota::{IotaBenchmarkType, IotaProtocol};
15use serde::{Deserialize, Serialize};
16use settings::{CloudProvider, Settings};
17use ssh::SshConnectionManager;
18use testbed::Testbed;
19
20use crate::net_latency::TopologyLayout;
21
22pub mod benchmark;
23pub mod build_cache;
24pub mod client;
25pub mod display;
26pub mod error;
27pub mod faults;
28pub mod logger;
29pub mod logs;
30pub mod measurement;
31mod monitor;
32pub mod net_latency;
33pub mod orchestrator;
34pub mod protocol;
35pub mod settings;
36pub mod ssh;
37pub mod testbed;
38
39type Protocol = IotaProtocol;
40type BenchmarkType = IotaBenchmarkType;
41
42#[derive(Parser)]
43#[command(author, version, about = "Testbed orchestrator", long_about = None)]
44pub struct Opts {
45 #[arg(
49 long,
50 value_name = "FILE",
51 default_value = "crates/iota-aws-orchestrator/assets/settings.json",
52 global = true
53 )]
54 settings_path: String,
55
56 #[command(subcommand)]
58 operation: Operation,
59}
60
61#[derive(Parser)]
62pub enum Operation {
63 Testbed {
65 #[command(subcommand)]
66 action: TestbedAction,
67 },
68
69 Benchmark {
71 #[arg(long, default_value = "0", global = true)]
74 benchmark_type: String,
75
76 #[arg(long, value_name = "INT")]
78 committee: usize,
79
80 #[arg(long, value_name = "INT", default_value = "0", global = true)]
82 faults: usize,
83
84 #[arg(long, action, default_value = "false", global = true)]
86 crash_recovery: bool,
87
88 #[arg(long, value_parser = parse_duration, default_value = "60", global = true)]
90 crash_interval: Duration,
91
92 #[arg(long, value_parser = parse_duration, default_value = "600", global = true)]
94 duration: Duration,
95
96 #[arg(long, value_parser = parse_duration, default_value = "15", global = true)]
98 scrape_interval: Duration,
99
100 #[arg(long, action, default_value = "false", global = true)]
102 skip_testbed_update: bool,
103
104 #[arg(long, action, default_value = "false", global = true)]
106 skip_testbed_configuration: bool,
107
108 #[arg(long, action, default_value = "false", global = true)]
110 log_processing: bool,
111
112 #[arg(long, value_name = "INT", default_value = "0", global = true)]
116 dedicated_clients: usize,
117
118 #[arg(long, action, default_value = "false", global = true)]
121 skip_monitoring: bool,
122
123 #[arg(long, action, value_parser = parse_duration, default_value = "30", global = true)]
125 timeout: Duration,
126
127 #[arg(long, value_name = "INT", default_value = "5", global = true)]
129 retries: usize,
130
131 #[command(subcommand)]
133 load_type: Load,
134
135 #[clap(long, action, default_value_t = false, global = true)]
142 use_internal_ip_addresses: bool,
143
144 #[arg(long, global = true)]
147 latency_topology: Option<LatencyTopology>,
148 #[arg(long = "latency-perturbation-spec", global = true)]
150 latency_perturbation_spec: Option<PerturbationSpec>,
151
152 #[arg(long, value_name = "INT", default_value = "10", global = true)]
154 number_of_clusters: usize,
155
156 #[arg(long, value_name = "INT", default_value = "5", global = true)]
158 number_of_triangles: u16,
159
160 #[arg(long, value_name = "INT", default_value = "20", global = true)]
162 added_latency: u16,
163
164 #[arg(long, value_name = "INT", default_value = "400", global = true)]
166 maximum_latency: u16,
167
168 #[arg(long, default_value = "starfish", global = true)]
171 consensus_protocol: ConsensusProtocol,
172
173 #[arg(long, value_name = "INT", global = true)]
175 epoch_duration_ms: Option<u64>,
176
177 #[arg(long, value_name = "INT", default_value = "400", global = true)]
179 max_pipeline_delay: u32,
180
181 #[arg(long, value_name = "INT", default_value = "1", global = true)]
184 blocking_connections: usize,
185
186 #[arg(long, action, default_value_t = false)]
189 use_current_timestamp_for_genesis: bool,
190
191 #[arg(long, value_name = "INT", global = true)]
194 shared_counter_hotness_factor: Option<u8>,
195
196 #[arg(long, value_name = "INT", global = true)]
198 num_shared_counters: Option<usize>,
199 },
200
201 Summarize {
203 #[arg(long, value_name = "FILE")]
205 path: String,
206 },
207}
208
209#[derive(Parser)]
210pub enum TestbedAction {
211 Status,
213
214 Deploy {
217 #[arg(long)]
219 instances: usize,
220
221 #[arg(long, action, default_value = "false", global = true)]
223 skip_monitoring: bool,
224
225 #[arg(long, value_name = "INT", default_value = "0", global = true)]
227 dedicated_clients: usize,
228
229 #[arg(long, action, default_value = "false", global = true)]
232 use_spot_instances: bool,
233
234 #[arg(long)]
237 id: String,
238 },
239
240 Start {
243 #[arg(long, default_value = "200")]
245 instances: usize,
246
247 #[arg(long, action, default_value = "false", global = true)]
249 skip_monitoring: bool,
250
251 #[arg(long, value_name = "INT", default_value = "0", global = true)]
253 dedicated_clients: usize,
254 },
255
256 Stop {
258 #[arg(long, action, default_value = "false", global = true)]
260 keep_monitoring: bool,
261 },
262
263 Destroy {
265 #[arg(long, action, default_value = "false", global = true)]
267 keep_monitoring: bool,
268
269 #[arg(short = 'f', long, action, default_value = "false", global = true)]
271 force: bool,
272 },
273}
274
275#[derive(Parser)]
276pub enum Load {
277 FixedLoad {
279 #[arg(
281 long,
282 value_name = "INT",
283 num_args(1..),
284 value_delimiter = ','
285 )]
286 loads: Vec<usize>,
287 },
288
289 Search {
291 #[arg(long, value_name = "INT", default_value = "250")]
293 starting_load: usize,
294 #[arg(long, value_name = "INT", default_value = "5")]
297 max_iterations: usize,
298 },
299}
300#[derive(ValueEnum, Clone, Debug)]
301pub enum PerturbationSpec {
302 BrokenTriangle,
303 Blocking,
304 }
306
307#[derive(ValueEnum, Clone, Debug)]
308pub enum LatencyTopology {
309 RandomGeographical,
312 RandomClustered,
315 HardCodedClustered,
317 Mainnet,
319}
320
321#[derive(ValueEnum, Clone, Debug, Deserialize, Serialize)]
322pub enum ConsensusProtocol {
323 Starfish,
324 Mysticeti,
325 SwapEachEpoch,
326}
327
328fn parse_duration(arg: &str) -> Result<Duration, std::num::ParseIntError> {
329 let seconds = arg.parse()?;
330 Ok(Duration::from_secs(seconds))
331}
332
333#[tokio::main]
334async fn main() -> Result<()> {
335 color_eyre::install()?;
336 let opts: Opts = Opts::parse();
337
338 let settings = Settings::load(&opts.settings_path).wrap_err("Failed to load settings")?;
340
341 match &settings.cloud_provider {
342 CloudProvider::Aws => {
343 let client = AwsClient::new(settings.clone()).await;
345
346 run(settings, client, opts).await
348 }
349 }
350}
351
352async fn run<C: ServerProviderClient>(settings: Settings, client: C, opts: Opts) -> Result<()> {
353 let mut testbed = Testbed::new(settings.clone(), client)
355 .await
356 .wrap_err("Failed to create testbed")?;
357
358 match opts.operation {
359 Operation::Testbed { action } => match action {
360 TestbedAction::Status => testbed.status(),
362
363 TestbedAction::Deploy {
365 instances,
366 dedicated_clients,
367 skip_monitoring,
368 use_spot_instances,
369 id,
370 } => testbed
371 .deploy(
372 instances,
373 skip_monitoring,
374 dedicated_clients,
375 use_spot_instances,
376 id,
377 )
378 .await
379 .wrap_err("Failed to deploy testbed")?,
380
381 TestbedAction::Start {
383 instances,
384 skip_monitoring,
385 dedicated_clients,
386 } => testbed
387 .start(instances, dedicated_clients, skip_monitoring)
388 .await
389 .wrap_err("Failed to start testbed")?,
390
391 TestbedAction::Stop { keep_monitoring } => testbed
393 .stop(keep_monitoring)
394 .await
395 .wrap_err("Failed to stop testbed")?,
396
397 TestbedAction::Destroy {
399 keep_monitoring,
400 force,
401 } => testbed
402 .destroy(keep_monitoring, force)
403 .await
404 .wrap_err("Failed to destroy testbed")?,
405 },
406
407 Operation::Benchmark {
409 benchmark_type,
410 committee,
411 faults,
412 crash_recovery,
413 crash_interval,
414 duration,
415 scrape_interval,
416 skip_testbed_update,
417 skip_testbed_configuration,
418 log_processing,
419 dedicated_clients,
420 skip_monitoring,
421 timeout,
422 retries,
423 load_type,
424 use_internal_ip_addresses,
425 latency_perturbation_spec,
426 latency_topology,
427 added_latency,
428 number_of_triangles,
429 number_of_clusters,
430 consensus_protocol,
431 maximum_latency,
432 epoch_duration_ms,
433 blocking_connections,
434 use_current_timestamp_for_genesis,
435 max_pipeline_delay,
436 shared_counter_hotness_factor,
437 num_shared_counters,
438 } => {
439 let username = testbed.username();
441 let private_key_file = settings.ssh_private_key_file.clone();
442 let ssh_manager = SshConnectionManager::new(username.into(), private_key_file)
443 .with_timeout(timeout)
444 .with_retries(retries);
445
446 let node_instances = testbed.node_instances();
447 let client_instances = testbed.client_instances();
448 let metrics_instance = testbed.metrics_instance();
449
450 let setup_commands = testbed
451 .setup_commands()
452 .await
453 .wrap_err("Failed to load testbed setup commands")?;
454
455 let protocol_commands = Protocol::new(&settings);
456 let benchmark_type = BenchmarkType::from_str(&benchmark_type)?;
457
458 let load = match load_type {
459 Load::FixedLoad { loads } => {
460 let loads = if loads.is_empty() { vec![200] } else { loads };
461 LoadType::Fixed(loads)
462 }
463 Load::Search {
464 starting_load,
465 max_iterations,
466 } => LoadType::Search {
467 starting_load,
468 max_iterations,
469 },
470 };
471
472 let fault_type = if !crash_recovery || faults == 0 {
473 FaultsType::Permanent { faults }
474 } else {
475 FaultsType::CrashRecovery {
476 max_faults: faults,
477 interval: crash_interval,
478 }
479 };
480
481 let perturbation_spec = match latency_perturbation_spec {
482 Some(PerturbationSpec::BrokenTriangle) => {
483 net_latency::PerturbationSpec::BrokenTriangle {
484 added_latency,
485 number_of_triangles,
486 }
487 }
488 Some(PerturbationSpec::Blocking) => net_latency::PerturbationSpec::Blocking {
489 number_of_blocked_connections: blocking_connections,
490 },
491 None => net_latency::PerturbationSpec::None,
492 };
493
494 let latency_topology = match latency_topology {
495 Some(LatencyTopology::RandomGeographical) => {
496 Some(TopologyLayout::RandomGeographical)
497 }
498 Some(LatencyTopology::RandomClustered) => {
499 Some(TopologyLayout::RandomClustered { number_of_clusters })
500 }
501 Some(LatencyTopology::HardCodedClustered) => {
502 Some(TopologyLayout::HardCodedClustered)
503 }
504 Some(LatencyTopology::Mainnet) => Some(TopologyLayout::Mainnet),
505 None => None,
506 };
507
508 let mut generator = BenchmarkParametersGenerator::new(
509 committee,
510 dedicated_clients,
511 load,
512 use_internal_ip_addresses,
513 )
514 .with_benchmark_type(benchmark_type)
515 .with_custom_duration(duration)
516 .with_perturbation_spec(perturbation_spec)
517 .with_latency_topology(latency_topology)
518 .with_consensus_protocol(consensus_protocol)
519 .with_max_latency(maximum_latency)
520 .with_epoch_duration(epoch_duration_ms)
521 .with_max_pipeline_delay(max_pipeline_delay)
522 .with_current_timestamp_for_genesis(use_current_timestamp_for_genesis)
523 .with_faults(fault_type);
524
525 if let Some(factor) = shared_counter_hotness_factor {
526 generator = generator.with_shared_counter_hotness_factor(factor);
527 }
528 if let Some(counters) = num_shared_counters {
529 generator = generator.with_num_shared_counters(counters);
530 }
531
532 Orchestrator::new(
533 settings,
534 node_instances,
535 client_instances,
536 metrics_instance,
537 setup_commands,
538 protocol_commands,
539 ssh_manager,
540 )
541 .with_scrape_interval(scrape_interval)
542 .with_crash_interval(crash_interval)
543 .skip_testbed_updates(skip_testbed_update)
544 .skip_testbed_configuration(skip_testbed_configuration)
545 .with_log_processing(log_processing)
546 .with_dedicated_clients(dedicated_clients)
547 .skip_monitoring(skip_monitoring)
548 .run_benchmarks(generator)
549 .await
550 .wrap_err("Failed to run benchmarks")?;
551 }
552
553 Operation::Summarize { path } => {
555 MeasurementsCollection::<BenchmarkType>::load(path)?.display_summary()
556 }
557 }
558 Ok(())
559}