iota_aws_orchestrator/
main.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{str::FromStr, time::Duration};
6
7use benchmark::{BenchmarkParametersGenerator, LoadType};
8use clap::{Parser, ValueEnum};
9use client::{ServerProviderClient, aws::AwsClient};
10use eyre::{Context, Result};
11use faults::FaultsType;
12use measurement::MeasurementsCollection;
13use orchestrator::Orchestrator;
14use protocol::iota::{IotaBenchmarkType, IotaProtocol};
15use serde::{Deserialize, Serialize};
16use settings::{CloudProvider, Settings};
17use ssh::SshConnectionManager;
18use testbed::Testbed;
19
20use crate::net_latency::TopologyLayout;
21
22pub mod benchmark;
23pub mod build_cache;
24pub mod client;
25pub mod display;
26pub mod error;
27pub mod faults;
28pub mod logger;
29pub mod logs;
30pub mod measurement;
31mod monitor;
32pub mod net_latency;
33pub mod orchestrator;
34pub mod protocol;
35pub mod settings;
36pub mod ssh;
37pub mod testbed;
38
39type Protocol = IotaProtocol;
40type BenchmarkType = IotaBenchmarkType;
41
42#[derive(Parser)]
43#[command(author, version, about = "Testbed orchestrator", long_about = None)]
44pub struct Opts {
45    /// The path to the settings file. This file contains basic information to
46    /// deploy testbeds and run benchmarks such as the url of the git repo,
47    /// the commit to deploy, etc.
48    #[arg(
49        long,
50        value_name = "FILE",
51        default_value = "crates/iota-aws-orchestrator/assets/settings.json",
52        global = true
53    )]
54    settings_path: String,
55
56    /// The type of operation to run.
57    #[command(subcommand)]
58    operation: Operation,
59}
60
61#[derive(Parser)]
62pub enum Operation {
63    /// Get or modify the status of the testbed.
64    Testbed {
65        #[command(subcommand)]
66        action: TestbedAction,
67    },
68
69    /// Run a benchmark on the specified testbed.
70    Benchmark {
71        /// Percentage of shared vs owned objects; 0 means only owned objects
72        /// and 100 means only shared objects.
73        #[arg(long, default_value = "0", global = true)]
74        benchmark_type: String,
75
76        /// The committee size to deploy.
77        #[arg(long, value_name = "INT")]
78        committee: usize,
79
80        /// Number of faulty nodes.
81        #[arg(long, value_name = "INT", default_value = "0", global = true)]
82        faults: usize,
83
84        /// Whether the faulty nodes recover.
85        #[arg(long, action, default_value = "false", global = true)]
86        crash_recovery: bool,
87
88        /// The interval to crash nodes in seconds.
89        #[arg(long, value_parser = parse_duration, default_value = "60", global = true)]
90        crash_interval: Duration,
91
92        /// The minimum duration of the benchmark in seconds.
93        #[arg(long, value_parser = parse_duration, default_value = "600", global = true)]
94        duration: Duration,
95
96        /// The interval between measurements collection in seconds.
97        #[arg(long, value_parser = parse_duration, default_value = "15", global = true)]
98        scrape_interval: Duration,
99
100        /// Whether to skip testbed updates before running benchmarks.
101        #[arg(long, action, default_value = "false", global = true)]
102        skip_testbed_update: bool,
103
104        /// Whether to skip testbed configuration before running benchmarks.
105        #[arg(long, action, default_value = "false", global = true)]
106        skip_testbed_configuration: bool,
107
108        /// Whether to download and analyze the client and node log files.
109        #[arg(long, action, default_value = "false", global = true)]
110        log_processing: bool,
111
112        /// The number of instances running exclusively load generators. If set
113        /// to zero the orchestrator collocates one load generator with
114        /// each node.
115        #[arg(long, value_name = "INT", default_value = "0", global = true)]
116        dedicated_clients: usize,
117
118        /// Whether to forgo a grafana and prometheus instance and leave the
119        /// testbed unmonitored.
120        #[arg(long, action, default_value = "false", global = true)]
121        skip_monitoring: bool,
122
123        /// The timeout duration for ssh commands (in seconds).
124        #[arg(long, action, value_parser = parse_duration, default_value = "30", global = true)]
125        timeout: Duration,
126
127        /// The number of times the orchestrator should retry an ssh command.
128        #[arg(long, value_name = "INT", default_value = "5", global = true)]
129        retries: usize,
130
131        /// The load to submit to the system.
132        #[command(subcommand)]
133        load_type: Load,
134
135        /// Flag indicating whether nodes should advertise their internal or
136        /// public IP address for inter-node communication. When running
137        /// the simulation in multiple regions, nodes need to use their public
138        /// IPs to correctly communicate, however when a simulation is
139        /// running in a single VPC, they should use their internal IPs to avoid
140        /// paying for data sent between the nodes.
141        #[clap(long, action, default_value_t = false, global = true)]
142        use_internal_ip_addresses: bool,
143
144        /// Optional Latency Topology. if omitted => None -> skips latency
145        /// matrix generation
146        #[arg(long, global = true)]
147        latency_topology: Option<LatencyTopology>,
148        /// Optional perturbation spec. If omitted => None
149        #[arg(long = "latency-perturbation-spec", global = true)]
150        latency_perturbation_spec: Option<PerturbationSpec>,
151
152        /// How many clusters to use in the latency topology
153        #[arg(long, value_name = "INT", default_value = "10", global = true)]
154        number_of_clusters: usize,
155
156        /// Number-of-triangles parameter for broken-topologies
157        #[arg(long, value_name = "INT", default_value = "5", global = true)]
158        number_of_triangles: u16,
159
160        /// Extra artificial latency when perturbing topo
161        #[arg(long, value_name = "INT", default_value = "20", global = true)]
162        added_latency: u16,
163
164        /// Maximum latency between two nodes/clusters in a private network
165        #[arg(long, value_name = "INT", default_value = "400", global = true)]
166        maximum_latency: u16,
167
168        /// Switch protocols between mysticeti and starfish every epoch,
169        /// default: false, aka use starfish in every epoch.
170        #[arg(long, default_value = "starfish", global = true)]
171        consensus_protocol: ConsensusProtocol,
172
173        /// Optional: Epoch duration in milliseconds, default is 1h
174        #[arg(long, value_name = "INT", global = true)]
175        epoch_duration_ms: Option<u64>,
176
177        /// Maximum pipeline delay
178        #[arg(long, value_name = "INT", default_value = "400", global = true)]
179        max_pipeline_delay: u32,
180
181        /// Number of blocking connections in the blocking
182        /// latency_perturbation_spec
183        #[arg(long, value_name = "INT", default_value = "1", global = true)]
184        blocking_connections: usize,
185
186        /// Use current system time as genesis chain start timestamp instead of
187        /// 0
188        #[arg(long, action, default_value_t = false)]
189        use_current_timestamp_for_genesis: bool,
190
191        /// Shared counter hotness factor (0-100). Higher values concentrate
192        /// load on fewer counters.
193        #[arg(long, value_name = "INT", global = true)]
194        shared_counter_hotness_factor: Option<u8>,
195
196        /// Number of shared counters to use in the benchmark
197        #[arg(long, value_name = "INT", global = true)]
198        num_shared_counters: Option<usize>,
199    },
200
201    /// Print a summary of the specified measurements collection.
202    Summarize {
203        /// The path to the settings file.
204        #[arg(long, value_name = "FILE")]
205        path: String,
206    },
207}
208
209#[derive(Parser)]
210pub enum TestbedAction {
211    /// Display the testbed status.
212    Status,
213
214    /// Deploy the specified number of instances in all regions specified by in
215    /// the setting file.
216    Deploy {
217        /// Number of instances to deploy.
218        #[arg(long)]
219        instances: usize,
220
221        /// Skips deployment of a Metrics instance
222        #[arg(long, action, default_value = "false", global = true)]
223        skip_monitoring: bool,
224
225        /// The number of instances running exclusively load generators.
226        #[arg(long, value_name = "INT", default_value = "0", global = true)]
227        dedicated_clients: usize,
228
229        /// Attempts to prioritise cheaper spot instances
230        /// Note: stop and start commands are not available for spot instances
231        #[arg(long, action, default_value = "false", global = true)]
232        use_spot_instances: bool,
233
234        /// Id tag added to each deployment, used for cost tracking, should be
235        /// unique for each test run deployment.
236        #[arg(long)]
237        id: String,
238    },
239
240    /// Start at most the specified number of instances per region on an
241    /// existing testbed.
242    Start {
243        /// Number of instances to deploy.
244        #[arg(long, default_value = "200")]
245        instances: usize,
246
247        // Skips deployment of a Metrics instance
248        #[arg(long, action, default_value = "false", global = true)]
249        skip_monitoring: bool,
250
251        /// The number of instances running exclusively load generators.
252        #[arg(long, value_name = "INT", default_value = "0", global = true)]
253        dedicated_clients: usize,
254    },
255
256    /// Stop an existing testbed (without destroying the instances).
257    Stop {
258        /// Keeps the monitoring instance running
259        #[arg(long, action, default_value = "false", global = true)]
260        keep_monitoring: bool,
261    },
262
263    /// Destroy the testbed and terminate all instances.
264    Destroy {
265        /// Keeps the monitoring instance running
266        #[arg(long, action, default_value = "false", global = true)]
267        keep_monitoring: bool,
268
269        /// Force destroy without confirmation prompt
270        #[arg(short = 'f', long, action, default_value = "false", global = true)]
271        force: bool,
272    },
273}
274
275#[derive(Parser)]
276pub enum Load {
277    /// The fixed loads (in tx/s) to submit to the nodes.
278    FixedLoad {
279        /// A list of fixed load (tx/s).
280        #[arg(
281            long,
282            value_name = "INT",
283            num_args(1..),
284            value_delimiter = ','
285        )]
286        loads: Vec<usize>,
287    },
288
289    /// Search for the maximum load that the system can sustainably handle.
290    Search {
291        /// The initial load (in tx/s) to test and use a baseline.
292        #[arg(long, value_name = "INT", default_value = "250")]
293        starting_load: usize,
294        /// The maximum number of iterations before converging on a breaking
295        /// point.
296        #[arg(long, value_name = "INT", default_value = "5")]
297        max_iterations: usize,
298    },
299}
300#[derive(ValueEnum, Clone, Debug)]
301pub enum PerturbationSpec {
302    BrokenTriangle,
303    Blocking,
304    // potentially other options later
305}
306
307#[derive(ValueEnum, Clone, Debug)]
308pub enum LatencyTopology {
309    /// Generates a latency matrix for each node, randomly positioned on a
310    /// cylinder.
311    RandomGeographical,
312    /// Generates a latency matrix by randomly clustering nodes into clusters
313    /// and randomly positioning clusters on a cylinder.
314    RandomClustered,
315    /// Uses a hardcoded 10x10 matrix with 10 equal-sized regions.
316    HardCodedClustered,
317    /// Uses mainnet validator region distribution for latencies.
318    Mainnet,
319}
320
321#[derive(ValueEnum, Clone, Debug, Deserialize, Serialize)]
322pub enum ConsensusProtocol {
323    Starfish,
324    Mysticeti,
325    SwapEachEpoch,
326}
327
328fn parse_duration(arg: &str) -> Result<Duration, std::num::ParseIntError> {
329    let seconds = arg.parse()?;
330    Ok(Duration::from_secs(seconds))
331}
332
333#[tokio::main]
334async fn main() -> Result<()> {
335    color_eyre::install()?;
336    let opts: Opts = Opts::parse();
337
338    // Load the settings files.
339    let settings = Settings::load(&opts.settings_path).wrap_err("Failed to load settings")?;
340
341    match &settings.cloud_provider {
342        CloudProvider::Aws => {
343            // Create the client for the cloud provider.
344            let client = AwsClient::new(settings.clone()).await;
345
346            // Execute the command.
347            run(settings, client, opts).await
348        }
349    }
350}
351
352async fn run<C: ServerProviderClient>(settings: Settings, client: C, opts: Opts) -> Result<()> {
353    // Create a new testbed.
354    let mut testbed = Testbed::new(settings.clone(), client)
355        .await
356        .wrap_err("Failed to create testbed")?;
357
358    match opts.operation {
359        Operation::Testbed { action } => match action {
360            // Display the current status of the testbed.
361            TestbedAction::Status => testbed.status(),
362
363            // Deploy the specified number of instances on the testbed.
364            TestbedAction::Deploy {
365                instances,
366                dedicated_clients,
367                skip_monitoring,
368                use_spot_instances,
369                id,
370            } => testbed
371                .deploy(
372                    instances,
373                    skip_monitoring,
374                    dedicated_clients,
375                    use_spot_instances,
376                    id,
377                )
378                .await
379                .wrap_err("Failed to deploy testbed")?,
380
381            // Start the specified number of instances on an existing testbed.
382            TestbedAction::Start {
383                instances,
384                skip_monitoring,
385                dedicated_clients,
386            } => testbed
387                .start(instances, dedicated_clients, skip_monitoring)
388                .await
389                .wrap_err("Failed to start testbed")?,
390
391            // Stop an existing testbed.
392            TestbedAction::Stop { keep_monitoring } => testbed
393                .stop(keep_monitoring)
394                .await
395                .wrap_err("Failed to stop testbed")?,
396
397            // Destroy the testbed and terminal all instances.
398            TestbedAction::Destroy {
399                keep_monitoring,
400                force,
401            } => testbed
402                .destroy(keep_monitoring, force)
403                .await
404                .wrap_err("Failed to destroy testbed")?,
405        },
406
407        // Run benchmarks.
408        Operation::Benchmark {
409            benchmark_type,
410            committee,
411            faults,
412            crash_recovery,
413            crash_interval,
414            duration,
415            scrape_interval,
416            skip_testbed_update,
417            skip_testbed_configuration,
418            log_processing,
419            dedicated_clients,
420            skip_monitoring,
421            timeout,
422            retries,
423            load_type,
424            use_internal_ip_addresses,
425            latency_perturbation_spec,
426            latency_topology,
427            added_latency,
428            number_of_triangles,
429            number_of_clusters,
430            consensus_protocol,
431            maximum_latency,
432            epoch_duration_ms,
433            blocking_connections,
434            use_current_timestamp_for_genesis,
435            max_pipeline_delay,
436            shared_counter_hotness_factor,
437            num_shared_counters,
438        } => {
439            // Create a new orchestrator to instruct the testbed.
440            let username = testbed.username();
441            let private_key_file = settings.ssh_private_key_file.clone();
442            let ssh_manager = SshConnectionManager::new(username.into(), private_key_file)
443                .with_timeout(timeout)
444                .with_retries(retries);
445
446            let node_instances = testbed.node_instances();
447            let client_instances = testbed.client_instances();
448            let metrics_instance = testbed.metrics_instance();
449
450            let setup_commands = testbed
451                .setup_commands()
452                .await
453                .wrap_err("Failed to load testbed setup commands")?;
454
455            let protocol_commands = Protocol::new(&settings);
456            let benchmark_type = BenchmarkType::from_str(&benchmark_type)?;
457
458            let load = match load_type {
459                Load::FixedLoad { loads } => {
460                    let loads = if loads.is_empty() { vec![200] } else { loads };
461                    LoadType::Fixed(loads)
462                }
463                Load::Search {
464                    starting_load,
465                    max_iterations,
466                } => LoadType::Search {
467                    starting_load,
468                    max_iterations,
469                },
470            };
471
472            let fault_type = if !crash_recovery || faults == 0 {
473                FaultsType::Permanent { faults }
474            } else {
475                FaultsType::CrashRecovery {
476                    max_faults: faults,
477                    interval: crash_interval,
478                }
479            };
480
481            let perturbation_spec = match latency_perturbation_spec {
482                Some(PerturbationSpec::BrokenTriangle) => {
483                    net_latency::PerturbationSpec::BrokenTriangle {
484                        added_latency,
485                        number_of_triangles,
486                    }
487                }
488                Some(PerturbationSpec::Blocking) => net_latency::PerturbationSpec::Blocking {
489                    number_of_blocked_connections: blocking_connections,
490                },
491                None => net_latency::PerturbationSpec::None,
492            };
493
494            let latency_topology = match latency_topology {
495                Some(LatencyTopology::RandomGeographical) => {
496                    Some(TopologyLayout::RandomGeographical)
497                }
498                Some(LatencyTopology::RandomClustered) => {
499                    Some(TopologyLayout::RandomClustered { number_of_clusters })
500                }
501                Some(LatencyTopology::HardCodedClustered) => {
502                    Some(TopologyLayout::HardCodedClustered)
503                }
504                Some(LatencyTopology::Mainnet) => Some(TopologyLayout::Mainnet),
505                None => None,
506            };
507
508            let mut generator = BenchmarkParametersGenerator::new(
509                committee,
510                dedicated_clients,
511                load,
512                use_internal_ip_addresses,
513            )
514            .with_benchmark_type(benchmark_type)
515            .with_custom_duration(duration)
516            .with_perturbation_spec(perturbation_spec)
517            .with_latency_topology(latency_topology)
518            .with_consensus_protocol(consensus_protocol)
519            .with_max_latency(maximum_latency)
520            .with_epoch_duration(epoch_duration_ms)
521            .with_max_pipeline_delay(max_pipeline_delay)
522            .with_current_timestamp_for_genesis(use_current_timestamp_for_genesis)
523            .with_faults(fault_type);
524
525            if let Some(factor) = shared_counter_hotness_factor {
526                generator = generator.with_shared_counter_hotness_factor(factor);
527            }
528            if let Some(counters) = num_shared_counters {
529                generator = generator.with_num_shared_counters(counters);
530            }
531
532            Orchestrator::new(
533                settings,
534                node_instances,
535                client_instances,
536                metrics_instance,
537                setup_commands,
538                protocol_commands,
539                ssh_manager,
540            )
541            .with_scrape_interval(scrape_interval)
542            .with_crash_interval(crash_interval)
543            .skip_testbed_updates(skip_testbed_update)
544            .skip_testbed_configuration(skip_testbed_configuration)
545            .with_log_processing(log_processing)
546            .with_dedicated_clients(dedicated_clients)
547            .skip_monitoring(skip_monitoring)
548            .run_benchmarks(generator)
549            .await
550            .wrap_err("Failed to run benchmarks")?;
551        }
552
553        // Print a summary of the specified measurements collection.
554        Operation::Summarize { path } => {
555            MeasurementsCollection::<BenchmarkType>::load(path)?.display_summary()
556        }
557    }
558    Ok(())
559}