iota_aws_orchestrator/
orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashSet,
7    fs::{self},
8    marker::PhantomData,
9    path::Path,
10    time::Duration,
11};
12
13use chrono;
14use futures;
15use tokio::time::{self, Instant};
16
17use crate::{
18    benchmark::{BenchmarkParameters, BenchmarkParametersGenerator, BenchmarkType},
19    build_cache::BuildCacheService,
20    client::Instance,
21    display,
22    error::{TestbedError, TestbedResult},
23    faults::CrashRecoverySchedule,
24    logs::LogsAnalyzer,
25    measurement::MeasurementsCollection,
26    monitor::{Monitor, Prometheus},
27    net_latency::NetworkLatencyCommandBuilder,
28    protocol::{ProtocolCommands, ProtocolMetrics},
29    settings::{BuildGroups, Settings, build_cargo_command},
30    ssh::{CommandContext, CommandStatus, SshConnectionManager},
31};
32
33/// An orchestrator to run benchmarks on a testbed.
34pub struct Orchestrator<P, T> {
35    /// The testbed's settings.
36    settings: Settings,
37    /// Node instances
38    node_instances: Vec<Instance>,
39    // Client (Load Generator) instances
40    client_instances: Vec<Instance>,
41    // Dedicated Metrics instance
42    metrics_instance: Option<Instance>,
43    /// The type of the benchmark parameters.
44    benchmark_type: PhantomData<T>,
45    /// Provider-specific commands to install on the instance.
46    instance_setup_commands: Vec<String>,
47    /// Protocol-specific commands generator to generate the protocol
48    /// configuration files, boot clients and nodes, etc.
49    protocol_commands: P,
50    /// The interval between measurements collection.
51    scrape_interval: Duration,
52    /// The interval to crash nodes.
53    crash_interval: Duration,
54    /// Handle ssh connections to instances.
55    ssh_manager: SshConnectionManager,
56    /// Whether to skip testbed updates before running benchmarks.
57    skip_testbed_update: bool,
58    /// Whether to skip testbed configuration before running benchmarks.
59    skip_testbed_configuration: bool,
60    /// Whether to downloading and analyze the client and node log files.
61    log_processing: bool,
62    /// Number of instances running only load generators (not nodes). If this
63    /// value is set to zero, the orchestrator runs a load generate
64    /// collocated with each node.
65    dedicated_clients: usize,
66    /// Whether to forgo a grafana and prometheus instance and leave the testbed
67    /// unmonitored.
68    skip_monitoring: bool,
69}
70
71impl<P, T> Orchestrator<P, T> {
72    /// The default interval between measurements collection.
73    const DEFAULT_SCRAPE_INTERVAL: Duration = Duration::from_secs(15);
74    /// The default interval to crash nodes.
75    const DEFAULT_CRASH_INTERVAL: Duration = Duration::from_secs(60);
76
77    /// Make a new orchestrator.
78    pub fn new(
79        settings: Settings,
80        node_instances: Vec<Instance>,
81        client_instances: Vec<Instance>,
82        metrics_instance: Option<Instance>,
83        instance_setup_commands: Vec<String>,
84        protocol_commands: P,
85        ssh_manager: SshConnectionManager,
86    ) -> Self {
87        Self {
88            settings,
89            node_instances,
90            client_instances,
91            metrics_instance,
92            benchmark_type: PhantomData,
93            instance_setup_commands,
94            protocol_commands,
95            ssh_manager,
96            scrape_interval: Self::DEFAULT_SCRAPE_INTERVAL,
97            crash_interval: Self::DEFAULT_CRASH_INTERVAL,
98            skip_testbed_update: false,
99            skip_testbed_configuration: false,
100            log_processing: false,
101            dedicated_clients: 0,
102            skip_monitoring: false,
103        }
104    }
105
106    /// Set interval between measurements collection.
107    pub fn with_scrape_interval(mut self, scrape_interval: Duration) -> Self {
108        self.scrape_interval = scrape_interval;
109        self
110    }
111
112    /// Set interval with which to crash nodes.
113    pub fn with_crash_interval(mut self, crash_interval: Duration) -> Self {
114        self.crash_interval = crash_interval;
115        self
116    }
117
118    /// Set whether to skip testbed updates before running benchmarks.
119    pub fn skip_testbed_updates(mut self, skip_testbed_update: bool) -> Self {
120        self.skip_testbed_update = skip_testbed_update;
121        self
122    }
123
124    /// Whether to skip testbed configuration before running benchmarks.
125    pub fn skip_testbed_configuration(mut self, skip_testbed_configuration: bool) -> Self {
126        self.skip_testbed_configuration = skip_testbed_configuration;
127        self
128    }
129
130    /// Set whether to download and analyze the client and node log files.
131    pub fn with_log_processing(mut self, log_processing: bool) -> Self {
132        self.log_processing = log_processing;
133        self
134    }
135
136    /// Set the number of instances running exclusively load generators.
137    pub fn with_dedicated_clients(mut self, dedicated_clients: usize) -> Self {
138        self.dedicated_clients = dedicated_clients;
139        self
140    }
141
142    /// Set whether to boot grafana on the local machine to monitor the nodes.
143    pub fn skip_monitoring(mut self, skip_monitoring: bool) -> Self {
144        self.skip_monitoring = skip_monitoring;
145        self
146    }
147
148    pub fn instances_without_metrics(&self) -> Vec<Instance> {
149        let mut instances = self.node_instances.clone();
150
151        if self.dedicated_clients > 0 {
152            instances.extend(self.client_instances.clone());
153        }
154        instances
155    }
156
157    /// Returns all the instances combined
158    pub fn instances(&self) -> Vec<Instance> {
159        let mut instances = self.instances_without_metrics();
160        if let Some(metrics_instance) = &self.metrics_instance {
161            instances.push(metrics_instance.clone());
162        }
163        instances
164    }
165}
166
167impl<P: ProtocolCommands<T> + ProtocolMetrics, T: BenchmarkType> Orchestrator<P, T> {
168    /// Boot one node per instance.
169    async fn boot_nodes(
170        &self,
171        instances: Vec<Instance>,
172        parameters: &BenchmarkParameters<T>,
173    ) -> TestbedResult<()> {
174        if parameters.use_internal_ip_address {
175            if let Some(latency_topology) = parameters.latency_topology.clone() {
176                let latency_commands = NetworkLatencyCommandBuilder::new(&instances)
177                    .with_perturbation_spec(parameters.perturbation_spec.clone())
178                    .with_topology_layout(latency_topology)
179                    .with_max_latency(parameters.maximum_latency)
180                    .build_network_latency_matrix();
181                self.ssh_manager
182                    .execute_per_instance(latency_commands, CommandContext::default())
183                    .await?;
184            }
185        }
186
187        // Run one node per instance.
188        let targets = self
189            .protocol_commands
190            .node_command(instances.clone(), parameters);
191
192        let repo = self.settings.repository_name();
193        let node_context = CommandContext::new()
194            .run_background("node".into())
195            .with_log_file("~/node.log".into())
196            .with_execute_from_path(repo.into());
197        self.ssh_manager
198            .execute_per_instance(targets, node_context)
199            .await?;
200
201        // Wait until all nodes are reachable.
202        let commands = self
203            .protocol_commands
204            .nodes_metrics_command(instances.clone(), parameters.use_internal_ip_address);
205        self.wait_for_success(commands, &parameters.benchmark_dir)
206            .await;
207
208        Ok(())
209    }
210
211    /// Install the codebase and its dependencies on the testbed.
212    pub async fn install(&self) -> TestbedResult<()> {
213        display::action("Installing dependencies on all machines");
214
215        let working_dir = self.settings.working_dir.display();
216        let url = &self.settings.repository.url;
217
218        let use_precompiled_binaries = self.settings.build_cache_enabled();
219
220        let working_dir_cmd = format!("mkdir -p {working_dir}");
221        let git_clone_cmd = format!("(git clone --depth=1 {url} || true)");
222
223        let mut basic_commands = vec![
224            "sudo apt-get update",
225            "sudo apt-get -y upgrade",
226            "sudo apt-get -y autoremove",
227            // Disable "pending kernel upgrade" message.
228            "sudo apt-get -y remove needrestart",
229            "sudo apt-get -y install curl git ca-certificates",
230            // Increase open file limits to prevent "Too many open files" errors
231            "echo '* soft nofile 1048576' | sudo tee -a /etc/security/limits.conf",
232            "echo '* hard nofile 1048576' | sudo tee -a /etc/security/limits.conf",
233            "echo 'root soft nofile 1048576' | sudo tee -a /etc/security/limits.conf",
234            "echo 'root hard nofile 1048576' | sudo tee -a /etc/security/limits.conf",
235            // Set system-wide file descriptor limits
236            "echo 'fs.file-max = 2097152' | sudo tee -a /etc/sysctl.conf",
237            "sudo sysctl -p",
238            // Set limits for current session
239            "ulimit -n 1048576 || true",
240            // set network buffer sizes
241            "sudo sysctl -w net.core.rmem_max=104857600",
242            "sudo sysctl -w net.core.wmem_max=104857600",
243            "sudo sysctl -w net.ipv4.tcp_rmem=\"8192 262144 104857600\"",
244            "sudo sysctl -w net.ipv4.tcp_wmem=\"8192 262144 104857600\"",
245            // Create the working directory.
246            working_dir_cmd.as_str(),
247            // Clone the repo.
248            git_clone_cmd.as_str(),
249        ];
250
251        // Collect all unique non-"stable" rust toolchains from build configs
252        let toolchain_cmds: Vec<String> = if !use_precompiled_binaries {
253            self.settings
254                .build_configs
255                .values()
256                .filter_map(|config| {
257                    config
258                        .toolchain
259                        .as_ref()
260                        .filter(|t| t.as_str() != "stable")
261                        .cloned()
262                })
263                .collect::<HashSet<String>>()
264                .into_iter()
265                .map(|toolchain| format!("rustup toolchain install {toolchain}"))
266                .collect()
267        } else {
268            vec![]
269        };
270
271        if !use_precompiled_binaries {
272            // If not using precompiled binaries, install rustup.
273            basic_commands.extend([
274                // The following dependencies:
275                // * build-essential: prevent the error: [error: linker `cc` not found].
276                "sudo apt-get -y install build-essential cmake clang lld protobuf-compiler pkg-config nvme-cli",
277                // Install rust (non-interactive).
278                "curl --proto \"=https\" --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y",
279                "echo \"source $HOME/.cargo/env\" | tee -a ~/.bashrc",
280                "source $HOME/.cargo/env",
281                "rustup default stable",
282            ]);
283
284            // Add the toolchain install commands to basic_commands
285            for cmd in &toolchain_cmds {
286                basic_commands.push(cmd.as_str());
287            }
288        } else {
289            // Create cargo env file if using precompiled binaries, so that the source
290            // commands don't fail.
291            basic_commands.push("mkdir -p $HOME/.cargo/ && touch $HOME/.cargo/env");
292        }
293
294        let cloud_provider_specific_dependencies: Vec<_> = self
295            .instance_setup_commands
296            .iter()
297            .map(|x| x.as_str())
298            .collect();
299
300        let protocol_dependencies = self.protocol_commands.protocol_dependencies();
301
302        let command = [
303            &basic_commands[..],
304            &Prometheus::install_commands(),
305            &cloud_provider_specific_dependencies[..],
306            &protocol_dependencies[..],
307        ]
308        .concat()
309        .join(" && ");
310
311        self.ssh_manager
312            .execute(self.instances(), command, CommandContext::default())
313            .await?;
314
315        if !self.skip_monitoring {
316            let metrics_instance = self
317                .metrics_instance
318                .clone()
319                .expect("No metrics instance available");
320            let monitor_command = Monitor::dependencies().join(" && ");
321            self.ssh_manager
322                .execute(
323                    vec![metrics_instance],
324                    monitor_command,
325                    CommandContext::default(),
326                )
327                .await?;
328        }
329
330        display::done();
331        Ok(())
332    }
333
334    /// Reload prometheus on all instances.
335    pub async fn start_monitoring(
336        &self,
337        use_internal_ip_address: bool,
338        timestamp: &str,
339    ) -> TestbedResult<()> {
340        if let Some(instance) = &self.metrics_instance {
341            display::action("Configuring monitoring instance");
342
343            let monitor = Monitor::new(
344                instance.clone(),
345                self.client_instances.clone(),
346                self.node_instances.clone(),
347                self.ssh_manager.clone(),
348            );
349            // When prometheus snapshots are enabled, pass the timestamp as snapshot
350            // directory
351            let snapshot_dir = self
352                .settings
353                .enable_prometheus_snapshots
354                .then_some(timestamp);
355            monitor
356                .start_prometheus(
357                    &self.protocol_commands,
358                    use_internal_ip_address,
359                    snapshot_dir,
360                )
361                .await?;
362            monitor.start_grafana().await?;
363
364            display::done();
365            display::config("Grafana address", monitor.grafana_address());
366            display::newline();
367        }
368
369        Ok(())
370    }
371
372    /// Update all instances to use the version of the codebase specified in the
373    /// setting file.
374    pub async fn update(&self) -> TestbedResult<()> {
375        display::action("Updating all instances");
376
377        let commit = &self.settings.repository.commit;
378        let repo_name = self.settings.repository_name();
379        let build_groups = self.settings.build_groups();
380
381        // we need to fetch and checkout the commit even if using precompiled binaries
382        // because the iota-framework submodule, the examples/move folder, or the
383        // dev-tools/grafana-local folder might be used.
384        let git_update_command = [
385            &format!("git fetch origin {commit} --force"),
386            &format!("(git reset --hard origin/{commit} || git checkout --force {commit})"),
387            "git clean -fd -e target",
388        ]
389        .join(" && ");
390
391        let id = "git update";
392        let context = CommandContext::new()
393            .run_background(id.into())
394            .with_execute_from_path(repo_name.clone().into());
395
396        // Execute and wait for the git update command on all instances (including
397        // metrics)
398        display::action(format!("update command: {git_update_command}"));
399        self.ssh_manager
400            .execute(self.instances(), git_update_command, context)
401            .await?;
402        self.ssh_manager
403            .wait_for_command(self.instances(), id, CommandStatus::Terminated)
404            .await?;
405
406        // Check if build cache is enabled
407        if self.settings.build_cache_enabled() {
408            display::action("Using build cache for binary distribution");
409            let build_cache_service = BuildCacheService::new(&self.settings, &self.ssh_manager);
410            build_cache_service
411                .update_with_build_cache(
412                    commit,
413                    &build_groups,
414                    self.instances_without_metrics(),
415                    repo_name.clone(),
416                )
417                .await?;
418        } else {
419            self.update_with_local_build(build_groups).await?;
420        }
421
422        display::done();
423        Ok(())
424    }
425
426    /// Update instances with local build (fallback, if build cache is not used)
427    /// Execute and wait for the cargo build command on all instances except the
428    /// metrics one. This requires compiling the codebase in release
429    /// (which may take a long time) so we run the command in the background
430    /// to avoid keeping alive many ssh connections for too long.
431    async fn update_with_local_build(&self, build_groups: BuildGroups) -> TestbedResult<()> {
432        let without_metrics = self.instances_without_metrics();
433        let repo_name = self.settings.repository_name();
434
435        // Build each group separately
436        for (i, (group, binary_names)) in build_groups.iter().enumerate() {
437            // Build arguments
438            let build_command = build_cargo_command(
439                "build",
440                group.toolchain.clone(),
441                group.features.clone(),
442                binary_names,
443                &[] as &[&str],
444                &[] as &[&str],
445            );
446
447            // print the full command for logging
448            display::action(format!(
449                "Running build command {}/{}: \"{build_command}\" in \"{repo_name}\"",
450                i + 1,
451                build_groups.len()
452            ));
453
454            let context = CommandContext::new().with_execute_from_path(repo_name.clone().into());
455
456            self.ssh_manager
457                .execute(without_metrics.clone(), build_command, context)
458                .await?;
459        }
460
461        Ok(())
462    }
463
464    /// Configure the instances with the appropriate configuration files.
465    pub async fn configure(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
466        display::action("Configuring instances");
467
468        // Generate the genesis configuration file and the keystore allowing access to
469        // gas objects.
470        let command = self
471            .protocol_commands
472            .genesis_command(self.node_instances.iter(), parameters);
473        display::action(format!("\nGenesis command: {command}\n\n"));
474        let repo_name = self.settings.repository_name();
475        let context = CommandContext::new().with_execute_from_path(repo_name.into());
476        self.ssh_manager
477            .execute(self.instances_without_metrics(), command, context)
478            .await?;
479
480        display::done();
481        Ok(())
482    }
483
484    /// Cleanup all instances and optionally delete their log files.
485    pub async fn cleanup(&self, cleanup: bool) -> TestbedResult<()> {
486        display::action("Cleaning up testbed");
487
488        // Kill all tmux servers and delete the nodes dbs. Optionally clear logs.
489        let mut command = vec!["(tmux kill-server || true)".into()];
490        for path in self.protocol_commands.db_directories() {
491            command.push(format!("(rm -rf {} || true)", path.display()));
492        }
493        if cleanup {
494            command.push("(rm -rf ~/*log* || true)".into());
495        }
496        let command = command.join(" ; ");
497
498        // Execute the deletion on all machines.
499        let active = self.instances().into_iter().filter(|x| x.is_active());
500        let context = CommandContext::default();
501        self.ssh_manager.execute(active, command, context).await?;
502
503        display::done();
504        Ok(())
505    }
506
507    /// Deploy the nodes.
508    pub async fn run_nodes(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
509        display::action("Deploying validators");
510
511        // Boot one node per instance.
512        self.boot_nodes(self.node_instances.clone(), parameters)
513            .await?;
514
515        display::done();
516        Ok(())
517    }
518
519    /// Deploy the load generators.
520    pub async fn run_clients(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
521        if self.settings.use_fullnode_for_execution {
522            display::action("Setting up full nodes");
523
524            // Deploy the fullnodes.
525            let targets = self
526                .protocol_commands
527                .fullnode_command(self.client_instances.clone(), parameters);
528
529            let repo = self.settings.repository_name();
530            let context = CommandContext::new()
531                .run_background("fullnode".into())
532                .with_log_file("~/fullnode.log".into())
533                .with_execute_from_path(repo.into());
534            self.ssh_manager
535                .execute_per_instance(targets, context)
536                .await?;
537
538            // Wait until all fullnodes are fully started by querying the latest checkpoint
539            // (otherwise clients might fail when a fullnode is not listening yet).
540            display::action("Await fullnode ready...");
541            let commands = self
542                .client_instances
543                .iter()
544                .cloned()
545                .map(|i| (i, "curl http://127.0.0.1:9000 -H 'Content-Type: application/json' -d '{\"jsonrpc\":\"2.0\",\"method\":\"iota_getLatestCheckpointSequenceNumber\",\"params\":[],\"id\":1}'".to_owned()));
546            self.ssh_manager.wait_for_success(commands).await;
547
548            display::done();
549        }
550
551        display::action("Setting up load generators");
552
553        // Deploy the load generators.
554        let targets = self
555            .protocol_commands
556            .client_command(self.client_instances.clone(), parameters);
557
558        let repo = self.settings.repository_name();
559        let context = CommandContext::new()
560            .run_background("client".into())
561            .with_log_file("~/client.log".into())
562            .with_execute_from_path(repo.into());
563        self.ssh_manager
564            .execute_per_instance(targets, context)
565            .await?;
566
567        // Wait until all load generators are reachable.
568        let commands = self.protocol_commands.clients_metrics_command(
569            self.client_instances.clone(),
570            parameters.use_internal_ip_address,
571        );
572        self.wait_for_success(commands, &parameters.benchmark_dir)
573            .await;
574
575        // Start background metrics collection service on each client instance.
576        display::action("\n\nStarting background metrics collection service");
577        let metrics_script =
578            self.metrics_collection_script_command(parameters.use_internal_ip_address);
579        let metrics_context = CommandContext::new().run_background("metrics-collector".into());
580        self.ssh_manager
581            .execute_per_instance(metrics_script.clone(), metrics_context)
582            .await?;
583
584        display::done();
585        Ok(())
586    }
587
588    /// Create a background metrics collection script that runs on each
589    /// client instance.
590    fn metrics_collection_script_command(
591        &self,
592        use_internal_ip_address: bool,
593    ) -> Vec<(Instance, String)> {
594        // We need to get the metrics path from clients_metrics_command
595        self.protocol_commands
596            .clients_metrics_command(self.client_instances.clone(), use_internal_ip_address)
597            .into_iter()
598            .map(|(instance, cmd)| {
599                (
600                    instance,
601                    format!(
602                        r#"while true; do
603    {cmd} >> ~/metrics.log 2>&1
604    sleep 15
605done"#
606                    ),
607                )
608            })
609            .collect::<Vec<_>>()
610    }
611
612    /// Collect metrics from the load generators.
613    pub async fn run(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
614        display::action(format!(
615            "Running benchmark (at least {}s)",
616            parameters.duration.as_secs()
617        ));
618
619        let mut metrics_interval = time::interval(Duration::from_secs(5));
620        metrics_interval.tick().await; // The first tick returns immediately.
621
622        let faults_type = parameters.faults.clone();
623        let mut faults_schedule =
624            CrashRecoverySchedule::new(faults_type, self.node_instances.clone());
625        let mut faults_interval = time::interval(self.crash_interval);
626        faults_interval.tick().await; // The first tick returns immediately.
627
628        let start = Instant::now();
629        loop {
630            tokio::select! {
631                // Update elapsed time display.
632                _ = metrics_interval.tick() => {
633                    let elapsed = Instant::now().duration_since(start).as_secs_f64().ceil() as u64;
634                    display::status(format!("{elapsed}s"));
635
636                    if elapsed > parameters.duration.as_secs() {
637                        break;
638                    }
639                },
640
641                // Kill and recover nodes according to the input schedule.
642                _ = faults_interval.tick() => {
643                    let  action = faults_schedule.update();
644                    if !action.kill.is_empty() {
645                        self.ssh_manager.kill(action.kill.clone(), "node").await?;
646                    }
647                    if !action.boot.is_empty() {
648                        self.boot_nodes(action.boot.clone(), parameters).await?;
649                    }
650                    if !action.kill.is_empty() || !action.boot.is_empty() {
651                        display::newline();
652                        display::config("Testbed update", action);
653                    }
654                }
655            }
656        }
657
658        if self.settings.enable_flamegraph {
659            let flamegraphs_dir = parameters.benchmark_dir.join("flamegraphs");
660            fs::create_dir_all(&flamegraphs_dir).expect("Failed to create flamegraphs directory");
661
662            self.fetch_flamegraphs(
663                self.instances_without_metrics().clone(),
664                &flamegraphs_dir,
665                "?svg=true",
666                "flamegraph",
667            )
668            .await?;
669
670            if self
671                .settings
672                .build_configs
673                .get("iota-node")
674                .is_some_and(|config| config.features.iter().any(|f| f == "flamegraph-alloc"))
675            {
676                self.fetch_flamegraphs(
677                    self.instances_without_metrics().clone(),
678                    &flamegraphs_dir,
679                    "?svg=true&mem=true",
680                    "flamegraph-alloc",
681                )
682                .await?;
683            }
684        }
685
686        display::done();
687        Ok(())
688    }
689
690    async fn fetch_flamegraphs(
691        &self,
692        nodes: Vec<Instance>,
693        path: &Path,
694        query: &str,
695        file_prefix: &str,
696    ) -> TestbedResult<()> {
697        let flamegraph_commands = self
698            .protocol_commands
699            .nodes_flamegraph_command(nodes, query);
700        let stdio = self
701            .ssh_manager
702            .execute_per_instance(flamegraph_commands, CommandContext::default())
703            .await?;
704        for (i, (stdout, stderr)) in stdio.into_iter().enumerate() {
705            if !stdout.is_empty() {
706                let file = path.join(format!("{file_prefix}-{i}.svg"));
707                fs::write(file, stdout).unwrap();
708            }
709            if !stderr.is_empty() {
710                let file = path.join(format!("{file_prefix}-{i}.log"));
711                fs::write(file, stderr).unwrap();
712            }
713        }
714        Ok(())
715    }
716
717    pub async fn wait_for_success<I, S>(&self, instances: I, _benchmark_dir: &Path)
718    where
719        I: IntoIterator<Item = (Instance, S)> + Clone,
720        S: Into<String> + Send + 'static + Clone,
721    {
722        match self
723            .ssh_manager
724            .execute_per_instance(
725                instances.clone(),
726                CommandContext::default().with_retries(10),
727            )
728            .await
729        {
730            Ok(_) => {}
731            Err(e) => {
732                // Handle failure case
733                panic!("Command execution failed on one or more instances: {e}");
734            }
735        }
736    }
737
738    /// Download the metrics logs from clients.
739    pub async fn download_metrics_logs(&self, benchmark_dir: &Path) -> TestbedResult<()> {
740        let path = benchmark_dir.join("logs");
741        fs::create_dir_all(&path).expect("Failed to create logs directory");
742
743        // Download the clients log files and metrics.
744        display::action("Downloading metrics logs");
745        for (i, instance) in self.client_instances.iter().enumerate() {
746            display::status(format!("{}/{}", i + 1, self.client_instances.len()));
747
748            let _: TestbedResult<()> = async {
749                let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
750
751                // Download metrics file if it exists
752                match connection.download("metrics.log").await {
753                    Ok(metrics_content) => {
754                        let metrics_file = path.join(format!("metrics-{i}.log"));
755                        fs::write(metrics_file, metrics_content.as_bytes())
756                            .expect("Cannot write metrics file");
757                    }
758                    Err(_) => {
759                        display::warn(format!("Metrics file not found for client {i}"));
760                    }
761                }
762                Ok(())
763            }
764            .await;
765        }
766        display::done();
767
768        Ok(())
769    }
770
771    pub async fn download_prometheus_snapshot(
772        &self,
773        benchmark_dir: &Path,
774        timestamp: &str,
775    ) -> TestbedResult<()> {
776        if let Some(instance) = &self.metrics_instance {
777            display::action("Taking prometheus snapshot");
778            let command = Prometheus::take_snapshot_command();
779
780            // prometheus snapshot response structure
781            #[derive(serde::Deserialize)]
782            struct ResponseData {
783                // snapshot directory name
784                name: String,
785            }
786            #[derive(serde::Deserialize)]
787            struct Response {
788                #[allow(dead_code)]
789                status: String,
790                data: ResponseData,
791            }
792
793            let response = self
794                .ssh_manager
795                .execute(
796                    std::iter::once(instance.clone()),
797                    command.clone(),
798                    CommandContext::default(),
799                )
800                .await?
801                .into_iter()
802                .next()
803                .ok_or_else(|| {
804                    TestbedError::SshCommandFailed(
805                        instance.clone(),
806                        command.clone(),
807                        "No response from command".into(),
808                    )
809                })?
810                .0;
811            let response: Response = serde_json::from_str(&response).map_err(|e| {
812                TestbedError::SshCommandFailed(
813                    instance.clone(),
814                    command.clone(),
815                    format!("Failed to parse response: {e}"),
816                )
817            })?;
818            display::done();
819
820            let snapshot_name = response.data.name;
821            display::config("Created prometheus snapshot", &snapshot_name);
822            display::newline();
823
824            display::action("Downloading prometheus snapshot");
825            let snapshot_dir = benchmark_dir.join("snapshot").display().to_string();
826            let rsync_args = vec![
827                // options: recursive, verbose, compress, override ssh to use key file and disable
828                // host key checking
829                "-rvze".to_string(),
830                // let rsync use ssh with the specified private key file
831                format!(
832                    "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -i {}",
833                    self.settings.ssh_private_key_file.display()
834                ),
835                // remote snapshot path: /var/lib/prometheus/<timestamp>/snapshots/<snapshot_name>
836                format!(
837                    "ubuntu@{}:/var/lib/prometheus/{}/snapshots/{}/",
838                    instance.main_ip, timestamp, snapshot_name
839                ),
840                // local snapshot path: <benchmark_dir>/snapshot
841                snapshot_dir,
842            ];
843
844            let instance = instance.clone();
845            tokio::task::spawn_blocking(move || -> TestbedResult<()> {
846                match std::process::Command::new("rsync")
847                    .args(&rsync_args)
848                    .status()
849                {
850                    Ok(status) if status.success() => Ok(()),
851                    Ok(status) => Err(TestbedError::SshCommandFailed(
852                        instance,
853                        "rsync ".to_string() + &rsync_args.join(" "),
854                        format!("rsync failed with status: {}", status),
855                    )),
856                    Err(e) => Err(TestbedError::SshCommandFailed(
857                        instance,
858                        "rsync ".to_string() + &rsync_args.join(" "),
859                        format!("rsync failed with error: {}", e),
860                    )),
861                }
862            })
863            .await
864            .unwrap()?;
865
866            display::done();
867            display::status("Downloaded prometheus snapshot");
868            display::newline();
869        }
870
871        Ok(())
872    }
873
874    /// Download the log files from the nodes and clients.
875    pub async fn download_logs(&self, benchmark_dir: &Path) -> TestbedResult<LogsAnalyzer> {
876        // Create a logs sub-directory for this run.
877        let path = benchmark_dir.join("logs");
878        fs::create_dir_all(&path).expect("Failed to create logs directory");
879
880        // NOTE: Our ssh library does not seem to be able to transfers files in parallel
881        // reliably.
882        let mut log_parsers = Vec::new();
883
884        // Download the clients log files.
885        display::action("Downloading clients logs");
886        for (i, instance) in self.client_instances.iter().enumerate() {
887            display::status(format!("{}/{}", i + 1, self.client_instances.len()));
888
889            let _: TestbedResult<()> = async {
890                let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
891
892                if self.settings.use_fullnode_for_execution {
893                    let fullnode_log_content = connection.download("fullnode.log").await?;
894                    let fullnode_log_file = path.join(format!("fullnode-{i}.log"));
895                    fs::write(fullnode_log_file, fullnode_log_content.as_bytes())
896                        .expect("Cannot write log file");
897                }
898
899                let client_log_content = connection.download("client.log").await?;
900
901                let client_log_file = path.join(format!("client-{i}.log"));
902                fs::write(client_log_file, client_log_content.as_bytes())
903                    .expect("Cannot write log file");
904
905                let mut log_parser = LogsAnalyzer::default();
906                log_parser.set_client_errors(&client_log_content);
907                log_parsers.push(log_parser);
908                Ok(())
909            }
910            .await;
911        }
912
913        display::done();
914
915        display::action("Downloading nodes logs");
916        let download_tasks: Vec<_> = self
917            .node_instances
918            .iter()
919            .enumerate()
920            .map(|(i, instance)| {
921                let ssh_manager = self.ssh_manager.clone();
922                let path = path.clone();
923                let ssh_address = instance.ssh_address();
924
925                async move {
926                    let connection = ssh_manager.connect(ssh_address).await?;
927                    let node_log_content = connection.download("node.log").await?;
928
929                    let node_log_file = path.join(format!("node-{i}.log"));
930                    fs::write(node_log_file, node_log_content.as_bytes())
931                        .expect("Cannot write log file");
932
933                    let mut log_parser = LogsAnalyzer::default();
934                    log_parser.set_node_errors(&node_log_content);
935                    Ok::<LogsAnalyzer, TestbedError>(log_parser)
936                }
937            })
938            .collect();
939
940        let results = futures::future::join_all(download_tasks).await;
941        for (idx, result) in results.into_iter().enumerate() {
942            display::status(format!("{}/{}", idx + 1, self.node_instances.len()));
943            match result {
944                Ok(log_parser) => log_parsers.push(log_parser),
945                Err(e) => display::warn(format!("Failed to download node log: {e}")),
946            }
947        }
948        display::done();
949
950        Ok(LogsAnalyzer::aggregate(log_parsers))
951    }
952
953    /// Run all the benchmarks specified by the benchmark generator.
954    pub async fn run_benchmarks(
955        &mut self,
956        mut generator: BenchmarkParametersGenerator<T>,
957    ) -> TestbedResult<()> {
958        display::header("Preparing testbed");
959        display::config("Commit", format!("'{}'", &self.settings.repository.commit));
960        display::newline();
961
962        // Cleanup the testbed (in case the previous run was not completed).
963        self.cleanup(true).await?;
964
965        let commit = self.settings.repository.commit.replace("/", "_");
966        let timestamp = chrono::Local::now().format("%y%m%d_%H%M%S").to_string();
967        let benchmark_dir = self.settings.results_dir.join(&commit).join(&timestamp);
968
969        // Update the software on all instances.
970        if !self.skip_testbed_update {
971            self.install().await?;
972            self.update().await?;
973        }
974
975        // Start the instance monitoring tools.
976        self.start_monitoring(generator.use_internal_ip_address, &timestamp)
977            .await?;
978
979        // Run all benchmarks.
980        let mut i = 1;
981        let mut latest_committee_size = 0;
982        while let Some(mut parameters) = generator.next() {
983            display::header(format!("Starting benchmark {i}"));
984            display::config("Benchmark type", &parameters.benchmark_type);
985            display::config("Parameters", &parameters);
986            display::newline();
987
988            parameters.benchmark_dir = benchmark_dir.join(format!("{parameters:?}"));
989
990            // Cleanup the testbed (in case the previous run was not completed).
991            self.cleanup(true).await?;
992            // Create benchmark directory.
993            fs::create_dir_all(&parameters.benchmark_dir)
994                .expect("Failed to create benchmark directory");
995
996            // Initialize logger for this benchmark run
997            let log_file = parameters.benchmark_dir.join("logs.txt");
998            crate::logger::init_logger(&log_file).expect("Failed to initialize logger");
999            crate::logger::log(
1000                chrono::Local::now()
1001                    .format("Started %y-%m-%d:%H-%M-%S")
1002                    .to_string()
1003                    .as_str(),
1004            );
1005
1006            let benchmark_result = async {
1007                // Configure all instances (if not skipped).
1008                if !self.skip_testbed_configuration {
1009                    self.configure(&parameters).await?;
1010                    latest_committee_size = parameters.nodes;
1011                }
1012
1013                // Deploy the validators.
1014                self.run_nodes(&parameters).await?;
1015
1016                // Deploy the load generators.
1017                self.run_clients(&parameters).await?;
1018
1019                // Wait for the benchmark to terminate. Then save the results and print a
1020                // summary.
1021                self.run(&parameters).await?;
1022
1023                // Collect and aggregate metrics
1024                let mut aggregator =
1025                    MeasurementsCollection::new(&self.settings, parameters.clone());
1026                self.download_metrics_logs(&parameters.benchmark_dir)
1027                    .await?;
1028
1029                // Parse and aggregate metrics from downloaded files
1030                aggregator.aggregates_metrics_from_files::<P>(
1031                    self.client_instances.len(),
1032                    &parameters.benchmark_dir.join("logs"),
1033                );
1034
1035                aggregator.display_summary();
1036                aggregator.save(&parameters.benchmark_dir);
1037                generator.register_result(aggregator);
1038
1039                TestbedResult::Ok(())
1040            }
1041            .await;
1042
1043            // Kill the nodes and clients (without deleting the log files).
1044            self.cleanup(false).await?;
1045
1046            // Download the log files.
1047            if self.log_processing {
1048                let error_counter = self.download_logs(&parameters.benchmark_dir).await?;
1049                error_counter.print_summary();
1050            }
1051
1052            // Close the logger for this benchmark run
1053            crate::logger::log(
1054                chrono::Local::now()
1055                    .format("Finished %y-%m-%d:%H-%M-%S")
1056                    .to_string()
1057                    .as_str(),
1058            );
1059            crate::logger::close_logger();
1060
1061            // Propagate any error that occurred
1062            benchmark_result?;
1063
1064            i += 1;
1065        }
1066
1067        if self.settings.enable_prometheus_snapshots {
1068            if let Err(e) = self
1069                .download_prometheus_snapshot(&benchmark_dir, &timestamp)
1070                .await
1071            {
1072                display::error(format!("Failed to download prometheus snapshot: {}", e));
1073            }
1074        }
1075
1076        display::header("Benchmark completed");
1077        Ok(())
1078    }
1079}