iota_aws_orchestrator/
orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{HashMap, VecDeque},
7    fs::{self},
8    marker::PhantomData,
9    path::PathBuf,
10    time::Duration,
11};
12
13use tokio::time::{self, Instant};
14
15use crate::{
16    benchmark::{BenchmarkParameters, BenchmarkParametersGenerator, BenchmarkType},
17    client::{Instance, InstanceRole},
18    display, ensure,
19    error::{TestbedError, TestbedResult},
20    faults::CrashRecoverySchedule,
21    logs::LogsAnalyzer,
22    measurement::{Measurement, MeasurementsCollection},
23    monitor::{Monitor, Prometheus},
24    protocol::{ProtocolCommands, ProtocolMetrics},
25    settings::Settings,
26    ssh::{CommandContext, CommandStatus, SshConnectionManager},
27};
28
29/// An orchestrator to run benchmarks on a testbed.
30pub struct Orchestrator<P, T> {
31    /// The testbed's settings.
32    settings: Settings,
33    /// Node instances
34    node_instances: Vec<Instance>,
35    // Dedicated Client instances
36    client_instances: Option<Vec<Instance>>,
37    // Dedicated Metrics instance
38    metrics_instance: Option<Instance>,
39    /// The type of the benchmark parameters.
40    benchmark_type: PhantomData<T>,
41    /// Provider-specific commands to install on the instance.
42    instance_setup_commands: Vec<String>,
43    /// Protocol-specific commands generator to generate the protocol
44    /// configuration files, boot clients and nodes, etc.
45    protocol_commands: P,
46    /// The interval between measurements collection.
47    scrape_interval: Duration,
48    /// The interval to crash nodes.
49    crash_interval: Duration,
50    /// Handle ssh connections to instances.
51    ssh_manager: SshConnectionManager,
52    /// Whether to skip testbed updates before running benchmarks.
53    skip_testbed_update: bool,
54    /// Whether to skip testbed configuration before running benchmarks.
55    skip_testbed_configuration: bool,
56    /// Whether to downloading and analyze the client and node log files.
57    log_processing: bool,
58    /// Number of instances running only load generators (not nodes). If this
59    /// value is set to zero, the orchestrator runs a load generate
60    /// collocated with each node.
61    dedicated_clients: usize,
62    /// Whether to forgo a grafana and prometheus instance and leave the testbed
63    /// unmonitored.
64    skip_monitoring: bool,
65}
66
67impl<P, T> Orchestrator<P, T> {
68    /// The default interval between measurements collection.
69    const DEFAULT_SCRAPE_INTERVAL: Duration = Duration::from_secs(15);
70    /// The default interval to crash nodes.
71    const DEFAULT_CRASH_INTERVAL: Duration = Duration::from_secs(60);
72
73    /// Make a new orchestrator.
74    pub fn new(
75        settings: Settings,
76        node_instances: Vec<Instance>,
77        client_instances: Option<Vec<Instance>>,
78        metrics_instance: Option<Instance>,
79        instance_setup_commands: Vec<String>,
80        protocol_commands: P,
81        ssh_manager: SshConnectionManager,
82    ) -> Self {
83        Self {
84            settings,
85            node_instances,
86            client_instances,
87            metrics_instance,
88            benchmark_type: PhantomData,
89            instance_setup_commands,
90            protocol_commands,
91            ssh_manager,
92            scrape_interval: Self::DEFAULT_SCRAPE_INTERVAL,
93            crash_interval: Self::DEFAULT_CRASH_INTERVAL,
94            skip_testbed_update: false,
95            skip_testbed_configuration: false,
96            log_processing: false,
97            dedicated_clients: 0,
98            skip_monitoring: false,
99        }
100    }
101
102    /// Set interval between measurements collection.
103    pub fn with_scrape_interval(mut self, scrape_interval: Duration) -> Self {
104        self.scrape_interval = scrape_interval;
105        self
106    }
107
108    /// Set interval with which to crash nodes.
109    pub fn with_crash_interval(mut self, crash_interval: Duration) -> Self {
110        self.crash_interval = crash_interval;
111        self
112    }
113
114    /// Set whether to skip testbed updates before running benchmarks.
115    pub fn skip_testbed_updates(mut self, skip_testbed_update: bool) -> Self {
116        self.skip_testbed_update = skip_testbed_update;
117        self
118    }
119
120    /// Whether to skip testbed configuration before running benchmarks.
121    pub fn skip_testbed_configuration(mut self, skip_testbed_configuration: bool) -> Self {
122        self.skip_testbed_configuration = skip_testbed_configuration;
123        self
124    }
125
126    /// Set whether to download and analyze the client and node log files.
127    pub fn with_log_processing(mut self, log_processing: bool) -> Self {
128        self.log_processing = log_processing;
129        self
130    }
131
132    /// Set the number of instances running exclusively load generators.
133    pub fn with_dedicated_clients(mut self, dedicated_clients: usize) -> Self {
134        self.dedicated_clients = dedicated_clients;
135        self
136    }
137
138    /// Set whether to boot grafana on the local machine to monitor the nodes.
139    pub fn skip_monitoring(mut self, skip_monitoring: bool) -> Self {
140        self.skip_monitoring = skip_monitoring;
141        self
142    }
143
144    /// Returns all the instances combined
145    pub fn instances(&self) -> Vec<Instance> {
146        let mut instances = self.node_instances.clone();
147        if let Some(client_instances) = &self.client_instances {
148            instances.extend(client_instances.clone());
149        }
150        if let Some(metrics_instance) = &self.metrics_instance {
151            instances.push(metrics_instance.clone());
152        }
153        instances
154    }
155
156    /// Select on which instances of the testbed to run the benchmarks. This
157    /// function returns two vector of instances; the first contains the
158    /// instances on which to run the load generators and the second
159    /// contains the instances on which to run the nodes.
160    pub fn select_instances(
161        &self,
162        parameters: &BenchmarkParameters<T>,
163    ) -> TestbedResult<(Vec<Instance>, Vec<Instance>, Option<Instance>)> {
164        // Ensure there are enough active instances.
165        let available_nodes: Vec<_> = self
166            .node_instances
167            .iter()
168            .filter(|x| x.is_active())
169            .collect();
170        ensure!(
171            available_nodes.len() >= parameters.nodes,
172            TestbedError::InsufficientCapacity(parameters.nodes - available_nodes.len())
173        );
174        let mut available_metrics_node = None;
175        if !self.skip_monitoring {
176            ensure!(
177                self.metrics_instance
178                    .as_ref()
179                    .is_some_and(|m| m.is_active()),
180                TestbedError::MetricsServerMissing()
181            );
182            available_metrics_node = self.metrics_instance.clone();
183        }
184        let mut available_dedicated_client_nodes = vec![];
185        if self.dedicated_clients > 0 {
186            ensure!(
187                self.client_instances.as_ref().is_some_and(|m| m
188                    .iter()
189                    .filter(|x| {
190                        if x.is_active() {
191                            available_dedicated_client_nodes.push(*x);
192                            true
193                        } else {
194                            false
195                        }
196                    })
197                    .count()
198                    >= self.dedicated_clients),
199                TestbedError::InsufficientDedicatedClientCapacity(
200                    self.dedicated_clients - available_dedicated_client_nodes.len()
201                )
202            );
203        }
204
205        // Sort the node instances by region.
206        let mut node_instances_by_regions = HashMap::new();
207        for instance in available_nodes {
208            node_instances_by_regions
209                .entry(&instance.region)
210                .or_insert_with(VecDeque::new)
211                .push_back(instance);
212        }
213
214        // Sort dedicated client instances by region.
215        let mut client_instances_by_regions = HashMap::new();
216        if self.dedicated_clients > 0 {
217            for instance in available_dedicated_client_nodes {
218                client_instances_by_regions
219                    .entry(&instance.region)
220                    .or_insert_with(VecDeque::new)
221                    .push_back(instance);
222            }
223        }
224
225        // Select the instances to host exclusively load generators.
226        let mut client_instances = Vec::new();
227        for region in self.settings.regions.iter().cycle() {
228            if client_instances.len() == self.dedicated_clients {
229                break;
230            }
231            if let Some(regional_instances) = client_instances_by_regions.get_mut(region) {
232                if let Some(instance) = regional_instances.pop_front() {
233                    client_instances.push(instance.clone());
234                }
235            }
236        }
237
238        // Select the instances to host the nodes.
239        let mut nodes_instances = Vec::new();
240        for region in self.settings.regions.iter().cycle() {
241            if nodes_instances.len() == parameters.nodes {
242                break;
243            }
244            if let Some(regional_instances) = node_instances_by_regions.get_mut(region) {
245                if let Some(instance) = regional_instances.pop_front() {
246                    nodes_instances.push(instance.clone());
247                }
248            }
249        }
250
251        // Spawn a load generate collocated with each node if there are no instances
252        // dedicated to excursively run load generators.
253        if client_instances.is_empty() {
254            client_instances.clone_from(&nodes_instances);
255        }
256
257        Ok((client_instances, nodes_instances, available_metrics_node))
258    }
259}
260
261impl<P: ProtocolCommands<T> + ProtocolMetrics, T: BenchmarkType> Orchestrator<P, T> {
262    /// Boot one node per instance.
263    async fn boot_nodes(
264        &self,
265        instances: Vec<Instance>,
266        parameters: &BenchmarkParameters<T>,
267    ) -> TestbedResult<()> {
268        // Run one node per instance.
269        let targets = self
270            .protocol_commands
271            .node_command(instances.clone(), parameters);
272
273        let repo = self.settings.repository_name();
274        let context = CommandContext::new()
275            .run_background("node".into())
276            .with_log_file("~/node.log".into())
277            .with_execute_from_path(repo.into());
278
279        self.ssh_manager
280            .execute_per_instance(targets, context)
281            .await?;
282
283        // Wait until all nodes are reachable.
284        let commands = self
285            .protocol_commands
286            .nodes_metrics_command(instances.clone(), parameters);
287        self.ssh_manager.wait_for_success(commands).await;
288
289        Ok(())
290    }
291
292    /// Install the codebase and its dependencies on the testbed.
293    pub async fn install(&self) -> TestbedResult<()> {
294        display::action("Installing dependencies on all machines");
295
296        let working_dir = self.settings.working_dir.display();
297        let url = &self.settings.repository.url;
298        let basic_commands = [
299            "sudo apt-get update",
300            "sudo apt-get -y upgrade",
301            "sudo apt-get -y autoremove",
302            // Disable "pending kernel upgrade" message.
303            "sudo apt-get -y remove needrestart",
304            // The following dependencies:
305            // * build-essential: prevent the error: [error: linker `cc` not found].
306            // * libssl-dev - Required to compile the orchestrator, todo remove this dependency
307            "sudo apt-get -y install build-essential libssl-dev cmake clang lld protobuf-compiler libudev-dev libpq5 libpq-dev ca-certificates",
308            // Install rust (non-interactive).
309            "curl --proto \"=https\" --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y",
310            "echo \"source $HOME/.cargo/env\" | tee -a ~/.bashrc",
311            "source $HOME/.cargo/env",
312            "rustup default stable",
313            // Create the working directory.
314            &format!("mkdir -p {working_dir}"),
315            // Clone the repo.
316            &format!("(git clone {url} || true)"),
317        ];
318
319        let cloud_provider_specific_dependencies: Vec<_> = self
320            .instance_setup_commands
321            .iter()
322            .map(|x| x.as_str())
323            .collect();
324
325        let protocol_dependencies = self.protocol_commands.protocol_dependencies();
326
327        let command = [
328            &basic_commands[..],
329            &Prometheus::install_commands(),
330            &cloud_provider_specific_dependencies[..],
331            &protocol_dependencies[..],
332        ]
333        .concat()
334        .join(" && ");
335
336        let context = CommandContext::default();
337        let without_metrics = self
338            .instances()
339            .iter()
340            .filter(|i| i.role != InstanceRole::Metrics)
341            .cloned()
342            .collect::<Vec<_>>();
343        self.ssh_manager
344            .execute(without_metrics, command, context.clone())
345            .await?;
346        if !self.skip_monitoring {
347            let metrics_instance = self
348                .metrics_instance
349                .clone()
350                .expect("No metrics instance available");
351            let monitor_command = Monitor::dependencies().join(" && ");
352            self.ssh_manager
353                .execute(vec![metrics_instance], monitor_command, context)
354                .await?;
355        }
356
357        display::done();
358        Ok(())
359    }
360
361    /// Reload prometheus on all instances.
362    pub async fn start_monitoring(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
363        let (clients, nodes, instance) = self.select_instances(parameters)?;
364        if let Some(instance) = instance {
365            display::action("Configuring monitoring instance");
366
367            let monitor = Monitor::new(instance, clients, nodes, self.ssh_manager.clone());
368            monitor
369                .start_prometheus(&self.protocol_commands, parameters)
370                .await?;
371            monitor.start_grafana().await?;
372
373            display::done();
374            display::config("Grafana address", monitor.grafana_address());
375            display::newline();
376        }
377
378        Ok(())
379    }
380
381    /// Update all instances to use the version of the codebase specified in the
382    /// setting file.
383    pub async fn update(&self) -> TestbedResult<()> {
384        display::action("Updating all instances");
385
386        // Update all active instances. This requires compiling the codebase in release
387        // (which may take a long time) so we run the command in the background
388        // to avoid keeping alive many ssh connections for too long.
389        let commit = &self.settings.repository.commit;
390        let command = [
391            &format!("git fetch origin {commit} --force"),
392            &format!("(git reset --hard origin/{commit} || git checkout --force {commit})"),
393            "git clean -fd -e target",
394            "source \"$HOME/.cargo/env\"",
395            "cargo build --release --bin iota --bin iota-node --bin stress",
396        ]
397        .join(" && ");
398
399        display::action(format!("update command: {command}"));
400
401        let id = "update";
402        let repo_name = self.settings.repository_name();
403        let context = CommandContext::new()
404            .run_background(id.into())
405            .with_execute_from_path(repo_name.into());
406        let without_metrics = self
407            .instances()
408            .iter()
409            .filter(|i| i.role != InstanceRole::Metrics)
410            .cloned()
411            .collect::<Vec<_>>();
412
413        self.ssh_manager
414            .execute(without_metrics.clone(), command, context)
415            .await?;
416
417        // Wait until the command finished running.
418        self.ssh_manager
419            .wait_for_command(without_metrics, id, CommandStatus::Terminated)
420            .await?;
421
422        display::done();
423        Ok(())
424    }
425
426    /// Configure the instances with the appropriate configuration files.
427    pub async fn configure(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
428        display::action("Configuring instances");
429
430        // Select instances to configure.
431        let (clients, nodes, _) = self.select_instances(parameters)?;
432
433        // Generate the genesis configuration file and the keystore allowing access to
434        // gas objects.
435        let command = self
436            .protocol_commands
437            .genesis_command(nodes.iter(), parameters);
438        display::action(format!("Genesis command: {command}"));
439        let repo_name = self.settings.repository_name();
440        let context = CommandContext::new().with_execute_from_path(repo_name.into());
441        let all = clients.into_iter().chain(nodes);
442        self.ssh_manager.execute(all, command, context).await?;
443
444        display::done();
445        Ok(())
446    }
447
448    /// Cleanup all instances and optionally delete their log files.
449    pub async fn cleanup(&self, cleanup: bool) -> TestbedResult<()> {
450        display::action("Cleaning up testbed");
451
452        // Kill all tmux servers and delete the nodes dbs. Optionally clear logs.
453        let mut command = vec!["(tmux kill-server || true)".into()];
454        for path in self.protocol_commands.db_directories() {
455            command.push(format!("(rm -rf {} || true)", path.display()));
456        }
457        if cleanup {
458            command.push("(rm -rf ~/*log* || true)".into());
459        }
460        let command = command.join(" ; ");
461
462        // Execute the deletion on all machines.
463        let active = self.instances().into_iter().filter(|x| x.is_active());
464        let context = CommandContext::default();
465        self.ssh_manager.execute(active, command, context).await?;
466
467        display::done();
468        Ok(())
469    }
470
471    /// Deploy the nodes.
472    pub async fn run_nodes(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
473        display::action("Deploying validators");
474
475        // Select the instances to run.
476        let (_, nodes, _) = self.select_instances(parameters)?;
477
478        // Boot one node per instance.
479        self.boot_nodes(nodes, parameters).await?;
480
481        display::done();
482        Ok(())
483    }
484
485    /// Deploy the load generators.
486    pub async fn run_clients(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
487        display::action("Setting up load generators");
488
489        // Select the instances to run.
490        let (clients, _, _) = self.select_instances(parameters)?;
491
492        // Deploy the load generators.
493        let targets = self
494            .protocol_commands
495            .client_command(clients.clone(), parameters);
496
497        let repo = self.settings.repository_name();
498        let context = CommandContext::new()
499            .run_background("client".into())
500            .with_log_file("~/client.log".into())
501            .with_execute_from_path(repo.into());
502        self.ssh_manager
503            .execute_per_instance(targets, context)
504            .await?;
505
506        // Wait until all load generators are reachable.
507        let commands = self
508            .protocol_commands
509            .clients_metrics_command(clients, parameters);
510        self.ssh_manager.wait_for_success(commands).await;
511
512        display::done();
513        Ok(())
514    }
515
516    /// Collect metrics from the load generators.
517    pub async fn run(
518        &self,
519        parameters: &BenchmarkParameters<T>,
520    ) -> TestbedResult<MeasurementsCollection<T>> {
521        display::action(format!(
522            "Scraping metrics (at least {}s)",
523            parameters.duration.as_secs()
524        ));
525
526        // Select the instances to run.
527        let (clients, nodes, _) = self.select_instances(parameters)?;
528
529        // Regularly scrape the client
530        let mut metrics_commands = self
531            .protocol_commands
532            .clients_metrics_command(clients, parameters);
533
534        // TODO: Remove this when consensus client latency metrics are available.
535        // We will be getting latency metrics directly from consensus nodes instead from
536        // the nw client
537        metrics_commands.append(
538            &mut self
539                .protocol_commands
540                .nodes_metrics_command(nodes.clone(), parameters),
541        );
542
543        let mut aggregator = MeasurementsCollection::new(&self.settings, parameters.clone());
544        let mut metrics_interval = time::interval(self.scrape_interval);
545        metrics_interval.tick().await; // The first tick returns immediately.
546
547        let faults_type = parameters.faults.clone();
548        let mut faults_schedule = CrashRecoverySchedule::new(faults_type, nodes.clone());
549        let mut faults_interval = time::interval(self.crash_interval);
550        faults_interval.tick().await; // The first tick returns immediately.
551
552        let start = Instant::now();
553        loop {
554            tokio::select! {
555                // Scrape metrics.
556                now = metrics_interval.tick() => {
557                    let elapsed = now.duration_since(start).as_secs_f64().ceil() as u64;
558                    display::status(format!("{elapsed}s"));
559
560                    let stdio = self
561                        .ssh_manager
562                        .execute_per_instance(metrics_commands.clone(), CommandContext::default())
563                        .await?;
564                    for (i, (stdout, _stderr)) in stdio.iter().enumerate() {
565                        let measurement = Measurement::from_prometheus::<P>(stdout);
566                        aggregator.add(i, measurement);
567                    }
568
569                    if elapsed > parameters.duration .as_secs() {
570                        break;
571                    }
572                },
573
574                // Kill and recover nodes according to the input schedule.
575                _ = faults_interval.tick() => {
576                    let  action = faults_schedule.update();
577                    if !action.kill.is_empty() {
578                        self.ssh_manager.kill(action.kill.clone(), "node").await?;
579                    }
580                    if !action.boot.is_empty() {
581                        self.boot_nodes(action.boot.clone(), parameters).await?;
582                    }
583                    if !action.kill.is_empty() || !action.boot.is_empty() {
584                        display::newline();
585                        display::config("Testbed update", action);
586                    }
587                }
588            }
589        }
590
591        let results_directory = &self.settings.results_dir;
592        let commit = &self.settings.repository.commit;
593        let path: PathBuf = [results_directory, &format!("results-{commit}").into()]
594            .iter()
595            .collect();
596        fs::create_dir_all(&path).expect("Failed to create log directory");
597        aggregator.save(path);
598
599        display::done();
600        Ok(aggregator)
601    }
602
603    /// Download the log files from the nodes and clients.
604    pub async fn download_logs(
605        &self,
606        parameters: &BenchmarkParameters<T>,
607    ) -> TestbedResult<LogsAnalyzer> {
608        // Select the instances to run.
609        let (clients, nodes, _) = self.select_instances(parameters)?;
610
611        // Create a log sub-directory for this run.
612        let commit = &self.settings.repository.commit;
613        let path: PathBuf = [
614            &self.settings.logs_dir,
615            &format!("logs-{commit}").into(),
616            &format!("logs-{parameters:?}").into(),
617        ]
618        .iter()
619        .collect();
620        fs::create_dir_all(&path).expect("Failed to create log directory");
621
622        // NOTE: Our ssh library does not seem to be able to transfers files in parallel
623        // reliably.
624        let mut log_parsers = Vec::new();
625
626        // Download the clients log files.
627        display::action("Downloading clients logs");
628        for (i, instance) in clients.iter().enumerate() {
629            display::status(format!("{}/{}", i + 1, clients.len()));
630
631            let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
632            let client_log_content = connection.download("client.log").await?;
633
634            let client_log_file = [path.clone(), format!("client-{i}.log").into()]
635                .iter()
636                .collect::<PathBuf>();
637            fs::write(&client_log_file, client_log_content.as_bytes())
638                .expect("Cannot write log file");
639
640            let mut log_parser = LogsAnalyzer::default();
641            log_parser.set_client_errors(&client_log_content);
642            log_parsers.push(log_parser)
643        }
644        display::done();
645
646        display::action("Downloading nodes logs");
647        for (i, instance) in nodes.iter().enumerate() {
648            display::status(format!("{}/{}", i + 1, nodes.len()));
649
650            let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
651            let node_log_content = connection.download("node.log").await?;
652
653            let node_log_file = [path.clone(), format!("node-{i}.log").into()]
654                .iter()
655                .collect::<PathBuf>();
656            fs::write(&node_log_file, node_log_content.as_bytes()).expect("Cannot write log file");
657
658            let mut log_parser = LogsAnalyzer::default();
659            log_parser.set_node_errors(&node_log_content);
660            log_parsers.push(log_parser)
661        }
662        display::done();
663
664        Ok(LogsAnalyzer::aggregate(log_parsers))
665    }
666
667    /// Run all the benchmarks specified by the benchmark generator.
668    pub async fn run_benchmarks(
669        &mut self,
670        mut generator: BenchmarkParametersGenerator<T>,
671    ) -> TestbedResult<()> {
672        display::header("Preparing testbed");
673        display::config("Commit", format!("'{}'", &self.settings.repository.commit));
674        display::newline();
675
676        // Cleanup the testbed (in case the previous run was not completed).
677        self.cleanup(true).await?;
678
679        // Update the software on all instances.
680        if !self.skip_testbed_update {
681            self.install().await?;
682            self.update().await?;
683        }
684
685        // Run all benchmarks.
686        let mut i = 1;
687        let mut latest_committee_size = 0;
688        while let Some(parameters) = generator.next() {
689            display::header(format!("Starting benchmark {i}"));
690            display::config("Benchmark type", &parameters.benchmark_type);
691            display::config("Parameters", &parameters);
692            display::newline();
693
694            // Cleanup the testbed (in case the previous run was not completed).
695            self.cleanup(true).await?;
696            // Start the instance monitoring tools.
697            self.start_monitoring(&parameters).await?;
698
699            // Configure all instances (if needed).
700            if !self.skip_testbed_configuration && latest_committee_size != parameters.nodes {
701                self.configure(&parameters).await?;
702                latest_committee_size = parameters.nodes;
703            }
704
705            // Deploy the validators.
706            self.run_nodes(&parameters).await?;
707
708            // Deploy the load generators.
709            self.run_clients(&parameters).await?;
710
711            // Wait for the benchmark to terminate. Then save the results and print a
712            // summary.
713            let aggregator = self.run(&parameters).await?;
714            aggregator.display_summary();
715            generator.register_result(aggregator);
716            // drop(monitor);
717
718            // Kill the nodes and clients (without deleting the log files).
719            self.cleanup(false).await?;
720
721            // Download the log files.
722            if self.log_processing {
723                let error_counter = self.download_logs(&parameters).await?;
724                error_counter.print_summary();
725            }
726
727            i += 1;
728        }
729
730        display::header("Benchmark completed");
731        Ok(())
732    }
733}