1use std::{
6 collections::HashSet,
7 fs::{self, OpenOptions},
8 future::Future,
9 io::Write,
10 marker::PhantomData,
11 path::Path,
12 time::Duration,
13};
14
15use chrono;
16use futures;
17use tokio::time::{self, Instant};
18use tracing::info;
19
20use crate::{
21 benchmark::{BenchmarkParameters, BenchmarkParametersGenerator, BenchmarkType, RunInterval},
22 build_cache::BuildCacheService,
23 client::Instance,
24 display,
25 error::{TestbedError, TestbedResult},
26 faults::CrashRecoverySchedule,
27 logger::{IS_LOOP, SwappableWriter},
28 logs::LogsAnalyzer,
29 measurement::MeasurementsCollection,
30 monitor::{Monitor, Prometheus},
31 net_latency::NetworkLatencyCommandBuilder,
32 protocol::{ProtocolCommands, ProtocolMetrics},
33 settings::{BuildGroups, Settings, build_cargo_command},
34 ssh::{CommandContext, CommandStatus, SshConnectionManager},
35};
36
37pub struct Orchestrator<P, T> {
39 settings: Settings,
41 node_instances: Vec<Instance>,
43 client_instances: Vec<Instance>,
45 metrics_instance: Option<Instance>,
47 benchmark_type: PhantomData<T>,
49 instance_setup_commands: Vec<String>,
51 protocol_commands: P,
54 scrape_interval: Duration,
56 crash_interval: Duration,
58 ssh_manager: SshConnectionManager,
60 skip_testbed_update: bool,
62 skip_testbed_configuration: bool,
64 log_processing: bool,
66 dedicated_clients: usize,
70 skip_monitoring: bool,
73 benchmark_dir: std::path::PathBuf,
75 benchmark_writer: crate::logger::SwappableWriter,
77}
78
79impl<P, T> Orchestrator<P, T> {
80 const DEFAULT_SCRAPE_INTERVAL: Duration = Duration::from_secs(15);
82 const DEFAULT_CRASH_INTERVAL: Duration = Duration::from_secs(60);
84
85 pub fn new(
87 settings: Settings,
88 node_instances: Vec<Instance>,
89 client_instances: Vec<Instance>,
90 metrics_instance: Option<Instance>,
91 instance_setup_commands: Vec<String>,
92 protocol_commands: P,
93 ssh_manager: SshConnectionManager,
94 ) -> Self {
95 Self {
96 settings,
97 node_instances,
98 client_instances,
99 metrics_instance,
100 benchmark_type: PhantomData,
101 instance_setup_commands,
102 protocol_commands,
103 ssh_manager,
104 scrape_interval: Self::DEFAULT_SCRAPE_INTERVAL,
105 crash_interval: Self::DEFAULT_CRASH_INTERVAL,
106 skip_testbed_update: false,
107 skip_testbed_configuration: false,
108 log_processing: false,
109 dedicated_clients: 0,
110 skip_monitoring: false,
111 benchmark_dir: Path::new("benchmark_results").to_path_buf(),
112 benchmark_writer: SwappableWriter::new(),
113 }
114 }
115
116 pub fn with_scrape_interval(mut self, scrape_interval: Duration) -> Self {
118 self.scrape_interval = scrape_interval;
119 self
120 }
121
122 pub fn with_crash_interval(mut self, crash_interval: Duration) -> Self {
124 self.crash_interval = crash_interval;
125 self
126 }
127
128 pub fn skip_testbed_updates(mut self, skip_testbed_update: bool) -> Self {
130 self.skip_testbed_update = skip_testbed_update;
131 self
132 }
133
134 pub fn skip_testbed_configuration(mut self, skip_testbed_configuration: bool) -> Self {
136 self.skip_testbed_configuration = skip_testbed_configuration;
137 self
138 }
139
140 pub fn with_log_processing(mut self, log_processing: bool) -> Self {
142 self.log_processing = log_processing;
143 self
144 }
145
146 pub fn with_dedicated_clients(mut self, dedicated_clients: usize) -> Self {
148 self.dedicated_clients = dedicated_clients;
149 self
150 }
151
152 pub fn with_benchmark_dir_and_writer<F: AsRef<Path>>(
153 mut self,
154 benchmark_dir: F,
155 writer: crate::logger::SwappableWriter,
156 ) -> Self {
157 self.benchmark_dir = benchmark_dir.as_ref().to_path_buf();
158 self.benchmark_writer = writer;
159 self
160 }
161
162 pub fn skip_monitoring(mut self, skip_monitoring: bool) -> Self {
164 self.skip_monitoring = skip_monitoring;
165 self
166 }
167
168 pub fn instances_without_metrics(&self) -> Vec<Instance> {
169 let mut instances = self.node_instances.clone();
170
171 if self.dedicated_clients > 0 {
172 instances.extend(self.client_instances.clone());
173 }
174 instances
175 }
176
177 pub fn instances(&self) -> Vec<Instance> {
179 let mut instances = self.instances_without_metrics();
180 if let Some(metrics_instance) = &self.metrics_instance {
181 instances.push(metrics_instance.clone());
182 }
183 instances
184 }
185
186 fn effective_scrape_interval(&self, parameters: &BenchmarkParameters<T>) -> Duration {
187 const MIN: Duration = Duration::from_secs(1);
188 let max = self.scrape_interval;
190
191 match parameters.run_interval {
192 RunInterval::Time(_) => self.scrape_interval,
193
194 RunInterval::Count(tx_count) => {
195 let qps = parameters.load.max(1) as u64; let est_secs = tx_count.div_ceil(qps);
200
201 let target_samples = 30u64;
202 let raw = (est_secs / target_samples).max(1);
203 let candidate = Duration::from_secs(raw);
204
205 candidate.clamp(MIN, max)
206 }
207 }
208 }
209}
210
211impl<P: ProtocolCommands<T> + ProtocolMetrics, T: BenchmarkType> Orchestrator<P, T> {
212 async fn boot_nodes(
214 &self,
215 instances: Vec<Instance>,
216 parameters: &BenchmarkParameters<T>,
217 ) -> TestbedResult<()> {
218 if parameters.use_internal_ip_address {
219 if let Some(latency_topology) = parameters.latency_topology.clone() {
220 let latency_commands = NetworkLatencyCommandBuilder::new(&instances)
221 .with_perturbation_spec(parameters.perturbation_spec.clone())
222 .with_topology_layout(latency_topology)
223 .with_max_latency(parameters.maximum_latency)
224 .build_network_latency_matrix();
225 self.ssh_manager
226 .execute_per_instance(latency_commands, CommandContext::default())
227 .await?;
228 }
229 }
230
231 let targets = self
233 .protocol_commands
234 .node_command(instances.clone(), parameters);
235
236 let repo = self.settings.repository_name();
237 let node_context = CommandContext::new()
238 .run_background("node".into())
239 .with_log_file("~/node.log".into())
240 .with_execute_from_path(repo.into());
241 self.ssh_manager
242 .execute_per_instance(targets, node_context)
243 .await?;
244
245 let commands = self
247 .protocol_commands
248 .nodes_metrics_command(instances.clone(), parameters.use_internal_ip_address);
249 self.wait_for_success(commands, ¶meters.benchmark_dir)
250 .await;
251
252 Ok(())
253 }
254
255 pub async fn install(&self) -> TestbedResult<()> {
257 display::action("Installing dependencies on all machines");
258
259 let working_dir = self.settings.working_dir.display();
260 let url = &self.settings.repository.url;
261
262 let use_precompiled_binaries = self.settings.build_cache_enabled();
263
264 let working_dir_cmd = format!("mkdir -p {working_dir}");
265 let git_clone_cmd = format!("(git clone --depth=1 {url} || true)");
266
267 let mut basic_commands = vec![
268 "sudo apt-get update",
269 "sudo apt-get -y upgrade",
270 "sudo apt-get -y autoremove",
271 "sudo apt-get -y remove needrestart",
273 "sudo apt-get -y install curl git ca-certificates",
274 "echo '* soft nofile 1048576' | sudo tee -a /etc/security/limits.conf",
276 "echo '* hard nofile 1048576' | sudo tee -a /etc/security/limits.conf",
277 "echo 'root soft nofile 1048576' | sudo tee -a /etc/security/limits.conf",
278 "echo 'root hard nofile 1048576' | sudo tee -a /etc/security/limits.conf",
279 "echo 'fs.file-max = 2097152' | sudo tee -a /etc/sysctl.conf",
281 "sudo sysctl -p",
282 "ulimit -n 1048576 || true",
284 "sudo sysctl -w net.core.rmem_max=104857600",
286 "sudo sysctl -w net.core.wmem_max=104857600",
287 "sudo sysctl -w net.ipv4.tcp_rmem=\"8192 262144 104857600\"",
288 "sudo sysctl -w net.ipv4.tcp_wmem=\"8192 262144 104857600\"",
289 working_dir_cmd.as_str(),
291 git_clone_cmd.as_str(),
293 ];
294
295 let toolchain_cmds: Vec<String> = if !use_precompiled_binaries {
297 self.settings
298 .build_configs
299 .values()
300 .filter_map(|config| {
301 config
302 .toolchain
303 .as_ref()
304 .filter(|t| t.as_str() != "stable")
305 .cloned()
306 })
307 .collect::<HashSet<String>>()
308 .into_iter()
309 .map(|toolchain| format!("rustup toolchain install {toolchain}"))
310 .collect()
311 } else {
312 vec![]
313 };
314
315 if !use_precompiled_binaries {
316 basic_commands.extend([
318 "sudo apt-get -y install build-essential cmake clang lld protobuf-compiler pkg-config nvme-cli",
321 "curl --proto \"=https\" --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y",
323 "echo \"source $HOME/.cargo/env\" | tee -a ~/.bashrc",
324 "source $HOME/.cargo/env",
325 "rustup default stable",
326 ]);
327
328 for cmd in &toolchain_cmds {
330 basic_commands.push(cmd.as_str());
331 }
332 } else {
333 basic_commands.push("mkdir -p $HOME/.cargo/ && touch $HOME/.cargo/env");
336 }
337
338 let cloud_provider_specific_dependencies: Vec<_> = self
339 .instance_setup_commands
340 .iter()
341 .map(|x| x.as_str())
342 .collect();
343
344 let protocol_dependencies = self.protocol_commands.protocol_dependencies();
345
346 let command = [
347 &basic_commands[..],
348 &Prometheus::install_commands(),
349 &cloud_provider_specific_dependencies[..],
350 &protocol_dependencies[..],
351 ]
352 .concat()
353 .join(" && ");
354
355 self.ssh_manager
356 .execute(self.instances(), command, CommandContext::default())
357 .await?;
358
359 if !self.skip_monitoring {
360 let metrics_instance = self
361 .metrics_instance
362 .clone()
363 .expect("No metrics instance available");
364 let monitor_command = Monitor::dependencies().join(" && ");
365 self.ssh_manager
366 .execute(
367 vec![metrics_instance],
368 monitor_command,
369 CommandContext::default(),
370 )
371 .await?;
372 }
373
374 display::done();
375 Ok(())
376 }
377
378 pub async fn start_monitoring(
380 &self,
381 use_internal_ip_address: bool,
382 timestamp: &str,
383 ) -> TestbedResult<()> {
384 if self.skip_monitoring {
385 display::warn("Monitoring is skipped, not starting Prometheus, Tempo and Grafana");
386 return Ok(());
387 }
388 if let Some(instance) = &self.metrics_instance {
389 display::action("Configuring monitoring instance");
390
391 let monitor = Monitor::new(
392 instance.clone(),
393 self.client_instances.clone(),
394 self.node_instances.clone(),
395 self.ssh_manager.clone(),
396 );
397 let snapshot_dir = self
400 .settings
401 .enable_prometheus_snapshots
402 .then_some(timestamp);
403 monitor.start_tempo().await?;
404 monitor
405 .start_prometheus(
406 &self.protocol_commands,
407 use_internal_ip_address,
408 self.settings.use_fullnode_for_execution,
409 snapshot_dir,
410 )
411 .await?;
412 monitor.start_grafana().await?;
413
414 display::done();
415 display::config("Grafana address", monitor.grafana_address());
416 display::newline();
417 }
418
419 Ok(())
420 }
421
422 pub async fn update(&self) -> TestbedResult<()> {
425 display::action("Updating all instances");
426
427 let commit = &self.settings.repository.commit;
428 let repo_name = self.settings.repository_name();
429 let build_groups = self.settings.build_groups();
430
431 let git_update_command = [
435 &format!("git fetch origin {commit} --force"),
436 &format!("(git reset --hard origin/{commit} || git checkout --force {commit})"),
437 "git clean -fd -e target",
438 ]
439 .join(" && ");
440
441 let id = "git update";
442 let context = CommandContext::new()
443 .run_background(id.into())
444 .with_execute_from_path(repo_name.clone().into());
445
446 display::action(format!("update command: {git_update_command}"));
449 self.ssh_manager
450 .execute(self.instances(), git_update_command, context)
451 .await?;
452 self.ssh_manager
453 .wait_for_command(self.instances(), id, CommandStatus::Terminated)
454 .await?;
455
456 if self.settings.build_cache_enabled() {
458 display::action("Using build cache for binary distribution");
459 let build_cache_service = BuildCacheService::new(&self.settings, &self.ssh_manager);
460 build_cache_service
461 .update_with_build_cache(
462 commit,
463 &build_groups,
464 self.instances_without_metrics(),
465 repo_name.clone(),
466 )
467 .await?;
468 } else {
469 self.update_with_local_build(build_groups).await?;
470 }
471
472 display::done();
473 Ok(())
474 }
475
476 async fn update_with_local_build(&self, build_groups: BuildGroups) -> TestbedResult<()> {
482 let without_metrics = self.instances_without_metrics();
483 let repo_name = self.settings.repository_name();
484
485 for (i, (group, binary_names)) in build_groups.iter().enumerate() {
487 let build_command = build_cargo_command(
489 "build",
490 group.toolchain.clone(),
491 group.features.clone(),
492 binary_names,
493 &[] as &[&str],
494 &[] as &[&str],
495 );
496
497 display::action(format!(
499 "Running build command {}/{}: \"{build_command}\" in \"{repo_name}\"",
500 i + 1,
501 build_groups.len()
502 ));
503
504 let context = CommandContext::new().with_execute_from_path(repo_name.clone().into());
505
506 self.ssh_manager
507 .execute(without_metrics.clone(), build_command, context)
508 .await?;
509 }
510
511 Ok(())
512 }
513
514 pub async fn configure(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
516 display::action("Configuring instances");
517
518 let command = self
521 .protocol_commands
522 .genesis_command(self.node_instances.iter(), parameters);
523 display::action(format!("\nGenesis command: {command}\n\n"));
524 let repo_name = self.settings.repository_name();
525 let context = CommandContext::new().with_execute_from_path(repo_name.into());
526 self.ssh_manager
527 .execute(self.instances_without_metrics(), command, context)
528 .await?;
529
530 display::action("Configuration of all instances completed");
531 display::done();
532 Ok(())
533 }
534
535 pub async fn cleanup(&self, cleanup: bool) -> TestbedResult<()> {
537 display::action("Cleaning up testbed");
538
539 let mut command = vec!["(tmux kill-server || true)".into()];
541 for path in self.protocol_commands.db_directories() {
542 command.push(format!("(rm -rf {} || true)", path.display()));
543 }
544 if cleanup {
545 command.push("(rm -rf ~/*log* || true)".into());
546 }
547 let command = command.join(" ; ");
548
549 let active = self.instances().into_iter().filter(|x| x.is_active());
551 let context = CommandContext::default();
552 self.ssh_manager.execute(active, command, context).await?;
553
554 display::action("Cleanup of all instances completed");
555 display::done();
556 Ok(())
557 }
558
559 pub async fn run_nodes(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
561 display::action("Deploying validators");
562
563 self.boot_nodes(self.node_instances.clone(), parameters)
565 .await?;
566
567 display::action("Deployment of validators completed");
568 display::done();
569 Ok(())
570 }
571
572 pub async fn run_clients(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
574 display::action("Starting deployment of load generators");
575 if self.settings.use_fullnode_for_execution {
576 display::action("Setting up full nodes");
577
578 let targets = self
580 .protocol_commands
581 .fullnode_command(self.client_instances.clone(), parameters);
582
583 let repo = self.settings.repository_name();
584 let context = CommandContext::new()
585 .run_background("fullnode".into())
586 .with_log_file("~/fullnode.log".into())
587 .with_execute_from_path(repo.into());
588 self.ssh_manager
589 .execute_per_instance(targets, context)
590 .await?;
591
592 display::action("Await fullnode ready...");
595 let commands = self
596 .client_instances
597 .iter()
598 .cloned()
599 .map(|i| (i, "curl http://127.0.0.1:9000 -H 'Content-Type: application/json' -d '{\"jsonrpc\":\"2.0\",\"method\":\"iota_getLatestCheckpointSequenceNumber\",\"params\":[],\"id\":1}'".to_owned()));
600 self.ssh_manager.wait_for_success(commands).await;
601
602 display::done();
603 }
604
605 display::action("Setting up load generators");
606
607 let targets = self
609 .protocol_commands
610 .client_command(self.client_instances.clone(), parameters);
611
612 let repo = self.settings.repository_name();
613 let context = CommandContext::new()
614 .run_background("client".into())
615 .with_log_file("~/client.log".into())
616 .with_execute_from_path(repo.into());
617 self.ssh_manager
618 .execute_per_instance(targets, context)
619 .await?;
620
621 let commands = self.protocol_commands.clients_metrics_command(
623 self.client_instances.clone(),
624 parameters.use_internal_ip_address,
625 );
626 self.wait_for_success(commands, ¶meters.benchmark_dir)
627 .await;
628
629 display::action("\n\nStarting background metrics collection service");
631 let metrics_script =
632 self.metrics_collection_script_command(parameters.use_internal_ip_address);
633 let metrics_context = CommandContext::new().run_background("metrics-collector".into());
634 self.ssh_manager
635 .execute_per_instance(metrics_script.clone(), metrics_context)
636 .await?;
637
638 display::action("Background metrics collection service started");
639 display::action("Deployment of load generators completed");
640 display::done();
641 Ok(())
642 }
643
644 fn metrics_collection_script_command(
647 &self,
648 use_internal_ip_address: bool,
649 ) -> Vec<(Instance, String)> {
650 self.protocol_commands
652 .clients_metrics_command(self.client_instances.clone(), use_internal_ip_address)
653 .into_iter()
654 .map(|(instance, cmd)| {
655 (
656 instance,
657 format!(
658 r#"while true; do
659 {cmd} >> ~/metrics.log 2>&1
660 sleep 15
661done"#
662 ),
663 )
664 })
665 .collect::<Vec<_>>()
666 }
667
668 pub async fn run(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
670 let run_label = match parameters.run_interval {
671 RunInterval::Time(d) => format!("at least {}s", d.as_secs()),
672 RunInterval::Count(n) => format!("until {n} tx executed"),
673 };
674 display::action(format!("Running benchmark ({run_label})"));
675
676 let scrape_every = self.effective_scrape_interval(parameters);
677 let mut metrics_interval = time::interval(scrape_every);
678 metrics_interval.tick().await;
679 let faults_type = parameters.faults.clone();
680 let mut faults_schedule =
681 CrashRecoverySchedule::new(faults_type, self.node_instances.clone());
682
683 let mut faults_interval = time::interval(self.crash_interval);
684 faults_interval.tick().await;
685
686 let is_count_mode = matches!(parameters.run_interval, RunInterval::Count(_));
689 let clients = self.client_instances.clone();
690 let ssh = self.ssh_manager.clone();
691
692 let wait_clients_future: std::pin::Pin<Box<dyn Future<Output = TestbedResult<()>> + Send>> =
693 if is_count_mode && !clients.is_empty() {
694 Box::pin(async move {
695 ssh.wait_for_command(clients, "client", CommandStatus::Terminated)
696 .await
697 .map_err(Into::into)
698 })
699 } else {
700 Box::pin(std::future::pending::<TestbedResult<()>>())
701 };
702 tokio::pin!(wait_clients_future);
703
704 let start = Instant::now();
705
706 loop {
707 tokio::select! {
708 _ = metrics_interval.tick() => {
709 let elapsed = Instant::now().duration_since(start).as_secs_f64().ceil() as u64;
710 display::status(format!("{elapsed}s"));
711
712 if let Some(limit) = parameters.run_interval.time_limit_secs() {
713 if elapsed >= limit {
714 break;
715 }
716 }
717 }
718
719 res = &mut wait_clients_future => {
720 res?;
722 break;
723 }
724
725 _ = faults_interval.tick() => {
726 let action = faults_schedule.update();
727 if !action.kill.is_empty() {
728 self.ssh_manager.kill(action.kill.clone(), "node").await?;
729 }
730 if !action.boot.is_empty() {
731 self.boot_nodes(action.boot.clone(), parameters).await?;
732 }
733 if !action.kill.is_empty() || !action.boot.is_empty() {
734 display::newline();
735 display::config("Testbed update", action);
736 }
737 }
738 }
739 }
740
741 if self.settings.enable_flamegraph {
742 let flamegraphs_dir = parameters.benchmark_dir.join("flamegraphs");
743 fs::create_dir_all(&flamegraphs_dir).expect("Failed to create flamegraphs directory");
744
745 self.fetch_flamegraphs(
746 self.instances_without_metrics().clone(),
747 &flamegraphs_dir,
748 "?svg=true",
749 "flamegraph",
750 )
751 .await?;
752
753 if self
754 .settings
755 .build_configs
756 .get("iota-node")
757 .is_some_and(|config| config.features.iter().any(|f| f == "flamegraph-alloc"))
758 {
759 self.fetch_flamegraphs(
760 self.instances_without_metrics().clone(),
761 &flamegraphs_dir,
762 "?svg=true&mem=true",
763 "flamegraph-alloc",
764 )
765 .await?;
766 }
767 }
768
769 display::action("Benchmark run completed");
770 display::done();
771 Ok(())
772 }
773
774 async fn fetch_flamegraphs(
775 &self,
776 nodes: Vec<Instance>,
777 path: &Path,
778 query: &str,
779 file_prefix: &str,
780 ) -> TestbedResult<()> {
781 let flamegraph_commands = self
782 .protocol_commands
783 .nodes_flamegraph_command(nodes, query);
784 let stdio = self
785 .ssh_manager
786 .execute_per_instance(flamegraph_commands, CommandContext::default())
787 .await?;
788 for (i, (stdout, stderr)) in stdio.into_iter().enumerate() {
789 if !stdout.is_empty() {
790 let file = path.join(format!("{file_prefix}-{i}.svg"));
791 fs::write(file, stdout).unwrap();
792 }
793 if !stderr.is_empty() {
794 let file = path.join(format!("{file_prefix}-{i}.log"));
795 fs::write(file, stderr).unwrap();
796 }
797 }
798 Ok(())
799 }
800
801 pub async fn wait_for_success<I, S>(&self, instances: I, _benchmark_dir: &Path)
802 where
803 I: IntoIterator<Item = (Instance, S)> + Clone,
804 S: Into<String> + Send + 'static + Clone,
805 {
806 match self
807 .ssh_manager
808 .execute_per_instance(
809 instances.clone(),
810 CommandContext::default().with_retries(10),
811 )
812 .await
813 {
814 Ok(_) => {}
815 Err(e) => {
816 panic!("Command execution failed on one or more instances: {e}");
818 }
819 }
820 }
821
822 pub async fn download_benchmark_stats(
823 &self,
824 benchmark_stats_path: &str,
825 parameters: &BenchmarkParameters<T>,
826 ) -> TestbedResult<()> {
827 let path = parameters.benchmark_dir.join("benchmark-stats");
828 fs::create_dir_all(&path).expect("Failed to create benchmark-stats directory");
829
830 display::action("Downloading benchmark stats");
831
832 let mut downloaded = 0usize;
833
834 for (i, instance) in self.client_instances.iter().enumerate() {
835 display::status(format!("{}/{}", i + 1, self.client_instances.len()));
836
837 let remote_path_raw = benchmark_stats_path.replace("{i}", &i.to_string());
840
841 let remote_path = if let Some(rest) = remote_path_raw.strip_prefix("~/") {
843 format!("/home/ubuntu/{rest}")
844 } else {
845 remote_path_raw
846 };
847
848 let local_file_name = Path::new(&remote_path)
849 .file_name()
850 .and_then(|s| s.to_str())
851 .map(|name| format!("client-{i}-{name}"))
852 .unwrap_or_else(|| format!("client-{i}-benchmark-stats.json"));
853
854 let result: TestbedResult<()> = async {
855 let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
856 let content = connection.download(&remote_path).await?;
857
858 let local_file = path.join(local_file_name);
859 fs::write(local_file, content.as_bytes())
860 .expect("Cannot write benchmark stats file");
861 Ok(())
862 }
863 .await;
864
865 match result {
866 Ok(_) => {
867 downloaded += 1;
868 }
869 Err(e) => {
870 display::warn(format!(
871 "Failed to download benchmark stats from client {i} ({}) at '{}': {e}",
872 instance.ssh_address(),
873 remote_path
874 ));
875 }
876 }
877 }
878
879 if downloaded == 0 {
880 display::warn(format!(
881 "No benchmark stats files downloaded (remote path template: '{}')",
882 benchmark_stats_path
883 ));
884 } else {
885 display::config("Downloaded benchmark stats files", downloaded);
886 }
887
888 display::done();
889 Ok(())
890 }
891
892 pub async fn download_metrics_logs(&self, benchmark_dir: &Path) -> TestbedResult<()> {
894 let path = benchmark_dir.join("logs");
895 fs::create_dir_all(&path).expect("Failed to create logs directory");
896
897 display::action("Downloading metrics logs");
899 for (i, instance) in self.client_instances.iter().enumerate() {
900 display::status(format!("{}/{}", i + 1, self.client_instances.len()));
901
902 let _: TestbedResult<()> = async {
903 let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
904
905 match connection.download("metrics.log").await {
907 Ok(metrics_content) => {
908 let metrics_file = path.join(format!("metrics-{i}.log"));
909 fs::write(metrics_file, metrics_content.as_bytes())
910 .expect("Cannot write metrics file");
911 }
912 Err(_) => {
913 display::warn(format!("Metrics file not found for client {i}"));
914 }
915 }
916 Ok(())
917 }
918 .await;
919 }
920 display::done();
921
922 Ok(())
923 }
924
925 pub async fn download_prometheus_snapshot(
926 &self,
927 benchmark_dir: &Path,
928 timestamp: &str,
929 ) -> TestbedResult<()> {
930 if let Some(instance) = &self.metrics_instance {
931 display::action("Taking prometheus snapshot");
932 let command = Prometheus::take_snapshot_command();
933
934 #[derive(serde::Deserialize)]
936 struct ResponseData {
937 name: String,
939 }
940 #[derive(serde::Deserialize)]
941 struct Response {
942 #[allow(dead_code)]
943 status: String,
944 data: ResponseData,
945 }
946
947 let response = self
948 .ssh_manager
949 .execute(
950 std::iter::once(instance.clone()),
951 command.clone(),
952 CommandContext::default(),
953 )
954 .await?
955 .into_iter()
956 .next()
957 .ok_or_else(|| {
958 TestbedError::SshCommandFailed(
959 instance.clone(),
960 command.clone(),
961 "No response from command".into(),
962 )
963 })?
964 .0;
965 let response: Response = serde_json::from_str(&response).map_err(|e| {
966 TestbedError::SshCommandFailed(
967 instance.clone(),
968 command.clone(),
969 format!("Failed to parse response: {e}"),
970 )
971 })?;
972 display::done();
973
974 let snapshot_name = response.data.name;
975 display::config("Created prometheus snapshot", &snapshot_name);
976 display::newline();
977
978 display::action("Downloading prometheus snapshot");
979 let snapshot_dir = benchmark_dir.join("snapshot").display().to_string();
980 let rsync_args = vec![
981 "-rvze".to_string(),
984 format!(
986 "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -i {}",
987 self.settings.ssh_private_key_file.display()
988 ),
989 format!(
991 "ubuntu@{}:/var/lib/prometheus/{}/snapshots/{}/",
992 instance.main_ip, timestamp, snapshot_name
993 ),
994 snapshot_dir,
996 ];
997
998 let instance = instance.clone();
999 tokio::task::spawn_blocking(move || -> TestbedResult<()> {
1000 match std::process::Command::new("rsync")
1001 .args(&rsync_args)
1002 .status()
1003 {
1004 Ok(status) if status.success() => Ok(()),
1005 Ok(status) => Err(TestbedError::SshCommandFailed(
1006 instance,
1007 "rsync ".to_string() + &rsync_args.join(" "),
1008 format!("rsync failed with status: {}", status),
1009 )),
1010 Err(e) => Err(TestbedError::SshCommandFailed(
1011 instance,
1012 "rsync ".to_string() + &rsync_args.join(" "),
1013 format!("rsync failed with error: {}", e),
1014 )),
1015 }
1016 })
1017 .await
1018 .unwrap()?;
1019
1020 display::done();
1021 display::status("Downloaded prometheus snapshot");
1022 display::newline();
1023 }
1024
1025 Ok(())
1026 }
1027
1028 pub async fn download_logs(&self, benchmark_dir: &Path) -> TestbedResult<LogsAnalyzer> {
1030 let path = benchmark_dir.join("logs");
1032 fs::create_dir_all(&path).expect("Failed to create logs directory");
1033
1034 let mut log_parsers = Vec::new();
1037
1038 display::action("Downloading clients logs");
1040 for (i, instance) in self.client_instances.iter().enumerate() {
1041 display::status(format!("{}/{}", i + 1, self.client_instances.len()));
1042
1043 let _: TestbedResult<()> = async {
1044 let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
1045
1046 if self.settings.use_fullnode_for_execution {
1047 let fullnode_log_content = connection.download("fullnode.log").await?;
1048 let fullnode_log_file = path.join(format!("fullnode-{i}.log"));
1049 fs::write(fullnode_log_file, fullnode_log_content.as_bytes())
1050 .expect("Cannot write log file");
1051 }
1052
1053 let client_log_content = connection.download("client.log").await?;
1054
1055 let client_log_file = path.join(format!("client-{i}.log"));
1056 fs::write(client_log_file, client_log_content.as_bytes())
1057 .expect("Cannot write log file");
1058
1059 let mut log_parser = LogsAnalyzer::default();
1060 log_parser.set_client_errors(&client_log_content);
1061 log_parsers.push(log_parser);
1062 Ok(())
1063 }
1064 .await;
1065 }
1066
1067 display::done();
1068
1069 display::action("Downloading nodes logs");
1070 let download_tasks: Vec<_> = self
1071 .node_instances
1072 .iter()
1073 .enumerate()
1074 .map(|(i, instance)| {
1075 let ssh_manager = self.ssh_manager.clone();
1076 let path = path.clone();
1077 let ssh_address = instance.ssh_address();
1078
1079 async move {
1080 let connection = ssh_manager.connect(ssh_address).await?;
1081 let node_log_content = connection.download("node.log").await?;
1082
1083 let node_log_file = path.join(format!("node-{i}.log"));
1084 fs::write(node_log_file, node_log_content.as_bytes())
1085 .expect("Cannot write log file");
1086
1087 let mut log_parser = LogsAnalyzer::default();
1088 log_parser.set_node_errors(&node_log_content);
1089 Ok::<LogsAnalyzer, TestbedError>(log_parser)
1090 }
1091 })
1092 .collect();
1093
1094 let results = futures::future::join_all(download_tasks).await;
1095 for (idx, result) in results.into_iter().enumerate() {
1096 display::status(format!("{}/{}", idx + 1, self.node_instances.len()));
1097 match result {
1098 Ok(log_parser) => log_parsers.push(log_parser),
1099 Err(e) => display::warn(format!("Failed to download node log: {e}")),
1100 }
1101 }
1102 display::done();
1103
1104 Ok(LogsAnalyzer::aggregate(log_parsers))
1105 }
1106
1107 pub async fn run_benchmarks(
1109 &mut self,
1110 mut generator: BenchmarkParametersGenerator<T>,
1111 ) -> TestbedResult<()> {
1112 display::header("Preparing testbed");
1113 display::config("Commit", format!("'{}'", &self.settings.repository.commit));
1114 display::newline();
1115
1116 self.cleanup(true).await?;
1118 let timestamp = chrono::Local::now().format("%y%m%d_%H%M%S").to_string();
1119
1120 display::config("dedicated_clients", self.dedicated_clients);
1121 display::config("nodes", self.node_instances.len());
1122 display::config("clients", self.client_instances.len());
1123 display::config("metrics", self.metrics_instance.is_some());
1124
1125 display::config(
1126 "nodes",
1127 self.node_instances
1128 .iter()
1129 .map(|i| i.ssh_address().to_string())
1130 .collect::<Vec<_>>()
1131 .join(", "),
1132 );
1133 display::config(
1134 "clients",
1135 self.client_instances
1136 .iter()
1137 .map(|i| i.ssh_address().to_string())
1138 .collect::<Vec<_>>()
1139 .join(", "),
1140 );
1141 display::config(
1142 "metrics",
1143 self.metrics_instance
1144 .as_ref()
1145 .map(|i| i.ssh_address().to_string())
1146 .unwrap_or("<none>".to_string()),
1147 );
1148
1149 if !self.skip_testbed_update {
1151 self.install().await?;
1152 self.update().await?;
1153 }
1154
1155 self.start_monitoring(generator.use_internal_ip_address, ×tamp)
1157 .await?;
1158
1159 display::action("Testbed ready!");
1160
1161 let mut i = 1;
1163 let mut latest_committee_size = 0;
1164 let result: TestbedResult<()> = IS_LOOP
1165 .scope(true, async {
1166 while let Some(mut parameters) = generator.next() {
1167 display::header(format!("Starting benchmark {i}"));
1168 display::config("Benchmark type", ¶meters.benchmark_type);
1169 display::config("Parameters", ¶meters);
1170 display::newline();
1171
1172 parameters.benchmark_dir = self.benchmark_dir.join(format!("{parameters:?}"));
1173
1174 if !self.skip_monitoring {
1175 if let Some(metrics) = &self.metrics_instance {
1176 let host_ip = if generator.use_internal_ip_address {
1177 metrics.private_ip
1178 } else {
1179 metrics.main_ip
1180 };
1181
1182 parameters.otel.get_or_insert(crate::benchmark::OtelConfig {
1183 otlp_endpoint: format!(
1184 "http://{}:{}",
1185 host_ip,
1186 crate::monitor::Tempo::OTLP_GRPC_PORT
1187 ),
1188 protocol: "grpc".to_string(),
1189 sampler: "parentbased_traceidratio".to_string(),
1190 sampler_arg: "0.1".to_string(),
1191 });
1192 }
1193 }
1194
1195 self.cleanup(true).await?;
1197 fs::create_dir_all(¶meters.benchmark_dir)
1199 .expect("Failed to create benchmark directory");
1200
1201 let log_file_path = parameters.benchmark_dir.join("logs.log");
1203 let new_file = OpenOptions::new()
1204 .create(true)
1205 .append(true)
1206 .open(log_file_path)
1207 .expect("Failed to open log file");
1208 self.benchmark_writer
1209 .swap(Box::new(new_file))
1210 .expect("Failed to swap log writer");
1211
1212 crate::logger::log(
1213 &chrono::Local::now()
1214 .format("Started %y-%m-%d:%H-%M-%S\n")
1215 .to_string(),
1216 );
1217
1218 let benchmark_result = async {
1219 if !self.skip_testbed_configuration {
1221 self.configure(¶meters).await?;
1222 latest_committee_size = parameters.nodes;
1223 }
1224
1225 self.run_nodes(¶meters).await?;
1227
1228 self.run_clients(¶meters).await?;
1230
1231 self.run(¶meters).await?;
1234
1235 let mut aggregator =
1237 MeasurementsCollection::new(&self.settings, parameters.clone());
1238 self.download_metrics_logs(¶meters.benchmark_dir)
1239 .await?;
1240
1241 aggregator.aggregates_metrics_from_files::<P>(
1243 self.client_instances.len(),
1244 ¶meters.benchmark_dir.join("logs"),
1245 );
1246
1247 aggregator.display_summary();
1248 aggregator.save(¶meters.benchmark_dir);
1249 generator.register_result(aggregator);
1250
1251 let _ = self.benchmark_writer.flush();
1253
1254 TestbedResult::Ok(())
1255 }
1256 .await;
1257
1258 if let Some(benchmark_stats_path) = ¶meters.benchmark_stats_path {
1260 self.download_benchmark_stats(benchmark_stats_path, ¶meters)
1261 .await?;
1262 }
1263
1264 self.cleanup(false).await?;
1266
1267 if self.log_processing {
1269 let error_counter = self.download_logs(¶meters.benchmark_dir).await?;
1270 error_counter.print_summary();
1271 }
1272
1273 crate::logger::log(
1275 &chrono::Local::now()
1276 .format("Finished %y-%m-%d:%H-%M-%S\n")
1277 .to_string(),
1278 );
1279
1280 benchmark_result?;
1282
1283 i += 1;
1284 }
1285
1286 Ok(())
1287 })
1288 .await;
1289
1290 result?;
1291
1292 if self.settings.enable_prometheus_snapshots {
1293 info!("Downloading prometheus snapshot");
1294 if let Err(e) = self
1295 .download_prometheus_snapshot(&self.benchmark_dir, ×tamp)
1296 .await
1297 {
1298 display::error(format!("Failed to download prometheus snapshot: {}", e));
1299 }
1300 info!("Prometheus snapshot download completed");
1301 }
1302
1303 display::header("Benchmark completed");
1304 Ok(())
1305 }
1306}