iota_aws_orchestrator/
main.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{str::FromStr, time::Duration};
6
7use benchmark::{BenchmarkParametersGenerator, LoadType};
8use clap::{Parser, ValueEnum};
9use client::{ServerProviderClient, aws::AwsClient};
10use eyre::{Context, Result};
11use faults::FaultsType;
12use measurement::MeasurementsCollection;
13use orchestrator::Orchestrator;
14use protocol::iota::{IotaBenchmarkType, IotaProtocol};
15use serde::{Deserialize, Serialize};
16use settings::{CloudProvider, Settings};
17use ssh::SshConnectionManager;
18use testbed::Testbed;
19
20use crate::net_latency::TopologyLayout;
21
22pub mod benchmark;
23pub mod build_cache;
24pub mod client;
25pub mod display;
26pub mod error;
27pub mod faults;
28pub mod logger;
29pub mod logs;
30pub mod measurement;
31mod monitor;
32pub mod net_latency;
33pub mod orchestrator;
34pub mod protocol;
35pub mod settings;
36pub mod ssh;
37pub mod testbed;
38
39type Protocol = IotaProtocol;
40type BenchmarkType = IotaBenchmarkType;
41
42#[derive(Parser)]
43#[command(author, version, about = "Testbed orchestrator", long_about = None)]
44pub struct Opts {
45    /// The path to the settings file. This file contains basic information to
46    /// deploy testbeds and run benchmarks such as the url of the git repo,
47    /// the commit to deploy, etc.
48    #[arg(
49        long,
50        value_name = "FILE",
51        default_value = "crates/iota-aws-orchestrator/assets/settings.json",
52        global = true
53    )]
54    settings_path: String,
55
56    /// The type of operation to run.
57    #[command(subcommand)]
58    operation: Operation,
59}
60
61#[derive(Parser)]
62pub enum Operation {
63    /// Get or modify the status of the testbed.
64    Testbed {
65        #[command(subcommand)]
66        action: TestbedAction,
67    },
68
69    /// Run a benchmark on the specified testbed.
70    Benchmark {
71        /// Percentage of shared vs owned objects; 0 means only owned objects
72        /// and 100 means only shared objects.
73        #[arg(long, default_value = "0", global = true)]
74        benchmark_type: String,
75
76        /// The committee size to deploy.
77        #[arg(long, value_name = "INT")]
78        committee: usize,
79
80        /// Number of faulty nodes.
81        #[arg(long, value_name = "INT", default_value = "0", global = true)]
82        faults: usize,
83
84        /// Whether the faulty nodes recover.
85        #[arg(long, action, default_value = "false", global = true)]
86        crash_recovery: bool,
87
88        /// The interval to crash nodes in seconds.
89        #[arg(long, value_parser = parse_duration, default_value = "60", global = true)]
90        crash_interval: Duration,
91
92        /// The minimum duration of the benchmark in seconds.
93        #[arg(long, value_parser = parse_duration, default_value = "600", global = true)]
94        duration: Duration,
95
96        /// The interval between measurements collection in seconds.
97        #[arg(long, value_parser = parse_duration, default_value = "15", global = true)]
98        scrape_interval: Duration,
99
100        /// Whether to skip testbed updates before running benchmarks.
101        #[arg(long, action, default_value = "false", global = true)]
102        skip_testbed_update: bool,
103
104        /// Whether to skip testbed configuration before running benchmarks.
105        #[arg(long, action, default_value = "false", global = true)]
106        skip_testbed_configuration: bool,
107
108        /// Whether to download and analyze the client and node log files.
109        #[arg(long, action, default_value = "false", global = true)]
110        log_processing: bool,
111
112        /// The number of instances running exclusively load generators. If set
113        /// to zero the orchestrator collocates one load generator with
114        /// each node.
115        #[arg(long, value_name = "INT", default_value = "0", global = true)]
116        dedicated_clients: usize,
117
118        /// Whether to forgo a grafana and prometheus instance and leave the
119        /// testbed unmonitored.
120        #[arg(long, action, default_value = "false", global = true)]
121        skip_monitoring: bool,
122
123        /// The timeout duration for ssh commands (in seconds).
124        #[arg(long, action, value_parser = parse_duration, default_value = "30", global = true)]
125        timeout: Duration,
126
127        /// The number of times the orchestrator should retry an ssh command.
128        #[arg(long, value_name = "INT", default_value = "5", global = true)]
129        retries: usize,
130
131        /// The load to submit to the system.
132        #[command(subcommand)]
133        load_type: Load,
134
135        /// Flag indicating whether nodes should advertise their internal or
136        /// public IP address for inter-node communication. When running
137        /// the simulation in multiple regions, nodes need to use their public
138        /// IPs to correctly communicate, however when a simulation is
139        /// running in a single VPC, they should use their internal IPs to avoid
140        /// paying for data sent between the nodes.
141        #[clap(long, action, default_value_t = false, global = true)]
142        use_internal_ip_addresses: bool,
143
144        /// Optional Latency Topology. if omitted => None -> skips latency
145        /// matrix generation
146        #[arg(long, global = true)]
147        latency_topology: Option<LatencyTopology>,
148        /// Optional perturbation spec. If omitted => None
149        #[arg(long = "latency-perturbation-spec", global = true)]
150        latency_perturbation_spec: Option<PerturbationSpec>,
151
152        /// How many clusters to use in the latency topology
153        #[arg(long, value_name = "INT", default_value = "10", global = true)]
154        number_of_clusters: usize,
155
156        /// Number-of-triangles parameter for broken-topologies
157        #[arg(long, value_name = "INT", default_value = "5", global = true)]
158        number_of_triangles: u16,
159
160        /// Extra artificial latency when perturbing topo
161        #[arg(long, value_name = "INT", default_value = "20", global = true)]
162        added_latency: u16,
163
164        /// Maximum latency between two nodes/clusters in a private network
165        #[arg(long, value_name = "INT", default_value = "400", global = true)]
166        maximum_latency: u16,
167
168        /// Switch protocols between mysticeti and starfish every epoch,
169        /// default: false, aka use starfish in every epoch.
170        #[arg(long, default_value = "starfish", global = true)]
171        consensus_protocol: ConsensusProtocol,
172
173        /// Optional: Epoch duration in milliseconds, default is 1h
174        #[arg(long, value_name = "INT", global = true)]
175        epoch_duration_ms: Option<u64>,
176
177        /// Maximum pipeline delay
178        #[arg(long, value_name = "INT", default_value = "400", global = true)]
179        max_pipeline_delay: u32,
180
181        /// Number of blocking connections in the blocking
182        /// latency_perturbation_spec
183        #[arg(long, value_name = "INT", default_value = "1", global = true)]
184        blocking_connections: usize,
185
186        /// Use current system time as genesis chain start timestamp instead of
187        /// 0
188        #[arg(long, action, default_value_t = false)]
189        use_current_timestamp_for_genesis: bool,
190    },
191
192    /// Print a summary of the specified measurements collection.
193    Summarize {
194        /// The path to the settings file.
195        #[arg(long, value_name = "FILE")]
196        path: String,
197    },
198}
199
200#[derive(Parser)]
201pub enum TestbedAction {
202    /// Display the testbed status.
203    Status,
204
205    /// Deploy the specified number of instances in all regions specified by in
206    /// the setting file.
207    Deploy {
208        /// Number of instances to deploy.
209        #[arg(long)]
210        instances: usize,
211
212        /// Skips deployment of a Metrics instance
213        #[arg(long, action, default_value = "false", global = true)]
214        skip_monitoring: bool,
215
216        /// The number of instances running exclusively load generators.
217        #[arg(long, value_name = "INT", default_value = "0", global = true)]
218        dedicated_clients: usize,
219
220        /// Attempts to prioritise cheaper spot instances
221        /// Note: stop and start commands are not available for spot instances
222        #[arg(long, action, default_value = "false", global = true)]
223        use_spot_instances: bool,
224
225        /// Id tag added to each deployment, used for cost tracking, should be
226        /// unique for each test run deployment.
227        #[arg(long)]
228        id: String,
229    },
230
231    /// Start at most the specified number of instances per region on an
232    /// existing testbed.
233    Start {
234        /// Number of instances to deploy.
235        #[arg(long, default_value = "200")]
236        instances: usize,
237
238        // Skips deployment of a Metrics instance
239        #[arg(long, action, default_value = "false", global = true)]
240        skip_monitoring: bool,
241
242        /// The number of instances running exclusively load generators.
243        #[arg(long, value_name = "INT", default_value = "0", global = true)]
244        dedicated_clients: usize,
245    },
246
247    /// Stop an existing testbed (without destroying the instances).
248    Stop {
249        /// Keeps the monitoring instance running
250        #[arg(long, action, default_value = "false", global = true)]
251        keep_monitoring: bool,
252    },
253
254    /// Destroy the testbed and terminate all instances.
255    Destroy {
256        /// Keeps the monitoring instance running
257        #[arg(long, action, default_value = "false", global = true)]
258        keep_monitoring: bool,
259    },
260}
261
262#[derive(Parser)]
263pub enum Load {
264    /// The fixed loads (in tx/s) to submit to the nodes.
265    FixedLoad {
266        /// A list of fixed load (tx/s).
267        #[arg(
268            long,
269            value_name = "INT",
270            num_args(1..),
271            value_delimiter = ','
272        )]
273        loads: Vec<usize>,
274    },
275
276    /// Search for the maximum load that the system can sustainably handle.
277    Search {
278        /// The initial load (in tx/s) to test and use a baseline.
279        #[arg(long, value_name = "INT", default_value = "250")]
280        starting_load: usize,
281        /// The maximum number of iterations before converging on a breaking
282        /// point.
283        #[arg(long, value_name = "INT", default_value = "5")]
284        max_iterations: usize,
285    },
286}
287#[derive(ValueEnum, Clone, Debug)]
288pub enum PerturbationSpec {
289    BrokenTriangle,
290    Blocking,
291    // potentially other options later
292}
293
294#[derive(ValueEnum, Clone, Debug)]
295pub enum LatencyTopology {
296    /// Generates a latency matrix for each node, randomly positioned on a
297    /// cylinder.
298    RandomGeographical,
299    /// Generates a latency matrix by randomly clustering nodes into clusters
300    /// and randomly positioning clusters on a cylinder.
301    RandomClustered,
302    /// Uses a hardcoded 10x10 matrix with 10 equal-sized regions.
303    HardCodedClustered,
304    /// Uses mainnet validator region distribution for latencies.
305    Mainnet,
306}
307
308#[derive(ValueEnum, Clone, Debug, Deserialize, Serialize)]
309pub enum ConsensusProtocol {
310    Starfish,
311    Mysticeti,
312    SwapEachEpoch,
313}
314
315fn parse_duration(arg: &str) -> Result<Duration, std::num::ParseIntError> {
316    let seconds = arg.parse()?;
317    Ok(Duration::from_secs(seconds))
318}
319
320#[tokio::main]
321async fn main() -> Result<()> {
322    color_eyre::install()?;
323    let opts: Opts = Opts::parse();
324
325    // Load the settings files.
326    let settings = Settings::load(&opts.settings_path).wrap_err("Failed to load settings")?;
327
328    match &settings.cloud_provider {
329        CloudProvider::Aws => {
330            // Create the client for the cloud provider.
331            let client = AwsClient::new(settings.clone()).await;
332
333            // Execute the command.
334            run(settings, client, opts).await
335        }
336    }
337}
338
339async fn run<C: ServerProviderClient>(settings: Settings, client: C, opts: Opts) -> Result<()> {
340    // Create a new testbed.
341    let mut testbed = Testbed::new(settings.clone(), client)
342        .await
343        .wrap_err("Failed to create testbed")?;
344
345    match opts.operation {
346        Operation::Testbed { action } => match action {
347            // Display the current status of the testbed.
348            TestbedAction::Status => testbed.status(),
349
350            // Deploy the specified number of instances on the testbed.
351            TestbedAction::Deploy {
352                instances,
353                dedicated_clients,
354                skip_monitoring,
355                use_spot_instances,
356                id,
357            } => testbed
358                .deploy(
359                    instances,
360                    skip_monitoring,
361                    dedicated_clients,
362                    use_spot_instances,
363                    id,
364                )
365                .await
366                .wrap_err("Failed to deploy testbed")?,
367
368            // Start the specified number of instances on an existing testbed.
369            TestbedAction::Start {
370                instances,
371                skip_monitoring,
372                dedicated_clients,
373            } => testbed
374                .start(instances, dedicated_clients, skip_monitoring)
375                .await
376                .wrap_err("Failed to start testbed")?,
377
378            // Stop an existing testbed.
379            TestbedAction::Stop { keep_monitoring } => testbed
380                .stop(keep_monitoring)
381                .await
382                .wrap_err("Failed to stop testbed")?,
383
384            // Destroy the testbed and terminal all instances.
385            TestbedAction::Destroy { keep_monitoring } => testbed
386                .destroy(keep_monitoring)
387                .await
388                .wrap_err("Failed to destroy testbed")?,
389        },
390
391        // Run benchmarks.
392        Operation::Benchmark {
393            benchmark_type,
394            committee,
395            faults,
396            crash_recovery,
397            crash_interval,
398            duration,
399            scrape_interval,
400            skip_testbed_update,
401            skip_testbed_configuration,
402            log_processing,
403            dedicated_clients,
404            skip_monitoring,
405            timeout,
406            retries,
407            load_type,
408            use_internal_ip_addresses,
409            latency_perturbation_spec,
410            latency_topology,
411            added_latency,
412            number_of_triangles,
413            number_of_clusters,
414            consensus_protocol,
415            maximum_latency,
416            epoch_duration_ms,
417            blocking_connections,
418            use_current_timestamp_for_genesis,
419            max_pipeline_delay,
420        } => {
421            // Create a new orchestrator to instruct the testbed.
422            let username = testbed.username();
423            let private_key_file = settings.ssh_private_key_file.clone();
424            let ssh_manager = SshConnectionManager::new(username.into(), private_key_file)
425                .with_timeout(timeout)
426                .with_retries(retries);
427
428            let node_instances = testbed.node_instances();
429            let client_instances = testbed.client_instances();
430            let metrics_instance = testbed.metrics_instance();
431
432            let setup_commands = testbed
433                .setup_commands()
434                .await
435                .wrap_err("Failed to load testbed setup commands")?;
436
437            let protocol_commands = Protocol::new(&settings);
438            let benchmark_type = BenchmarkType::from_str(&benchmark_type)?;
439
440            let load = match load_type {
441                Load::FixedLoad { loads } => {
442                    let loads = if loads.is_empty() { vec![200] } else { loads };
443                    LoadType::Fixed(loads)
444                }
445                Load::Search {
446                    starting_load,
447                    max_iterations,
448                } => LoadType::Search {
449                    starting_load,
450                    max_iterations,
451                },
452            };
453
454            let fault_type = if !crash_recovery || faults == 0 {
455                FaultsType::Permanent { faults }
456            } else {
457                FaultsType::CrashRecovery {
458                    max_faults: faults,
459                    interval: crash_interval,
460                }
461            };
462
463            let perturbation_spec = match latency_perturbation_spec {
464                Some(PerturbationSpec::BrokenTriangle) => {
465                    net_latency::PerturbationSpec::BrokenTriangle {
466                        added_latency,
467                        number_of_triangles,
468                    }
469                }
470                Some(PerturbationSpec::Blocking) => net_latency::PerturbationSpec::Blocking {
471                    number_of_blocked_connections: blocking_connections,
472                },
473                None => net_latency::PerturbationSpec::None,
474            };
475
476            let latency_topology = match latency_topology {
477                Some(LatencyTopology::RandomGeographical) => {
478                    Some(TopologyLayout::RandomGeographical)
479                }
480                Some(LatencyTopology::RandomClustered) => {
481                    Some(TopologyLayout::RandomClustered { number_of_clusters })
482                }
483                Some(LatencyTopology::HardCodedClustered) => {
484                    Some(TopologyLayout::HardCodedClustered)
485                }
486                Some(LatencyTopology::Mainnet) => Some(TopologyLayout::Mainnet),
487                None => None,
488            };
489
490            let generator = BenchmarkParametersGenerator::new(
491                committee,
492                dedicated_clients,
493                load,
494                use_internal_ip_addresses,
495            )
496            .with_benchmark_type(benchmark_type)
497            .with_custom_duration(duration)
498            .with_perturbation_spec(perturbation_spec)
499            .with_latency_topology(latency_topology)
500            .with_consensus_protocol(consensus_protocol)
501            .with_max_latency(maximum_latency)
502            .with_epoch_duration(epoch_duration_ms)
503            .with_max_pipeline_delay(max_pipeline_delay)
504            .with_current_timestamp_for_genesis(use_current_timestamp_for_genesis)
505            .with_faults(fault_type);
506
507            Orchestrator::new(
508                settings,
509                node_instances,
510                client_instances,
511                metrics_instance,
512                setup_commands,
513                protocol_commands,
514                ssh_manager,
515            )
516            .with_scrape_interval(scrape_interval)
517            .with_crash_interval(crash_interval)
518            .skip_testbed_updates(skip_testbed_update)
519            .skip_testbed_configuration(skip_testbed_configuration)
520            .with_log_processing(log_processing)
521            .with_dedicated_clients(dedicated_clients)
522            .skip_monitoring(skip_monitoring)
523            .run_benchmarks(generator)
524            .await
525            .wrap_err("Failed to run benchmarks")?;
526        }
527
528        // Print a summary of the specified measurements collection.
529        Operation::Summarize { path } => {
530            MeasurementsCollection::<BenchmarkType>::load(path)?.display_summary()
531        }
532    }
533    Ok(())
534}