iota_aws_orchestrator/
orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashSet,
7    fs::{self, OpenOptions},
8    future::Future,
9    io::Write,
10    marker::PhantomData,
11    path::Path,
12    time::Duration,
13};
14
15use chrono;
16use futures;
17use tokio::time::{self, Instant};
18use tracing::info;
19
20use crate::{
21    benchmark::{BenchmarkParameters, BenchmarkParametersGenerator, BenchmarkType, RunInterval},
22    build_cache::BuildCacheService,
23    client::Instance,
24    display,
25    error::{TestbedError, TestbedResult},
26    faults::CrashRecoverySchedule,
27    logger::{IS_LOOP, SwappableWriter},
28    logs::LogsAnalyzer,
29    measurement::MeasurementsCollection,
30    monitor::{Monitor, Prometheus},
31    net_latency::NetworkLatencyCommandBuilder,
32    protocol::{ProtocolCommands, ProtocolMetrics},
33    settings::{BuildGroups, Settings, build_cargo_command},
34    ssh::{CommandContext, CommandStatus, SshConnectionManager},
35};
36
37/// An orchestrator to run benchmarks on a testbed.
38pub struct Orchestrator<P, T> {
39    /// The testbed's settings.
40    settings: Settings,
41    /// Node instances
42    node_instances: Vec<Instance>,
43    // Client (Load Generator) instances
44    client_instances: Vec<Instance>,
45    // Dedicated Metrics instance
46    metrics_instance: Option<Instance>,
47    /// The type of the benchmark parameters.
48    benchmark_type: PhantomData<T>,
49    /// Provider-specific commands to install on the instance.
50    instance_setup_commands: Vec<String>,
51    /// Protocol-specific commands generator to generate the protocol
52    /// configuration files, boot clients and nodes, etc.
53    protocol_commands: P,
54    /// The interval between measurements collection.
55    scrape_interval: Duration,
56    /// The interval to crash nodes.
57    crash_interval: Duration,
58    /// Handle ssh connections to instances.
59    ssh_manager: SshConnectionManager,
60    /// Whether to skip testbed updates before running benchmarks.
61    skip_testbed_update: bool,
62    /// Whether to skip testbed configuration before running benchmarks.
63    skip_testbed_configuration: bool,
64    /// Whether to downloading and analyze the client and node log files.
65    log_processing: bool,
66    /// Number of instances running only load generators (not nodes). If this
67    /// value is set to zero, the orchestrator runs a load generate
68    /// collocated with each node.
69    dedicated_clients: usize,
70    /// Whether to forgo a grafana and prometheus instance and leave the testbed
71    /// unmonitored.
72    skip_monitoring: bool,
73    /// Directory to store benchmark results.
74    benchmark_dir: std::path::PathBuf,
75    /// Writer for benchmark logs.
76    benchmark_writer: crate::logger::SwappableWriter,
77}
78
79impl<P, T> Orchestrator<P, T> {
80    /// The default interval between measurements collection.
81    const DEFAULT_SCRAPE_INTERVAL: Duration = Duration::from_secs(15);
82    /// The default interval to crash nodes.
83    const DEFAULT_CRASH_INTERVAL: Duration = Duration::from_secs(60);
84
85    /// Make a new orchestrator.
86    pub fn new(
87        settings: Settings,
88        node_instances: Vec<Instance>,
89        client_instances: Vec<Instance>,
90        metrics_instance: Option<Instance>,
91        instance_setup_commands: Vec<String>,
92        protocol_commands: P,
93        ssh_manager: SshConnectionManager,
94    ) -> Self {
95        Self {
96            settings,
97            node_instances,
98            client_instances,
99            metrics_instance,
100            benchmark_type: PhantomData,
101            instance_setup_commands,
102            protocol_commands,
103            ssh_manager,
104            scrape_interval: Self::DEFAULT_SCRAPE_INTERVAL,
105            crash_interval: Self::DEFAULT_CRASH_INTERVAL,
106            skip_testbed_update: false,
107            skip_testbed_configuration: false,
108            log_processing: false,
109            dedicated_clients: 0,
110            skip_monitoring: false,
111            benchmark_dir: Path::new("benchmark_results").to_path_buf(),
112            benchmark_writer: SwappableWriter::new(),
113        }
114    }
115
116    /// Set interval between measurements collection.
117    pub fn with_scrape_interval(mut self, scrape_interval: Duration) -> Self {
118        self.scrape_interval = scrape_interval;
119        self
120    }
121
122    /// Set interval with which to crash nodes.
123    pub fn with_crash_interval(mut self, crash_interval: Duration) -> Self {
124        self.crash_interval = crash_interval;
125        self
126    }
127
128    /// Set whether to skip testbed updates before running benchmarks.
129    pub fn skip_testbed_updates(mut self, skip_testbed_update: bool) -> Self {
130        self.skip_testbed_update = skip_testbed_update;
131        self
132    }
133
134    /// Whether to skip testbed configuration before running benchmarks.
135    pub fn skip_testbed_configuration(mut self, skip_testbed_configuration: bool) -> Self {
136        self.skip_testbed_configuration = skip_testbed_configuration;
137        self
138    }
139
140    /// Set whether to download and analyze the client and node log files.
141    pub fn with_log_processing(mut self, log_processing: bool) -> Self {
142        self.log_processing = log_processing;
143        self
144    }
145
146    /// Set the number of instances running exclusively load generators.
147    pub fn with_dedicated_clients(mut self, dedicated_clients: usize) -> Self {
148        self.dedicated_clients = dedicated_clients;
149        self
150    }
151
152    pub fn with_benchmark_dir_and_writer<F: AsRef<Path>>(
153        mut self,
154        benchmark_dir: F,
155        writer: crate::logger::SwappableWriter,
156    ) -> Self {
157        self.benchmark_dir = benchmark_dir.as_ref().to_path_buf();
158        self.benchmark_writer = writer;
159        self
160    }
161
162    /// Set whether to boot grafana on the local machine to monitor the nodes.
163    pub fn skip_monitoring(mut self, skip_monitoring: bool) -> Self {
164        self.skip_monitoring = skip_monitoring;
165        self
166    }
167
168    pub fn instances_without_metrics(&self) -> Vec<Instance> {
169        let mut instances = self.node_instances.clone();
170
171        if self.dedicated_clients > 0 {
172            instances.extend(self.client_instances.clone());
173        }
174        instances
175    }
176
177    /// Returns all the instances combined
178    pub fn instances(&self) -> Vec<Instance> {
179        let mut instances = self.instances_without_metrics();
180        if let Some(metrics_instance) = &self.metrics_instance {
181            instances.push(metrics_instance.clone());
182        }
183        instances
184    }
185
186    fn effective_scrape_interval(&self, parameters: &BenchmarkParameters<T>) -> Duration {
187        const MIN: Duration = Duration::from_secs(1);
188        // upper bound: current self.scrape_interval (e.g., 15s)
189        let max = self.scrape_interval;
190
191        match parameters.run_interval {
192            RunInterval::Time(_) => self.scrape_interval,
193
194            RunInterval::Count(tx_count) => {
195                // Evaluate the scrape interval based on the estimated benchmark duration,
196                // aiming for around 30 samples, but never less than 1s and never more than
197                // self.scrape_interval.
198                let qps = parameters.load.max(1) as u64; // protect against division by zero, even if load is set to 0
199                let est_secs = tx_count.div_ceil(qps);
200
201                let target_samples = 30u64;
202                let raw = (est_secs / target_samples).max(1);
203                let candidate = Duration::from_secs(raw);
204
205                candidate.clamp(MIN, max)
206            }
207        }
208    }
209}
210
211impl<P: ProtocolCommands<T> + ProtocolMetrics, T: BenchmarkType> Orchestrator<P, T> {
212    /// Boot one node per instance.
213    async fn boot_nodes(
214        &self,
215        instances: Vec<Instance>,
216        parameters: &BenchmarkParameters<T>,
217    ) -> TestbedResult<()> {
218        if parameters.use_internal_ip_address {
219            if let Some(latency_topology) = parameters.latency_topology.clone() {
220                let latency_commands = NetworkLatencyCommandBuilder::new(&instances)
221                    .with_perturbation_spec(parameters.perturbation_spec.clone())
222                    .with_topology_layout(latency_topology)
223                    .with_max_latency(parameters.maximum_latency)
224                    .build_network_latency_matrix();
225                self.ssh_manager
226                    .execute_per_instance(latency_commands, CommandContext::default())
227                    .await?;
228            }
229        }
230
231        // Run one node per instance.
232        let targets = self
233            .protocol_commands
234            .node_command(instances.clone(), parameters);
235
236        let repo = self.settings.repository_name();
237        let node_context = CommandContext::new()
238            .run_background("node".into())
239            .with_log_file("~/node.log".into())
240            .with_execute_from_path(repo.into());
241        self.ssh_manager
242            .execute_per_instance(targets, node_context)
243            .await?;
244
245        // Wait until all nodes are reachable.
246        let commands = self
247            .protocol_commands
248            .nodes_metrics_command(instances.clone(), parameters.use_internal_ip_address);
249        self.wait_for_success(commands, &parameters.benchmark_dir)
250            .await;
251
252        Ok(())
253    }
254
255    /// Install the codebase and its dependencies on the testbed.
256    pub async fn install(&self) -> TestbedResult<()> {
257        display::action("Installing dependencies on all machines");
258
259        let working_dir = self.settings.working_dir.display();
260        let url = &self.settings.repository.url;
261
262        let use_precompiled_binaries = self.settings.build_cache_enabled();
263
264        let working_dir_cmd = format!("mkdir -p {working_dir}");
265        let git_clone_cmd = format!("(git clone --depth=1 {url} || true)");
266
267        let mut basic_commands = vec![
268            "sudo apt-get update",
269            "sudo apt-get -y upgrade",
270            "sudo apt-get -y autoremove",
271            // Disable "pending kernel upgrade" message.
272            "sudo apt-get -y remove needrestart",
273            "sudo apt-get -y install curl git ca-certificates",
274            // Increase open file limits to prevent "Too many open files" errors
275            "echo '* soft nofile 1048576' | sudo tee -a /etc/security/limits.conf",
276            "echo '* hard nofile 1048576' | sudo tee -a /etc/security/limits.conf",
277            "echo 'root soft nofile 1048576' | sudo tee -a /etc/security/limits.conf",
278            "echo 'root hard nofile 1048576' | sudo tee -a /etc/security/limits.conf",
279            // Set system-wide file descriptor limits
280            "echo 'fs.file-max = 2097152' | sudo tee -a /etc/sysctl.conf",
281            "sudo sysctl -p",
282            // Set limits for current session
283            "ulimit -n 1048576 || true",
284            // set network buffer sizes
285            "sudo sysctl -w net.core.rmem_max=104857600",
286            "sudo sysctl -w net.core.wmem_max=104857600",
287            "sudo sysctl -w net.ipv4.tcp_rmem=\"8192 262144 104857600\"",
288            "sudo sysctl -w net.ipv4.tcp_wmem=\"8192 262144 104857600\"",
289            // Create the working directory.
290            working_dir_cmd.as_str(),
291            // Clone the repo.
292            git_clone_cmd.as_str(),
293        ];
294
295        // Collect all unique non-"stable" rust toolchains from build configs
296        let toolchain_cmds: Vec<String> = if !use_precompiled_binaries {
297            self.settings
298                .build_configs
299                .values()
300                .filter_map(|config| {
301                    config
302                        .toolchain
303                        .as_ref()
304                        .filter(|t| t.as_str() != "stable")
305                        .cloned()
306                })
307                .collect::<HashSet<String>>()
308                .into_iter()
309                .map(|toolchain| format!("rustup toolchain install {toolchain}"))
310                .collect()
311        } else {
312            vec![]
313        };
314
315        if !use_precompiled_binaries {
316            // If not using precompiled binaries, install rustup.
317            basic_commands.extend([
318                // The following dependencies:
319                // * build-essential: prevent the error: [error: linker `cc` not found].
320                "sudo apt-get -y install build-essential cmake clang lld protobuf-compiler pkg-config nvme-cli",
321                // Install rust (non-interactive).
322                "curl --proto \"=https\" --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y",
323                "echo \"source $HOME/.cargo/env\" | tee -a ~/.bashrc",
324                "source $HOME/.cargo/env",
325                "rustup default stable",
326            ]);
327
328            // Add the toolchain install commands to basic_commands
329            for cmd in &toolchain_cmds {
330                basic_commands.push(cmd.as_str());
331            }
332        } else {
333            // Create cargo env file if using precompiled binaries, so that the source
334            // commands don't fail.
335            basic_commands.push("mkdir -p $HOME/.cargo/ && touch $HOME/.cargo/env");
336        }
337
338        let cloud_provider_specific_dependencies: Vec<_> = self
339            .instance_setup_commands
340            .iter()
341            .map(|x| x.as_str())
342            .collect();
343
344        let protocol_dependencies = self.protocol_commands.protocol_dependencies();
345
346        let command = [
347            &basic_commands[..],
348            &Prometheus::install_commands(),
349            &cloud_provider_specific_dependencies[..],
350            &protocol_dependencies[..],
351        ]
352        .concat()
353        .join(" && ");
354
355        self.ssh_manager
356            .execute(self.instances(), command, CommandContext::default())
357            .await?;
358
359        if !self.skip_monitoring {
360            let metrics_instance = self
361                .metrics_instance
362                .clone()
363                .expect("No metrics instance available");
364            let monitor_command = Monitor::dependencies().join(" && ");
365            self.ssh_manager
366                .execute(
367                    vec![metrics_instance],
368                    monitor_command,
369                    CommandContext::default(),
370                )
371                .await?;
372        }
373
374        display::done();
375        Ok(())
376    }
377
378    /// Reload prometheus on all instances.
379    pub async fn start_monitoring(
380        &self,
381        use_internal_ip_address: bool,
382        timestamp: &str,
383    ) -> TestbedResult<()> {
384        if self.skip_monitoring {
385            display::warn("Monitoring is skipped, not starting Prometheus, Tempo and Grafana");
386            return Ok(());
387        }
388        if let Some(instance) = &self.metrics_instance {
389            display::action("Configuring monitoring instance");
390
391            let monitor = Monitor::new(
392                instance.clone(),
393                self.client_instances.clone(),
394                self.node_instances.clone(),
395                self.ssh_manager.clone(),
396            );
397            // When prometheus snapshots are enabled, pass the timestamp as snapshot
398            // directory
399            let snapshot_dir = self
400                .settings
401                .enable_prometheus_snapshots
402                .then_some(timestamp);
403            monitor.start_tempo().await?;
404            monitor
405                .start_prometheus(
406                    &self.protocol_commands,
407                    use_internal_ip_address,
408                    self.settings.use_fullnode_for_execution,
409                    snapshot_dir,
410                )
411                .await?;
412            monitor.start_grafana().await?;
413
414            display::done();
415            display::config("Grafana address", monitor.grafana_address());
416            display::newline();
417        }
418
419        Ok(())
420    }
421
422    /// Update all instances to use the version of the codebase specified in the
423    /// setting file.
424    pub async fn update(&self) -> TestbedResult<()> {
425        display::action("Updating all instances");
426
427        let commit = &self.settings.repository.commit;
428        let repo_name = self.settings.repository_name();
429        let build_groups = self.settings.build_groups();
430
431        // we need to fetch and checkout the commit even if using precompiled binaries
432        // because the iota-framework submodule, the examples/move folder, or the
433        // dev-tools/grafana-local folder might be used.
434        let git_update_command = [
435            &format!("git fetch origin {commit} --force"),
436            &format!("(git reset --hard origin/{commit} || git checkout --force {commit})"),
437            "git clean -fd -e target",
438        ]
439        .join(" && ");
440
441        let id = "git update";
442        let context = CommandContext::new()
443            .run_background(id.into())
444            .with_execute_from_path(repo_name.clone().into());
445
446        // Execute and wait for the git update command on all instances (including
447        // metrics)
448        display::action(format!("update command: {git_update_command}"));
449        self.ssh_manager
450            .execute(self.instances(), git_update_command, context)
451            .await?;
452        self.ssh_manager
453            .wait_for_command(self.instances(), id, CommandStatus::Terminated)
454            .await?;
455
456        // Check if build cache is enabled
457        if self.settings.build_cache_enabled() {
458            display::action("Using build cache for binary distribution");
459            let build_cache_service = BuildCacheService::new(&self.settings, &self.ssh_manager);
460            build_cache_service
461                .update_with_build_cache(
462                    commit,
463                    &build_groups,
464                    self.instances_without_metrics(),
465                    repo_name.clone(),
466                )
467                .await?;
468        } else {
469            self.update_with_local_build(build_groups).await?;
470        }
471
472        display::done();
473        Ok(())
474    }
475
476    /// Update instances with local build (fallback, if build cache is not used)
477    /// Execute and wait for the cargo build command on all instances except the
478    /// metrics one. This requires compiling the codebase in release
479    /// (which may take a long time) so we run the command in the background
480    /// to avoid keeping alive many ssh connections for too long.
481    async fn update_with_local_build(&self, build_groups: BuildGroups) -> TestbedResult<()> {
482        let without_metrics = self.instances_without_metrics();
483        let repo_name = self.settings.repository_name();
484
485        // Build each group separately
486        for (i, (group, binary_names)) in build_groups.iter().enumerate() {
487            // Build arguments
488            let build_command = build_cargo_command(
489                "build",
490                group.toolchain.clone(),
491                group.features.clone(),
492                binary_names,
493                &[] as &[&str],
494                &[] as &[&str],
495            );
496
497            // print the full command for logging
498            display::action(format!(
499                "Running build command {}/{}: \"{build_command}\" in \"{repo_name}\"",
500                i + 1,
501                build_groups.len()
502            ));
503
504            let context = CommandContext::new().with_execute_from_path(repo_name.clone().into());
505
506            self.ssh_manager
507                .execute(without_metrics.clone(), build_command, context)
508                .await?;
509        }
510
511        Ok(())
512    }
513
514    /// Configure the instances with the appropriate configuration files.
515    pub async fn configure(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
516        display::action("Configuring instances");
517
518        // Generate the genesis configuration file and the keystore allowing access to
519        // gas objects.
520        let command = self
521            .protocol_commands
522            .genesis_command(self.node_instances.iter(), parameters);
523        display::action(format!("\nGenesis command: {command}\n\n"));
524        let repo_name = self.settings.repository_name();
525        let context = CommandContext::new().with_execute_from_path(repo_name.into());
526        self.ssh_manager
527            .execute(self.instances_without_metrics(), command, context)
528            .await?;
529
530        display::action("Configuration of all instances completed");
531        display::done();
532        Ok(())
533    }
534
535    /// Cleanup all instances and optionally delete their log files.
536    pub async fn cleanup(&self, cleanup: bool) -> TestbedResult<()> {
537        display::action("Cleaning up testbed");
538
539        // Kill all tmux servers and delete the nodes dbs. Optionally clear logs.
540        let mut command = vec!["(tmux kill-server || true)".into()];
541        for path in self.protocol_commands.db_directories() {
542            command.push(format!("(rm -rf {} || true)", path.display()));
543        }
544        if cleanup {
545            command.push("(rm -rf ~/*log* || true)".into());
546        }
547        let command = command.join(" ; ");
548
549        // Execute the deletion on all machines.
550        let active = self.instances().into_iter().filter(|x| x.is_active());
551        let context = CommandContext::default();
552        self.ssh_manager.execute(active, command, context).await?;
553
554        display::action("Cleanup of all instances completed");
555        display::done();
556        Ok(())
557    }
558
559    /// Deploy the nodes.
560    pub async fn run_nodes(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
561        display::action("Deploying validators");
562
563        // Boot one node per instance.
564        self.boot_nodes(self.node_instances.clone(), parameters)
565            .await?;
566
567        display::action("Deployment of validators completed");
568        display::done();
569        Ok(())
570    }
571
572    /// Deploy the load generators.
573    pub async fn run_clients(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
574        display::action("Starting deployment of load generators");
575        if self.settings.use_fullnode_for_execution {
576            display::action("Setting up full nodes");
577
578            // Deploy the fullnodes.
579            let targets = self
580                .protocol_commands
581                .fullnode_command(self.client_instances.clone(), parameters);
582
583            let repo = self.settings.repository_name();
584            let context = CommandContext::new()
585                .run_background("fullnode".into())
586                .with_log_file("~/fullnode.log".into())
587                .with_execute_from_path(repo.into());
588            self.ssh_manager
589                .execute_per_instance(targets, context)
590                .await?;
591
592            // Wait until all fullnodes are fully started by querying the latest checkpoint
593            // (otherwise clients might fail when a fullnode is not listening yet).
594            display::action("Await fullnode ready...");
595            let commands = self
596                .client_instances
597                .iter()
598                .cloned()
599                .map(|i| (i, "curl http://127.0.0.1:9000 -H 'Content-Type: application/json' -d '{\"jsonrpc\":\"2.0\",\"method\":\"iota_getLatestCheckpointSequenceNumber\",\"params\":[],\"id\":1}'".to_owned()));
600            self.ssh_manager.wait_for_success(commands).await;
601
602            display::done();
603        }
604
605        display::action("Setting up load generators");
606
607        // Deploy the load generators.
608        let targets = self
609            .protocol_commands
610            .client_command(self.client_instances.clone(), parameters);
611
612        let repo = self.settings.repository_name();
613        let context = CommandContext::new()
614            .run_background("client".into())
615            .with_log_file("~/client.log".into())
616            .with_execute_from_path(repo.into());
617        self.ssh_manager
618            .execute_per_instance(targets, context)
619            .await?;
620
621        // Wait until all load generators are reachable.
622        let commands = self.protocol_commands.clients_metrics_command(
623            self.client_instances.clone(),
624            parameters.use_internal_ip_address,
625        );
626        self.wait_for_success(commands, &parameters.benchmark_dir)
627            .await;
628
629        // Start background metrics collection service on each client instance.
630        display::action("\n\nStarting background metrics collection service");
631        let metrics_script =
632            self.metrics_collection_script_command(parameters.use_internal_ip_address);
633        let metrics_context = CommandContext::new().run_background("metrics-collector".into());
634        self.ssh_manager
635            .execute_per_instance(metrics_script.clone(), metrics_context)
636            .await?;
637
638        display::action("Background metrics collection service started");
639        display::action("Deployment of load generators completed");
640        display::done();
641        Ok(())
642    }
643
644    /// Create a background metrics collection script that runs on each
645    /// client instance.
646    fn metrics_collection_script_command(
647        &self,
648        use_internal_ip_address: bool,
649    ) -> Vec<(Instance, String)> {
650        // We need to get the metrics path from clients_metrics_command
651        self.protocol_commands
652            .clients_metrics_command(self.client_instances.clone(), use_internal_ip_address)
653            .into_iter()
654            .map(|(instance, cmd)| {
655                (
656                    instance,
657                    format!(
658                        r#"while true; do
659    {cmd} >> ~/metrics.log 2>&1
660    sleep 15
661done"#
662                    ),
663                )
664            })
665            .collect::<Vec<_>>()
666    }
667
668    /// Collect metrics from the load generators.
669    pub async fn run(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
670        let run_label = match parameters.run_interval {
671            RunInterval::Time(d) => format!("at least {}s", d.as_secs()),
672            RunInterval::Count(n) => format!("until {n} tx executed"),
673        };
674        display::action(format!("Running benchmark ({run_label})"));
675
676        let scrape_every = self.effective_scrape_interval(parameters);
677        let mut metrics_interval = time::interval(scrape_every);
678        metrics_interval.tick().await;
679        let faults_type = parameters.faults.clone();
680        let mut faults_schedule =
681            CrashRecoverySchedule::new(faults_type, self.node_instances.clone());
682
683        let mut faults_interval = time::interval(self.crash_interval);
684        faults_interval.tick().await;
685
686        // In Count-mode we should stop when the client tmux session terminates,
687        // not when "elapsed seconds" reaches some value.
688        let is_count_mode = matches!(parameters.run_interval, RunInterval::Count(_));
689        let clients = self.client_instances.clone();
690        let ssh = self.ssh_manager.clone();
691
692        let wait_clients_future: std::pin::Pin<Box<dyn Future<Output = TestbedResult<()>> + Send>> =
693            if is_count_mode && !clients.is_empty() {
694                Box::pin(async move {
695                    ssh.wait_for_command(clients, "client", CommandStatus::Terminated)
696                        .await
697                        .map_err(Into::into)
698                })
699            } else {
700                Box::pin(std::future::pending::<TestbedResult<()>>())
701            };
702        tokio::pin!(wait_clients_future);
703
704        let start = Instant::now();
705
706        loop {
707            tokio::select! {
708                _ = metrics_interval.tick() => {
709                    let elapsed = Instant::now().duration_since(start).as_secs_f64().ceil() as u64;
710                    display::status(format!("{elapsed}s"));
711
712                    if let Some(limit) = parameters.run_interval.time_limit_secs() {
713                        if elapsed >= limit {
714                            break;
715                        }
716                    }
717                }
718
719                res = &mut wait_clients_future => {
720                    // Work only count-based benchmarks, for time-based the future is pending and will never complete
721                    res?;
722                    break;
723                }
724
725                _ = faults_interval.tick() => {
726                    let  action = faults_schedule.update();
727                    if !action.kill.is_empty() {
728                        self.ssh_manager.kill(action.kill.clone(), "node").await?;
729                    }
730                    if !action.boot.is_empty() {
731                        self.boot_nodes(action.boot.clone(), parameters).await?;
732                    }
733                    if !action.kill.is_empty() || !action.boot.is_empty() {
734                        display::newline();
735                        display::config("Testbed update", action);
736                    }
737                 }
738            }
739        }
740
741        if self.settings.enable_flamegraph {
742            let flamegraphs_dir = parameters.benchmark_dir.join("flamegraphs");
743            fs::create_dir_all(&flamegraphs_dir).expect("Failed to create flamegraphs directory");
744
745            self.fetch_flamegraphs(
746                self.instances_without_metrics().clone(),
747                &flamegraphs_dir,
748                "?svg=true",
749                "flamegraph",
750            )
751            .await?;
752
753            if self
754                .settings
755                .build_configs
756                .get("iota-node")
757                .is_some_and(|config| config.features.iter().any(|f| f == "flamegraph-alloc"))
758            {
759                self.fetch_flamegraphs(
760                    self.instances_without_metrics().clone(),
761                    &flamegraphs_dir,
762                    "?svg=true&mem=true",
763                    "flamegraph-alloc",
764                )
765                .await?;
766            }
767        }
768
769        display::action("Benchmark run completed");
770        display::done();
771        Ok(())
772    }
773
774    async fn fetch_flamegraphs(
775        &self,
776        nodes: Vec<Instance>,
777        path: &Path,
778        query: &str,
779        file_prefix: &str,
780    ) -> TestbedResult<()> {
781        let flamegraph_commands = self
782            .protocol_commands
783            .nodes_flamegraph_command(nodes, query);
784        let stdio = self
785            .ssh_manager
786            .execute_per_instance(flamegraph_commands, CommandContext::default())
787            .await?;
788        for (i, (stdout, stderr)) in stdio.into_iter().enumerate() {
789            if !stdout.is_empty() {
790                let file = path.join(format!("{file_prefix}-{i}.svg"));
791                fs::write(file, stdout).unwrap();
792            }
793            if !stderr.is_empty() {
794                let file = path.join(format!("{file_prefix}-{i}.log"));
795                fs::write(file, stderr).unwrap();
796            }
797        }
798        Ok(())
799    }
800
801    pub async fn wait_for_success<I, S>(&self, instances: I, _benchmark_dir: &Path)
802    where
803        I: IntoIterator<Item = (Instance, S)> + Clone,
804        S: Into<String> + Send + 'static + Clone,
805    {
806        match self
807            .ssh_manager
808            .execute_per_instance(
809                instances.clone(),
810                CommandContext::default().with_retries(10),
811            )
812            .await
813        {
814            Ok(_) => {}
815            Err(e) => {
816                // Handle failure case
817                panic!("Command execution failed on one or more instances: {e}");
818            }
819        }
820    }
821
822    pub async fn download_benchmark_stats(
823        &self,
824        benchmark_stats_path: &str,
825        parameters: &BenchmarkParameters<T>,
826    ) -> TestbedResult<()> {
827        let path = parameters.benchmark_dir.join("benchmark-stats");
828        fs::create_dir_all(&path).expect("Failed to create benchmark-stats directory");
829
830        display::action("Downloading benchmark stats");
831
832        let mut downloaded = 0usize;
833
834        for (i, instance) in self.client_instances.iter().enumerate() {
835            display::status(format!("{}/{}", i + 1, self.client_instances.len()));
836
837            // Support per-client template path, e.g.
838            // "/home/ubuntu/benchmark_stats_{i}.json"
839            let remote_path_raw = benchmark_stats_path.replace("{i}", &i.to_string());
840
841            // SFTP/download often does not expand "~", so normalize it.
842            let remote_path = if let Some(rest) = remote_path_raw.strip_prefix("~/") {
843                format!("/home/ubuntu/{rest}")
844            } else {
845                remote_path_raw
846            };
847
848            let local_file_name = Path::new(&remote_path)
849                .file_name()
850                .and_then(|s| s.to_str())
851                .map(|name| format!("client-{i}-{name}"))
852                .unwrap_or_else(|| format!("client-{i}-benchmark-stats.json"));
853
854            let result: TestbedResult<()> = async {
855                let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
856                let content = connection.download(&remote_path).await?;
857
858                let local_file = path.join(local_file_name);
859                fs::write(local_file, content.as_bytes())
860                    .expect("Cannot write benchmark stats file");
861                Ok(())
862            }
863            .await;
864
865            match result {
866                Ok(_) => {
867                    downloaded += 1;
868                }
869                Err(e) => {
870                    display::warn(format!(
871                        "Failed to download benchmark stats from client {i} ({}) at '{}': {e}",
872                        instance.ssh_address(),
873                        remote_path
874                    ));
875                }
876            }
877        }
878
879        if downloaded == 0 {
880            display::warn(format!(
881                "No benchmark stats files downloaded (remote path template: '{}')",
882                benchmark_stats_path
883            ));
884        } else {
885            display::config("Downloaded benchmark stats files", downloaded);
886        }
887
888        display::done();
889        Ok(())
890    }
891
892    /// Download the metrics logs from clients.
893    pub async fn download_metrics_logs(&self, benchmark_dir: &Path) -> TestbedResult<()> {
894        let path = benchmark_dir.join("logs");
895        fs::create_dir_all(&path).expect("Failed to create logs directory");
896
897        // Download the clients log files and metrics.
898        display::action("Downloading metrics logs");
899        for (i, instance) in self.client_instances.iter().enumerate() {
900            display::status(format!("{}/{}", i + 1, self.client_instances.len()));
901
902            let _: TestbedResult<()> = async {
903                let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
904
905                // Download metrics file if it exists
906                match connection.download("metrics.log").await {
907                    Ok(metrics_content) => {
908                        let metrics_file = path.join(format!("metrics-{i}.log"));
909                        fs::write(metrics_file, metrics_content.as_bytes())
910                            .expect("Cannot write metrics file");
911                    }
912                    Err(_) => {
913                        display::warn(format!("Metrics file not found for client {i}"));
914                    }
915                }
916                Ok(())
917            }
918            .await;
919        }
920        display::done();
921
922        Ok(())
923    }
924
925    pub async fn download_prometheus_snapshot(
926        &self,
927        benchmark_dir: &Path,
928        timestamp: &str,
929    ) -> TestbedResult<()> {
930        if let Some(instance) = &self.metrics_instance {
931            display::action("Taking prometheus snapshot");
932            let command = Prometheus::take_snapshot_command();
933
934            // prometheus snapshot response structure
935            #[derive(serde::Deserialize)]
936            struct ResponseData {
937                // snapshot directory name
938                name: String,
939            }
940            #[derive(serde::Deserialize)]
941            struct Response {
942                #[allow(dead_code)]
943                status: String,
944                data: ResponseData,
945            }
946
947            let response = self
948                .ssh_manager
949                .execute(
950                    std::iter::once(instance.clone()),
951                    command.clone(),
952                    CommandContext::default(),
953                )
954                .await?
955                .into_iter()
956                .next()
957                .ok_or_else(|| {
958                    TestbedError::SshCommandFailed(
959                        instance.clone(),
960                        command.clone(),
961                        "No response from command".into(),
962                    )
963                })?
964                .0;
965            let response: Response = serde_json::from_str(&response).map_err(|e| {
966                TestbedError::SshCommandFailed(
967                    instance.clone(),
968                    command.clone(),
969                    format!("Failed to parse response: {e}"),
970                )
971            })?;
972            display::done();
973
974            let snapshot_name = response.data.name;
975            display::config("Created prometheus snapshot", &snapshot_name);
976            display::newline();
977
978            display::action("Downloading prometheus snapshot");
979            let snapshot_dir = benchmark_dir.join("snapshot").display().to_string();
980            let rsync_args = vec![
981                // options: recursive, verbose, compress, override ssh to use key file and disable
982                // host key checking
983                "-rvze".to_string(),
984                // let rsync use ssh with the specified private key file
985                format!(
986                    "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -i {}",
987                    self.settings.ssh_private_key_file.display()
988                ),
989                // remote snapshot path: /var/lib/prometheus/<timestamp>/snapshots/<snapshot_name>
990                format!(
991                    "ubuntu@{}:/var/lib/prometheus/{}/snapshots/{}/",
992                    instance.main_ip, timestamp, snapshot_name
993                ),
994                // local snapshot path: <benchmark_dir>/snapshot
995                snapshot_dir,
996            ];
997
998            let instance = instance.clone();
999            tokio::task::spawn_blocking(move || -> TestbedResult<()> {
1000                match std::process::Command::new("rsync")
1001                    .args(&rsync_args)
1002                    .status()
1003                {
1004                    Ok(status) if status.success() => Ok(()),
1005                    Ok(status) => Err(TestbedError::SshCommandFailed(
1006                        instance,
1007                        "rsync ".to_string() + &rsync_args.join(" "),
1008                        format!("rsync failed with status: {}", status),
1009                    )),
1010                    Err(e) => Err(TestbedError::SshCommandFailed(
1011                        instance,
1012                        "rsync ".to_string() + &rsync_args.join(" "),
1013                        format!("rsync failed with error: {}", e),
1014                    )),
1015                }
1016            })
1017            .await
1018            .unwrap()?;
1019
1020            display::done();
1021            display::status("Downloaded prometheus snapshot");
1022            display::newline();
1023        }
1024
1025        Ok(())
1026    }
1027
1028    /// Download the log files from the nodes and clients.
1029    pub async fn download_logs(&self, benchmark_dir: &Path) -> TestbedResult<LogsAnalyzer> {
1030        // Create a logs sub-directory for this run.
1031        let path = benchmark_dir.join("logs");
1032        fs::create_dir_all(&path).expect("Failed to create logs directory");
1033
1034        // NOTE: Our ssh library does not seem to be able to transfers files in parallel
1035        // reliably.
1036        let mut log_parsers = Vec::new();
1037
1038        // Download the clients log files.
1039        display::action("Downloading clients logs");
1040        for (i, instance) in self.client_instances.iter().enumerate() {
1041            display::status(format!("{}/{}", i + 1, self.client_instances.len()));
1042
1043            let _: TestbedResult<()> = async {
1044                let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
1045
1046                if self.settings.use_fullnode_for_execution {
1047                    let fullnode_log_content = connection.download("fullnode.log").await?;
1048                    let fullnode_log_file = path.join(format!("fullnode-{i}.log"));
1049                    fs::write(fullnode_log_file, fullnode_log_content.as_bytes())
1050                        .expect("Cannot write log file");
1051                }
1052
1053                let client_log_content = connection.download("client.log").await?;
1054
1055                let client_log_file = path.join(format!("client-{i}.log"));
1056                fs::write(client_log_file, client_log_content.as_bytes())
1057                    .expect("Cannot write log file");
1058
1059                let mut log_parser = LogsAnalyzer::default();
1060                log_parser.set_client_errors(&client_log_content);
1061                log_parsers.push(log_parser);
1062                Ok(())
1063            }
1064            .await;
1065        }
1066
1067        display::done();
1068
1069        display::action("Downloading nodes logs");
1070        let download_tasks: Vec<_> = self
1071            .node_instances
1072            .iter()
1073            .enumerate()
1074            .map(|(i, instance)| {
1075                let ssh_manager = self.ssh_manager.clone();
1076                let path = path.clone();
1077                let ssh_address = instance.ssh_address();
1078
1079                async move {
1080                    let connection = ssh_manager.connect(ssh_address).await?;
1081                    let node_log_content = connection.download("node.log").await?;
1082
1083                    let node_log_file = path.join(format!("node-{i}.log"));
1084                    fs::write(node_log_file, node_log_content.as_bytes())
1085                        .expect("Cannot write log file");
1086
1087                    let mut log_parser = LogsAnalyzer::default();
1088                    log_parser.set_node_errors(&node_log_content);
1089                    Ok::<LogsAnalyzer, TestbedError>(log_parser)
1090                }
1091            })
1092            .collect();
1093
1094        let results = futures::future::join_all(download_tasks).await;
1095        for (idx, result) in results.into_iter().enumerate() {
1096            display::status(format!("{}/{}", idx + 1, self.node_instances.len()));
1097            match result {
1098                Ok(log_parser) => log_parsers.push(log_parser),
1099                Err(e) => display::warn(format!("Failed to download node log: {e}")),
1100            }
1101        }
1102        display::done();
1103
1104        Ok(LogsAnalyzer::aggregate(log_parsers))
1105    }
1106
1107    /// Run all the benchmarks specified by the benchmark generator.
1108    pub async fn run_benchmarks(
1109        &mut self,
1110        mut generator: BenchmarkParametersGenerator<T>,
1111    ) -> TestbedResult<()> {
1112        display::header("Preparing testbed");
1113        display::config("Commit", format!("'{}'", &self.settings.repository.commit));
1114        display::newline();
1115
1116        // Cleanup the testbed (in case the previous run was not completed).
1117        self.cleanup(true).await?;
1118        let timestamp = chrono::Local::now().format("%y%m%d_%H%M%S").to_string();
1119
1120        display::config("dedicated_clients", self.dedicated_clients);
1121        display::config("nodes", self.node_instances.len());
1122        display::config("clients", self.client_instances.len());
1123        display::config("metrics", self.metrics_instance.is_some());
1124
1125        display::config(
1126            "nodes",
1127            self.node_instances
1128                .iter()
1129                .map(|i| i.ssh_address().to_string())
1130                .collect::<Vec<_>>()
1131                .join(", "),
1132        );
1133        display::config(
1134            "clients",
1135            self.client_instances
1136                .iter()
1137                .map(|i| i.ssh_address().to_string())
1138                .collect::<Vec<_>>()
1139                .join(", "),
1140        );
1141        display::config(
1142            "metrics",
1143            self.metrics_instance
1144                .as_ref()
1145                .map(|i| i.ssh_address().to_string())
1146                .unwrap_or("<none>".to_string()),
1147        );
1148
1149        // Update the software on all instances.
1150        if !self.skip_testbed_update {
1151            self.install().await?;
1152            self.update().await?;
1153        }
1154
1155        // Start the instance monitoring tools.
1156        self.start_monitoring(generator.use_internal_ip_address, &timestamp)
1157            .await?;
1158
1159        display::action("Testbed ready!");
1160
1161        // Run all benchmarks.
1162        let mut i = 1;
1163        let mut latest_committee_size = 0;
1164        let result: TestbedResult<()> = IS_LOOP
1165            .scope(true, async {
1166                while let Some(mut parameters) = generator.next() {
1167                    display::header(format!("Starting benchmark {i}"));
1168                    display::config("Benchmark type", &parameters.benchmark_type);
1169                    display::config("Parameters", &parameters);
1170                    display::newline();
1171
1172                    parameters.benchmark_dir = self.benchmark_dir.join(format!("{parameters:?}"));
1173
1174                    if !self.skip_monitoring {
1175                        if let Some(metrics) = &self.metrics_instance {
1176                            let host_ip = if generator.use_internal_ip_address {
1177                                metrics.private_ip
1178                            } else {
1179                                metrics.main_ip
1180                            };
1181
1182                            parameters.otel.get_or_insert(crate::benchmark::OtelConfig {
1183                                otlp_endpoint: format!(
1184                                    "http://{}:{}",
1185                                    host_ip,
1186                                    crate::monitor::Tempo::OTLP_GRPC_PORT
1187                                ),
1188                                protocol: "grpc".to_string(),
1189                                sampler: "parentbased_traceidratio".to_string(),
1190                                sampler_arg: "0.1".to_string(),
1191                            });
1192                        }
1193                    }
1194
1195                    // Cleanup the testbed (in case the previous run was not completed).
1196                    self.cleanup(true).await?;
1197                    // Create benchmark directory.
1198                    fs::create_dir_all(&parameters.benchmark_dir)
1199                        .expect("Failed to create benchmark directory");
1200
1201                    // Swap the loop logger to write to this benchmark's log file
1202                    let log_file_path = parameters.benchmark_dir.join("logs.log");
1203                    let new_file = OpenOptions::new()
1204                        .create(true)
1205                        .append(true)
1206                        .open(log_file_path)
1207                        .expect("Failed to open log file");
1208                    self.benchmark_writer
1209                        .swap(Box::new(new_file))
1210                        .expect("Failed to swap log writer");
1211
1212                    crate::logger::log(
1213                        &chrono::Local::now()
1214                            .format("Started %y-%m-%d:%H-%M-%S\n")
1215                            .to_string(),
1216                    );
1217
1218                    let benchmark_result = async {
1219                        // Configure all instances (if not skipped).
1220                        if !self.skip_testbed_configuration {
1221                            self.configure(&parameters).await?;
1222                            latest_committee_size = parameters.nodes;
1223                        }
1224
1225                        // Deploy the validators.
1226                        self.run_nodes(&parameters).await?;
1227
1228                        // Deploy the load generators.
1229                        self.run_clients(&parameters).await?;
1230
1231                        // Wait for the benchmark to terminate. Then save the results and
1232                        // print a summary.
1233                        self.run(&parameters).await?;
1234
1235                        // Collect and aggregate metrics
1236                        let mut aggregator =
1237                            MeasurementsCollection::new(&self.settings, parameters.clone());
1238                        self.download_metrics_logs(&parameters.benchmark_dir)
1239                            .await?;
1240
1241                        // Parse and aggregate metrics from downloaded files
1242                        aggregator.aggregates_metrics_from_files::<P>(
1243                            self.client_instances.len(),
1244                            &parameters.benchmark_dir.join("logs"),
1245                        );
1246
1247                        aggregator.display_summary();
1248                        aggregator.save(&parameters.benchmark_dir);
1249                        generator.register_result(aggregator);
1250
1251                        // Flush any remaining logs to the benchmark log file
1252                        let _ = self.benchmark_writer.flush();
1253
1254                        TestbedResult::Ok(())
1255                    }
1256                    .await;
1257
1258                    // Download benchmark stats if metadata path is provided
1259                    if let Some(benchmark_stats_path) = &parameters.benchmark_stats_path {
1260                        self.download_benchmark_stats(benchmark_stats_path, &parameters)
1261                            .await?;
1262                    }
1263
1264                    // Kill the nodes and clients (without deleting the log files).
1265                    self.cleanup(false).await?;
1266
1267                    // Download the log files.
1268                    if self.log_processing {
1269                        let error_counter = self.download_logs(&parameters.benchmark_dir).await?;
1270                        error_counter.print_summary();
1271                    }
1272
1273                    // Close the per-benchmark log file
1274                    crate::logger::log(
1275                        &chrono::Local::now()
1276                            .format("Finished %y-%m-%d:%H-%M-%S\n")
1277                            .to_string(),
1278                    );
1279
1280                    // Propagate any error that occurred
1281                    benchmark_result?;
1282
1283                    i += 1;
1284                }
1285
1286                Ok(())
1287            })
1288            .await;
1289
1290        result?;
1291
1292        if self.settings.enable_prometheus_snapshots {
1293            info!("Downloading prometheus snapshot");
1294            if let Err(e) = self
1295                .download_prometheus_snapshot(&self.benchmark_dir, &timestamp)
1296                .await
1297            {
1298                display::error(format!("Failed to download prometheus snapshot: {}", e));
1299            }
1300            info!("Prometheus snapshot download completed");
1301        }
1302
1303        display::header("Benchmark completed");
1304        Ok(())
1305    }
1306}