1use std::{
6 collections::{HashMap, VecDeque},
7 fs::{self},
8 marker::PhantomData,
9 path::PathBuf,
10 time::Duration,
11};
12
13use tokio::time::{self, Instant};
14
15use crate::{
16 benchmark::{BenchmarkParameters, BenchmarkParametersGenerator, BenchmarkType},
17 client::Instance,
18 display, ensure,
19 error::{TestbedError, TestbedResult},
20 faults::CrashRecoverySchedule,
21 logs::LogsAnalyzer,
22 measurement::{Measurement, MeasurementsCollection},
23 monitor::Monitor,
24 protocol::{ProtocolCommands, ProtocolMetrics},
25 settings::Settings,
26 ssh::{CommandContext, CommandStatus, SshConnectionManager},
27};
28
29pub struct Orchestrator<P, T> {
31 settings: Settings,
33 instances: Vec<Instance>,
36 benchmark_type: PhantomData<T>,
38 instance_setup_commands: Vec<String>,
40 protocol_commands: P,
43 scrape_interval: Duration,
45 crash_interval: Duration,
47 ssh_manager: SshConnectionManager,
49 skip_testbed_update: bool,
51 skip_testbed_configuration: bool,
53 log_processing: bool,
55 dedicated_clients: usize,
59 skip_monitoring: bool,
62}
63
64impl<P, T> Orchestrator<P, T> {
65 const DEFAULT_SCRAPE_INTERVAL: Duration = Duration::from_secs(15);
67 const DEFAULT_CRASH_INTERVAL: Duration = Duration::from_secs(60);
69
70 pub fn new(
72 settings: Settings,
73 instances: Vec<Instance>,
74 instance_setup_commands: Vec<String>,
75 protocol_commands: P,
76 ssh_manager: SshConnectionManager,
77 ) -> Self {
78 Self {
79 settings,
80 instances,
81 benchmark_type: PhantomData,
82 instance_setup_commands,
83 protocol_commands,
84 ssh_manager,
85 scrape_interval: Self::DEFAULT_SCRAPE_INTERVAL,
86 crash_interval: Self::DEFAULT_CRASH_INTERVAL,
87 skip_testbed_update: false,
88 skip_testbed_configuration: false,
89 log_processing: false,
90 dedicated_clients: 0,
91 skip_monitoring: false,
92 }
93 }
94
95 pub fn with_scrape_interval(mut self, scrape_interval: Duration) -> Self {
97 self.scrape_interval = scrape_interval;
98 self
99 }
100
101 pub fn with_crash_interval(mut self, crash_interval: Duration) -> Self {
103 self.crash_interval = crash_interval;
104 self
105 }
106
107 pub fn skip_testbed_updates(mut self, skip_testbed_update: bool) -> Self {
109 self.skip_testbed_update = skip_testbed_update;
110 self
111 }
112
113 pub fn skip_testbed_configuration(mut self, skip_testbed_configuration: bool) -> Self {
115 self.skip_testbed_configuration = skip_testbed_configuration;
116 self
117 }
118
119 pub fn with_log_processing(mut self, log_processing: bool) -> Self {
121 self.log_processing = log_processing;
122 self
123 }
124
125 pub fn with_dedicated_clients(mut self, dedicated_clients: usize) -> Self {
127 self.dedicated_clients = dedicated_clients;
128 self
129 }
130
131 pub fn skip_monitoring(mut self, skip_monitoring: bool) -> Self {
133 self.skip_monitoring = skip_monitoring;
134 self
135 }
136
137 pub fn select_instances(
142 &self,
143 parameters: &BenchmarkParameters<T>,
144 ) -> TestbedResult<(Vec<Instance>, Vec<Instance>, Option<Instance>)> {
145 let available_instances: Vec<_> = self.instances.iter().filter(|x| x.is_active()).collect();
147 let minimum_instances = if self.skip_monitoring {
148 parameters.nodes + self.dedicated_clients
149 } else {
150 parameters.nodes + self.dedicated_clients + 1
151 };
152 ensure!(
153 available_instances.len() >= minimum_instances,
154 TestbedError::InsufficientCapacity(minimum_instances - available_instances.len())
155 );
156
157 let mut instances_by_regions = HashMap::new();
159 for instance in available_instances {
160 instances_by_regions
161 .entry(&instance.region)
162 .or_insert_with(VecDeque::new)
163 .push_back(instance);
164 }
165
166 let mut monitoring_instance = None;
168 if !self.skip_monitoring {
169 for region in &self.settings.regions {
170 if let Some(regional_instances) = instances_by_regions.get_mut(region) {
171 if let Some(instance) = regional_instances.pop_front() {
172 monitoring_instance = Some(instance.clone());
173 }
174 break;
175 }
176 }
177 }
178
179 let mut client_instances = Vec::new();
181 for region in self.settings.regions.iter().cycle() {
182 if client_instances.len() == self.dedicated_clients {
183 break;
184 }
185 if let Some(regional_instances) = instances_by_regions.get_mut(region) {
186 if let Some(instance) = regional_instances.pop_front() {
187 client_instances.push(instance.clone());
188 }
189 }
190 }
191
192 let mut nodes_instances = Vec::new();
194 for region in self.settings.regions.iter().cycle() {
195 if nodes_instances.len() == parameters.nodes {
196 break;
197 }
198 if let Some(regional_instances) = instances_by_regions.get_mut(region) {
199 if let Some(instance) = regional_instances.pop_front() {
200 nodes_instances.push(instance.clone());
201 }
202 }
203 }
204
205 if client_instances.is_empty() {
208 client_instances.clone_from(&nodes_instances);
209 }
210
211 Ok((client_instances, nodes_instances, monitoring_instance))
212 }
213}
214
215impl<P: ProtocolCommands<T> + ProtocolMetrics, T: BenchmarkType> Orchestrator<P, T> {
216 async fn boot_nodes(
218 &self,
219 instances: Vec<Instance>,
220 parameters: &BenchmarkParameters<T>,
221 ) -> TestbedResult<()> {
222 let targets = self
224 .protocol_commands
225 .node_command(instances.clone(), parameters);
226
227 let repo = self.settings.repository_name();
228 let context = CommandContext::new()
229 .run_background("node".into())
230 .with_log_file("~/node.log".into())
231 .with_execute_from_path(repo.into());
232 self.ssh_manager
233 .execute_per_instance(targets, context)
234 .await?;
235
236 let commands = self
238 .protocol_commands
239 .nodes_metrics_command(instances.clone());
240 self.ssh_manager.wait_for_success(commands).await;
241
242 Ok(())
243 }
244
245 pub async fn install(&self) -> TestbedResult<()> {
247 display::action("Installing dependencies on all machines");
248
249 let working_dir = self.settings.working_dir.display();
250 let url = &self.settings.repository.url;
251 let basic_commands = [
252 "sudo apt-get update",
253 "sudo apt-get -y upgrade",
254 "sudo apt-get -y autoremove",
255 "sudo apt-get -y remove needrestart",
257 "sudo apt-get -y install build-essential libssl-dev",
261 "curl --proto \"=https\" --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y",
263 "echo \"source $HOME/.cargo/env\" | tee -a ~/.bashrc",
264 "source $HOME/.cargo/env",
265 "rustup default stable",
266 &format!("mkdir -p {working_dir}"),
268 &format!("(git clone {url} || true)"),
270 ];
271
272 let cloud_provider_specific_dependencies: Vec<_> = self
273 .instance_setup_commands
274 .iter()
275 .map(|x| x.as_str())
276 .collect();
277
278 let protocol_dependencies = self.protocol_commands.protocol_dependencies();
279
280 let command = [
281 &basic_commands[..],
282 &Monitor::dependencies()[..],
283 &cloud_provider_specific_dependencies[..],
284 &protocol_dependencies[..],
285 ]
286 .concat()
287 .join(" && ");
288
289 let active = self.instances.iter().filter(|x| x.is_active()).cloned();
290 let context = CommandContext::default();
291 self.ssh_manager.execute(active, command, context).await?;
292
293 display::done();
294 Ok(())
295 }
296
297 pub async fn start_monitoring(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
299 let (clients, nodes, instance) = self.select_instances(parameters)?;
300 if let Some(instance) = instance {
301 display::action("Configuring monitoring instance");
302
303 let monitor = Monitor::new(instance, clients, nodes, self.ssh_manager.clone());
304 monitor.start_prometheus(&self.protocol_commands).await?;
305 monitor.start_grafana().await?;
306
307 display::done();
308 display::config("Grafana address", monitor.grafana_address());
309 display::newline();
310 }
311
312 Ok(())
313 }
314
315 pub async fn update(&self) -> TestbedResult<()> {
318 display::action("Updating all instances");
319
320 let commit = &self.settings.repository.commit;
324 let command = [
325 "git fetch -f",
326 &format!("(git checkout -b {commit} {commit} || git checkout -f {commit})"),
327 "(git pull -f || true)",
328 "source $HOME/.cargo/env",
329 "cargo build --release",
330 ]
331 .join(" && ");
332
333 let active = self.instances.iter().filter(|x| x.is_active()).cloned();
334
335 let id = "update";
336 let repo_name = self.settings.repository_name();
337 let context = CommandContext::new()
338 .run_background(id.into())
339 .with_execute_from_path(repo_name.into());
340 self.ssh_manager
341 .execute(active.clone(), command, context)
342 .await?;
343
344 self.ssh_manager
346 .wait_for_command(active, id, CommandStatus::Terminated)
347 .await?;
348
349 display::done();
350 Ok(())
351 }
352
353 pub async fn configure(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
355 display::action("Configuring instances");
356
357 let (clients, nodes, _) = self.select_instances(parameters)?;
359
360 let command = self.protocol_commands.genesis_command(nodes.iter());
363 let repo_name = self.settings.repository_name();
364 let context = CommandContext::new().with_execute_from_path(repo_name.into());
365 let all = clients.into_iter().chain(nodes);
366 self.ssh_manager.execute(all, command, context).await?;
367
368 display::done();
369 Ok(())
370 }
371
372 pub async fn cleanup(&self, cleanup: bool) -> TestbedResult<()> {
374 display::action("Cleaning up testbed");
375
376 let mut command = vec!["(tmux kill-server || true)".into()];
378 for path in self.protocol_commands.db_directories() {
379 command.push(format!("(rm -rf {} || true)", path.display()));
380 }
381 if cleanup {
382 command.push("(rm -rf ~/*log* || true)".into());
383 }
384 let command = command.join(" ; ");
385
386 let active = self.instances.iter().filter(|x| x.is_active()).cloned();
388 let context = CommandContext::default();
389 self.ssh_manager.execute(active, command, context).await?;
390
391 display::done();
392 Ok(())
393 }
394
395 pub async fn run_nodes(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
397 display::action("Deploying validators");
398
399 let (_, nodes, _) = self.select_instances(parameters)?;
401
402 self.boot_nodes(nodes, parameters).await?;
404
405 display::done();
406 Ok(())
407 }
408
409 pub async fn run_clients(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
411 display::action("Setting up load generators");
412
413 let (clients, _, _) = self.select_instances(parameters)?;
415
416 let targets = self
418 .protocol_commands
419 .client_command(clients.clone(), parameters);
420
421 let repo = self.settings.repository_name();
422 let context = CommandContext::new()
423 .run_background("client".into())
424 .with_log_file("~/client.log".into())
425 .with_execute_from_path(repo.into());
426 self.ssh_manager
427 .execute_per_instance(targets, context)
428 .await?;
429
430 let commands = self.protocol_commands.clients_metrics_command(clients);
432 self.ssh_manager.wait_for_success(commands).await;
433
434 display::done();
435 Ok(())
436 }
437
438 pub async fn run(
440 &self,
441 parameters: &BenchmarkParameters<T>,
442 ) -> TestbedResult<MeasurementsCollection<T>> {
443 display::action(format!(
444 "Scraping metrics (at least {}s)",
445 parameters.duration.as_secs()
446 ));
447
448 let (clients, nodes, _) = self.select_instances(parameters)?;
450
451 let mut metrics_commands = self.protocol_commands.clients_metrics_command(clients);
453
454 metrics_commands.append(&mut self.protocol_commands.nodes_metrics_command(nodes.clone()));
458
459 let mut aggregator = MeasurementsCollection::new(&self.settings, parameters.clone());
460 let mut metrics_interval = time::interval(self.scrape_interval);
461 metrics_interval.tick().await; let faults_type = parameters.faults.clone();
464 let mut faults_schedule = CrashRecoverySchedule::new(faults_type, nodes.clone());
465 let mut faults_interval = time::interval(self.crash_interval);
466 faults_interval.tick().await; let start = Instant::now();
469 loop {
470 tokio::select! {
471 now = metrics_interval.tick() => {
473 let elapsed = now.duration_since(start).as_secs_f64().ceil() as u64;
474 display::status(format!("{elapsed}s"));
475
476 let stdio = self
477 .ssh_manager
478 .execute_per_instance(metrics_commands.clone(), CommandContext::default())
479 .await?;
480 for (i, (stdout, _stderr)) in stdio.iter().enumerate() {
481 let measurement = Measurement::from_prometheus::<P>(stdout);
482 aggregator.add(i, measurement);
483 }
484
485 if elapsed > parameters.duration .as_secs() {
486 break;
487 }
488 },
489
490 _ = faults_interval.tick() => {
492 let action = faults_schedule.update();
493 if !action.kill.is_empty() {
494 self.ssh_manager.kill(action.kill.clone(), "node").await?;
495 }
496 if !action.boot.is_empty() {
497 self.boot_nodes(action.boot.clone(), parameters).await?;
498 }
499 if !action.kill.is_empty() || !action.boot.is_empty() {
500 display::newline();
501 display::config("Testbed update", action);
502 }
503 }
504 }
505 }
506
507 let results_directory = &self.settings.results_dir;
508 let commit = &self.settings.repository.commit;
509 let path: PathBuf = [results_directory, &format!("results-{commit}").into()]
510 .iter()
511 .collect();
512 fs::create_dir_all(&path).expect("Failed to create log directory");
513 aggregator.save(path);
514
515 display::done();
516 Ok(aggregator)
517 }
518
519 pub async fn download_logs(
521 &self,
522 parameters: &BenchmarkParameters<T>,
523 ) -> TestbedResult<LogsAnalyzer> {
524 let (clients, nodes, _) = self.select_instances(parameters)?;
526
527 let commit = &self.settings.repository.commit;
529 let path: PathBuf = [
530 &self.settings.logs_dir,
531 &format!("logs-{commit}").into(),
532 &format!("logs-{parameters:?}").into(),
533 ]
534 .iter()
535 .collect();
536 fs::create_dir_all(&path).expect("Failed to create log directory");
537
538 let mut log_parsers = Vec::new();
541
542 display::action("Downloading clients logs");
544 for (i, instance) in clients.iter().enumerate() {
545 display::status(format!("{}/{}", i + 1, clients.len()));
546
547 let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
548 let client_log_content = connection.download("client.log").await?;
549
550 let client_log_file = [path.clone(), format!("client-{i}.log").into()]
551 .iter()
552 .collect::<PathBuf>();
553 fs::write(&client_log_file, client_log_content.as_bytes())
554 .expect("Cannot write log file");
555
556 let mut log_parser = LogsAnalyzer::default();
557 log_parser.set_client_errors(&client_log_content);
558 log_parsers.push(log_parser)
559 }
560 display::done();
561
562 display::action("Downloading nodes logs");
563 for (i, instance) in nodes.iter().enumerate() {
564 display::status(format!("{}/{}", i + 1, nodes.len()));
565
566 let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
567 let node_log_content = connection.download("node.log").await?;
568
569 let node_log_file = [path.clone(), format!("node-{i}.log").into()]
570 .iter()
571 .collect::<PathBuf>();
572 fs::write(&node_log_file, node_log_content.as_bytes()).expect("Cannot write log file");
573
574 let mut log_parser = LogsAnalyzer::default();
575 log_parser.set_node_errors(&node_log_content);
576 log_parsers.push(log_parser)
577 }
578 display::done();
579
580 Ok(LogsAnalyzer::aggregate(log_parsers))
581 }
582
583 pub async fn run_benchmarks(
585 &mut self,
586 mut generator: BenchmarkParametersGenerator<T>,
587 ) -> TestbedResult<()> {
588 display::header("Preparing testbed");
589 display::config("Commit", format!("'{}'", &self.settings.repository.commit));
590 display::newline();
591
592 self.cleanup(true).await?;
594
595 if !self.skip_testbed_update {
597 self.install().await?;
598 self.update().await?;
599 }
600
601 let mut i = 1;
603 let mut latest_committee_size = 0;
604 while let Some(parameters) = generator.next() {
605 display::header(format!("Starting benchmark {i}"));
606 display::config("Benchmark type", ¶meters.benchmark_type);
607 display::config("Parameters", ¶meters);
608 display::newline();
609
610 self.cleanup(true).await?;
612 self.start_monitoring(¶meters).await?;
614
615 if !self.skip_testbed_configuration && latest_committee_size != parameters.nodes {
617 self.configure(¶meters).await?;
618 latest_committee_size = parameters.nodes;
619 }
620
621 self.run_nodes(¶meters).await?;
623
624 self.run_clients(¶meters).await?;
626
627 let aggregator = self.run(¶meters).await?;
630 aggregator.display_summary();
631 generator.register_result(aggregator);
632 self.cleanup(false).await?;
636
637 if self.log_processing {
639 let error_counter = self.download_logs(¶meters).await?;
640 error_counter.print_summary();
641 }
642
643 i += 1;
644 }
645
646 display::header("Benchmark completed");
647 Ok(())
648 }
649}