iota_aws_orchestrator/
main.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{str::FromStr, time::Duration};
6
7use benchmark::{BenchmarkParametersGenerator, LoadType};
8use clap::Parser;
9use client::{ServerProviderClient, aws::AwsClient};
10use eyre::{Context, Result};
11use faults::FaultsType;
12use measurement::MeasurementsCollection;
13use orchestrator::Orchestrator;
14use protocol::iota::{IotaBenchmarkType, IotaProtocol};
15use settings::{CloudProvider, Settings};
16use ssh::SshConnectionManager;
17use testbed::Testbed;
18
19pub mod benchmark;
20pub mod client;
21pub mod display;
22pub mod error;
23pub mod faults;
24pub mod logs;
25pub mod measurement;
26mod monitor;
27pub mod orchestrator;
28pub mod protocol;
29pub mod settings;
30pub mod ssh;
31pub mod testbed;
32
33type Protocol = IotaProtocol;
34type BenchmarkType = IotaBenchmarkType;
35
36#[derive(Parser)]
37#[command(author, version, about = "Testbed orchestrator", long_about = None)]
38pub struct Opts {
39    /// The path to the settings file. This file contains basic information to
40    /// deploy testbeds and run benchmarks such as the url of the git repo,
41    /// the commit to deploy, etc.
42    #[arg(
43        long,
44        value_name = "FILE",
45        default_value = "crates/iota-aws-orchestrator/assets/settings.json",
46        global = true
47    )]
48    settings_path: String,
49
50    /// The type of operation to run.
51    #[command(subcommand)]
52    operation: Operation,
53}
54
55#[derive(Parser)]
56pub enum Operation {
57    /// Get or modify the status of the testbed.
58    Testbed {
59        #[command(subcommand)]
60        action: TestbedAction,
61    },
62
63    /// Run a benchmark on the specified testbed.
64    Benchmark {
65        /// Percentage of shared vs owned objects; 0 means only owned objects
66        /// and 100 means only shared objects.
67        #[arg(long, default_value = "0", global = true)]
68        benchmark_type: String,
69
70        /// The committee size to deploy.
71        #[arg(long, value_name = "INT")]
72        committee: usize,
73
74        /// Number of faulty nodes.
75        #[arg(long, value_name = "INT", default_value = "0", global = true)]
76        faults: usize,
77
78        /// Whether the faulty nodes recover.
79        #[arg(long, action, default_value = "false", global = true)]
80        crash_recovery: bool,
81
82        /// The interval to crash nodes in seconds.
83        #[arg(long, value_parser = parse_duration, default_value = "60", global = true)]
84        crash_interval: Duration,
85
86        /// The minimum duration of the benchmark in seconds.
87        #[arg(long, value_parser = parse_duration, default_value = "600", global = true)]
88        duration: Duration,
89
90        /// The interval between measurements collection in seconds.
91        #[arg(long, value_parser = parse_duration, default_value = "15", global = true)]
92        scrape_interval: Duration,
93
94        /// Whether to skip testbed updates before running benchmarks.
95        #[arg(long, action, default_value = "false", global = true)]
96        skip_testbed_update: bool,
97
98        /// Whether to skip testbed configuration before running benchmarks.
99        #[arg(long, action, default_value = "false", global = true)]
100        skip_testbed_configuration: bool,
101
102        /// Whether to download and analyze the client and node log files.
103        #[arg(long, action, default_value = "false", global = true)]
104        log_processing: bool,
105
106        /// The number of instances running exclusively load generators. If set
107        /// to zero the orchestrator collocates one load generator with
108        /// each node.
109        #[arg(long, value_name = "INT", default_value = "0", global = true)]
110        dedicated_clients: usize,
111
112        /// Whether to forgo a grafana and prometheus instance and leave the
113        /// testbed unmonitored.
114        #[arg(long, action, default_value = "false", global = true)]
115        skip_monitoring: bool,
116
117        /// The timeout duration for ssh commands (in seconds).
118        #[arg(long, action, value_parser = parse_duration, default_value = "30", global = true)]
119        timeout: Duration,
120
121        /// The number of times the orchestrator should retry an ssh command.
122        #[arg(long, value_name = "INT", default_value = "5", global = true)]
123        retries: usize,
124
125        /// The load to submit to the system.
126        #[command(subcommand)]
127        load_type: Load,
128    },
129
130    /// Print a summary of the specified measurements collection.
131    Summarize {
132        /// The path to the settings file.
133        #[arg(long, value_name = "FILE")]
134        path: String,
135    },
136}
137
138#[derive(Parser)]
139pub enum TestbedAction {
140    /// Display the testbed status.
141    Status,
142
143    /// Deploy the specified number of instances in all regions specified by in
144    /// the setting file.
145    Deploy {
146        /// Number of instances to deploy.
147        #[arg(long)]
148        instances: usize,
149
150        /// The region where to deploy the instances. If this parameter is not
151        /// specified, the command deploys the specified number of
152        /// instances in all regions listed in the setting file.
153        #[arg(long)]
154        region: Option<String>,
155    },
156
157    /// Start at most the specified number of instances per region on an
158    /// existing testbed.
159    Start {
160        /// Number of instances to deploy.
161        #[arg(long, default_value = "200")]
162        instances: usize,
163    },
164
165    /// Stop an existing testbed (without destroying the instances).
166    Stop,
167
168    /// Destroy the testbed and terminate all instances.
169    Destroy,
170}
171
172#[derive(Parser)]
173pub enum Load {
174    /// The fixed loads (in tx/s) to submit to the nodes.
175    FixedLoad {
176        /// A list of fixed load (tx/s).
177        #[arg(
178            long,
179            value_name = "INT",
180            num_args(1..),
181            value_delimiter = ','
182        )]
183        loads: Vec<usize>,
184    },
185
186    /// Search for the maximum load that the system can sustainably handle.
187    Search {
188        /// The initial load (in tx/s) to test and use a baseline.
189        #[arg(long, value_name = "INT", default_value = "250")]
190        starting_load: usize,
191        /// The maximum number of iterations before converging on a breaking
192        /// point.
193        #[arg(long, value_name = "INT", default_value = "5")]
194        max_iterations: usize,
195    },
196}
197
198fn parse_duration(arg: &str) -> Result<Duration, std::num::ParseIntError> {
199    let seconds = arg.parse()?;
200    Ok(Duration::from_secs(seconds))
201}
202
203#[tokio::main]
204async fn main() -> Result<()> {
205    color_eyre::install()?;
206    let opts: Opts = Opts::parse();
207
208    // Load the settings files.
209    let settings = Settings::load(&opts.settings_path).wrap_err("Failed to load settings")?;
210
211    match &settings.cloud_provider {
212        CloudProvider::Aws => {
213            // Create the client for the cloud provider.
214            let client = AwsClient::new(settings.clone()).await;
215
216            // Execute the command.
217            run(settings, client, opts).await
218        }
219    }
220}
221
222async fn run<C: ServerProviderClient>(settings: Settings, client: C, opts: Opts) -> Result<()> {
223    // Create a new testbed.
224    let mut testbed = Testbed::new(settings.clone(), client)
225        .await
226        .wrap_err("Failed to create testbed")?;
227
228    match opts.operation {
229        Operation::Testbed { action } => match action {
230            // Display the current status of the testbed.
231            TestbedAction::Status => testbed.status(),
232
233            // Deploy the specified number of instances on the testbed.
234            TestbedAction::Deploy { instances, region } => testbed
235                .deploy(instances, region)
236                .await
237                .wrap_err("Failed to deploy testbed")?,
238
239            // Start the specified number of instances on an existing testbed.
240            TestbedAction::Start { instances } => testbed
241                .start(instances)
242                .await
243                .wrap_err("Failed to start testbed")?,
244
245            // Stop an existing testbed.
246            TestbedAction::Stop => testbed.stop().await.wrap_err("Failed to stop testbed")?,
247
248            // Destroy the testbed and terminal all instances.
249            TestbedAction::Destroy => testbed
250                .destroy()
251                .await
252                .wrap_err("Failed to destroy testbed")?,
253        },
254
255        // Run benchmarks.
256        Operation::Benchmark {
257            benchmark_type,
258            committee,
259            faults,
260            crash_recovery,
261            crash_interval,
262            duration,
263            scrape_interval,
264            skip_testbed_update,
265            skip_testbed_configuration,
266            log_processing,
267            dedicated_clients,
268            skip_monitoring,
269            timeout,
270            retries,
271            load_type,
272        } => {
273            // Create a new orchestrator to instruct the testbed.
274            let username = testbed.username();
275            let private_key_file = settings.ssh_private_key_file.clone();
276            let ssh_manager = SshConnectionManager::new(username.into(), private_key_file)
277                .with_timeout(timeout)
278                .with_retries(retries);
279
280            let instances = testbed.instances();
281
282            let setup_commands = testbed
283                .setup_commands()
284                .await
285                .wrap_err("Failed to load testbed setup commands")?;
286
287            let protocol_commands = Protocol::new(&settings);
288            let benchmark_type = BenchmarkType::from_str(&benchmark_type)?;
289
290            let load = match load_type {
291                Load::FixedLoad { loads } => {
292                    let loads = if loads.is_empty() { vec![200] } else { loads };
293                    LoadType::Fixed(loads)
294                }
295                Load::Search {
296                    starting_load,
297                    max_iterations,
298                } => LoadType::Search {
299                    starting_load,
300                    max_iterations,
301                },
302            };
303
304            let fault_type = if !crash_recovery || faults == 0 {
305                FaultsType::Permanent { faults }
306            } else {
307                FaultsType::CrashRecovery {
308                    max_faults: faults,
309                    interval: crash_interval,
310                }
311            };
312
313            let generator = BenchmarkParametersGenerator::new(committee, load)
314                .with_benchmark_type(benchmark_type)
315                .with_custom_duration(duration)
316                .with_faults(fault_type);
317
318            Orchestrator::new(
319                settings,
320                instances,
321                setup_commands,
322                protocol_commands,
323                ssh_manager,
324            )
325            .with_scrape_interval(scrape_interval)
326            .with_crash_interval(crash_interval)
327            .skip_testbed_updates(skip_testbed_update)
328            .skip_testbed_configuration(skip_testbed_configuration)
329            .with_log_processing(log_processing)
330            .with_dedicated_clients(dedicated_clients)
331            .skip_monitoring(skip_monitoring)
332            .run_benchmarks(generator)
333            .await
334            .wrap_err("Failed to run benchmarks")?;
335        }
336
337        // Print a summary of the specified measurements collection.
338        Operation::Summarize { path } => {
339            MeasurementsCollection::<BenchmarkType>::load(path)?.display_summary()
340        }
341    }
342    Ok(())
343}