iota_aws_orchestrator/
orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{HashMap, VecDeque},
7    fs::{self},
8    marker::PhantomData,
9    path::PathBuf,
10    time::Duration,
11};
12
13use tokio::time::{self, Instant};
14
15use crate::{
16    benchmark::{BenchmarkParameters, BenchmarkParametersGenerator, BenchmarkType},
17    client::Instance,
18    display, ensure,
19    error::{TestbedError, TestbedResult},
20    faults::CrashRecoverySchedule,
21    logs::LogsAnalyzer,
22    measurement::{Measurement, MeasurementsCollection},
23    monitor::Monitor,
24    protocol::{ProtocolCommands, ProtocolMetrics},
25    settings::Settings,
26    ssh::{CommandContext, CommandStatus, SshConnectionManager},
27};
28
29/// An orchestrator to run benchmarks on a testbed.
30pub struct Orchestrator<P, T> {
31    /// The testbed's settings.
32    settings: Settings,
33    /// The state of the testbed (reflecting accurately the state of the
34    /// machines).
35    instances: Vec<Instance>,
36    /// The type of the benchmark parameters.
37    benchmark_type: PhantomData<T>,
38    /// Provider-specific commands to install on the instance.
39    instance_setup_commands: Vec<String>,
40    /// Protocol-specific commands generator to generate the protocol
41    /// configuration files, boot clients and nodes, etc.
42    protocol_commands: P,
43    /// The interval between measurements collection.
44    scrape_interval: Duration,
45    /// The interval to crash nodes.
46    crash_interval: Duration,
47    /// Handle ssh connections to instances.
48    ssh_manager: SshConnectionManager,
49    /// Whether to skip testbed updates before running benchmarks.
50    skip_testbed_update: bool,
51    /// Whether to skip testbed configuration before running benchmarks.
52    skip_testbed_configuration: bool,
53    /// Whether to downloading and analyze the client and node log files.
54    log_processing: bool,
55    /// Number of instances running only load generators (not nodes). If this
56    /// value is set to zero, the orchestrator runs a load generate
57    /// collocated with each node.
58    dedicated_clients: usize,
59    /// Whether to forgo a grafana and prometheus instance and leave the testbed
60    /// unmonitored.
61    skip_monitoring: bool,
62}
63
64impl<P, T> Orchestrator<P, T> {
65    /// The default interval between measurements collection.
66    const DEFAULT_SCRAPE_INTERVAL: Duration = Duration::from_secs(15);
67    /// The default interval to crash nodes.
68    const DEFAULT_CRASH_INTERVAL: Duration = Duration::from_secs(60);
69
70    /// Make a new orchestrator.
71    pub fn new(
72        settings: Settings,
73        instances: Vec<Instance>,
74        instance_setup_commands: Vec<String>,
75        protocol_commands: P,
76        ssh_manager: SshConnectionManager,
77    ) -> Self {
78        Self {
79            settings,
80            instances,
81            benchmark_type: PhantomData,
82            instance_setup_commands,
83            protocol_commands,
84            ssh_manager,
85            scrape_interval: Self::DEFAULT_SCRAPE_INTERVAL,
86            crash_interval: Self::DEFAULT_CRASH_INTERVAL,
87            skip_testbed_update: false,
88            skip_testbed_configuration: false,
89            log_processing: false,
90            dedicated_clients: 0,
91            skip_monitoring: false,
92        }
93    }
94
95    /// Set interval between measurements collection.
96    pub fn with_scrape_interval(mut self, scrape_interval: Duration) -> Self {
97        self.scrape_interval = scrape_interval;
98        self
99    }
100
101    /// Set interval with which to crash nodes.
102    pub fn with_crash_interval(mut self, crash_interval: Duration) -> Self {
103        self.crash_interval = crash_interval;
104        self
105    }
106
107    /// Set whether to skip testbed updates before running benchmarks.
108    pub fn skip_testbed_updates(mut self, skip_testbed_update: bool) -> Self {
109        self.skip_testbed_update = skip_testbed_update;
110        self
111    }
112
113    /// Whether to skip testbed configuration before running benchmarks.
114    pub fn skip_testbed_configuration(mut self, skip_testbed_configuration: bool) -> Self {
115        self.skip_testbed_configuration = skip_testbed_configuration;
116        self
117    }
118
119    /// Set whether to download and analyze the client and node log files.
120    pub fn with_log_processing(mut self, log_processing: bool) -> Self {
121        self.log_processing = log_processing;
122        self
123    }
124
125    /// Set the number of instances running exclusively load generators.
126    pub fn with_dedicated_clients(mut self, dedicated_clients: usize) -> Self {
127        self.dedicated_clients = dedicated_clients;
128        self
129    }
130
131    /// Set whether to boot grafana on the local machine to monitor the nodes.
132    pub fn skip_monitoring(mut self, skip_monitoring: bool) -> Self {
133        self.skip_monitoring = skip_monitoring;
134        self
135    }
136
137    /// Select on which instances of the testbed to run the benchmarks. This
138    /// function returns two vector of instances; the first contains the
139    /// instances on which to run the load generators and the second
140    /// contains the instances on which to run the nodes.
141    pub fn select_instances(
142        &self,
143        parameters: &BenchmarkParameters<T>,
144    ) -> TestbedResult<(Vec<Instance>, Vec<Instance>, Option<Instance>)> {
145        // Ensure there are enough active instances.
146        let available_instances: Vec<_> = self.instances.iter().filter(|x| x.is_active()).collect();
147        let minimum_instances = if self.skip_monitoring {
148            parameters.nodes + self.dedicated_clients
149        } else {
150            parameters.nodes + self.dedicated_clients + 1
151        };
152        ensure!(
153            available_instances.len() >= minimum_instances,
154            TestbedError::InsufficientCapacity(minimum_instances - available_instances.len())
155        );
156
157        // Sort the instances by region.
158        let mut instances_by_regions = HashMap::new();
159        for instance in available_instances {
160            instances_by_regions
161                .entry(&instance.region)
162                .or_insert_with(VecDeque::new)
163                .push_back(instance);
164        }
165
166        // Select the instance to host the monitoring stack.
167        let mut monitoring_instance = None;
168        if !self.skip_monitoring {
169            for region in &self.settings.regions {
170                if let Some(regional_instances) = instances_by_regions.get_mut(region) {
171                    if let Some(instance) = regional_instances.pop_front() {
172                        monitoring_instance = Some(instance.clone());
173                    }
174                    break;
175                }
176            }
177        }
178
179        // Select the instances to host exclusively load generators.
180        let mut client_instances = Vec::new();
181        for region in self.settings.regions.iter().cycle() {
182            if client_instances.len() == self.dedicated_clients {
183                break;
184            }
185            if let Some(regional_instances) = instances_by_regions.get_mut(region) {
186                if let Some(instance) = regional_instances.pop_front() {
187                    client_instances.push(instance.clone());
188                }
189            }
190        }
191
192        // Select the instances to host the nodes.
193        let mut nodes_instances = Vec::new();
194        for region in self.settings.regions.iter().cycle() {
195            if nodes_instances.len() == parameters.nodes {
196                break;
197            }
198            if let Some(regional_instances) = instances_by_regions.get_mut(region) {
199                if let Some(instance) = regional_instances.pop_front() {
200                    nodes_instances.push(instance.clone());
201                }
202            }
203        }
204
205        // Spawn a load generate collocated with each node if there are no instances
206        // dedicated to excursively run load generators.
207        if client_instances.is_empty() {
208            client_instances.clone_from(&nodes_instances);
209        }
210
211        Ok((client_instances, nodes_instances, monitoring_instance))
212    }
213}
214
215impl<P: ProtocolCommands<T> + ProtocolMetrics, T: BenchmarkType> Orchestrator<P, T> {
216    /// Boot one node per instance.
217    async fn boot_nodes(
218        &self,
219        instances: Vec<Instance>,
220        parameters: &BenchmarkParameters<T>,
221    ) -> TestbedResult<()> {
222        // Run one node per instance.
223        let targets = self
224            .protocol_commands
225            .node_command(instances.clone(), parameters);
226
227        let repo = self.settings.repository_name();
228        let context = CommandContext::new()
229            .run_background("node".into())
230            .with_log_file("~/node.log".into())
231            .with_execute_from_path(repo.into());
232        self.ssh_manager
233            .execute_per_instance(targets, context)
234            .await?;
235
236        // Wait until all nodes are reachable.
237        let commands = self
238            .protocol_commands
239            .nodes_metrics_command(instances.clone());
240        self.ssh_manager.wait_for_success(commands).await;
241
242        Ok(())
243    }
244
245    /// Install the codebase and its dependencies on the testbed.
246    pub async fn install(&self) -> TestbedResult<()> {
247        display::action("Installing dependencies on all machines");
248
249        let working_dir = self.settings.working_dir.display();
250        let url = &self.settings.repository.url;
251        let basic_commands = [
252            "sudo apt-get update",
253            "sudo apt-get -y upgrade",
254            "sudo apt-get -y autoremove",
255            // Disable "pending kernel upgrade" message.
256            "sudo apt-get -y remove needrestart",
257            // The following dependencies:
258            // * build-essential: prevent the error: [error: linker `cc` not found].
259            // * libssl-dev - Required to compile the orchestrator, todo remove this dependency
260            "sudo apt-get -y install build-essential libssl-dev",
261            // Install rust (non-interactive).
262            "curl --proto \"=https\" --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y",
263            "echo \"source $HOME/.cargo/env\" | tee -a ~/.bashrc",
264            "source $HOME/.cargo/env",
265            "rustup default stable",
266            // Create the working directory.
267            &format!("mkdir -p {working_dir}"),
268            // Clone the repo.
269            &format!("(git clone {url} || true)"),
270        ];
271
272        let cloud_provider_specific_dependencies: Vec<_> = self
273            .instance_setup_commands
274            .iter()
275            .map(|x| x.as_str())
276            .collect();
277
278        let protocol_dependencies = self.protocol_commands.protocol_dependencies();
279
280        let command = [
281            &basic_commands[..],
282            &Monitor::dependencies()[..],
283            &cloud_provider_specific_dependencies[..],
284            &protocol_dependencies[..],
285        ]
286        .concat()
287        .join(" && ");
288
289        let active = self.instances.iter().filter(|x| x.is_active()).cloned();
290        let context = CommandContext::default();
291        self.ssh_manager.execute(active, command, context).await?;
292
293        display::done();
294        Ok(())
295    }
296
297    /// Reload prometheus on all instances.
298    pub async fn start_monitoring(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
299        let (clients, nodes, instance) = self.select_instances(parameters)?;
300        if let Some(instance) = instance {
301            display::action("Configuring monitoring instance");
302
303            let monitor = Monitor::new(instance, clients, nodes, self.ssh_manager.clone());
304            monitor.start_prometheus(&self.protocol_commands).await?;
305            monitor.start_grafana().await?;
306
307            display::done();
308            display::config("Grafana address", monitor.grafana_address());
309            display::newline();
310        }
311
312        Ok(())
313    }
314
315    /// Update all instances to use the version of the codebase specified in the
316    /// setting file.
317    pub async fn update(&self) -> TestbedResult<()> {
318        display::action("Updating all instances");
319
320        // Update all active instances. This requires compiling the codebase in release
321        // (which may take a long time) so we run the command in the background
322        // to avoid keeping alive many ssh connections for too long.
323        let commit = &self.settings.repository.commit;
324        let command = [
325            "git fetch -f",
326            &format!("(git checkout -b {commit} {commit} || git checkout -f {commit})"),
327            "(git pull -f || true)",
328            "source $HOME/.cargo/env",
329            "cargo build --release",
330        ]
331        .join(" && ");
332
333        let active = self.instances.iter().filter(|x| x.is_active()).cloned();
334
335        let id = "update";
336        let repo_name = self.settings.repository_name();
337        let context = CommandContext::new()
338            .run_background(id.into())
339            .with_execute_from_path(repo_name.into());
340        self.ssh_manager
341            .execute(active.clone(), command, context)
342            .await?;
343
344        // Wait until the command finished running.
345        self.ssh_manager
346            .wait_for_command(active, id, CommandStatus::Terminated)
347            .await?;
348
349        display::done();
350        Ok(())
351    }
352
353    /// Configure the instances with the appropriate configuration files.
354    pub async fn configure(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
355        display::action("Configuring instances");
356
357        // Select instances to configure.
358        let (clients, nodes, _) = self.select_instances(parameters)?;
359
360        // Generate the genesis configuration file and the keystore allowing access to
361        // gas objects.
362        let command = self.protocol_commands.genesis_command(nodes.iter());
363        let repo_name = self.settings.repository_name();
364        let context = CommandContext::new().with_execute_from_path(repo_name.into());
365        let all = clients.into_iter().chain(nodes);
366        self.ssh_manager.execute(all, command, context).await?;
367
368        display::done();
369        Ok(())
370    }
371
372    /// Cleanup all instances and optionally delete their log files.
373    pub async fn cleanup(&self, cleanup: bool) -> TestbedResult<()> {
374        display::action("Cleaning up testbed");
375
376        // Kill all tmux servers and delete the nodes dbs. Optionally clear logs.
377        let mut command = vec!["(tmux kill-server || true)".into()];
378        for path in self.protocol_commands.db_directories() {
379            command.push(format!("(rm -rf {} || true)", path.display()));
380        }
381        if cleanup {
382            command.push("(rm -rf ~/*log* || true)".into());
383        }
384        let command = command.join(" ; ");
385
386        // Execute the deletion on all machines.
387        let active = self.instances.iter().filter(|x| x.is_active()).cloned();
388        let context = CommandContext::default();
389        self.ssh_manager.execute(active, command, context).await?;
390
391        display::done();
392        Ok(())
393    }
394
395    /// Deploy the nodes.
396    pub async fn run_nodes(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
397        display::action("Deploying validators");
398
399        // Select the instances to run.
400        let (_, nodes, _) = self.select_instances(parameters)?;
401
402        // Boot one node per instance.
403        self.boot_nodes(nodes, parameters).await?;
404
405        display::done();
406        Ok(())
407    }
408
409    /// Deploy the load generators.
410    pub async fn run_clients(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
411        display::action("Setting up load generators");
412
413        // Select the instances to run.
414        let (clients, _, _) = self.select_instances(parameters)?;
415
416        // Deploy the load generators.
417        let targets = self
418            .protocol_commands
419            .client_command(clients.clone(), parameters);
420
421        let repo = self.settings.repository_name();
422        let context = CommandContext::new()
423            .run_background("client".into())
424            .with_log_file("~/client.log".into())
425            .with_execute_from_path(repo.into());
426        self.ssh_manager
427            .execute_per_instance(targets, context)
428            .await?;
429
430        // Wait until all load generators are reachable.
431        let commands = self.protocol_commands.clients_metrics_command(clients);
432        self.ssh_manager.wait_for_success(commands).await;
433
434        display::done();
435        Ok(())
436    }
437
438    /// Collect metrics from the load generators.
439    pub async fn run(
440        &self,
441        parameters: &BenchmarkParameters<T>,
442    ) -> TestbedResult<MeasurementsCollection<T>> {
443        display::action(format!(
444            "Scraping metrics (at least {}s)",
445            parameters.duration.as_secs()
446        ));
447
448        // Select the instances to run.
449        let (clients, nodes, _) = self.select_instances(parameters)?;
450
451        // Regularly scrape the client
452        let mut metrics_commands = self.protocol_commands.clients_metrics_command(clients);
453
454        // TODO: Remove this when consensus client latency metrics are available.
455        // We will be getting latency metrics directly from consensus nodes instead from
456        // the nw client
457        metrics_commands.append(&mut self.protocol_commands.nodes_metrics_command(nodes.clone()));
458
459        let mut aggregator = MeasurementsCollection::new(&self.settings, parameters.clone());
460        let mut metrics_interval = time::interval(self.scrape_interval);
461        metrics_interval.tick().await; // The first tick returns immediately.
462
463        let faults_type = parameters.faults.clone();
464        let mut faults_schedule = CrashRecoverySchedule::new(faults_type, nodes.clone());
465        let mut faults_interval = time::interval(self.crash_interval);
466        faults_interval.tick().await; // The first tick returns immediately.
467
468        let start = Instant::now();
469        loop {
470            tokio::select! {
471                // Scrape metrics.
472                now = metrics_interval.tick() => {
473                    let elapsed = now.duration_since(start).as_secs_f64().ceil() as u64;
474                    display::status(format!("{elapsed}s"));
475
476                    let stdio = self
477                        .ssh_manager
478                        .execute_per_instance(metrics_commands.clone(), CommandContext::default())
479                        .await?;
480                    for (i, (stdout, _stderr)) in stdio.iter().enumerate() {
481                        let measurement = Measurement::from_prometheus::<P>(stdout);
482                        aggregator.add(i, measurement);
483                    }
484
485                    if elapsed > parameters.duration .as_secs() {
486                        break;
487                    }
488                },
489
490                // Kill and recover nodes according to the input schedule.
491                _ = faults_interval.tick() => {
492                    let  action = faults_schedule.update();
493                    if !action.kill.is_empty() {
494                        self.ssh_manager.kill(action.kill.clone(), "node").await?;
495                    }
496                    if !action.boot.is_empty() {
497                        self.boot_nodes(action.boot.clone(), parameters).await?;
498                    }
499                    if !action.kill.is_empty() || !action.boot.is_empty() {
500                        display::newline();
501                        display::config("Testbed update", action);
502                    }
503                }
504            }
505        }
506
507        let results_directory = &self.settings.results_dir;
508        let commit = &self.settings.repository.commit;
509        let path: PathBuf = [results_directory, &format!("results-{commit}").into()]
510            .iter()
511            .collect();
512        fs::create_dir_all(&path).expect("Failed to create log directory");
513        aggregator.save(path);
514
515        display::done();
516        Ok(aggregator)
517    }
518
519    /// Download the log files from the nodes and clients.
520    pub async fn download_logs(
521        &self,
522        parameters: &BenchmarkParameters<T>,
523    ) -> TestbedResult<LogsAnalyzer> {
524        // Select the instances to run.
525        let (clients, nodes, _) = self.select_instances(parameters)?;
526
527        // Create a log sub-directory for this run.
528        let commit = &self.settings.repository.commit;
529        let path: PathBuf = [
530            &self.settings.logs_dir,
531            &format!("logs-{commit}").into(),
532            &format!("logs-{parameters:?}").into(),
533        ]
534        .iter()
535        .collect();
536        fs::create_dir_all(&path).expect("Failed to create log directory");
537
538        // NOTE: Our ssh library does not seem to be able to transfers files in parallel
539        // reliably.
540        let mut log_parsers = Vec::new();
541
542        // Download the clients log files.
543        display::action("Downloading clients logs");
544        for (i, instance) in clients.iter().enumerate() {
545            display::status(format!("{}/{}", i + 1, clients.len()));
546
547            let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
548            let client_log_content = connection.download("client.log").await?;
549
550            let client_log_file = [path.clone(), format!("client-{i}.log").into()]
551                .iter()
552                .collect::<PathBuf>();
553            fs::write(&client_log_file, client_log_content.as_bytes())
554                .expect("Cannot write log file");
555
556            let mut log_parser = LogsAnalyzer::default();
557            log_parser.set_client_errors(&client_log_content);
558            log_parsers.push(log_parser)
559        }
560        display::done();
561
562        display::action("Downloading nodes logs");
563        for (i, instance) in nodes.iter().enumerate() {
564            display::status(format!("{}/{}", i + 1, nodes.len()));
565
566            let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
567            let node_log_content = connection.download("node.log").await?;
568
569            let node_log_file = [path.clone(), format!("node-{i}.log").into()]
570                .iter()
571                .collect::<PathBuf>();
572            fs::write(&node_log_file, node_log_content.as_bytes()).expect("Cannot write log file");
573
574            let mut log_parser = LogsAnalyzer::default();
575            log_parser.set_node_errors(&node_log_content);
576            log_parsers.push(log_parser)
577        }
578        display::done();
579
580        Ok(LogsAnalyzer::aggregate(log_parsers))
581    }
582
583    /// Run all the benchmarks specified by the benchmark generator.
584    pub async fn run_benchmarks(
585        &mut self,
586        mut generator: BenchmarkParametersGenerator<T>,
587    ) -> TestbedResult<()> {
588        display::header("Preparing testbed");
589        display::config("Commit", format!("'{}'", &self.settings.repository.commit));
590        display::newline();
591
592        // Cleanup the testbed (in case the previous run was not completed).
593        self.cleanup(true).await?;
594
595        // Update the software on all instances.
596        if !self.skip_testbed_update {
597            self.install().await?;
598            self.update().await?;
599        }
600
601        // Run all benchmarks.
602        let mut i = 1;
603        let mut latest_committee_size = 0;
604        while let Some(parameters) = generator.next() {
605            display::header(format!("Starting benchmark {i}"));
606            display::config("Benchmark type", &parameters.benchmark_type);
607            display::config("Parameters", &parameters);
608            display::newline();
609
610            // Cleanup the testbed (in case the previous run was not completed).
611            self.cleanup(true).await?;
612            // Start the instance monitoring tools.
613            self.start_monitoring(&parameters).await?;
614
615            // Configure all instances (if needed).
616            if !self.skip_testbed_configuration && latest_committee_size != parameters.nodes {
617                self.configure(&parameters).await?;
618                latest_committee_size = parameters.nodes;
619            }
620
621            // Deploy the validators.
622            self.run_nodes(&parameters).await?;
623
624            // Deploy the load generators.
625            self.run_clients(&parameters).await?;
626
627            // Wait for the benchmark to terminate. Then save the results and print a
628            // summary.
629            let aggregator = self.run(&parameters).await?;
630            aggregator.display_summary();
631            generator.register_result(aggregator);
632            // drop(monitor);
633
634            // Kill the nodes and clients (without deleting the log files).
635            self.cleanup(false).await?;
636
637            // Download the log files.
638            if self.log_processing {
639                let error_counter = self.download_logs(&parameters).await?;
640                error_counter.print_summary();
641            }
642
643            i += 1;
644        }
645
646        display::header("Benchmark completed");
647        Ok(())
648    }
649}