iota_aws_orchestrator/
main.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{fs, str::FromStr, time::Duration};
6
7use benchmark::{BenchmarkParametersGenerator, LoadType};
8use clap::{Parser, ValueEnum};
9use client::{ServerProviderClient, aws::AwsClient};
10use eyre::{Context, Result};
11use faults::FaultsType;
12use iota_benchmark::workloads::abstract_account::{AuthenticatorKind, TxPayloadObjType};
13use measurement::MeasurementsCollection;
14use orchestrator::Orchestrator;
15use protocol::iota::{IotaBenchmarkType, IotaProtocol};
16use serde::{Deserialize, Serialize};
17use settings::{CloudProvider, Settings};
18use ssh::SshConnectionManager;
19use testbed::Testbed;
20
21use crate::{benchmark::RunInterval, net_latency::TopologyLayout};
22
23pub mod benchmark;
24pub mod build_cache;
25pub mod client;
26pub mod display;
27pub mod error;
28pub mod faults;
29pub mod logger;
30pub mod logs;
31pub mod measurement;
32mod monitor;
33pub mod net_latency;
34pub mod orchestrator;
35pub mod protocol;
36pub mod settings;
37pub mod ssh;
38pub mod testbed;
39
40type Protocol = IotaProtocol;
41type BenchmarkType = IotaBenchmarkType;
42
43#[derive(Parser)]
44#[command(author, version, about = "Testbed orchestrator", long_about = None)]
45pub struct Opts {
46    /// The path to the settings file. This file contains basic information to
47    /// deploy testbeds and run benchmarks such as the url of the git repo,
48    /// the commit to deploy, etc.
49    #[arg(
50        long,
51        value_name = "FILE",
52        default_value = "crates/iota-aws-orchestrator/assets/settings.json",
53        global = true
54    )]
55    settings_path: String,
56
57    /// The type of operation to run.
58    #[command(subcommand)]
59    operation: Operation,
60}
61
62fn parse_run_interval(s: &str) -> Result<RunInterval, String> {
63    s.parse()
64}
65
66#[derive(Parser)]
67#[allow(clippy::large_enum_variant)]
68pub enum Operation {
69    /// Get or modify the status of the testbed.
70    Testbed {
71        #[command(subcommand)]
72        action: TestbedAction,
73    },
74
75    /// Run a benchmark on the specified testbed.
76    Benchmark {
77        /// The type of benchmark to run.
78        #[arg(long, default_value = "0", global = true)]
79        benchmark_type: String,
80
81        /// The AA authenticator to use.
82        #[arg(long, default_value = "ed25519", global = true)]
83        aa_authenticator: AuthenticatorKind,
84
85        /// Whether the AA transactions should fail.
86        #[arg(long, default_value = "false", global = true)]
87        should_fail: bool,
88
89        /// Type of object transaction uses - owned or shared.
90        #[arg(long, default_value = "owned-object", global = true)]
91        tx_payload_obj_type: TxPayloadObjType,
92
93        /// Number of worker tasks inside `stress`.
94        /// Higher -> more concurrency; too high can reduce throughput due to
95        /// contention.
96        #[arg(long, default_value = "2", global = true)]
97        stress_num_workers: u64,
98
99        /// AA workload: split amount inside `stress` for AA (number of coins to
100        /// split before transfer).
101        #[arg(long, default_value = "1000", global = true)]
102        aa_split_amount: u64,
103
104        /// In-flight ratio inside `stress` for AA (roughly,
105        /// allowed outstanding tx per worker). Higher -> more
106        /// outstanding requests; too high can inflate latency and trigger
107        /// backpressure.
108        #[arg(long, default_value = "10", global = true)]
109        stress_in_flight_ratio: u64,
110
111        /// Stress: number of client threads (applies to AA, kept constant for
112        /// bench to preserve behavior).
113        #[arg(long, default_value = "8", global = true)]
114        stress_num_client_threads: u64,
115
116        /// Stress: number of server threads (applies to AA, kept constant for
117        /// bench to preserve behavior).
118        #[arg(long, default_value = "8", global = true)]
119        stress_num_server_threads: u64,
120
121        /// The committee size to deploy.
122        #[arg(long, value_name = "INT")]
123        committee: usize,
124
125        /// Number of faulty nodes.
126        #[arg(long, value_name = "INT", default_value = "0", global = true)]
127        faults: usize,
128
129        /// Whether the faulty nodes recover.
130        #[arg(long, action, default_value = "false", global = true)]
131        crash_recovery: bool,
132
133        /// The interval to crash nodes in seconds.
134        #[arg(long, value_parser = parse_duration, default_value = "60", global = true)]
135        crash_interval: Duration,
136
137        /// The minimum duration of the benchmark in seconds.
138        #[arg(long, value_parser = parse_run_interval)]
139        run_interval: RunInterval,
140
141        /// The interval between measurements collection in seconds.
142        #[arg(long, value_parser = parse_duration, default_value = "15", global = true)]
143        scrape_interval: Duration,
144
145        /// Whether to skip testbed updates before running benchmarks.
146        #[arg(long, action, default_value = "false", global = true)]
147        skip_testbed_update: bool,
148
149        /// Whether to skip testbed configuration before running benchmarks.
150        #[arg(long, action, default_value = "false", global = true)]
151        skip_testbed_configuration: bool,
152
153        /// Whether to download and analyze the client and node log files.
154        #[arg(long, action, default_value = "false", global = true)]
155        log_processing: bool,
156
157        /// The number of instances running exclusively load generators. If set
158        /// to zero the orchestrator collocates one load generator with
159        /// each node.
160        #[arg(long, value_name = "INT", default_value = "0", global = true)]
161        dedicated_clients: usize,
162
163        /// Whether to forgo a grafana and prometheus instance and leave the
164        /// testbed unmonitored.
165        #[arg(long, action, default_value = "false", global = true)]
166        skip_monitoring: bool,
167
168        /// The timeout duration for ssh commands (in seconds).
169        #[arg(long, action, value_parser = parse_duration, default_value = "30", global = true)]
170        timeout: Duration,
171
172        /// The number of times the orchestrator should retry an ssh command.
173        #[arg(long, value_name = "INT", default_value = "5", global = true)]
174        retries: usize,
175
176        /// The load to submit to the system.
177        #[command(subcommand)]
178        load_type: Load,
179
180        /// Flag indicating whether nodes should advertise their internal or
181        /// public IP address for inter-node communication. When running
182        /// the simulation in multiple regions, nodes need to use their public
183        /// IPs to correctly communicate, however when a simulation is
184        /// running in a single VPC, they should use their internal IPs to avoid
185        /// paying for data sent between the nodes.
186        #[clap(long, action, default_value_t = false, global = true)]
187        use_internal_ip_addresses: bool,
188
189        /// Optional Latency Topology. if omitted => None -> skips latency
190        /// matrix generation
191        #[arg(long, global = true)]
192        latency_topology: Option<LatencyTopology>,
193        /// Optional perturbation spec. If omitted => None
194        #[arg(long = "latency-perturbation-spec", global = true)]
195        latency_perturbation_spec: Option<PerturbationSpec>,
196
197        /// How many clusters to use in the latency topology
198        #[arg(long, value_name = "INT", default_value = "10", global = true)]
199        number_of_clusters: usize,
200
201        /// Number-of-triangles parameter for broken-topologies
202        #[arg(long, value_name = "INT", default_value = "5", global = true)]
203        number_of_triangles: u16,
204
205        /// Extra artificial latency when perturbing topo
206        #[arg(long, value_name = "INT", default_value = "20", global = true)]
207        added_latency: u16,
208
209        /// Maximum latency between two nodes/clusters in a private network
210        #[arg(long, value_name = "INT", default_value = "400", global = true)]
211        maximum_latency: u16,
212
213        /// Switch protocols between mysticeti and starfish every epoch,
214        /// default: false, aka use starfish in every epoch.
215        #[arg(long, default_value = "starfish", global = true)]
216        consensus_protocol: ConsensusProtocol,
217
218        /// Optional: Epoch duration in milliseconds, default is 1h
219        #[arg(long, value_name = "INT", global = true)]
220        epoch_duration_ms: Option<u64>,
221
222        /// Maximum pipeline delay
223        #[arg(long, value_name = "INT", default_value = "400", global = true)]
224        max_pipeline_delay: u32,
225
226        /// Number of blocking connections in the blocking
227        /// latency_perturbation_spec
228        #[arg(long, value_name = "INT", default_value = "1", global = true)]
229        blocking_connections: usize,
230
231        /// Use current system time as genesis chain start timestamp instead of
232        /// 0
233        #[arg(long, action, default_value_t = false)]
234        use_current_timestamp_for_genesis: bool,
235
236        /// Shared counter hotness factor (0-100). Higher values concentrate
237        /// load on fewer counters.
238        #[arg(long, value_name = "INT", global = true)]
239        shared_counter_hotness_factor: Option<u8>,
240
241        /// Number of shared counters to use in the benchmark
242        #[arg(long, value_name = "INT", global = true)]
243        num_shared_counters: Option<usize>,
244
245        /// Optional path for the benchmark stats metadata to be downloaded
246        /// after the run
247        #[arg(long, value_name = "/home/ubuntu/benchmark_stats.json", global = true)]
248        benchmark_stats_path: Option<String>,
249    },
250
251    /// Print a summary of the specified measurements collection.
252    Summarize {
253        /// The path to the settings file.
254        #[arg(long, value_name = "FILE")]
255        path: String,
256    },
257}
258
259#[derive(Parser)]
260pub enum TestbedAction {
261    /// Display the testbed status.
262    Status,
263
264    /// Deploy the specified number of instances in all regions specified by in
265    /// the setting file.
266    Deploy {
267        /// Number of instances to deploy.
268        #[arg(long)]
269        instances: usize,
270
271        /// Skips deployment of a Metrics instance
272        #[arg(long, action, default_value = "false", global = true)]
273        skip_monitoring: bool,
274
275        /// The number of instances running exclusively load generators.
276        #[arg(long, value_name = "INT", default_value = "0", global = true)]
277        dedicated_clients: usize,
278
279        /// Attempts to prioritise cheaper spot instances
280        /// Note: stop and start commands are not available for spot instances
281        #[arg(long, action, default_value = "false", global = true)]
282        use_spot_instances: bool,
283
284        /// Id tag added to each deployment, used for cost tracking, should be
285        /// unique for each test run deployment.
286        #[arg(long)]
287        id: String,
288    },
289
290    /// Start at most the specified number of instances per region on an
291    /// existing testbed.
292    Start {
293        /// Number of instances to deploy.
294        #[arg(long, default_value = "200")]
295        instances: usize,
296
297        // Skips deployment of a Metrics instance
298        #[arg(long, action, default_value = "false", global = true)]
299        skip_monitoring: bool,
300
301        /// The number of instances running exclusively load generators.
302        #[arg(long, value_name = "INT", default_value = "0", global = true)]
303        dedicated_clients: usize,
304    },
305
306    /// Stop an existing testbed (without destroying the instances).
307    Stop {
308        /// Keeps the monitoring instance running
309        #[arg(long, action, default_value = "false", global = true)]
310        keep_monitoring: bool,
311    },
312
313    /// Destroy the testbed and terminate all instances.
314    Destroy {
315        /// Keeps the monitoring instance running
316        #[arg(long, action, default_value = "false", global = true)]
317        keep_monitoring: bool,
318
319        /// Force destroy without confirmation prompt
320        #[arg(short = 'f', long, action, default_value = "false", global = true)]
321        force: bool,
322    },
323}
324
325#[derive(Parser)]
326pub enum Load {
327    /// The fixed loads (in tx/s) to submit to the nodes.
328    FixedLoad {
329        /// A list of fixed load (tx/s).
330        #[arg(
331            long,
332            value_name = "INT",
333            num_args(1..),
334            value_delimiter = ','
335        )]
336        loads: Vec<usize>,
337    },
338
339    /// Search for the maximum load that the system can sustainably handle.
340    Search {
341        /// The initial load (in tx/s) to test and use a baseline.
342        #[arg(long, value_name = "INT", default_value = "250")]
343        starting_load: usize,
344        /// The maximum number of iterations before converging on a breaking
345        /// point.
346        #[arg(long, value_name = "INT", default_value = "5")]
347        max_iterations: usize,
348    },
349}
350#[derive(ValueEnum, Clone, Debug)]
351pub enum PerturbationSpec {
352    BrokenTriangle,
353    Blocking,
354    // potentially other options later
355}
356
357#[derive(ValueEnum, Clone, Debug)]
358pub enum LatencyTopology {
359    /// Generates a latency matrix for each node, randomly positioned on a
360    /// cylinder.
361    RandomGeographical,
362    /// Generates a latency matrix by randomly clustering nodes into clusters
363    /// and randomly positioning clusters on a cylinder.
364    RandomClustered,
365    /// Uses a hardcoded 10x10 matrix with 10 equal-sized regions.
366    HardCodedClustered,
367    /// Uses mainnet validator region distribution for latencies.
368    Mainnet,
369}
370
371#[derive(ValueEnum, Clone, Debug, Deserialize, Serialize)]
372pub enum ConsensusProtocol {
373    Starfish,
374    Mysticeti,
375    SwapEachEpoch,
376}
377
378fn parse_duration(arg: &str) -> Result<Duration, std::num::ParseIntError> {
379    let seconds = arg.parse()?;
380    Ok(Duration::from_secs(seconds))
381}
382
383#[tokio::main]
384async fn main() -> Result<()> {
385    color_eyre::install()?;
386    let opts: Opts = Opts::parse();
387
388    // Load the settings files.
389    let settings = Settings::load(&opts.settings_path).wrap_err("Failed to load settings")?;
390
391    match &settings.cloud_provider {
392        CloudProvider::Aws => {
393            // Create the client for the cloud provider.
394            let client = AwsClient::new(settings.clone()).await;
395
396            // Execute the command.
397            run(settings, client, opts).await
398        }
399    }
400}
401
402// Create benchmark_dir and initialize logger
403// Then benchmark_dir would be like results/<commit>/<timestamp>_<operation>
404fn init_benchmark_logger(
405    settings: &Settings,
406    operation: &str,
407) -> std::io::Result<(std::path::PathBuf, crate::logger::SwappableWriter)> {
408    let commit = settings.repository.commit.replace("/", "_");
409
410    let mut timestamp = chrono::Local::now().format("%y%m%d_%H%M%S").to_string();
411    timestamp.push('_');
412    timestamp.push_str(operation);
413
414    let benchmark_dir = settings.results_dir.join(&commit).join(&timestamp);
415    fs::create_dir_all(&benchmark_dir)?;
416
417    let swappable_writer = crate::logger::init_logger(&benchmark_dir)?;
418
419    Ok((benchmark_dir, swappable_writer))
420}
421
422async fn run<C: ServerProviderClient>(settings: Settings, client: C, opts: Opts) -> Result<()> {
423    // Create a new testbed.
424    let mut testbed = Testbed::new(settings.clone(), client)
425        .await
426        .wrap_err("Failed to create testbed")?;
427
428    match opts.operation {
429        Operation::Testbed { action } => match action {
430            // Display the current status of the testbed.
431            TestbedAction::Status => testbed.status(),
432
433            // Deploy the specified number of instances on the testbed.
434            TestbedAction::Deploy {
435                instances,
436                dedicated_clients,
437                skip_monitoring,
438                use_spot_instances,
439                id,
440            } => {
441                let (_benchmark_dir, _writer) = init_benchmark_logger(&settings, "deploy")?;
442
443                testbed
444                    .deploy(
445                        instances,
446                        skip_monitoring,
447                        dedicated_clients,
448                        use_spot_instances,
449                        id,
450                    )
451                    .await
452                    .wrap_err("Failed to deploy testbed")?
453            }
454
455            // Start the specified number of instances on an existing testbed.
456            TestbedAction::Start {
457                instances,
458                skip_monitoring,
459                dedicated_clients,
460            } => testbed
461                .start(instances, dedicated_clients, skip_monitoring)
462                .await
463                .wrap_err("Failed to start testbed")?,
464
465            // Stop an existing testbed.
466            TestbedAction::Stop { keep_monitoring } => testbed
467                .stop(keep_monitoring)
468                .await
469                .wrap_err("Failed to stop testbed")?,
470
471            // Destroy the testbed and terminal all instances.
472            TestbedAction::Destroy {
473                keep_monitoring,
474                force,
475            } => testbed
476                .destroy(keep_monitoring, force)
477                .await
478                .wrap_err("Failed to destroy testbed")?,
479        },
480
481        // Run benchmarks.
482        Operation::Benchmark {
483            benchmark_type,
484            committee,
485            faults,
486            crash_recovery,
487            crash_interval,
488            run_interval,
489            scrape_interval,
490            skip_testbed_update,
491            skip_testbed_configuration,
492            log_processing,
493            dedicated_clients,
494            skip_monitoring,
495            timeout,
496            retries,
497            load_type,
498            use_internal_ip_addresses,
499            latency_perturbation_spec,
500            latency_topology,
501            added_latency,
502            number_of_triangles,
503            number_of_clusters,
504            consensus_protocol,
505            maximum_latency,
506            epoch_duration_ms,
507            blocking_connections,
508            use_current_timestamp_for_genesis,
509            max_pipeline_delay,
510            aa_authenticator,
511            should_fail,
512            tx_payload_obj_type,
513            stress_num_workers,
514            aa_split_amount,
515            stress_in_flight_ratio,
516            stress_num_client_threads,
517            stress_num_server_threads,
518            shared_counter_hotness_factor,
519            num_shared_counters,
520            benchmark_stats_path,
521        } => {
522            // Create a new orchestrator to instruct the testbed.
523            let username = testbed.username();
524            let private_key_file = settings.ssh_private_key_file.clone();
525            let ssh_manager = SshConnectionManager::new(username.into(), private_key_file)
526                .with_timeout(timeout)
527                .with_retries(retries);
528
529            let node_instances = testbed.node_instances();
530            let client_instances = testbed.client_instances();
531            let metrics_instance = testbed.metrics_instance();
532
533            let setup_commands = testbed
534                .setup_commands()
535                .await
536                .wrap_err("Failed to load testbed setup commands")?;
537
538            let protocol_commands = Protocol::new(&settings);
539            let benchmark_type =
540                BenchmarkType::from_str(&benchmark_type).map_err(|e| eyre::eyre!(e))?;
541
542            let load = match load_type {
543                Load::FixedLoad { loads } => {
544                    let loads = if loads.is_empty() { vec![200] } else { loads };
545                    LoadType::Fixed(loads)
546                }
547                Load::Search {
548                    starting_load,
549                    max_iterations,
550                } => LoadType::Search {
551                    starting_load,
552                    max_iterations,
553                },
554            };
555
556            let fault_type = if !crash_recovery || faults == 0 {
557                FaultsType::Permanent { faults }
558            } else {
559                FaultsType::CrashRecovery {
560                    max_faults: faults,
561                    interval: crash_interval,
562                }
563            };
564
565            let perturbation_spec = match latency_perturbation_spec {
566                Some(PerturbationSpec::BrokenTriangle) => {
567                    net_latency::PerturbationSpec::BrokenTriangle {
568                        added_latency,
569                        number_of_triangles,
570                    }
571                }
572                Some(PerturbationSpec::Blocking) => net_latency::PerturbationSpec::Blocking {
573                    number_of_blocked_connections: blocking_connections,
574                },
575                None => net_latency::PerturbationSpec::None,
576            };
577
578            let latency_topology = match latency_topology {
579                Some(LatencyTopology::RandomGeographical) => {
580                    Some(TopologyLayout::RandomGeographical)
581                }
582                Some(LatencyTopology::RandomClustered) => {
583                    Some(TopologyLayout::RandomClustered { number_of_clusters })
584                }
585                Some(LatencyTopology::HardCodedClustered) => {
586                    Some(TopologyLayout::HardCodedClustered)
587                }
588                Some(LatencyTopology::Mainnet) => Some(TopologyLayout::Mainnet),
589                None => None,
590            };
591
592            let (benchmark_dir, writer) = init_benchmark_logger(&settings, "benchmark_run")?;
593
594            let mut generator = BenchmarkParametersGenerator::new(
595                committee,
596                dedicated_clients,
597                load,
598                use_internal_ip_addresses,
599            )
600            .with_benchmark_type(benchmark_type)
601            .with_custom_run_interval(run_interval)
602            .with_perturbation_spec(perturbation_spec)
603            .with_latency_topology(latency_topology)
604            .with_consensus_protocol(consensus_protocol)
605            .with_max_latency(maximum_latency)
606            .with_epoch_duration(epoch_duration_ms)
607            .with_max_pipeline_delay(max_pipeline_delay)
608            .with_current_timestamp_for_genesis(use_current_timestamp_for_genesis)
609            .with_faults(fault_type)
610            .with_aa_authenticator(aa_authenticator)
611            .with_should_fail(should_fail)
612            .with_tx_payload_obj_type(tx_payload_obj_type)
613            .with_stress_num_workers(stress_num_workers)
614            .with_aa_split_amount(aa_split_amount)
615            .with_stress_in_flight_ratio(stress_in_flight_ratio)
616            .with_stress_client_threads(stress_num_client_threads)
617            .with_stress_server_threads(stress_num_server_threads)
618            .with_benchmark_stats_path(benchmark_stats_path.clone());
619
620            if let Some(factor) = shared_counter_hotness_factor {
621                generator = generator.with_shared_counter_hotness_factor(factor);
622            }
623            if let Some(counters) = num_shared_counters {
624                generator = generator.with_num_shared_counters(counters);
625            }
626
627            Orchestrator::new(
628                settings,
629                node_instances,
630                client_instances,
631                metrics_instance,
632                setup_commands,
633                protocol_commands,
634                ssh_manager,
635            )
636            .with_scrape_interval(scrape_interval)
637            .with_crash_interval(crash_interval)
638            .skip_testbed_updates(skip_testbed_update)
639            .skip_testbed_configuration(skip_testbed_configuration)
640            .with_log_processing(log_processing)
641            .with_dedicated_clients(dedicated_clients)
642            .skip_monitoring(skip_monitoring)
643            .with_benchmark_dir_and_writer(benchmark_dir, writer)
644            .run_benchmarks(generator)
645            .await
646            .wrap_err("Failed to run benchmarks")?;
647        }
648
649        // Print a summary of the specified measurements collection.
650        Operation::Summarize { path } => {
651            MeasurementsCollection::<BenchmarkType>::load(path)?.display_summary()
652        }
653    }
654    Ok(())
655}