iota_aws_orchestrator/
orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashSet,
7    fs::{self},
8    marker::PhantomData,
9    path::{Path, PathBuf},
10    time::Duration,
11};
12
13use chrono;
14use tokio::time::{self, Instant};
15
16use crate::{
17    benchmark::{BenchmarkParameters, BenchmarkParametersGenerator, BenchmarkType},
18    build_cache::BuildCacheService,
19    client::Instance,
20    display,
21    error::TestbedResult,
22    faults::CrashRecoverySchedule,
23    logs::LogsAnalyzer,
24    measurement::{Measurement, MeasurementsCollection},
25    monitor::{Monitor, Prometheus},
26    net_latency::NetworkLatencyCommandBuilder,
27    protocol::{ProtocolCommands, ProtocolMetrics},
28    settings::{BuildGroups, Settings, build_cargo_command},
29    ssh::{CommandContext, CommandStatus, SshConnectionManager},
30};
31
32/// An orchestrator to run benchmarks on a testbed.
33pub struct Orchestrator<P, T> {
34    /// The testbed's settings.
35    settings: Settings,
36    /// Node instances
37    node_instances: Vec<Instance>,
38    // Client (Load Generator) instances
39    client_instances: Vec<Instance>,
40    // Dedicated Metrics instance
41    metrics_instance: Option<Instance>,
42    /// The type of the benchmark parameters.
43    benchmark_type: PhantomData<T>,
44    /// Provider-specific commands to install on the instance.
45    instance_setup_commands: Vec<String>,
46    /// Protocol-specific commands generator to generate the protocol
47    /// configuration files, boot clients and nodes, etc.
48    protocol_commands: P,
49    /// The interval between measurements collection.
50    scrape_interval: Duration,
51    /// The interval to crash nodes.
52    crash_interval: Duration,
53    /// Handle ssh connections to instances.
54    ssh_manager: SshConnectionManager,
55    /// Whether to skip testbed updates before running benchmarks.
56    skip_testbed_update: bool,
57    /// Whether to skip testbed configuration before running benchmarks.
58    skip_testbed_configuration: bool,
59    /// Whether to downloading and analyze the client and node log files.
60    log_processing: bool,
61    /// Number of instances running only load generators (not nodes). If this
62    /// value is set to zero, the orchestrator runs a load generate
63    /// collocated with each node.
64    dedicated_clients: usize,
65    /// Whether to forgo a grafana and prometheus instance and leave the testbed
66    /// unmonitored.
67    skip_monitoring: bool,
68}
69
70impl<P, T> Orchestrator<P, T> {
71    /// The default interval between measurements collection.
72    const DEFAULT_SCRAPE_INTERVAL: Duration = Duration::from_secs(15);
73    /// The default interval to crash nodes.
74    const DEFAULT_CRASH_INTERVAL: Duration = Duration::from_secs(60);
75
76    /// Make a new orchestrator.
77    pub fn new(
78        settings: Settings,
79        node_instances: Vec<Instance>,
80        client_instances: Vec<Instance>,
81        metrics_instance: Option<Instance>,
82        instance_setup_commands: Vec<String>,
83        protocol_commands: P,
84        ssh_manager: SshConnectionManager,
85    ) -> Self {
86        Self {
87            settings,
88            node_instances,
89            client_instances,
90            metrics_instance,
91            benchmark_type: PhantomData,
92            instance_setup_commands,
93            protocol_commands,
94            ssh_manager,
95            scrape_interval: Self::DEFAULT_SCRAPE_INTERVAL,
96            crash_interval: Self::DEFAULT_CRASH_INTERVAL,
97            skip_testbed_update: false,
98            skip_testbed_configuration: false,
99            log_processing: false,
100            dedicated_clients: 0,
101            skip_monitoring: false,
102        }
103    }
104
105    /// Set interval between measurements collection.
106    pub fn with_scrape_interval(mut self, scrape_interval: Duration) -> Self {
107        self.scrape_interval = scrape_interval;
108        self
109    }
110
111    /// Set interval with which to crash nodes.
112    pub fn with_crash_interval(mut self, crash_interval: Duration) -> Self {
113        self.crash_interval = crash_interval;
114        self
115    }
116
117    /// Set whether to skip testbed updates before running benchmarks.
118    pub fn skip_testbed_updates(mut self, skip_testbed_update: bool) -> Self {
119        self.skip_testbed_update = skip_testbed_update;
120        self
121    }
122
123    /// Whether to skip testbed configuration before running benchmarks.
124    pub fn skip_testbed_configuration(mut self, skip_testbed_configuration: bool) -> Self {
125        self.skip_testbed_configuration = skip_testbed_configuration;
126        self
127    }
128
129    /// Set whether to download and analyze the client and node log files.
130    pub fn with_log_processing(mut self, log_processing: bool) -> Self {
131        self.log_processing = log_processing;
132        self
133    }
134
135    /// Set the number of instances running exclusively load generators.
136    pub fn with_dedicated_clients(mut self, dedicated_clients: usize) -> Self {
137        self.dedicated_clients = dedicated_clients;
138        self
139    }
140
141    /// Set whether to boot grafana on the local machine to monitor the nodes.
142    pub fn skip_monitoring(mut self, skip_monitoring: bool) -> Self {
143        self.skip_monitoring = skip_monitoring;
144        self
145    }
146
147    pub fn instances_without_metrics(&self) -> Vec<Instance> {
148        let mut instances = self.node_instances.clone();
149
150        if self.dedicated_clients > 0 {
151            instances.extend(self.client_instances.clone());
152        }
153        instances
154    }
155
156    /// Returns all the instances combined
157    pub fn instances(&self) -> Vec<Instance> {
158        let mut instances = self.instances_without_metrics();
159        if let Some(metrics_instance) = &self.metrics_instance {
160            instances.push(metrics_instance.clone());
161        }
162        instances
163    }
164}
165
166impl<P: ProtocolCommands<T> + ProtocolMetrics, T: BenchmarkType> Orchestrator<P, T> {
167    /// Boot one node per instance.
168    async fn boot_nodes(
169        &self,
170        instances: Vec<Instance>,
171        parameters: &BenchmarkParameters<T>,
172    ) -> TestbedResult<()> {
173        if parameters.use_internal_ip_address {
174            if let Some(latency_topology) = parameters.latency_topology.clone() {
175                let latency_commands = NetworkLatencyCommandBuilder::new(&instances)
176                    .with_perturbation_spec(parameters.perturbation_spec.clone())
177                    .with_topology_layout(latency_topology)
178                    .with_max_latency(parameters.maximum_latency)
179                    .build_network_latency_matrix();
180                self.ssh_manager
181                    .execute_per_instance(latency_commands, CommandContext::default())
182                    .await?;
183            }
184        }
185
186        // Run one node per instance.
187        let targets = self
188            .protocol_commands
189            .node_command(instances.clone(), parameters);
190
191        let repo = self.settings.repository_name();
192        let node_context = CommandContext::new()
193            .run_background("node".into())
194            .with_log_file("~/node.log".into())
195            .with_execute_from_path(repo.into());
196        self.ssh_manager
197            .execute_per_instance(targets, node_context)
198            .await?;
199
200        // Wait until all nodes are reachable.
201        let commands = self
202            .protocol_commands
203            .nodes_metrics_command(instances.clone(), parameters);
204        self.ssh_manager.wait_for_success(commands).await;
205
206        Ok(())
207    }
208
209    /// Install the codebase and its dependencies on the testbed.
210    pub async fn install(&self) -> TestbedResult<()> {
211        display::action("Installing dependencies on all machines");
212
213        let working_dir = self.settings.working_dir.display();
214        let url = &self.settings.repository.url;
215
216        let use_precompiled_binaries = self.settings.build_cache_enabled();
217
218        let working_dir_cmd = format!("mkdir -p {working_dir}");
219        let git_clone_cmd = format!("(git clone --depth=1 {url} || true)");
220
221        let mut basic_commands = vec![
222            "sudo apt-get update",
223            "sudo apt-get -y upgrade",
224            "sudo apt-get -y autoremove",
225            // Disable "pending kernel upgrade" message.
226            "sudo apt-get -y remove needrestart",
227            "sudo apt-get -y install curl git ca-certificates",
228            // Increase open file limits to prevent "Too many open files" errors
229            "echo '* soft nofile 1048576' | sudo tee -a /etc/security/limits.conf",
230            "echo '* hard nofile 1048576' | sudo tee -a /etc/security/limits.conf",
231            "echo 'root soft nofile 1048576' | sudo tee -a /etc/security/limits.conf",
232            "echo 'root hard nofile 1048576' | sudo tee -a /etc/security/limits.conf",
233            // Set system-wide file descriptor limits
234            "echo 'fs.file-max = 2097152' | sudo tee -a /etc/sysctl.conf",
235            "sudo sysctl -p",
236            // Set limits for current session
237            "ulimit -n 1048576 || true",
238            // Create the working directory.
239            working_dir_cmd.as_str(),
240            // Clone the repo.
241            git_clone_cmd.as_str(),
242        ];
243
244        // Collect all unique non-"stable" rust toolchains from build configs
245        let toolchain_cmds: Vec<String> = if !use_precompiled_binaries {
246            self.settings
247                .build_configs
248                .values()
249                .filter_map(|config| {
250                    config
251                        .toolchain
252                        .as_ref()
253                        .filter(|t| t.as_str() != "stable")
254                        .cloned()
255                })
256                .collect::<HashSet<String>>()
257                .into_iter()
258                .map(|toolchain| format!("rustup toolchain install {toolchain}"))
259                .collect()
260        } else {
261            vec![]
262        };
263
264        if !use_precompiled_binaries {
265            // If not using precompiled binaries, install rustup.
266            basic_commands.extend([
267                // The following dependencies:
268                // * build-essential: prevent the error: [error: linker `cc` not found].
269                "sudo apt-get -y install build-essential cmake clang lld protobuf-compiler pkg-config nvme-cli",
270                // Install rust (non-interactive).
271                "curl --proto \"=https\" --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y",
272                "echo \"source $HOME/.cargo/env\" | tee -a ~/.bashrc",
273                "source $HOME/.cargo/env",
274                "rustup default stable",
275            ]);
276
277            // Add the toolchain install commands to basic_commands
278            for cmd in &toolchain_cmds {
279                basic_commands.push(cmd.as_str());
280            }
281        } else {
282            // Create cargo env file if using precompiled binaries, so that the source
283            // commands don't fail.
284            basic_commands.push("mkdir -p $HOME/.cargo/ && touch $HOME/.cargo/env");
285        }
286
287        let cloud_provider_specific_dependencies: Vec<_> = self
288            .instance_setup_commands
289            .iter()
290            .map(|x| x.as_str())
291            .collect();
292
293        let protocol_dependencies = self.protocol_commands.protocol_dependencies();
294
295        let command = [
296            &basic_commands[..],
297            &Prometheus::install_commands(),
298            &cloud_provider_specific_dependencies[..],
299            &protocol_dependencies[..],
300        ]
301        .concat()
302        .join(" && ");
303
304        self.ssh_manager
305            .execute(self.instances(), command, CommandContext::default())
306            .await?;
307
308        if !self.skip_monitoring {
309            let metrics_instance = self
310                .metrics_instance
311                .clone()
312                .expect("No metrics instance available");
313            let monitor_command = Monitor::dependencies().join(" && ");
314            self.ssh_manager
315                .execute(
316                    vec![metrics_instance],
317                    monitor_command,
318                    CommandContext::default(),
319                )
320                .await?;
321        }
322
323        display::done();
324        Ok(())
325    }
326
327    /// Reload prometheus on all instances.
328    pub async fn start_monitoring(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
329        if let Some(instance) = &self.metrics_instance {
330            display::action("Configuring monitoring instance");
331
332            let monitor = Monitor::new(
333                instance.clone(),
334                self.client_instances.clone(),
335                self.node_instances.clone(),
336                self.ssh_manager.clone(),
337            );
338            monitor
339                .start_prometheus(&self.protocol_commands, parameters)
340                .await?;
341            monitor.start_grafana().await?;
342
343            display::done();
344            display::config("Grafana address", monitor.grafana_address());
345            display::newline();
346        }
347
348        Ok(())
349    }
350
351    /// Update all instances to use the version of the codebase specified in the
352    /// setting file.
353    pub async fn update(&self) -> TestbedResult<()> {
354        display::action("Updating all instances");
355
356        let commit = &self.settings.repository.commit;
357        let repo_name = self.settings.repository_name();
358        let build_groups = self.settings.build_groups();
359
360        // we need to fetch and checkout the commit even if using precompiled binaries
361        // because the iota-framework submodule, the examples/move folder, or the
362        // dev-tools/grafana-local folder might be used.
363        let git_update_command = [
364            &format!("git fetch origin {commit} --force"),
365            &format!("(git reset --hard origin/{commit} || git checkout --force {commit})"),
366            "git clean -fd -e target",
367        ]
368        .join(" && ");
369
370        let id = "git update";
371        let context = CommandContext::new()
372            .run_background(id.into())
373            .with_execute_from_path(repo_name.clone().into());
374
375        // Execute and wait for the git update command on all instances (including
376        // metrics)
377        display::action(format!("update command: {git_update_command}"));
378        self.ssh_manager
379            .execute(self.instances(), git_update_command, context)
380            .await?;
381        self.ssh_manager
382            .wait_for_command(self.instances(), id, CommandStatus::Terminated)
383            .await?;
384
385        // Check if build cache is enabled
386        if self.settings.build_cache_enabled() {
387            display::action("Using build cache for binary distribution");
388            let build_cache_service = BuildCacheService::new(&self.settings, &self.ssh_manager);
389            build_cache_service
390                .update_with_build_cache(
391                    commit,
392                    &build_groups,
393                    self.instances_without_metrics(),
394                    repo_name.clone(),
395                )
396                .await?;
397        } else {
398            self.update_with_local_build(build_groups).await?;
399        }
400
401        display::done();
402        Ok(())
403    }
404
405    /// Update instances with local build (fallback, if build cache is not used)
406    /// Execute and wait for the cargo build command on all instances except the
407    /// metrics one. This requires compiling the codebase in release
408    /// (which may take a long time) so we run the command in the background
409    /// to avoid keeping alive many ssh connections for too long.
410    async fn update_with_local_build(&self, build_groups: BuildGroups) -> TestbedResult<()> {
411        let without_metrics = self.instances_without_metrics();
412        let repo_name = self.settings.repository_name();
413
414        // Build each group separately
415        for (i, (group, binary_names)) in build_groups.iter().enumerate() {
416            // Build arguments
417            let build_command = build_cargo_command(
418                "build",
419                group.toolchain.clone(),
420                group.features.clone(),
421                binary_names,
422                &[] as &[&str],
423                &[] as &[&str],
424            );
425
426            // print the full command for logging
427            display::action(format!(
428                "Running build command {}/{}: \"{build_command}\" in \"{repo_name}\"",
429                i + 1,
430                build_groups.len()
431            ));
432
433            let context = CommandContext::new().with_execute_from_path(repo_name.clone().into());
434
435            self.ssh_manager
436                .execute(without_metrics.clone(), build_command, context)
437                .await?;
438        }
439
440        Ok(())
441    }
442
443    /// Configure the instances with the appropriate configuration files.
444    pub async fn configure(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
445        display::action("Configuring instances");
446
447        // Generate the genesis configuration file and the keystore allowing access to
448        // gas objects.
449        let command = self
450            .protocol_commands
451            .genesis_command(self.node_instances.iter(), parameters);
452        display::action(format!("Genesis command: {command}"));
453        let repo_name = self.settings.repository_name();
454        let context = CommandContext::new().with_execute_from_path(repo_name.into());
455        self.ssh_manager
456            .execute(self.instances_without_metrics(), command, context)
457            .await?;
458
459        display::done();
460        Ok(())
461    }
462
463    /// Cleanup all instances and optionally delete their log files.
464    pub async fn cleanup(&self, cleanup: bool) -> TestbedResult<()> {
465        display::action("Cleaning up testbed");
466
467        // Kill all tmux servers and delete the nodes dbs. Optionally clear logs.
468        let mut command = vec!["(tmux kill-server || true)".into()];
469        for path in self.protocol_commands.db_directories() {
470            command.push(format!("(rm -rf {} || true)", path.display()));
471        }
472        if cleanup {
473            command.push("(rm -rf ~/*log* || true)".into());
474        }
475        let command = command.join(" ; ");
476
477        // Execute the deletion on all machines.
478        let active = self.instances().into_iter().filter(|x| x.is_active());
479        let context = CommandContext::default();
480        self.ssh_manager.execute(active, command, context).await?;
481
482        display::done();
483        Ok(())
484    }
485
486    /// Deploy the nodes.
487    pub async fn run_nodes(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
488        display::action("Deploying validators");
489
490        // Boot one node per instance.
491        self.boot_nodes(self.node_instances.clone(), parameters)
492            .await?;
493
494        display::done();
495        Ok(())
496    }
497
498    /// Deploy the load generators.
499    pub async fn run_clients(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
500        if self.settings.use_fullnode_for_execution {
501            display::action("Setting up full nodes");
502
503            // Deploy the fullnodes.
504            let targets = self
505                .protocol_commands
506                .fullnode_command(self.client_instances.clone(), parameters);
507
508            let repo = self.settings.repository_name();
509            let context = CommandContext::new()
510                .run_background("fullnode".into())
511                .with_log_file("~/fullnode.log".into())
512                .with_execute_from_path(repo.into());
513            self.ssh_manager
514                .execute_per_instance(targets, context)
515                .await?;
516
517            // Wait until all fullnodes are fully started by querying the latest checkpoint
518            // (otherwise clients might fail when a fullnode is not listening yet).
519            display::action("Await fullnode ready...");
520            let commands = self
521                .client_instances
522                .iter()
523                .cloned()
524                .map(|i| (i, "curl http://127.0.0.1:9000 -H 'Content-Type: application/json' -d '{\"jsonrpc\":\"2.0\",\"method\":\"iota_getLatestCheckpointSequenceNumber\",\"params\":[],\"id\":1}'".to_owned()));
525            self.ssh_manager.wait_for_success(commands).await;
526
527            display::done();
528        }
529
530        display::action("Setting up load generators");
531
532        // Deploy the load generators.
533        let targets = self
534            .protocol_commands
535            .client_command(self.client_instances.clone(), parameters);
536
537        let repo = self.settings.repository_name();
538        let context = CommandContext::new()
539            .run_background("client".into())
540            .with_log_file("~/client.log".into())
541            .with_execute_from_path(repo.into());
542        self.ssh_manager
543            .execute_per_instance(targets, context)
544            .await?;
545
546        // Wait until all load generators are reachable.
547        let commands = self
548            .protocol_commands
549            .clients_metrics_command(self.client_instances.clone(), parameters);
550        self.ssh_manager.wait_for_success(commands).await;
551
552        display::done();
553        Ok(())
554    }
555
556    /// Collect metrics from the load generators.
557    pub async fn run(
558        &self,
559        benchmark_dir: &Path,
560        parameters: &BenchmarkParameters<T>,
561    ) -> TestbedResult<MeasurementsCollection<T>> {
562        display::action(format!(
563            "Scraping metrics (at least {}s)",
564            parameters.duration.as_secs()
565        ));
566
567        // Regularly scrape the client
568        let metrics_commands = self
569            .protocol_commands
570            .clients_metrics_command(self.client_instances.clone(), parameters);
571
572        let mut aggregator = MeasurementsCollection::new(&self.settings, parameters.clone());
573        let mut metrics_interval = time::interval(self.scrape_interval);
574        metrics_interval.tick().await; // The first tick returns immediately.
575
576        let faults_type = parameters.faults.clone();
577        let mut faults_schedule =
578            CrashRecoverySchedule::new(faults_type, self.node_instances.clone());
579        let mut faults_interval = time::interval(self.crash_interval);
580        faults_interval.tick().await; // The first tick returns immediately.
581
582        let start = Instant::now();
583        loop {
584            tokio::select! {
585                // Scrape metrics.
586                now = metrics_interval.tick() => {
587                    let elapsed = now.duration_since(start).as_secs_f64().ceil() as u64;
588                    display::status(format!("{elapsed}s"));
589
590                    let stdio = self
591                        .ssh_manager
592                        .execute_per_instance(metrics_commands.clone(), CommandContext::default())
593                        .await?;
594                    for (i, (stdout, _stderr)) in stdio.iter().enumerate() {
595                        display::action(format!("Processing metrics from client {}\n", i));
596                        let measurement = Measurement::from_prometheus::<P>(stdout);
597                        aggregator.add(i, measurement);
598                    }
599
600                    if elapsed > parameters.duration .as_secs() {
601                        break;
602                    }
603                },
604
605                // Kill and recover nodes according to the input schedule.
606                _ = faults_interval.tick() => {
607                    let  action = faults_schedule.update();
608                    if !action.kill.is_empty() {
609                        self.ssh_manager.kill(action.kill.clone(), "node").await?;
610                    }
611                    if !action.boot.is_empty() {
612                        self.boot_nodes(action.boot.clone(), parameters).await?;
613                    }
614                    if !action.kill.is_empty() || !action.boot.is_empty() {
615                        display::newline();
616                        display::config("Testbed update", action);
617                    }
618                }
619            }
620        }
621
622        aggregator.save(benchmark_dir);
623
624        if self.settings.enable_flamegraph {
625            let flamegraphs_dir = benchmark_dir.join("flamegraphs");
626            fs::create_dir_all(&flamegraphs_dir).expect("Failed to create flamegraphs directory");
627
628            self.fetch_flamegraphs(
629                parameters,
630                self.instances_without_metrics().clone(),
631                &flamegraphs_dir,
632                "?svg=true",
633                "flamegraph",
634            )
635            .await?;
636
637            if self
638                .settings
639                .build_configs
640                .get("iota-node")
641                .is_some_and(|config| config.features.iter().any(|f| f == "flamegraph-alloc"))
642            {
643                self.fetch_flamegraphs(
644                    parameters,
645                    self.instances_without_metrics().clone(),
646                    &flamegraphs_dir,
647                    "?svg=true&mem=true",
648                    "flamegraph-alloc",
649                )
650                .await?;
651            }
652        }
653
654        display::done();
655        Ok(aggregator)
656    }
657
658    async fn fetch_flamegraphs(
659        &self,
660        parameters: &BenchmarkParameters<T>,
661        nodes: Vec<Instance>,
662        path: &Path,
663        query: &str,
664        file_prefix: &str,
665    ) -> TestbedResult<()> {
666        let flamegraph_commands = self
667            .protocol_commands
668            .nodes_flamegraph_command(nodes, parameters, query);
669        let stdio = self
670            .ssh_manager
671            .execute_per_instance(flamegraph_commands, CommandContext::default())
672            .await?;
673        for (i, (stdout, stderr)) in stdio.into_iter().enumerate() {
674            if !stdout.is_empty() {
675                let file = path.join(format!("{file_prefix}-{i}.svg"));
676                fs::write(file, stdout).unwrap();
677            }
678            if !stderr.is_empty() {
679                let file = path.join(format!("{file_prefix}-{i}.log"));
680                fs::write(file, stderr).unwrap();
681            }
682        }
683        Ok(())
684    }
685
686    /// Download the log files from the nodes and clients.
687    pub async fn download_logs(&self, benchmark_dir: &Path) -> TestbedResult<LogsAnalyzer> {
688        // Create a logs sub-directory for this run.
689        let path = benchmark_dir.join("logs");
690        fs::create_dir_all(&path).expect("Failed to create logs directory");
691
692        // NOTE: Our ssh library does not seem to be able to transfers files in parallel
693        // reliably.
694        let mut log_parsers = Vec::new();
695
696        // Download the clients log files.
697        display::action("Downloading clients logs");
698        for (i, instance) in self.client_instances.iter().enumerate() {
699            display::status(format!("{}/{}", i + 1, self.client_instances.len()));
700
701            let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
702            let client_log_content = connection.download("client.log").await?;
703
704            let client_log_file = path.join(format!("client-{i}.log"));
705            fs::write(client_log_file, client_log_content.as_bytes())
706                .expect("Cannot write log file");
707
708            let mut log_parser = LogsAnalyzer::default();
709            log_parser.set_client_errors(&client_log_content);
710            log_parsers.push(log_parser)
711        }
712        display::done();
713
714        display::action("Downloading nodes logs");
715        for (i, instance) in self.node_instances.iter().enumerate() {
716            display::status(format!("{}/{}", i + 1, self.node_instances.len()));
717
718            let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
719            let node_log_content = connection.download("node.log").await?;
720
721            let node_log_file = path.join(format!("node-{i}.log"));
722            fs::write(node_log_file, node_log_content.as_bytes()).expect("Cannot write log file");
723
724            let mut log_parser = LogsAnalyzer::default();
725            log_parser.set_node_errors(&node_log_content);
726            log_parsers.push(log_parser)
727        }
728        display::done();
729
730        Ok(LogsAnalyzer::aggregate(log_parsers))
731    }
732
733    /// Run all the benchmarks specified by the benchmark generator.
734    pub async fn run_benchmarks(
735        &mut self,
736        mut generator: BenchmarkParametersGenerator<T>,
737    ) -> TestbedResult<()> {
738        display::header("Preparing testbed");
739        display::config("Commit", format!("'{}'", &self.settings.repository.commit));
740        display::newline();
741
742        // Cleanup the testbed (in case the previous run was not completed).
743        self.cleanup(true).await?;
744
745        let commit: PathBuf = self.settings.repository.commit.replace("/", "_").into();
746        let timestamp = chrono::Local::now().format("%y%m%d_%H%M%S");
747
748        // Update the software on all instances.
749        if !self.skip_testbed_update {
750            self.install().await?;
751            self.update().await?;
752        }
753
754        // Run all benchmarks.
755        let mut i = 1;
756        let mut latest_committee_size = 0;
757        while let Some(parameters) = generator.next() {
758            display::header(format!("Starting benchmark {i}"));
759            display::config("Benchmark type", &parameters.benchmark_type);
760            display::config("Parameters", &parameters);
761            display::newline();
762
763            let benchmark_dir: PathBuf = [
764                &self.settings.results_dir,
765                &commit,
766                &format!("{timestamp}-{parameters:?}").into(),
767            ]
768            .iter()
769            .collect();
770
771            // Cleanup the testbed (in case the previous run was not completed).
772            self.cleanup(true).await?;
773            // Create benchmark directory.
774            fs::create_dir_all(&benchmark_dir).expect("Failed to create benchmark directory");
775
776            // Initialize logger for this benchmark run
777            let log_file = benchmark_dir.join("logs.txt");
778            crate::logger::init_logger(&log_file).expect("Failed to initialize logger");
779
780            let benchmark_result = async {
781                // Start the instance monitoring tools.
782                self.start_monitoring(&parameters).await?;
783
784                // Configure all instances (if needed).
785                if !self.skip_testbed_configuration && latest_committee_size != parameters.nodes {
786                    self.configure(&parameters).await?;
787                    latest_committee_size = parameters.nodes;
788                }
789
790                // Deploy the validators.
791                self.run_nodes(&parameters).await?;
792
793                // Deploy the load generators.
794                self.run_clients(&parameters).await?;
795
796                // Wait for the benchmark to terminate. Then save the results and print a
797                // summary.
798                let aggregator = self.run(&benchmark_dir, &parameters).await?;
799                aggregator.display_summary();
800                generator.register_result(aggregator);
801                // drop(monitor);
802
803                // Kill the nodes and clients (without deleting the log files).
804                self.cleanup(false).await?;
805
806                // Download the log files.
807                if self.log_processing {
808                    let error_counter = self.download_logs(&benchmark_dir).await?;
809                    error_counter.print_summary();
810                }
811
812                TestbedResult::Ok(())
813            }
814            .await;
815
816            // Close the logger for this benchmark run
817            crate::logger::close_logger();
818
819            // Propagate any error that occurred
820            benchmark_result?;
821
822            i += 1;
823        }
824
825        display::header("Benchmark completed");
826        Ok(())
827    }
828}