1use std::{
6 collections::{HashMap, VecDeque},
7 fs::{self},
8 marker::PhantomData,
9 path::PathBuf,
10 time::Duration,
11};
12
13use tokio::time::{self, Instant};
14
15use crate::{
16 benchmark::{BenchmarkParameters, BenchmarkParametersGenerator, BenchmarkType},
17 client::{Instance, InstanceRole},
18 display, ensure,
19 error::{TestbedError, TestbedResult},
20 faults::CrashRecoverySchedule,
21 logs::LogsAnalyzer,
22 measurement::{Measurement, MeasurementsCollection},
23 monitor::{Monitor, Prometheus},
24 protocol::{ProtocolCommands, ProtocolMetrics},
25 settings::Settings,
26 ssh::{CommandContext, CommandStatus, SshConnectionManager},
27};
28
29pub struct Orchestrator<P, T> {
31 settings: Settings,
33 node_instances: Vec<Instance>,
35 client_instances: Option<Vec<Instance>>,
37 metrics_instance: Option<Instance>,
39 benchmark_type: PhantomData<T>,
41 instance_setup_commands: Vec<String>,
43 protocol_commands: P,
46 scrape_interval: Duration,
48 crash_interval: Duration,
50 ssh_manager: SshConnectionManager,
52 skip_testbed_update: bool,
54 skip_testbed_configuration: bool,
56 log_processing: bool,
58 dedicated_clients: usize,
62 skip_monitoring: bool,
65}
66
67impl<P, T> Orchestrator<P, T> {
68 const DEFAULT_SCRAPE_INTERVAL: Duration = Duration::from_secs(15);
70 const DEFAULT_CRASH_INTERVAL: Duration = Duration::from_secs(60);
72
73 pub fn new(
75 settings: Settings,
76 node_instances: Vec<Instance>,
77 client_instances: Option<Vec<Instance>>,
78 metrics_instance: Option<Instance>,
79 instance_setup_commands: Vec<String>,
80 protocol_commands: P,
81 ssh_manager: SshConnectionManager,
82 ) -> Self {
83 Self {
84 settings,
85 node_instances,
86 client_instances,
87 metrics_instance,
88 benchmark_type: PhantomData,
89 instance_setup_commands,
90 protocol_commands,
91 ssh_manager,
92 scrape_interval: Self::DEFAULT_SCRAPE_INTERVAL,
93 crash_interval: Self::DEFAULT_CRASH_INTERVAL,
94 skip_testbed_update: false,
95 skip_testbed_configuration: false,
96 log_processing: false,
97 dedicated_clients: 0,
98 skip_monitoring: false,
99 }
100 }
101
102 pub fn with_scrape_interval(mut self, scrape_interval: Duration) -> Self {
104 self.scrape_interval = scrape_interval;
105 self
106 }
107
108 pub fn with_crash_interval(mut self, crash_interval: Duration) -> Self {
110 self.crash_interval = crash_interval;
111 self
112 }
113
114 pub fn skip_testbed_updates(mut self, skip_testbed_update: bool) -> Self {
116 self.skip_testbed_update = skip_testbed_update;
117 self
118 }
119
120 pub fn skip_testbed_configuration(mut self, skip_testbed_configuration: bool) -> Self {
122 self.skip_testbed_configuration = skip_testbed_configuration;
123 self
124 }
125
126 pub fn with_log_processing(mut self, log_processing: bool) -> Self {
128 self.log_processing = log_processing;
129 self
130 }
131
132 pub fn with_dedicated_clients(mut self, dedicated_clients: usize) -> Self {
134 self.dedicated_clients = dedicated_clients;
135 self
136 }
137
138 pub fn skip_monitoring(mut self, skip_monitoring: bool) -> Self {
140 self.skip_monitoring = skip_monitoring;
141 self
142 }
143
144 pub fn instances(&self) -> Vec<Instance> {
146 let mut instances = self.node_instances.clone();
147 if let Some(client_instances) = &self.client_instances {
148 instances.extend(client_instances.clone());
149 }
150 if let Some(metrics_instance) = &self.metrics_instance {
151 instances.push(metrics_instance.clone());
152 }
153 instances
154 }
155
156 pub fn select_instances(
161 &self,
162 parameters: &BenchmarkParameters<T>,
163 ) -> TestbedResult<(Vec<Instance>, Vec<Instance>, Option<Instance>)> {
164 let available_nodes: Vec<_> = self
166 .node_instances
167 .iter()
168 .filter(|x| x.is_active())
169 .collect();
170 ensure!(
171 available_nodes.len() >= parameters.nodes,
172 TestbedError::InsufficientCapacity(parameters.nodes - available_nodes.len())
173 );
174 let mut available_metrics_node = None;
175 if !self.skip_monitoring {
176 ensure!(
177 self.metrics_instance
178 .as_ref()
179 .is_some_and(|m| m.is_active()),
180 TestbedError::MetricsServerMissing()
181 );
182 available_metrics_node = self.metrics_instance.clone();
183 }
184 let mut available_dedicated_client_nodes = vec![];
185 if self.dedicated_clients > 0 {
186 ensure!(
187 self.client_instances.as_ref().is_some_and(|m| m
188 .iter()
189 .filter(|x| {
190 if x.is_active() {
191 available_dedicated_client_nodes.push(*x);
192 true
193 } else {
194 false
195 }
196 })
197 .count()
198 >= self.dedicated_clients),
199 TestbedError::InsufficientDedicatedClientCapacity(
200 self.dedicated_clients - available_dedicated_client_nodes.len()
201 )
202 );
203 }
204
205 let mut node_instances_by_regions = HashMap::new();
207 for instance in available_nodes {
208 node_instances_by_regions
209 .entry(&instance.region)
210 .or_insert_with(VecDeque::new)
211 .push_back(instance);
212 }
213
214 let mut client_instances_by_regions = HashMap::new();
216 if self.dedicated_clients > 0 {
217 for instance in available_dedicated_client_nodes {
218 client_instances_by_regions
219 .entry(&instance.region)
220 .or_insert_with(VecDeque::new)
221 .push_back(instance);
222 }
223 }
224
225 let mut client_instances = Vec::new();
227 for region in self.settings.regions.iter().cycle() {
228 if client_instances.len() == self.dedicated_clients {
229 break;
230 }
231 if let Some(regional_instances) = client_instances_by_regions.get_mut(region) {
232 if let Some(instance) = regional_instances.pop_front() {
233 client_instances.push(instance.clone());
234 }
235 }
236 }
237
238 let mut nodes_instances = Vec::new();
240 for region in self.settings.regions.iter().cycle() {
241 if nodes_instances.len() == parameters.nodes {
242 break;
243 }
244 if let Some(regional_instances) = node_instances_by_regions.get_mut(region) {
245 if let Some(instance) = regional_instances.pop_front() {
246 nodes_instances.push(instance.clone());
247 }
248 }
249 }
250
251 if client_instances.is_empty() {
254 client_instances.clone_from(&nodes_instances);
255 }
256
257 Ok((client_instances, nodes_instances, available_metrics_node))
258 }
259}
260
261impl<P: ProtocolCommands<T> + ProtocolMetrics, T: BenchmarkType> Orchestrator<P, T> {
262 async fn boot_nodes(
264 &self,
265 instances: Vec<Instance>,
266 parameters: &BenchmarkParameters<T>,
267 ) -> TestbedResult<()> {
268 let targets = self
270 .protocol_commands
271 .node_command(instances.clone(), parameters);
272
273 let repo = self.settings.repository_name();
274 let context = CommandContext::new()
275 .run_background("node".into())
276 .with_log_file("~/node.log".into())
277 .with_execute_from_path(repo.into());
278
279 self.ssh_manager
280 .execute_per_instance(targets, context)
281 .await?;
282
283 let commands = self
285 .protocol_commands
286 .nodes_metrics_command(instances.clone(), parameters);
287 self.ssh_manager.wait_for_success(commands).await;
288
289 Ok(())
290 }
291
292 pub async fn install(&self) -> TestbedResult<()> {
294 display::action("Installing dependencies on all machines");
295
296 let working_dir = self.settings.working_dir.display();
297 let url = &self.settings.repository.url;
298 let basic_commands = [
299 "sudo apt-get update",
300 "sudo apt-get -y upgrade",
301 "sudo apt-get -y autoremove",
302 "sudo apt-get -y remove needrestart",
304 "sudo apt-get -y install build-essential libssl-dev cmake clang lld protobuf-compiler libudev-dev libpq5 libpq-dev ca-certificates",
308 "curl --proto \"=https\" --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y",
310 "echo \"source $HOME/.cargo/env\" | tee -a ~/.bashrc",
311 "source $HOME/.cargo/env",
312 "rustup default stable",
313 &format!("mkdir -p {working_dir}"),
315 &format!("(git clone {url} || true)"),
317 ];
318
319 let cloud_provider_specific_dependencies: Vec<_> = self
320 .instance_setup_commands
321 .iter()
322 .map(|x| x.as_str())
323 .collect();
324
325 let protocol_dependencies = self.protocol_commands.protocol_dependencies();
326
327 let command = [
328 &basic_commands[..],
329 &Prometheus::install_commands(),
330 &cloud_provider_specific_dependencies[..],
331 &protocol_dependencies[..],
332 ]
333 .concat()
334 .join(" && ");
335
336 let context = CommandContext::default();
337 let without_metrics = self
338 .instances()
339 .iter()
340 .filter(|i| i.role != InstanceRole::Metrics)
341 .cloned()
342 .collect::<Vec<_>>();
343 self.ssh_manager
344 .execute(without_metrics, command, context.clone())
345 .await?;
346 if !self.skip_monitoring {
347 let metrics_instance = self
348 .metrics_instance
349 .clone()
350 .expect("No metrics instance available");
351 let monitor_command = Monitor::dependencies().join(" && ");
352 self.ssh_manager
353 .execute(vec![metrics_instance], monitor_command, context)
354 .await?;
355 }
356
357 display::done();
358 Ok(())
359 }
360
361 pub async fn start_monitoring(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
363 let (clients, nodes, instance) = self.select_instances(parameters)?;
364 if let Some(instance) = instance {
365 display::action("Configuring monitoring instance");
366
367 let monitor = Monitor::new(instance, clients, nodes, self.ssh_manager.clone());
368 monitor
369 .start_prometheus(&self.protocol_commands, parameters)
370 .await?;
371 monitor.start_grafana().await?;
372
373 display::done();
374 display::config("Grafana address", monitor.grafana_address());
375 display::newline();
376 }
377
378 Ok(())
379 }
380
381 pub async fn update(&self) -> TestbedResult<()> {
384 display::action("Updating all instances");
385
386 let commit = &self.settings.repository.commit;
390 let command = [
391 &format!("git fetch origin {commit} --force"),
392 &format!("(git reset --hard origin/{commit} || git checkout --force {commit})"),
393 "git clean -fd -e target",
394 "source \"$HOME/.cargo/env\"",
395 "cargo build --release --bin iota --bin iota-node --bin stress",
396 ]
397 .join(" && ");
398
399 display::action(format!("update command: {command}"));
400
401 let id = "update";
402 let repo_name = self.settings.repository_name();
403 let context = CommandContext::new()
404 .run_background(id.into())
405 .with_execute_from_path(repo_name.into());
406 let without_metrics = self
407 .instances()
408 .iter()
409 .filter(|i| i.role != InstanceRole::Metrics)
410 .cloned()
411 .collect::<Vec<_>>();
412
413 self.ssh_manager
414 .execute(without_metrics.clone(), command, context)
415 .await?;
416
417 self.ssh_manager
419 .wait_for_command(without_metrics, id, CommandStatus::Terminated)
420 .await?;
421
422 display::done();
423 Ok(())
424 }
425
426 pub async fn configure(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
428 display::action("Configuring instances");
429
430 let (clients, nodes, _) = self.select_instances(parameters)?;
432
433 let command = self
436 .protocol_commands
437 .genesis_command(nodes.iter(), parameters);
438 display::action(format!("Genesis command: {command}"));
439 let repo_name = self.settings.repository_name();
440 let context = CommandContext::new().with_execute_from_path(repo_name.into());
441 let all = clients.into_iter().chain(nodes);
442 self.ssh_manager.execute(all, command, context).await?;
443
444 display::done();
445 Ok(())
446 }
447
448 pub async fn cleanup(&self, cleanup: bool) -> TestbedResult<()> {
450 display::action("Cleaning up testbed");
451
452 let mut command = vec!["(tmux kill-server || true)".into()];
454 for path in self.protocol_commands.db_directories() {
455 command.push(format!("(rm -rf {} || true)", path.display()));
456 }
457 if cleanup {
458 command.push("(rm -rf ~/*log* || true)".into());
459 }
460 let command = command.join(" ; ");
461
462 let active = self.instances().into_iter().filter(|x| x.is_active());
464 let context = CommandContext::default();
465 self.ssh_manager.execute(active, command, context).await?;
466
467 display::done();
468 Ok(())
469 }
470
471 pub async fn run_nodes(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
473 display::action("Deploying validators");
474
475 let (_, nodes, _) = self.select_instances(parameters)?;
477
478 self.boot_nodes(nodes, parameters).await?;
480
481 display::done();
482 Ok(())
483 }
484
485 pub async fn run_clients(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
487 display::action("Setting up load generators");
488
489 let (clients, _, _) = self.select_instances(parameters)?;
491
492 let targets = self
494 .protocol_commands
495 .client_command(clients.clone(), parameters);
496
497 let repo = self.settings.repository_name();
498 let context = CommandContext::new()
499 .run_background("client".into())
500 .with_log_file("~/client.log".into())
501 .with_execute_from_path(repo.into());
502 self.ssh_manager
503 .execute_per_instance(targets, context)
504 .await?;
505
506 let commands = self
508 .protocol_commands
509 .clients_metrics_command(clients, parameters);
510 self.ssh_manager.wait_for_success(commands).await;
511
512 display::done();
513 Ok(())
514 }
515
516 pub async fn run(
518 &self,
519 parameters: &BenchmarkParameters<T>,
520 ) -> TestbedResult<MeasurementsCollection<T>> {
521 display::action(format!(
522 "Scraping metrics (at least {}s)",
523 parameters.duration.as_secs()
524 ));
525
526 let (clients, nodes, _) = self.select_instances(parameters)?;
528
529 let mut metrics_commands = self
531 .protocol_commands
532 .clients_metrics_command(clients, parameters);
533
534 metrics_commands.append(
538 &mut self
539 .protocol_commands
540 .nodes_metrics_command(nodes.clone(), parameters),
541 );
542
543 let mut aggregator = MeasurementsCollection::new(&self.settings, parameters.clone());
544 let mut metrics_interval = time::interval(self.scrape_interval);
545 metrics_interval.tick().await; let faults_type = parameters.faults.clone();
548 let mut faults_schedule = CrashRecoverySchedule::new(faults_type, nodes.clone());
549 let mut faults_interval = time::interval(self.crash_interval);
550 faults_interval.tick().await; let start = Instant::now();
553 loop {
554 tokio::select! {
555 now = metrics_interval.tick() => {
557 let elapsed = now.duration_since(start).as_secs_f64().ceil() as u64;
558 display::status(format!("{elapsed}s"));
559
560 let stdio = self
561 .ssh_manager
562 .execute_per_instance(metrics_commands.clone(), CommandContext::default())
563 .await?;
564 for (i, (stdout, _stderr)) in stdio.iter().enumerate() {
565 let measurement = Measurement::from_prometheus::<P>(stdout);
566 aggregator.add(i, measurement);
567 }
568
569 if elapsed > parameters.duration .as_secs() {
570 break;
571 }
572 },
573
574 _ = faults_interval.tick() => {
576 let action = faults_schedule.update();
577 if !action.kill.is_empty() {
578 self.ssh_manager.kill(action.kill.clone(), "node").await?;
579 }
580 if !action.boot.is_empty() {
581 self.boot_nodes(action.boot.clone(), parameters).await?;
582 }
583 if !action.kill.is_empty() || !action.boot.is_empty() {
584 display::newline();
585 display::config("Testbed update", action);
586 }
587 }
588 }
589 }
590
591 let results_directory = &self.settings.results_dir;
592 let commit = &self.settings.repository.commit;
593 let path: PathBuf = [results_directory, &format!("results-{commit}").into()]
594 .iter()
595 .collect();
596 fs::create_dir_all(&path).expect("Failed to create log directory");
597 aggregator.save(path);
598
599 display::done();
600 Ok(aggregator)
601 }
602
603 pub async fn download_logs(
605 &self,
606 parameters: &BenchmarkParameters<T>,
607 ) -> TestbedResult<LogsAnalyzer> {
608 let (clients, nodes, _) = self.select_instances(parameters)?;
610
611 let commit = &self.settings.repository.commit;
613 let path: PathBuf = [
614 &self.settings.logs_dir,
615 &format!("logs-{commit}").into(),
616 &format!("logs-{parameters:?}").into(),
617 ]
618 .iter()
619 .collect();
620 fs::create_dir_all(&path).expect("Failed to create log directory");
621
622 let mut log_parsers = Vec::new();
625
626 display::action("Downloading clients logs");
628 for (i, instance) in clients.iter().enumerate() {
629 display::status(format!("{}/{}", i + 1, clients.len()));
630
631 let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
632 let client_log_content = connection.download("client.log").await?;
633
634 let client_log_file = [path.clone(), format!("client-{i}.log").into()]
635 .iter()
636 .collect::<PathBuf>();
637 fs::write(&client_log_file, client_log_content.as_bytes())
638 .expect("Cannot write log file");
639
640 let mut log_parser = LogsAnalyzer::default();
641 log_parser.set_client_errors(&client_log_content);
642 log_parsers.push(log_parser)
643 }
644 display::done();
645
646 display::action("Downloading nodes logs");
647 for (i, instance) in nodes.iter().enumerate() {
648 display::status(format!("{}/{}", i + 1, nodes.len()));
649
650 let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
651 let node_log_content = connection.download("node.log").await?;
652
653 let node_log_file = [path.clone(), format!("node-{i}.log").into()]
654 .iter()
655 .collect::<PathBuf>();
656 fs::write(&node_log_file, node_log_content.as_bytes()).expect("Cannot write log file");
657
658 let mut log_parser = LogsAnalyzer::default();
659 log_parser.set_node_errors(&node_log_content);
660 log_parsers.push(log_parser)
661 }
662 display::done();
663
664 Ok(LogsAnalyzer::aggregate(log_parsers))
665 }
666
667 pub async fn run_benchmarks(
669 &mut self,
670 mut generator: BenchmarkParametersGenerator<T>,
671 ) -> TestbedResult<()> {
672 display::header("Preparing testbed");
673 display::config("Commit", format!("'{}'", &self.settings.repository.commit));
674 display::newline();
675
676 self.cleanup(true).await?;
678
679 if !self.skip_testbed_update {
681 self.install().await?;
682 self.update().await?;
683 }
684
685 let mut i = 1;
687 let mut latest_committee_size = 0;
688 while let Some(parameters) = generator.next() {
689 display::header(format!("Starting benchmark {i}"));
690 display::config("Benchmark type", ¶meters.benchmark_type);
691 display::config("Parameters", ¶meters);
692 display::newline();
693
694 self.cleanup(true).await?;
696 self.start_monitoring(¶meters).await?;
698
699 if !self.skip_testbed_configuration && latest_committee_size != parameters.nodes {
701 self.configure(¶meters).await?;
702 latest_committee_size = parameters.nodes;
703 }
704
705 self.run_nodes(¶meters).await?;
707
708 self.run_clients(¶meters).await?;
710
711 let aggregator = self.run(¶meters).await?;
714 aggregator.display_summary();
715 generator.register_result(aggregator);
716 self.cleanup(false).await?;
720
721 if self.log_processing {
723 let error_counter = self.download_logs(¶meters).await?;
724 error_counter.print_summary();
725 }
726
727 i += 1;
728 }
729
730 display::header("Benchmark completed");
731 Ok(())
732 }
733}