iota_aws_orchestrator/
main.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{str::FromStr, time::Duration};
6
7use benchmark::{BenchmarkParametersGenerator, LoadType};
8use clap::Parser;
9use client::{ServerProviderClient, aws::AwsClient};
10use eyre::{Context, Result};
11use faults::FaultsType;
12use measurement::MeasurementsCollection;
13use orchestrator::Orchestrator;
14use protocol::iota::{IotaBenchmarkType, IotaProtocol};
15use settings::{CloudProvider, Settings};
16use ssh::SshConnectionManager;
17use testbed::Testbed;
18
19pub mod benchmark;
20pub mod client;
21pub mod display;
22pub mod error;
23pub mod faults;
24pub mod logs;
25pub mod measurement;
26mod monitor;
27pub mod orchestrator;
28pub mod protocol;
29pub mod settings;
30pub mod ssh;
31pub mod testbed;
32
33type Protocol = IotaProtocol;
34type BenchmarkType = IotaBenchmarkType;
35
36#[derive(Parser)]
37#[command(author, version, about = "Testbed orchestrator", long_about = None)]
38pub struct Opts {
39    /// The path to the settings file. This file contains basic information to
40    /// deploy testbeds and run benchmarks such as the url of the git repo,
41    /// the commit to deploy, etc.
42    #[arg(
43        long,
44        value_name = "FILE",
45        default_value = "crates/iota-aws-orchestrator/assets/settings.json",
46        global = true
47    )]
48    settings_path: String,
49
50    /// The type of operation to run.
51    #[command(subcommand)]
52    operation: Operation,
53}
54
55#[derive(Parser)]
56pub enum Operation {
57    /// Get or modify the status of the testbed.
58    Testbed {
59        #[command(subcommand)]
60        action: TestbedAction,
61    },
62
63    /// Run a benchmark on the specified testbed.
64    Benchmark {
65        /// Percentage of shared vs owned objects; 0 means only owned objects
66        /// and 100 means only shared objects.
67        #[arg(long, default_value = "0", global = true)]
68        benchmark_type: String,
69
70        /// The committee size to deploy.
71        #[arg(long, value_name = "INT")]
72        committee: usize,
73
74        /// Number of faulty nodes.
75        #[arg(long, value_name = "INT", default_value = "0", global = true)]
76        faults: usize,
77
78        /// Whether the faulty nodes recover.
79        #[arg(long, action, default_value = "false", global = true)]
80        crash_recovery: bool,
81
82        /// The interval to crash nodes in seconds.
83        #[arg(long, value_parser = parse_duration, default_value = "60", global = true)]
84        crash_interval: Duration,
85
86        /// The minimum duration of the benchmark in seconds.
87        #[arg(long, value_parser = parse_duration, default_value = "600", global = true)]
88        duration: Duration,
89
90        /// The interval between measurements collection in seconds.
91        #[arg(long, value_parser = parse_duration, default_value = "15", global = true)]
92        scrape_interval: Duration,
93
94        /// Whether to skip testbed updates before running benchmarks.
95        #[arg(long, action, default_value = "false", global = true)]
96        skip_testbed_update: bool,
97
98        /// Whether to skip testbed configuration before running benchmarks.
99        #[arg(long, action, default_value = "false", global = true)]
100        skip_testbed_configuration: bool,
101
102        /// Whether to download and analyze the client and node log files.
103        #[arg(long, action, default_value = "false", global = true)]
104        log_processing: bool,
105
106        /// The number of instances running exclusively load generators. If set
107        /// to zero the orchestrator collocates one load generator with
108        /// each node.
109        #[arg(long, value_name = "INT", default_value = "0", global = true)]
110        dedicated_clients: usize,
111
112        /// Whether to forgo a grafana and prometheus instance and leave the
113        /// testbed unmonitored.
114        #[arg(long, action, default_value = "false", global = true)]
115        skip_monitoring: bool,
116
117        /// The timeout duration for ssh commands (in seconds).
118        #[arg(long, action, value_parser = parse_duration, default_value = "30", global = true)]
119        timeout: Duration,
120
121        /// The number of times the orchestrator should retry an ssh command.
122        #[arg(long, value_name = "INT", default_value = "5", global = true)]
123        retries: usize,
124
125        /// The load to submit to the system.
126        #[command(subcommand)]
127        load_type: Load,
128
129        /// Flag indicating whether nodes should advertise their internal or
130        /// public IP address for inter-node communication. When running
131        /// the simulation in multiple regions, nodes need to use their public
132        /// IPs to correctly communicate, however when a simulation is
133        /// running in a single VPC, they should use their internal IPs to avoid
134        /// paying for data sent between the nodes.
135        #[clap(long, action, default_value_t = false, global = true)]
136        use_internal_ip_addresses: bool,
137    },
138
139    /// Print a summary of the specified measurements collection.
140    Summarize {
141        /// The path to the settings file.
142        #[arg(long, value_name = "FILE")]
143        path: String,
144    },
145}
146
147#[derive(Parser)]
148pub enum TestbedAction {
149    /// Display the testbed status.
150    Status,
151
152    /// Deploy the specified number of instances in all regions specified by in
153    /// the setting file.
154    Deploy {
155        /// Number of instances to deploy.
156        #[arg(long)]
157        instances: usize,
158
159        /// The region where to deploy the instances. If this parameter is not
160        /// specified, the command deploys the specified number of
161        /// instances in all regions listed in the setting file.
162        #[arg(long)]
163        region: Option<String>,
164    },
165
166    /// Start at most the specified number of instances per region on an
167    /// existing testbed.
168    Start {
169        /// Number of instances to deploy.
170        #[arg(long, default_value = "200")]
171        instances: usize,
172    },
173
174    /// Stop an existing testbed (without destroying the instances).
175    Stop,
176
177    /// Destroy the testbed and terminate all instances.
178    Destroy,
179}
180
181#[derive(Parser)]
182pub enum Load {
183    /// The fixed loads (in tx/s) to submit to the nodes.
184    FixedLoad {
185        /// A list of fixed load (tx/s).
186        #[arg(
187            long,
188            value_name = "INT",
189            num_args(1..),
190            value_delimiter = ','
191        )]
192        loads: Vec<usize>,
193    },
194
195    /// Search for the maximum load that the system can sustainably handle.
196    Search {
197        /// The initial load (in tx/s) to test and use a baseline.
198        #[arg(long, value_name = "INT", default_value = "250")]
199        starting_load: usize,
200        /// The maximum number of iterations before converging on a breaking
201        /// point.
202        #[arg(long, value_name = "INT", default_value = "5")]
203        max_iterations: usize,
204    },
205}
206
207fn parse_duration(arg: &str) -> Result<Duration, std::num::ParseIntError> {
208    let seconds = arg.parse()?;
209    Ok(Duration::from_secs(seconds))
210}
211
212#[tokio::main]
213async fn main() -> Result<()> {
214    color_eyre::install()?;
215    let opts: Opts = Opts::parse();
216
217    // Load the settings files.
218    let settings = Settings::load(&opts.settings_path).wrap_err("Failed to load settings")?;
219
220    match &settings.cloud_provider {
221        CloudProvider::Aws => {
222            // Create the client for the cloud provider.
223            let client = AwsClient::new(settings.clone()).await;
224
225            // Execute the command.
226            run(settings, client, opts).await
227        }
228    }
229}
230
231async fn run<C: ServerProviderClient>(settings: Settings, client: C, opts: Opts) -> Result<()> {
232    // Create a new testbed.
233    let mut testbed = Testbed::new(settings.clone(), client)
234        .await
235        .wrap_err("Failed to create testbed")?;
236
237    match opts.operation {
238        Operation::Testbed { action } => match action {
239            // Display the current status of the testbed.
240            TestbedAction::Status => testbed.status(),
241
242            // Deploy the specified number of instances on the testbed.
243            TestbedAction::Deploy { instances, region } => testbed
244                .deploy(instances, region)
245                .await
246                .wrap_err("Failed to deploy testbed")?,
247
248            // Start the specified number of instances on an existing testbed.
249            TestbedAction::Start { instances } => testbed
250                .start(instances)
251                .await
252                .wrap_err("Failed to start testbed")?,
253
254            // Stop an existing testbed.
255            TestbedAction::Stop => testbed.stop().await.wrap_err("Failed to stop testbed")?,
256
257            // Destroy the testbed and terminal all instances.
258            TestbedAction::Destroy => testbed
259                .destroy()
260                .await
261                .wrap_err("Failed to destroy testbed")?,
262        },
263
264        // Run benchmarks.
265        Operation::Benchmark {
266            benchmark_type,
267            committee,
268            faults,
269            crash_recovery,
270            crash_interval,
271            duration,
272            scrape_interval,
273            skip_testbed_update,
274            skip_testbed_configuration,
275            log_processing,
276            dedicated_clients,
277            skip_monitoring,
278            timeout,
279            retries,
280            load_type,
281            use_internal_ip_addresses,
282        } => {
283            // Create a new orchestrator to instruct the testbed.
284            let username = testbed.username();
285            let private_key_file = settings.ssh_private_key_file.clone();
286            let ssh_manager = SshConnectionManager::new(username.into(), private_key_file)
287                .with_timeout(timeout)
288                .with_retries(retries);
289
290            let instances = testbed.instances();
291
292            let setup_commands = testbed
293                .setup_commands()
294                .await
295                .wrap_err("Failed to load testbed setup commands")?;
296
297            let protocol_commands = Protocol::new(&settings);
298            let benchmark_type = BenchmarkType::from_str(&benchmark_type)?;
299
300            let load = match load_type {
301                Load::FixedLoad { loads } => {
302                    let loads = if loads.is_empty() { vec![200] } else { loads };
303                    LoadType::Fixed(loads)
304                }
305                Load::Search {
306                    starting_load,
307                    max_iterations,
308                } => LoadType::Search {
309                    starting_load,
310                    max_iterations,
311                },
312            };
313
314            let fault_type = if !crash_recovery || faults == 0 {
315                FaultsType::Permanent { faults }
316            } else {
317                FaultsType::CrashRecovery {
318                    max_faults: faults,
319                    interval: crash_interval,
320                }
321            };
322
323            let generator =
324                BenchmarkParametersGenerator::new(committee, load, use_internal_ip_addresses)
325                    .with_benchmark_type(benchmark_type)
326                    .with_custom_duration(duration)
327                    .with_faults(fault_type);
328
329            Orchestrator::new(
330                settings,
331                instances,
332                setup_commands,
333                protocol_commands,
334                ssh_manager,
335            )
336            .with_scrape_interval(scrape_interval)
337            .with_crash_interval(crash_interval)
338            .skip_testbed_updates(skip_testbed_update)
339            .skip_testbed_configuration(skip_testbed_configuration)
340            .with_log_processing(log_processing)
341            .with_dedicated_clients(dedicated_clients)
342            .skip_monitoring(skip_monitoring)
343            .run_benchmarks(generator)
344            .await
345            .wrap_err("Failed to run benchmarks")?;
346        }
347
348        // Print a summary of the specified measurements collection.
349        Operation::Summarize { path } => {
350            MeasurementsCollection::<BenchmarkType>::load(path)?.display_summary()
351        }
352    }
353    Ok(())
354}