1use std::{
6 collections::HashSet,
7 fs::{self},
8 marker::PhantomData,
9 path::{Path, PathBuf},
10 time::Duration,
11};
12
13use chrono;
14use tokio::time::{self, Instant};
15
16use crate::{
17 benchmark::{BenchmarkParameters, BenchmarkParametersGenerator, BenchmarkType},
18 build_cache::BuildCacheService,
19 client::Instance,
20 display,
21 error::TestbedResult,
22 faults::CrashRecoverySchedule,
23 logs::LogsAnalyzer,
24 measurement::{Measurement, MeasurementsCollection},
25 monitor::{Monitor, Prometheus},
26 net_latency::NetworkLatencyCommandBuilder,
27 protocol::{ProtocolCommands, ProtocolMetrics},
28 settings::{BuildGroups, Settings, build_cargo_command},
29 ssh::{CommandContext, CommandStatus, SshConnectionManager},
30};
31
32pub struct Orchestrator<P, T> {
34 settings: Settings,
36 node_instances: Vec<Instance>,
38 client_instances: Vec<Instance>,
40 metrics_instance: Option<Instance>,
42 benchmark_type: PhantomData<T>,
44 instance_setup_commands: Vec<String>,
46 protocol_commands: P,
49 scrape_interval: Duration,
51 crash_interval: Duration,
53 ssh_manager: SshConnectionManager,
55 skip_testbed_update: bool,
57 skip_testbed_configuration: bool,
59 log_processing: bool,
61 dedicated_clients: usize,
65 skip_monitoring: bool,
68}
69
70impl<P, T> Orchestrator<P, T> {
71 const DEFAULT_SCRAPE_INTERVAL: Duration = Duration::from_secs(15);
73 const DEFAULT_CRASH_INTERVAL: Duration = Duration::from_secs(60);
75
76 pub fn new(
78 settings: Settings,
79 node_instances: Vec<Instance>,
80 client_instances: Vec<Instance>,
81 metrics_instance: Option<Instance>,
82 instance_setup_commands: Vec<String>,
83 protocol_commands: P,
84 ssh_manager: SshConnectionManager,
85 ) -> Self {
86 Self {
87 settings,
88 node_instances,
89 client_instances,
90 metrics_instance,
91 benchmark_type: PhantomData,
92 instance_setup_commands,
93 protocol_commands,
94 ssh_manager,
95 scrape_interval: Self::DEFAULT_SCRAPE_INTERVAL,
96 crash_interval: Self::DEFAULT_CRASH_INTERVAL,
97 skip_testbed_update: false,
98 skip_testbed_configuration: false,
99 log_processing: false,
100 dedicated_clients: 0,
101 skip_monitoring: false,
102 }
103 }
104
105 pub fn with_scrape_interval(mut self, scrape_interval: Duration) -> Self {
107 self.scrape_interval = scrape_interval;
108 self
109 }
110
111 pub fn with_crash_interval(mut self, crash_interval: Duration) -> Self {
113 self.crash_interval = crash_interval;
114 self
115 }
116
117 pub fn skip_testbed_updates(mut self, skip_testbed_update: bool) -> Self {
119 self.skip_testbed_update = skip_testbed_update;
120 self
121 }
122
123 pub fn skip_testbed_configuration(mut self, skip_testbed_configuration: bool) -> Self {
125 self.skip_testbed_configuration = skip_testbed_configuration;
126 self
127 }
128
129 pub fn with_log_processing(mut self, log_processing: bool) -> Self {
131 self.log_processing = log_processing;
132 self
133 }
134
135 pub fn with_dedicated_clients(mut self, dedicated_clients: usize) -> Self {
137 self.dedicated_clients = dedicated_clients;
138 self
139 }
140
141 pub fn skip_monitoring(mut self, skip_monitoring: bool) -> Self {
143 self.skip_monitoring = skip_monitoring;
144 self
145 }
146
147 pub fn instances_without_metrics(&self) -> Vec<Instance> {
148 let mut instances = self.node_instances.clone();
149
150 if self.dedicated_clients > 0 {
151 instances.extend(self.client_instances.clone());
152 }
153 instances
154 }
155
156 pub fn instances(&self) -> Vec<Instance> {
158 let mut instances = self.instances_without_metrics();
159 if let Some(metrics_instance) = &self.metrics_instance {
160 instances.push(metrics_instance.clone());
161 }
162 instances
163 }
164}
165
166impl<P: ProtocolCommands<T> + ProtocolMetrics, T: BenchmarkType> Orchestrator<P, T> {
167 async fn boot_nodes(
169 &self,
170 instances: Vec<Instance>,
171 parameters: &BenchmarkParameters<T>,
172 ) -> TestbedResult<()> {
173 if parameters.use_internal_ip_address {
174 if let Some(latency_topology) = parameters.latency_topology.clone() {
175 let latency_commands = NetworkLatencyCommandBuilder::new(&instances)
176 .with_perturbation_spec(parameters.perturbation_spec.clone())
177 .with_topology_layout(latency_topology)
178 .with_max_latency(parameters.maximum_latency)
179 .build_network_latency_matrix();
180 self.ssh_manager
181 .execute_per_instance(latency_commands, CommandContext::default())
182 .await?;
183 }
184 }
185
186 let targets = self
188 .protocol_commands
189 .node_command(instances.clone(), parameters);
190
191 let repo = self.settings.repository_name();
192 let node_context = CommandContext::new()
193 .run_background("node".into())
194 .with_log_file("~/node.log".into())
195 .with_execute_from_path(repo.into());
196 self.ssh_manager
197 .execute_per_instance(targets, node_context)
198 .await?;
199
200 let commands = self
202 .protocol_commands
203 .nodes_metrics_command(instances.clone(), parameters);
204 self.ssh_manager.wait_for_success(commands).await;
205
206 Ok(())
207 }
208
209 pub async fn install(&self) -> TestbedResult<()> {
211 display::action("Installing dependencies on all machines");
212
213 let working_dir = self.settings.working_dir.display();
214 let url = &self.settings.repository.url;
215
216 let use_precompiled_binaries = self.settings.build_cache_enabled();
217
218 let working_dir_cmd = format!("mkdir -p {working_dir}");
219 let git_clone_cmd = format!("(git clone --depth=1 {url} || true)");
220
221 let mut basic_commands = vec![
222 "sudo apt-get update",
223 "sudo apt-get -y upgrade",
224 "sudo apt-get -y autoremove",
225 "sudo apt-get -y remove needrestart",
227 "sudo apt-get -y install curl git ca-certificates",
228 "echo '* soft nofile 1048576' | sudo tee -a /etc/security/limits.conf",
230 "echo '* hard nofile 1048576' | sudo tee -a /etc/security/limits.conf",
231 "echo 'root soft nofile 1048576' | sudo tee -a /etc/security/limits.conf",
232 "echo 'root hard nofile 1048576' | sudo tee -a /etc/security/limits.conf",
233 "echo 'fs.file-max = 2097152' | sudo tee -a /etc/sysctl.conf",
235 "sudo sysctl -p",
236 "ulimit -n 1048576 || true",
238 working_dir_cmd.as_str(),
240 git_clone_cmd.as_str(),
242 ];
243
244 let toolchain_cmds: Vec<String> = if !use_precompiled_binaries {
246 self.settings
247 .build_configs
248 .values()
249 .filter_map(|config| {
250 config
251 .toolchain
252 .as_ref()
253 .filter(|t| t.as_str() != "stable")
254 .cloned()
255 })
256 .collect::<HashSet<String>>()
257 .into_iter()
258 .map(|toolchain| format!("rustup toolchain install {toolchain}"))
259 .collect()
260 } else {
261 vec![]
262 };
263
264 if !use_precompiled_binaries {
265 basic_commands.extend([
267 "sudo apt-get -y install build-essential cmake clang lld protobuf-compiler pkg-config nvme-cli",
270 "curl --proto \"=https\" --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y",
272 "echo \"source $HOME/.cargo/env\" | tee -a ~/.bashrc",
273 "source $HOME/.cargo/env",
274 "rustup default stable",
275 ]);
276
277 for cmd in &toolchain_cmds {
279 basic_commands.push(cmd.as_str());
280 }
281 } else {
282 basic_commands.push("mkdir -p $HOME/.cargo/ && touch $HOME/.cargo/env");
285 }
286
287 let cloud_provider_specific_dependencies: Vec<_> = self
288 .instance_setup_commands
289 .iter()
290 .map(|x| x.as_str())
291 .collect();
292
293 let protocol_dependencies = self.protocol_commands.protocol_dependencies();
294
295 let command = [
296 &basic_commands[..],
297 &Prometheus::install_commands(),
298 &cloud_provider_specific_dependencies[..],
299 &protocol_dependencies[..],
300 ]
301 .concat()
302 .join(" && ");
303
304 self.ssh_manager
305 .execute(self.instances(), command, CommandContext::default())
306 .await?;
307
308 if !self.skip_monitoring {
309 let metrics_instance = self
310 .metrics_instance
311 .clone()
312 .expect("No metrics instance available");
313 let monitor_command = Monitor::dependencies().join(" && ");
314 self.ssh_manager
315 .execute(
316 vec![metrics_instance],
317 monitor_command,
318 CommandContext::default(),
319 )
320 .await?;
321 }
322
323 display::done();
324 Ok(())
325 }
326
327 pub async fn start_monitoring(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
329 if let Some(instance) = &self.metrics_instance {
330 display::action("Configuring monitoring instance");
331
332 let monitor = Monitor::new(
333 instance.clone(),
334 self.client_instances.clone(),
335 self.node_instances.clone(),
336 self.ssh_manager.clone(),
337 );
338 monitor
339 .start_prometheus(&self.protocol_commands, parameters)
340 .await?;
341 monitor.start_grafana().await?;
342
343 display::done();
344 display::config("Grafana address", monitor.grafana_address());
345 display::newline();
346 }
347
348 Ok(())
349 }
350
351 pub async fn update(&self) -> TestbedResult<()> {
354 display::action("Updating all instances");
355
356 let commit = &self.settings.repository.commit;
357 let repo_name = self.settings.repository_name();
358 let build_groups = self.settings.build_groups();
359
360 let git_update_command = [
364 &format!("git fetch origin {commit} --force"),
365 &format!("(git reset --hard origin/{commit} || git checkout --force {commit})"),
366 "git clean -fd -e target",
367 ]
368 .join(" && ");
369
370 let id = "git update";
371 let context = CommandContext::new()
372 .run_background(id.into())
373 .with_execute_from_path(repo_name.clone().into());
374
375 display::action(format!("update command: {git_update_command}"));
378 self.ssh_manager
379 .execute(self.instances(), git_update_command, context)
380 .await?;
381 self.ssh_manager
382 .wait_for_command(self.instances(), id, CommandStatus::Terminated)
383 .await?;
384
385 if self.settings.build_cache_enabled() {
387 display::action("Using build cache for binary distribution");
388 let build_cache_service = BuildCacheService::new(&self.settings, &self.ssh_manager);
389 build_cache_service
390 .update_with_build_cache(
391 commit,
392 &build_groups,
393 self.instances_without_metrics(),
394 repo_name.clone(),
395 )
396 .await?;
397 } else {
398 self.update_with_local_build(build_groups).await?;
399 }
400
401 display::done();
402 Ok(())
403 }
404
405 async fn update_with_local_build(&self, build_groups: BuildGroups) -> TestbedResult<()> {
411 let without_metrics = self.instances_without_metrics();
412 let repo_name = self.settings.repository_name();
413
414 for (i, (group, binary_names)) in build_groups.iter().enumerate() {
416 let build_command = build_cargo_command(
418 "build",
419 group.toolchain.clone(),
420 group.features.clone(),
421 binary_names,
422 &[] as &[&str],
423 &[] as &[&str],
424 );
425
426 display::action(format!(
428 "Running build command {}/{}: \"{build_command}\" in \"{repo_name}\"",
429 i + 1,
430 build_groups.len()
431 ));
432
433 let context = CommandContext::new().with_execute_from_path(repo_name.clone().into());
434
435 self.ssh_manager
436 .execute(without_metrics.clone(), build_command, context)
437 .await?;
438 }
439
440 Ok(())
441 }
442
443 pub async fn configure(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
445 display::action("Configuring instances");
446
447 let command = self
450 .protocol_commands
451 .genesis_command(self.node_instances.iter(), parameters);
452 display::action(format!("Genesis command: {command}"));
453 let repo_name = self.settings.repository_name();
454 let context = CommandContext::new().with_execute_from_path(repo_name.into());
455 self.ssh_manager
456 .execute(self.instances_without_metrics(), command, context)
457 .await?;
458
459 display::done();
460 Ok(())
461 }
462
463 pub async fn cleanup(&self, cleanup: bool) -> TestbedResult<()> {
465 display::action("Cleaning up testbed");
466
467 let mut command = vec!["(tmux kill-server || true)".into()];
469 for path in self.protocol_commands.db_directories() {
470 command.push(format!("(rm -rf {} || true)", path.display()));
471 }
472 if cleanup {
473 command.push("(rm -rf ~/*log* || true)".into());
474 }
475 let command = command.join(" ; ");
476
477 let active = self.instances().into_iter().filter(|x| x.is_active());
479 let context = CommandContext::default();
480 self.ssh_manager.execute(active, command, context).await?;
481
482 display::done();
483 Ok(())
484 }
485
486 pub async fn run_nodes(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
488 display::action("Deploying validators");
489
490 self.boot_nodes(self.node_instances.clone(), parameters)
492 .await?;
493
494 display::done();
495 Ok(())
496 }
497
498 pub async fn run_clients(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
500 if self.settings.use_fullnode_for_execution {
501 display::action("Setting up full nodes");
502
503 let targets = self
505 .protocol_commands
506 .fullnode_command(self.client_instances.clone(), parameters);
507
508 let repo = self.settings.repository_name();
509 let context = CommandContext::new()
510 .run_background("fullnode".into())
511 .with_log_file("~/fullnode.log".into())
512 .with_execute_from_path(repo.into());
513 self.ssh_manager
514 .execute_per_instance(targets, context)
515 .await?;
516
517 display::action("Await fullnode ready...");
520 let commands = self
521 .client_instances
522 .iter()
523 .cloned()
524 .map(|i| (i, "curl http://127.0.0.1:9000 -H 'Content-Type: application/json' -d '{\"jsonrpc\":\"2.0\",\"method\":\"iota_getLatestCheckpointSequenceNumber\",\"params\":[],\"id\":1}'".to_owned()));
525 self.ssh_manager.wait_for_success(commands).await;
526
527 display::done();
528 }
529
530 display::action("Setting up load generators");
531
532 let targets = self
534 .protocol_commands
535 .client_command(self.client_instances.clone(), parameters);
536
537 let repo = self.settings.repository_name();
538 let context = CommandContext::new()
539 .run_background("client".into())
540 .with_log_file("~/client.log".into())
541 .with_execute_from_path(repo.into());
542 self.ssh_manager
543 .execute_per_instance(targets, context)
544 .await?;
545
546 let commands = self
548 .protocol_commands
549 .clients_metrics_command(self.client_instances.clone(), parameters);
550 self.ssh_manager.wait_for_success(commands).await;
551
552 display::done();
553 Ok(())
554 }
555
556 pub async fn run(
558 &self,
559 benchmark_dir: &Path,
560 parameters: &BenchmarkParameters<T>,
561 ) -> TestbedResult<MeasurementsCollection<T>> {
562 display::action(format!(
563 "Scraping metrics (at least {}s)",
564 parameters.duration.as_secs()
565 ));
566
567 let metrics_commands = self
569 .protocol_commands
570 .clients_metrics_command(self.client_instances.clone(), parameters);
571
572 let mut aggregator = MeasurementsCollection::new(&self.settings, parameters.clone());
573 let mut metrics_interval = time::interval(self.scrape_interval);
574 metrics_interval.tick().await; let faults_type = parameters.faults.clone();
577 let mut faults_schedule =
578 CrashRecoverySchedule::new(faults_type, self.node_instances.clone());
579 let mut faults_interval = time::interval(self.crash_interval);
580 faults_interval.tick().await; let start = Instant::now();
583 loop {
584 tokio::select! {
585 now = metrics_interval.tick() => {
587 let elapsed = now.duration_since(start).as_secs_f64().ceil() as u64;
588 display::status(format!("{elapsed}s"));
589
590 let stdio = self
591 .ssh_manager
592 .execute_per_instance(metrics_commands.clone(), CommandContext::default())
593 .await?;
594 for (i, (stdout, _stderr)) in stdio.iter().enumerate() {
595 display::action(format!("Processing metrics from client {}\n", i));
596 let measurement = Measurement::from_prometheus::<P>(stdout);
597 aggregator.add(i, measurement);
598 }
599
600 if elapsed > parameters.duration .as_secs() {
601 break;
602 }
603 },
604
605 _ = faults_interval.tick() => {
607 let action = faults_schedule.update();
608 if !action.kill.is_empty() {
609 self.ssh_manager.kill(action.kill.clone(), "node").await?;
610 }
611 if !action.boot.is_empty() {
612 self.boot_nodes(action.boot.clone(), parameters).await?;
613 }
614 if !action.kill.is_empty() || !action.boot.is_empty() {
615 display::newline();
616 display::config("Testbed update", action);
617 }
618 }
619 }
620 }
621
622 aggregator.save(benchmark_dir);
623
624 if self.settings.enable_flamegraph {
625 let flamegraphs_dir = benchmark_dir.join("flamegraphs");
626 fs::create_dir_all(&flamegraphs_dir).expect("Failed to create flamegraphs directory");
627
628 self.fetch_flamegraphs(
629 parameters,
630 self.instances_without_metrics().clone(),
631 &flamegraphs_dir,
632 "?svg=true",
633 "flamegraph",
634 )
635 .await?;
636
637 if self
638 .settings
639 .build_configs
640 .get("iota-node")
641 .is_some_and(|config| config.features.iter().any(|f| f == "flamegraph-alloc"))
642 {
643 self.fetch_flamegraphs(
644 parameters,
645 self.instances_without_metrics().clone(),
646 &flamegraphs_dir,
647 "?svg=true&mem=true",
648 "flamegraph-alloc",
649 )
650 .await?;
651 }
652 }
653
654 display::done();
655 Ok(aggregator)
656 }
657
658 async fn fetch_flamegraphs(
659 &self,
660 parameters: &BenchmarkParameters<T>,
661 nodes: Vec<Instance>,
662 path: &Path,
663 query: &str,
664 file_prefix: &str,
665 ) -> TestbedResult<()> {
666 let flamegraph_commands = self
667 .protocol_commands
668 .nodes_flamegraph_command(nodes, parameters, query);
669 let stdio = self
670 .ssh_manager
671 .execute_per_instance(flamegraph_commands, CommandContext::default())
672 .await?;
673 for (i, (stdout, stderr)) in stdio.into_iter().enumerate() {
674 if !stdout.is_empty() {
675 let file = path.join(format!("{file_prefix}-{i}.svg"));
676 fs::write(file, stdout).unwrap();
677 }
678 if !stderr.is_empty() {
679 let file = path.join(format!("{file_prefix}-{i}.log"));
680 fs::write(file, stderr).unwrap();
681 }
682 }
683 Ok(())
684 }
685
686 pub async fn download_logs(&self, benchmark_dir: &Path) -> TestbedResult<LogsAnalyzer> {
688 let path = benchmark_dir.join("logs");
690 fs::create_dir_all(&path).expect("Failed to create logs directory");
691
692 let mut log_parsers = Vec::new();
695
696 display::action("Downloading clients logs");
698 for (i, instance) in self.client_instances.iter().enumerate() {
699 display::status(format!("{}/{}", i + 1, self.client_instances.len()));
700
701 let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
702 let client_log_content = connection.download("client.log").await?;
703
704 let client_log_file = path.join(format!("client-{i}.log"));
705 fs::write(client_log_file, client_log_content.as_bytes())
706 .expect("Cannot write log file");
707
708 let mut log_parser = LogsAnalyzer::default();
709 log_parser.set_client_errors(&client_log_content);
710 log_parsers.push(log_parser)
711 }
712 display::done();
713
714 display::action("Downloading nodes logs");
715 for (i, instance) in self.node_instances.iter().enumerate() {
716 display::status(format!("{}/{}", i + 1, self.node_instances.len()));
717
718 let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
719 let node_log_content = connection.download("node.log").await?;
720
721 let node_log_file = path.join(format!("node-{i}.log"));
722 fs::write(node_log_file, node_log_content.as_bytes()).expect("Cannot write log file");
723
724 let mut log_parser = LogsAnalyzer::default();
725 log_parser.set_node_errors(&node_log_content);
726 log_parsers.push(log_parser)
727 }
728 display::done();
729
730 Ok(LogsAnalyzer::aggregate(log_parsers))
731 }
732
733 pub async fn run_benchmarks(
735 &mut self,
736 mut generator: BenchmarkParametersGenerator<T>,
737 ) -> TestbedResult<()> {
738 display::header("Preparing testbed");
739 display::config("Commit", format!("'{}'", &self.settings.repository.commit));
740 display::newline();
741
742 self.cleanup(true).await?;
744
745 let commit: PathBuf = self.settings.repository.commit.replace("/", "_").into();
746 let timestamp = chrono::Local::now().format("%y%m%d_%H%M%S");
747
748 if !self.skip_testbed_update {
750 self.install().await?;
751 self.update().await?;
752 }
753
754 let mut i = 1;
756 let mut latest_committee_size = 0;
757 while let Some(parameters) = generator.next() {
758 display::header(format!("Starting benchmark {i}"));
759 display::config("Benchmark type", ¶meters.benchmark_type);
760 display::config("Parameters", ¶meters);
761 display::newline();
762
763 let benchmark_dir: PathBuf = [
764 &self.settings.results_dir,
765 &commit,
766 &format!("{timestamp}-{parameters:?}").into(),
767 ]
768 .iter()
769 .collect();
770
771 self.cleanup(true).await?;
773 fs::create_dir_all(&benchmark_dir).expect("Failed to create benchmark directory");
775
776 let log_file = benchmark_dir.join("logs.txt");
778 crate::logger::init_logger(&log_file).expect("Failed to initialize logger");
779
780 let benchmark_result = async {
781 self.start_monitoring(¶meters).await?;
783
784 if !self.skip_testbed_configuration && latest_committee_size != parameters.nodes {
786 self.configure(¶meters).await?;
787 latest_committee_size = parameters.nodes;
788 }
789
790 self.run_nodes(¶meters).await?;
792
793 self.run_clients(¶meters).await?;
795
796 let aggregator = self.run(&benchmark_dir, ¶meters).await?;
799 aggregator.display_summary();
800 generator.register_result(aggregator);
801 self.cleanup(false).await?;
805
806 if self.log_processing {
808 let error_counter = self.download_logs(&benchmark_dir).await?;
809 error_counter.print_summary();
810 }
811
812 TestbedResult::Ok(())
813 }
814 .await;
815
816 crate::logger::close_logger();
818
819 benchmark_result?;
821
822 i += 1;
823 }
824
825 display::header("Benchmark completed");
826 Ok(())
827 }
828}