1use std::{
6 collections::HashSet,
7 fs::{self},
8 marker::PhantomData,
9 path::Path,
10 time::Duration,
11};
12
13use chrono;
14use futures;
15use tokio::time::{self, Instant};
16
17use crate::{
18 benchmark::{BenchmarkParameters, BenchmarkParametersGenerator, BenchmarkType},
19 build_cache::BuildCacheService,
20 client::Instance,
21 display,
22 error::{TestbedError, TestbedResult},
23 faults::CrashRecoverySchedule,
24 logs::LogsAnalyzer,
25 measurement::MeasurementsCollection,
26 monitor::{Monitor, Prometheus},
27 net_latency::NetworkLatencyCommandBuilder,
28 protocol::{ProtocolCommands, ProtocolMetrics},
29 settings::{BuildGroups, Settings, build_cargo_command},
30 ssh::{CommandContext, CommandStatus, SshConnectionManager},
31};
32
33pub struct Orchestrator<P, T> {
35 settings: Settings,
37 node_instances: Vec<Instance>,
39 client_instances: Vec<Instance>,
41 metrics_instance: Option<Instance>,
43 benchmark_type: PhantomData<T>,
45 instance_setup_commands: Vec<String>,
47 protocol_commands: P,
50 scrape_interval: Duration,
52 crash_interval: Duration,
54 ssh_manager: SshConnectionManager,
56 skip_testbed_update: bool,
58 skip_testbed_configuration: bool,
60 log_processing: bool,
62 dedicated_clients: usize,
66 skip_monitoring: bool,
69}
70
71impl<P, T> Orchestrator<P, T> {
72 const DEFAULT_SCRAPE_INTERVAL: Duration = Duration::from_secs(15);
74 const DEFAULT_CRASH_INTERVAL: Duration = Duration::from_secs(60);
76
77 pub fn new(
79 settings: Settings,
80 node_instances: Vec<Instance>,
81 client_instances: Vec<Instance>,
82 metrics_instance: Option<Instance>,
83 instance_setup_commands: Vec<String>,
84 protocol_commands: P,
85 ssh_manager: SshConnectionManager,
86 ) -> Self {
87 Self {
88 settings,
89 node_instances,
90 client_instances,
91 metrics_instance,
92 benchmark_type: PhantomData,
93 instance_setup_commands,
94 protocol_commands,
95 ssh_manager,
96 scrape_interval: Self::DEFAULT_SCRAPE_INTERVAL,
97 crash_interval: Self::DEFAULT_CRASH_INTERVAL,
98 skip_testbed_update: false,
99 skip_testbed_configuration: false,
100 log_processing: false,
101 dedicated_clients: 0,
102 skip_monitoring: false,
103 }
104 }
105
106 pub fn with_scrape_interval(mut self, scrape_interval: Duration) -> Self {
108 self.scrape_interval = scrape_interval;
109 self
110 }
111
112 pub fn with_crash_interval(mut self, crash_interval: Duration) -> Self {
114 self.crash_interval = crash_interval;
115 self
116 }
117
118 pub fn skip_testbed_updates(mut self, skip_testbed_update: bool) -> Self {
120 self.skip_testbed_update = skip_testbed_update;
121 self
122 }
123
124 pub fn skip_testbed_configuration(mut self, skip_testbed_configuration: bool) -> Self {
126 self.skip_testbed_configuration = skip_testbed_configuration;
127 self
128 }
129
130 pub fn with_log_processing(mut self, log_processing: bool) -> Self {
132 self.log_processing = log_processing;
133 self
134 }
135
136 pub fn with_dedicated_clients(mut self, dedicated_clients: usize) -> Self {
138 self.dedicated_clients = dedicated_clients;
139 self
140 }
141
142 pub fn skip_monitoring(mut self, skip_monitoring: bool) -> Self {
144 self.skip_monitoring = skip_monitoring;
145 self
146 }
147
148 pub fn instances_without_metrics(&self) -> Vec<Instance> {
149 let mut instances = self.node_instances.clone();
150
151 if self.dedicated_clients > 0 {
152 instances.extend(self.client_instances.clone());
153 }
154 instances
155 }
156
157 pub fn instances(&self) -> Vec<Instance> {
159 let mut instances = self.instances_without_metrics();
160 if let Some(metrics_instance) = &self.metrics_instance {
161 instances.push(metrics_instance.clone());
162 }
163 instances
164 }
165}
166
167impl<P: ProtocolCommands<T> + ProtocolMetrics, T: BenchmarkType> Orchestrator<P, T> {
168 async fn boot_nodes(
170 &self,
171 instances: Vec<Instance>,
172 parameters: &BenchmarkParameters<T>,
173 ) -> TestbedResult<()> {
174 if parameters.use_internal_ip_address {
175 if let Some(latency_topology) = parameters.latency_topology.clone() {
176 let latency_commands = NetworkLatencyCommandBuilder::new(&instances)
177 .with_perturbation_spec(parameters.perturbation_spec.clone())
178 .with_topology_layout(latency_topology)
179 .with_max_latency(parameters.maximum_latency)
180 .build_network_latency_matrix();
181 self.ssh_manager
182 .execute_per_instance(latency_commands, CommandContext::default())
183 .await?;
184 }
185 }
186
187 let targets = self
189 .protocol_commands
190 .node_command(instances.clone(), parameters);
191
192 let repo = self.settings.repository_name();
193 let node_context = CommandContext::new()
194 .run_background("node".into())
195 .with_log_file("~/node.log".into())
196 .with_execute_from_path(repo.into());
197 self.ssh_manager
198 .execute_per_instance(targets, node_context)
199 .await?;
200
201 let commands = self
203 .protocol_commands
204 .nodes_metrics_command(instances.clone(), parameters.use_internal_ip_address);
205 self.wait_for_success(commands, ¶meters.benchmark_dir)
206 .await;
207
208 Ok(())
209 }
210
211 pub async fn install(&self) -> TestbedResult<()> {
213 display::action("Installing dependencies on all machines");
214
215 let working_dir = self.settings.working_dir.display();
216 let url = &self.settings.repository.url;
217
218 let use_precompiled_binaries = self.settings.build_cache_enabled();
219
220 let working_dir_cmd = format!("mkdir -p {working_dir}");
221 let git_clone_cmd = format!("(git clone --depth=1 {url} || true)");
222
223 let mut basic_commands = vec![
224 "sudo apt-get update",
225 "sudo apt-get -y upgrade",
226 "sudo apt-get -y autoremove",
227 "sudo apt-get -y remove needrestart",
229 "sudo apt-get -y install curl git ca-certificates",
230 "echo '* soft nofile 1048576' | sudo tee -a /etc/security/limits.conf",
232 "echo '* hard nofile 1048576' | sudo tee -a /etc/security/limits.conf",
233 "echo 'root soft nofile 1048576' | sudo tee -a /etc/security/limits.conf",
234 "echo 'root hard nofile 1048576' | sudo tee -a /etc/security/limits.conf",
235 "echo 'fs.file-max = 2097152' | sudo tee -a /etc/sysctl.conf",
237 "sudo sysctl -p",
238 "ulimit -n 1048576 || true",
240 "sudo sysctl -w net.core.rmem_max=104857600",
242 "sudo sysctl -w net.core.wmem_max=104857600",
243 "sudo sysctl -w net.ipv4.tcp_rmem=\"8192 262144 104857600\"",
244 "sudo sysctl -w net.ipv4.tcp_wmem=\"8192 262144 104857600\"",
245 working_dir_cmd.as_str(),
247 git_clone_cmd.as_str(),
249 ];
250
251 let toolchain_cmds: Vec<String> = if !use_precompiled_binaries {
253 self.settings
254 .build_configs
255 .values()
256 .filter_map(|config| {
257 config
258 .toolchain
259 .as_ref()
260 .filter(|t| t.as_str() != "stable")
261 .cloned()
262 })
263 .collect::<HashSet<String>>()
264 .into_iter()
265 .map(|toolchain| format!("rustup toolchain install {toolchain}"))
266 .collect()
267 } else {
268 vec![]
269 };
270
271 if !use_precompiled_binaries {
272 basic_commands.extend([
274 "sudo apt-get -y install build-essential cmake clang lld protobuf-compiler pkg-config nvme-cli",
277 "curl --proto \"=https\" --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y",
279 "echo \"source $HOME/.cargo/env\" | tee -a ~/.bashrc",
280 "source $HOME/.cargo/env",
281 "rustup default stable",
282 ]);
283
284 for cmd in &toolchain_cmds {
286 basic_commands.push(cmd.as_str());
287 }
288 } else {
289 basic_commands.push("mkdir -p $HOME/.cargo/ && touch $HOME/.cargo/env");
292 }
293
294 let cloud_provider_specific_dependencies: Vec<_> = self
295 .instance_setup_commands
296 .iter()
297 .map(|x| x.as_str())
298 .collect();
299
300 let protocol_dependencies = self.protocol_commands.protocol_dependencies();
301
302 let command = [
303 &basic_commands[..],
304 &Prometheus::install_commands(),
305 &cloud_provider_specific_dependencies[..],
306 &protocol_dependencies[..],
307 ]
308 .concat()
309 .join(" && ");
310
311 self.ssh_manager
312 .execute(self.instances(), command, CommandContext::default())
313 .await?;
314
315 if !self.skip_monitoring {
316 let metrics_instance = self
317 .metrics_instance
318 .clone()
319 .expect("No metrics instance available");
320 let monitor_command = Monitor::dependencies().join(" && ");
321 self.ssh_manager
322 .execute(
323 vec![metrics_instance],
324 monitor_command,
325 CommandContext::default(),
326 )
327 .await?;
328 }
329
330 display::done();
331 Ok(())
332 }
333
334 pub async fn start_monitoring(
336 &self,
337 use_internal_ip_address: bool,
338 timestamp: &str,
339 ) -> TestbedResult<()> {
340 if let Some(instance) = &self.metrics_instance {
341 display::action("Configuring monitoring instance");
342
343 let monitor = Monitor::new(
344 instance.clone(),
345 self.client_instances.clone(),
346 self.node_instances.clone(),
347 self.ssh_manager.clone(),
348 );
349 let snapshot_dir = self
352 .settings
353 .enable_prometheus_snapshots
354 .then_some(timestamp);
355 monitor
356 .start_prometheus(
357 &self.protocol_commands,
358 use_internal_ip_address,
359 snapshot_dir,
360 )
361 .await?;
362 monitor.start_grafana().await?;
363
364 display::done();
365 display::config("Grafana address", monitor.grafana_address());
366 display::newline();
367 }
368
369 Ok(())
370 }
371
372 pub async fn update(&self) -> TestbedResult<()> {
375 display::action("Updating all instances");
376
377 let commit = &self.settings.repository.commit;
378 let repo_name = self.settings.repository_name();
379 let build_groups = self.settings.build_groups();
380
381 let git_update_command = [
385 &format!("git fetch origin {commit} --force"),
386 &format!("(git reset --hard origin/{commit} || git checkout --force {commit})"),
387 "git clean -fd -e target",
388 ]
389 .join(" && ");
390
391 let id = "git update";
392 let context = CommandContext::new()
393 .run_background(id.into())
394 .with_execute_from_path(repo_name.clone().into());
395
396 display::action(format!("update command: {git_update_command}"));
399 self.ssh_manager
400 .execute(self.instances(), git_update_command, context)
401 .await?;
402 self.ssh_manager
403 .wait_for_command(self.instances(), id, CommandStatus::Terminated)
404 .await?;
405
406 if self.settings.build_cache_enabled() {
408 display::action("Using build cache for binary distribution");
409 let build_cache_service = BuildCacheService::new(&self.settings, &self.ssh_manager);
410 build_cache_service
411 .update_with_build_cache(
412 commit,
413 &build_groups,
414 self.instances_without_metrics(),
415 repo_name.clone(),
416 )
417 .await?;
418 } else {
419 self.update_with_local_build(build_groups).await?;
420 }
421
422 display::done();
423 Ok(())
424 }
425
426 async fn update_with_local_build(&self, build_groups: BuildGroups) -> TestbedResult<()> {
432 let without_metrics = self.instances_without_metrics();
433 let repo_name = self.settings.repository_name();
434
435 for (i, (group, binary_names)) in build_groups.iter().enumerate() {
437 let build_command = build_cargo_command(
439 "build",
440 group.toolchain.clone(),
441 group.features.clone(),
442 binary_names,
443 &[] as &[&str],
444 &[] as &[&str],
445 );
446
447 display::action(format!(
449 "Running build command {}/{}: \"{build_command}\" in \"{repo_name}\"",
450 i + 1,
451 build_groups.len()
452 ));
453
454 let context = CommandContext::new().with_execute_from_path(repo_name.clone().into());
455
456 self.ssh_manager
457 .execute(without_metrics.clone(), build_command, context)
458 .await?;
459 }
460
461 Ok(())
462 }
463
464 pub async fn configure(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
466 display::action("Configuring instances");
467
468 let command = self
471 .protocol_commands
472 .genesis_command(self.node_instances.iter(), parameters);
473 display::action(format!("\nGenesis command: {command}\n\n"));
474 let repo_name = self.settings.repository_name();
475 let context = CommandContext::new().with_execute_from_path(repo_name.into());
476 self.ssh_manager
477 .execute(self.instances_without_metrics(), command, context)
478 .await?;
479
480 display::done();
481 Ok(())
482 }
483
484 pub async fn cleanup(&self, cleanup: bool) -> TestbedResult<()> {
486 display::action("Cleaning up testbed");
487
488 let mut command = vec!["(tmux kill-server || true)".into()];
490 for path in self.protocol_commands.db_directories() {
491 command.push(format!("(rm -rf {} || true)", path.display()));
492 }
493 if cleanup {
494 command.push("(rm -rf ~/*log* || true)".into());
495 }
496 let command = command.join(" ; ");
497
498 let active = self.instances().into_iter().filter(|x| x.is_active());
500 let context = CommandContext::default();
501 self.ssh_manager.execute(active, command, context).await?;
502
503 display::done();
504 Ok(())
505 }
506
507 pub async fn run_nodes(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
509 display::action("Deploying validators");
510
511 self.boot_nodes(self.node_instances.clone(), parameters)
513 .await?;
514
515 display::done();
516 Ok(())
517 }
518
519 pub async fn run_clients(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
521 if self.settings.use_fullnode_for_execution {
522 display::action("Setting up full nodes");
523
524 let targets = self
526 .protocol_commands
527 .fullnode_command(self.client_instances.clone(), parameters);
528
529 let repo = self.settings.repository_name();
530 let context = CommandContext::new()
531 .run_background("fullnode".into())
532 .with_log_file("~/fullnode.log".into())
533 .with_execute_from_path(repo.into());
534 self.ssh_manager
535 .execute_per_instance(targets, context)
536 .await?;
537
538 display::action("Await fullnode ready...");
541 let commands = self
542 .client_instances
543 .iter()
544 .cloned()
545 .map(|i| (i, "curl http://127.0.0.1:9000 -H 'Content-Type: application/json' -d '{\"jsonrpc\":\"2.0\",\"method\":\"iota_getLatestCheckpointSequenceNumber\",\"params\":[],\"id\":1}'".to_owned()));
546 self.ssh_manager.wait_for_success(commands).await;
547
548 display::done();
549 }
550
551 display::action("Setting up load generators");
552
553 let targets = self
555 .protocol_commands
556 .client_command(self.client_instances.clone(), parameters);
557
558 let repo = self.settings.repository_name();
559 let context = CommandContext::new()
560 .run_background("client".into())
561 .with_log_file("~/client.log".into())
562 .with_execute_from_path(repo.into());
563 self.ssh_manager
564 .execute_per_instance(targets, context)
565 .await?;
566
567 let commands = self.protocol_commands.clients_metrics_command(
569 self.client_instances.clone(),
570 parameters.use_internal_ip_address,
571 );
572 self.wait_for_success(commands, ¶meters.benchmark_dir)
573 .await;
574
575 display::action("\n\nStarting background metrics collection service");
577 let metrics_script =
578 self.metrics_collection_script_command(parameters.use_internal_ip_address);
579 let metrics_context = CommandContext::new().run_background("metrics-collector".into());
580 self.ssh_manager
581 .execute_per_instance(metrics_script.clone(), metrics_context)
582 .await?;
583
584 display::done();
585 Ok(())
586 }
587
588 fn metrics_collection_script_command(
591 &self,
592 use_internal_ip_address: bool,
593 ) -> Vec<(Instance, String)> {
594 self.protocol_commands
596 .clients_metrics_command(self.client_instances.clone(), use_internal_ip_address)
597 .into_iter()
598 .map(|(instance, cmd)| {
599 (
600 instance,
601 format!(
602 r#"while true; do
603 {cmd} >> ~/metrics.log 2>&1
604 sleep 15
605done"#
606 ),
607 )
608 })
609 .collect::<Vec<_>>()
610 }
611
612 pub async fn run(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
614 display::action(format!(
615 "Running benchmark (at least {}s)",
616 parameters.duration.as_secs()
617 ));
618
619 let mut metrics_interval = time::interval(Duration::from_secs(5));
620 metrics_interval.tick().await; let faults_type = parameters.faults.clone();
623 let mut faults_schedule =
624 CrashRecoverySchedule::new(faults_type, self.node_instances.clone());
625 let mut faults_interval = time::interval(self.crash_interval);
626 faults_interval.tick().await; let start = Instant::now();
629 loop {
630 tokio::select! {
631 _ = metrics_interval.tick() => {
633 let elapsed = Instant::now().duration_since(start).as_secs_f64().ceil() as u64;
634 display::status(format!("{elapsed}s"));
635
636 if elapsed > parameters.duration.as_secs() {
637 break;
638 }
639 },
640
641 _ = faults_interval.tick() => {
643 let action = faults_schedule.update();
644 if !action.kill.is_empty() {
645 self.ssh_manager.kill(action.kill.clone(), "node").await?;
646 }
647 if !action.boot.is_empty() {
648 self.boot_nodes(action.boot.clone(), parameters).await?;
649 }
650 if !action.kill.is_empty() || !action.boot.is_empty() {
651 display::newline();
652 display::config("Testbed update", action);
653 }
654 }
655 }
656 }
657
658 if self.settings.enable_flamegraph {
659 let flamegraphs_dir = parameters.benchmark_dir.join("flamegraphs");
660 fs::create_dir_all(&flamegraphs_dir).expect("Failed to create flamegraphs directory");
661
662 self.fetch_flamegraphs(
663 self.instances_without_metrics().clone(),
664 &flamegraphs_dir,
665 "?svg=true",
666 "flamegraph",
667 )
668 .await?;
669
670 if self
671 .settings
672 .build_configs
673 .get("iota-node")
674 .is_some_and(|config| config.features.iter().any(|f| f == "flamegraph-alloc"))
675 {
676 self.fetch_flamegraphs(
677 self.instances_without_metrics().clone(),
678 &flamegraphs_dir,
679 "?svg=true&mem=true",
680 "flamegraph-alloc",
681 )
682 .await?;
683 }
684 }
685
686 display::done();
687 Ok(())
688 }
689
690 async fn fetch_flamegraphs(
691 &self,
692 nodes: Vec<Instance>,
693 path: &Path,
694 query: &str,
695 file_prefix: &str,
696 ) -> TestbedResult<()> {
697 let flamegraph_commands = self
698 .protocol_commands
699 .nodes_flamegraph_command(nodes, query);
700 let stdio = self
701 .ssh_manager
702 .execute_per_instance(flamegraph_commands, CommandContext::default())
703 .await?;
704 for (i, (stdout, stderr)) in stdio.into_iter().enumerate() {
705 if !stdout.is_empty() {
706 let file = path.join(format!("{file_prefix}-{i}.svg"));
707 fs::write(file, stdout).unwrap();
708 }
709 if !stderr.is_empty() {
710 let file = path.join(format!("{file_prefix}-{i}.log"));
711 fs::write(file, stderr).unwrap();
712 }
713 }
714 Ok(())
715 }
716
717 pub async fn wait_for_success<I, S>(&self, instances: I, _benchmark_dir: &Path)
718 where
719 I: IntoIterator<Item = (Instance, S)> + Clone,
720 S: Into<String> + Send + 'static + Clone,
721 {
722 match self
723 .ssh_manager
724 .execute_per_instance(
725 instances.clone(),
726 CommandContext::default().with_retries(10),
727 )
728 .await
729 {
730 Ok(_) => {}
731 Err(e) => {
732 panic!("Command execution failed on one or more instances: {e}");
734 }
735 }
736 }
737
738 pub async fn download_metrics_logs(&self, benchmark_dir: &Path) -> TestbedResult<()> {
740 let path = benchmark_dir.join("logs");
741 fs::create_dir_all(&path).expect("Failed to create logs directory");
742
743 display::action("Downloading metrics logs");
745 for (i, instance) in self.client_instances.iter().enumerate() {
746 display::status(format!("{}/{}", i + 1, self.client_instances.len()));
747
748 let _: TestbedResult<()> = async {
749 let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
750
751 match connection.download("metrics.log").await {
753 Ok(metrics_content) => {
754 let metrics_file = path.join(format!("metrics-{i}.log"));
755 fs::write(metrics_file, metrics_content.as_bytes())
756 .expect("Cannot write metrics file");
757 }
758 Err(_) => {
759 display::warn(format!("Metrics file not found for client {i}"));
760 }
761 }
762 Ok(())
763 }
764 .await;
765 }
766 display::done();
767
768 Ok(())
769 }
770
771 pub async fn download_prometheus_snapshot(
772 &self,
773 benchmark_dir: &Path,
774 timestamp: &str,
775 ) -> TestbedResult<()> {
776 if let Some(instance) = &self.metrics_instance {
777 display::action("Taking prometheus snapshot");
778 let command = Prometheus::take_snapshot_command();
779
780 #[derive(serde::Deserialize)]
782 struct ResponseData {
783 name: String,
785 }
786 #[derive(serde::Deserialize)]
787 struct Response {
788 #[allow(dead_code)]
789 status: String,
790 data: ResponseData,
791 }
792
793 let response = self
794 .ssh_manager
795 .execute(
796 std::iter::once(instance.clone()),
797 command.clone(),
798 CommandContext::default(),
799 )
800 .await?
801 .into_iter()
802 .next()
803 .ok_or_else(|| {
804 TestbedError::SshCommandFailed(
805 instance.clone(),
806 command.clone(),
807 "No response from command".into(),
808 )
809 })?
810 .0;
811 let response: Response = serde_json::from_str(&response).map_err(|e| {
812 TestbedError::SshCommandFailed(
813 instance.clone(),
814 command.clone(),
815 format!("Failed to parse response: {e}"),
816 )
817 })?;
818 display::done();
819
820 let snapshot_name = response.data.name;
821 display::config("Created prometheus snapshot", &snapshot_name);
822 display::newline();
823
824 display::action("Downloading prometheus snapshot");
825 let snapshot_dir = benchmark_dir.join("snapshot").display().to_string();
826 let rsync_args = vec![
827 "-rvze".to_string(),
830 format!(
832 "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -i {}",
833 self.settings.ssh_private_key_file.display()
834 ),
835 format!(
837 "ubuntu@{}:/var/lib/prometheus/{}/snapshots/{}/",
838 instance.main_ip, timestamp, snapshot_name
839 ),
840 snapshot_dir,
842 ];
843
844 let instance = instance.clone();
845 tokio::task::spawn_blocking(move || -> TestbedResult<()> {
846 match std::process::Command::new("rsync")
847 .args(&rsync_args)
848 .status()
849 {
850 Ok(status) if status.success() => Ok(()),
851 Ok(status) => Err(TestbedError::SshCommandFailed(
852 instance,
853 "rsync ".to_string() + &rsync_args.join(" "),
854 format!("rsync failed with status: {}", status),
855 )),
856 Err(e) => Err(TestbedError::SshCommandFailed(
857 instance,
858 "rsync ".to_string() + &rsync_args.join(" "),
859 format!("rsync failed with error: {}", e),
860 )),
861 }
862 })
863 .await
864 .unwrap()?;
865
866 display::done();
867 display::status("Downloaded prometheus snapshot");
868 display::newline();
869 }
870
871 Ok(())
872 }
873
874 pub async fn download_logs(&self, benchmark_dir: &Path) -> TestbedResult<LogsAnalyzer> {
876 let path = benchmark_dir.join("logs");
878 fs::create_dir_all(&path).expect("Failed to create logs directory");
879
880 let mut log_parsers = Vec::new();
883
884 display::action("Downloading clients logs");
886 for (i, instance) in self.client_instances.iter().enumerate() {
887 display::status(format!("{}/{}", i + 1, self.client_instances.len()));
888
889 let _: TestbedResult<()> = async {
890 let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
891
892 if self.settings.use_fullnode_for_execution {
893 let fullnode_log_content = connection.download("fullnode.log").await?;
894 let fullnode_log_file = path.join(format!("fullnode-{i}.log"));
895 fs::write(fullnode_log_file, fullnode_log_content.as_bytes())
896 .expect("Cannot write log file");
897 }
898
899 let client_log_content = connection.download("client.log").await?;
900
901 let client_log_file = path.join(format!("client-{i}.log"));
902 fs::write(client_log_file, client_log_content.as_bytes())
903 .expect("Cannot write log file");
904
905 let mut log_parser = LogsAnalyzer::default();
906 log_parser.set_client_errors(&client_log_content);
907 log_parsers.push(log_parser);
908 Ok(())
909 }
910 .await;
911 }
912
913 display::done();
914
915 display::action("Downloading nodes logs");
916 let download_tasks: Vec<_> = self
917 .node_instances
918 .iter()
919 .enumerate()
920 .map(|(i, instance)| {
921 let ssh_manager = self.ssh_manager.clone();
922 let path = path.clone();
923 let ssh_address = instance.ssh_address();
924
925 async move {
926 let connection = ssh_manager.connect(ssh_address).await?;
927 let node_log_content = connection.download("node.log").await?;
928
929 let node_log_file = path.join(format!("node-{i}.log"));
930 fs::write(node_log_file, node_log_content.as_bytes())
931 .expect("Cannot write log file");
932
933 let mut log_parser = LogsAnalyzer::default();
934 log_parser.set_node_errors(&node_log_content);
935 Ok::<LogsAnalyzer, TestbedError>(log_parser)
936 }
937 })
938 .collect();
939
940 let results = futures::future::join_all(download_tasks).await;
941 for (idx, result) in results.into_iter().enumerate() {
942 display::status(format!("{}/{}", idx + 1, self.node_instances.len()));
943 match result {
944 Ok(log_parser) => log_parsers.push(log_parser),
945 Err(e) => display::warn(format!("Failed to download node log: {e}")),
946 }
947 }
948 display::done();
949
950 Ok(LogsAnalyzer::aggregate(log_parsers))
951 }
952
953 pub async fn run_benchmarks(
955 &mut self,
956 mut generator: BenchmarkParametersGenerator<T>,
957 ) -> TestbedResult<()> {
958 display::header("Preparing testbed");
959 display::config("Commit", format!("'{}'", &self.settings.repository.commit));
960 display::newline();
961
962 self.cleanup(true).await?;
964
965 let commit = self.settings.repository.commit.replace("/", "_");
966 let timestamp = chrono::Local::now().format("%y%m%d_%H%M%S").to_string();
967 let benchmark_dir = self.settings.results_dir.join(&commit).join(×tamp);
968
969 if !self.skip_testbed_update {
971 self.install().await?;
972 self.update().await?;
973 }
974
975 self.start_monitoring(generator.use_internal_ip_address, ×tamp)
977 .await?;
978
979 let mut i = 1;
981 let mut latest_committee_size = 0;
982 while let Some(mut parameters) = generator.next() {
983 display::header(format!("Starting benchmark {i}"));
984 display::config("Benchmark type", ¶meters.benchmark_type);
985 display::config("Parameters", ¶meters);
986 display::newline();
987
988 parameters.benchmark_dir = benchmark_dir.join(format!("{parameters:?}"));
989
990 self.cleanup(true).await?;
992 fs::create_dir_all(¶meters.benchmark_dir)
994 .expect("Failed to create benchmark directory");
995
996 let log_file = parameters.benchmark_dir.join("logs.txt");
998 crate::logger::init_logger(&log_file).expect("Failed to initialize logger");
999 crate::logger::log(
1000 chrono::Local::now()
1001 .format("Started %y-%m-%d:%H-%M-%S")
1002 .to_string()
1003 .as_str(),
1004 );
1005
1006 let benchmark_result = async {
1007 if !self.skip_testbed_configuration {
1009 self.configure(¶meters).await?;
1010 latest_committee_size = parameters.nodes;
1011 }
1012
1013 self.run_nodes(¶meters).await?;
1015
1016 self.run_clients(¶meters).await?;
1018
1019 self.run(¶meters).await?;
1022
1023 let mut aggregator =
1025 MeasurementsCollection::new(&self.settings, parameters.clone());
1026 self.download_metrics_logs(¶meters.benchmark_dir)
1027 .await?;
1028
1029 aggregator.aggregates_metrics_from_files::<P>(
1031 self.client_instances.len(),
1032 ¶meters.benchmark_dir.join("logs"),
1033 );
1034
1035 aggregator.display_summary();
1036 aggregator.save(¶meters.benchmark_dir);
1037 generator.register_result(aggregator);
1038
1039 TestbedResult::Ok(())
1040 }
1041 .await;
1042
1043 self.cleanup(false).await?;
1045
1046 if self.log_processing {
1048 let error_counter = self.download_logs(¶meters.benchmark_dir).await?;
1049 error_counter.print_summary();
1050 }
1051
1052 crate::logger::log(
1054 chrono::Local::now()
1055 .format("Finished %y-%m-%d:%H-%M-%S")
1056 .to_string()
1057 .as_str(),
1058 );
1059 crate::logger::close_logger();
1060
1061 benchmark_result?;
1063
1064 i += 1;
1065 }
1066
1067 if self.settings.enable_prometheus_snapshots {
1068 if let Err(e) = self
1069 .download_prometheus_snapshot(&benchmark_dir, ×tamp)
1070 .await
1071 {
1072 display::error(format!("Failed to download prometheus snapshot: {}", e));
1073 }
1074 }
1075
1076 display::header("Benchmark completed");
1077 Ok(())
1078 }
1079}