1use std::{collections::HashMap, fs, io::BufRead, path::Path, time::Duration};
6
7use prettytable::{Table, row};
8use prometheus_parse::Scrape;
9use serde::{Deserialize, Serialize};
10
11use crate::{
12 IotaBenchmarkType,
13 benchmark::{BenchmarkParameters, BenchmarkType, RunInterval},
14 display,
15 protocol::ProtocolMetrics,
16 settings::Settings,
17};
18
19type BucketId = String;
21
22#[derive(Serialize, Deserialize, Default, Clone)]
24pub struct Measurement {
25 pub workload: String,
27 timestamp: Duration,
29 buckets: HashMap<BucketId, usize>,
31 sum: Duration,
33 count: usize,
35 squared_sum: Duration,
37}
38
39impl Measurement {
40 pub fn from_prometheus<M: ProtocolMetrics>(text: &str) -> HashMap<String, Self> {
42 let br = std::io::BufReader::new(text.as_bytes());
43 let parsed = Scrape::parse(br.lines()).expect("Failed to parse Prometheus metrics");
44
45 let mut samples_by_workload: HashMap<String, Vec<&prometheus_parse::Sample>> =
47 HashMap::new();
48 for sample in &parsed.samples {
49 if let Some(workload) = sample.labels.get("workload") {
50 samples_by_workload
51 .entry(workload.to_string())
52 .or_default()
53 .push(sample);
54 }
55 }
56
57 if samples_by_workload.is_empty() {
58 return HashMap::new();
60 }
61
62 let global_timestamp = parsed
64 .samples
65 .iter()
66 .find(|x| x.metric == M::BENCHMARK_DURATION && x.labels.get("workload").is_none())
67 .and_then(|x| match x.value {
68 prometheus_parse::Value::Gauge(value) => Some(Duration::from_secs(value as u64)),
69 _ => None,
70 })
71 .unwrap_or_default();
72
73 samples_by_workload
75 .into_iter()
76 .map(|(workload, workload_samples)| {
77 let buckets: HashMap<_, _> = workload_samples
78 .iter()
79 .find(|x| x.metric == M::LATENCY_BUCKETS)
80 .and_then(|sample| match &sample.value {
81 prometheus_parse::Value::Histogram(values) => Some(
82 values
83 .iter()
84 .map(|x| (x.less_than.to_string(), x.count as usize))
85 .collect(),
86 ),
87 _ => None,
88 })
89 .unwrap_or_default();
90
91 let sum = workload_samples
92 .iter()
93 .find(|x| x.metric == M::LATENCY_SUM)
94 .and_then(|sample| match sample.value {
95 prometheus_parse::Value::Untyped(value) => {
96 Some(Duration::from_secs_f64(value))
97 }
98 _ => None,
99 })
100 .unwrap_or_default();
101
102 let count = workload_samples
103 .iter()
104 .find(|x| x.metric == M::TOTAL_TRANSACTIONS)
105 .and_then(|sample| match sample.value {
106 prometheus_parse::Value::Untyped(value) => Some(value as usize),
107 _ => None,
108 })
109 .unwrap_or_default();
110
111 let squared_sum = workload_samples
112 .iter()
113 .find(|x| x.metric == M::LATENCY_SQUARED_SUM)
114 .and_then(|sample| match sample.value {
115 prometheus_parse::Value::Counter(value) => {
116 Some(Duration::from_secs_f64(value))
117 }
118 _ => None,
119 })
120 .unwrap_or_default();
121
122 let timestamp = workload_samples
124 .iter()
125 .find(|x| x.metric == M::BENCHMARK_DURATION)
126 .and_then(|sample| match sample.value {
127 prometheus_parse::Value::Gauge(value) => {
128 Some(Duration::from_secs(value as u64))
129 }
130 _ => None,
131 })
132 .unwrap_or(global_timestamp);
133
134 let measurement = Self {
135 workload: workload.clone(),
136 timestamp,
137 buckets,
138 sum,
139 count,
140 squared_sum,
141 };
142
143 (workload, measurement)
144 })
145 .collect()
146 }
147
148 pub fn tps(&self, duration: &Duration) -> u64 {
154 let tps = self.count.checked_div(duration.as_secs() as usize);
155 tps.unwrap_or_default() as u64
156 }
157
158 pub fn average_latency(&self) -> Duration {
160 self.sum.checked_div(self.count as u32).unwrap_or_default()
161 }
162
163 pub fn stdev_latency(&self) -> Duration {
166 let first_term = if self.count == 0 {
168 0.0
169 } else {
170 self.squared_sum.as_secs_f64() / self.count as f64
171 };
172
173 let squared_avg = self.average_latency().as_secs_f64().powf(2.0);
175
176 let variance = if squared_avg > first_term {
178 0.0
179 } else {
180 first_term - squared_avg
181 };
182
183 let stdev = variance.sqrt();
185 Duration::from_secs_f64(stdev)
186 }
187
188 #[cfg(test)]
189 pub fn new_for_test(workload: String) -> Self {
190 Self {
191 workload,
192 timestamp: Duration::from_secs(30),
193 buckets: HashMap::new(),
194 sum: Duration::from_secs(1265),
195 count: 1860,
196 squared_sum: Duration::from_secs(952),
197 }
198 }
199}
200
201type ScraperId = usize;
203
204#[derive(Serialize, Deserialize, Clone)]
205pub struct MeasurementsCollection<T> {
206 pub machine_specs: String,
208 pub commit: String,
210 pub parameters: BenchmarkParameters<T>,
212 pub scrapers: HashMap<ScraperId, HashMap<String, Vec<Measurement>>>,
214}
215
216impl<T: BenchmarkType> MeasurementsCollection<T> {
217 pub fn new(settings: &Settings, parameters: BenchmarkParameters<T>) -> Self {
219 Self {
220 machine_specs: settings.node_specs.clone(),
221 commit: settings.repository.commit.clone(),
222 parameters,
223 scrapers: HashMap::new(),
224 }
225 }
226
227 pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, std::io::Error> {
229 let data = fs::read(path)?;
230 let measurements: Self = serde_json::from_slice(data.as_slice())?;
231 Ok(measurements)
232 }
233
234 pub fn add(&mut self, scraper_id: ScraperId, measurements: HashMap<String, Measurement>) {
236 let scraper_workloads = self.scrapers.entry(scraper_id).or_default();
237 for (workload, workload_measurement) in measurements {
238 scraper_workloads
239 .entry(workload)
240 .or_default()
241 .push(workload_measurement);
242 }
243 }
244
245 pub fn transaction_load(&self) -> usize {
247 self.parameters.load
248 }
249
250 pub fn benchmark_duration(&self) -> Duration {
253 self.last_measurements_iter()
254 .map(|x| x.timestamp)
255 .max()
256 .unwrap_or_default()
257 }
258
259 pub fn workload_tps(&self) -> HashMap<String, u64> {
260 let last_measurements: Vec<_> = self.last_measurements_iter().collect();
262
263 last_measurements
264 .into_iter()
265 .fold(HashMap::new(), |mut acc, measurement| {
267 *acc.entry(measurement.workload.clone()).or_insert(0) +=
268 measurement.tps(&measurement.timestamp);
269 acc
270 })
271 }
272
273 pub fn aggregate_tps(&self) -> u64 {
277 let last_measurements: Vec<_> = self.last_measurements_iter().collect();
279
280 last_measurements.iter().map(|x| x.tps(&x.timestamp)).sum()
282 }
283
284 pub fn workload_average_latency(&self) -> HashMap<String, Duration> {
285 let mut workload_data: HashMap<String, (Duration, usize)> = HashMap::new();
287
288 for measurement in self.last_measurements_iter() {
289 workload_data
290 .entry(measurement.workload.clone())
291 .and_modify(|(sum, count)| {
292 *sum += measurement.sum;
293 *count += measurement.count;
294 })
295 .or_insert((measurement.sum, measurement.count));
296 }
297
298 workload_data
300 .into_iter()
301 .map(|(workload, (sum, count))| {
302 let avg = if count == 0 {
303 Duration::default()
304 } else {
305 sum.checked_div(count as u32).unwrap_or_default()
306 };
307 (workload, avg)
308 })
309 .collect()
310 }
311
312 pub fn aggregate_average_latency(&self) -> Duration {
316 let last_measurements: Vec<_> = self.last_measurements_iter().collect();
317
318 let total_sum: Duration = last_measurements.iter().map(|x| x.sum).sum();
319 let total_count: usize = last_measurements.iter().map(|x| x.count).sum();
320
321 if total_count == 0 {
322 return Duration::default();
323 }
324
325 total_sum
326 .checked_div(total_count as u32)
327 .unwrap_or_default()
328 }
329
330 pub fn workload_stdev_latency(&self) -> HashMap<String, Duration> {
331 let mut workload_data: HashMap<String, (Duration, Duration, usize)> = HashMap::new();
333
334 for measurement in self.last_measurements_iter() {
335 workload_data
336 .entry(measurement.workload.clone())
337 .and_modify(|(sum, squared_sum, count)| {
338 *sum += measurement.sum;
339 *squared_sum += measurement.squared_sum;
340 *count += measurement.count;
341 })
342 .or_insert((measurement.sum, measurement.squared_sum, measurement.count));
343 }
344
345 workload_data
347 .into_iter()
348 .map(|(workload, (sum, squared_sum, count))| {
349 let stdev = if count == 0 {
350 Duration::default()
351 } else {
352 let first_term = squared_sum.as_secs_f64() / count as f64;
353 let avg = sum.as_secs_f64() / count as f64;
354 let variance = if avg.powf(2.0) > first_term {
355 0.0
356 } else {
357 first_term - avg.powf(2.0)
358 };
359 Duration::from_secs_f64(variance.sqrt())
360 };
361 (workload, stdev)
362 })
363 .collect()
364 }
365
366 pub fn aggregate_stdev_latency(&self) -> Duration {
370 let last_measurements: Vec<_> = self.last_measurements_iter().collect();
371
372 let total_sum: Duration = last_measurements.iter().map(|x| x.sum).sum();
373 let total_squared_sum: Duration = last_measurements.iter().map(|x| x.squared_sum).sum();
374 let total_count: usize = last_measurements.iter().map(|x| x.count).sum();
375
376 if total_count == 0 {
377 return Duration::default();
378 }
379
380 let first_term = total_squared_sum.as_secs_f64() / total_count as f64;
381 let avg = total_sum.as_secs_f64() / total_count as f64;
382 let variance = if avg.powf(2.0) > first_term {
383 0.0
384 } else {
385 first_term - avg.powf(2.0)
386 };
387
388 Duration::from_secs_f64(variance.sqrt())
389 }
390
391 pub fn workload_p50_latency(&self) -> HashMap<String, Duration> {
392 let mut workload_data: HashMap<String, (HashMap<BucketId, usize>, usize)> = HashMap::new();
394
395 for measurement in self.last_measurements_iter() {
396 workload_data
397 .entry(measurement.workload.clone())
398 .and_modify(|(buckets, count)| {
399 for (bucket_id, bucket_count) in &measurement.buckets {
401 *buckets.entry(bucket_id.clone()).or_insert(0) += bucket_count;
402 }
403 *count += measurement.count;
404 })
405 .or_insert((measurement.buckets.clone(), measurement.count));
406 }
407
408 workload_data
410 .into_iter()
411 .map(|(workload, (buckets, count))| {
412 let p50 = p50_latency(&buckets, count);
413 (workload, p50)
414 })
415 .collect()
416 }
417
418 pub fn aggregate_p50_latency(&self) -> Duration {
421 let last_measurements: Vec<_> = self.last_measurements_iter().collect();
422
423 let mut combined_buckets: HashMap<BucketId, usize> = HashMap::new();
425 let mut total_count = 0;
426
427 for measurement in &last_measurements {
428 for (bucket_id, bucket_count) in &measurement.buckets {
429 *combined_buckets.entry(bucket_id.clone()).or_insert(0) += bucket_count;
430 }
431 total_count += measurement.count;
432 }
433
434 p50_latency(&combined_buckets, total_count)
436 }
437
438 pub fn workload_p99_latency(&self) -> HashMap<String, Duration> {
439 let mut workload_data: HashMap<String, (HashMap<BucketId, usize>, usize)> = HashMap::new();
441
442 for measurement in self.last_measurements_iter() {
443 workload_data
444 .entry(measurement.workload.clone())
445 .and_modify(|(buckets, count)| {
446 for (bucket_id, bucket_count) in &measurement.buckets {
448 *buckets.entry(bucket_id.clone()).or_insert(0) += bucket_count;
449 }
450 *count += measurement.count;
451 })
452 .or_insert((measurement.buckets.clone(), measurement.count));
453 }
454
455 workload_data
457 .into_iter()
458 .map(|(workload, (buckets, count))| {
459 let p99 = p99_latency(&buckets, count);
460 (workload, p99)
461 })
462 .collect()
463 }
464
465 pub fn aggregate_p99_latency(&self) -> Duration {
468 let last_measurements: Vec<_> = self.last_measurements_iter().collect();
469
470 let mut combined_buckets: HashMap<BucketId, usize> = HashMap::new();
472 let mut total_count = 0;
473
474 for measurement in &last_measurements {
475 for (bucket_id, bucket_count) in &measurement.buckets {
476 *combined_buckets.entry(bucket_id.clone()).or_insert(0) += bucket_count;
477 }
478 total_count += measurement.count;
479 }
480
481 p99_latency(&combined_buckets, total_count)
482 }
483
484 pub fn save<P: AsRef<Path>>(&self, path: P) {
486 let json = serde_json::to_string_pretty(self).expect("Cannot serialize metrics");
487 let file = path
488 .as_ref()
489 .join(format!("measurements-{:?}.json", self.parameters));
490 fs::write(file, json).unwrap();
491 }
492
493 pub fn aggregates_metrics_from_files<M: ProtocolMetrics>(
494 &mut self,
495 num_clients: usize,
496 log_dir: &Path,
497 ) {
498 display::action("Processing metrics files");
499
500 let time_limit_secs: Option<u64> = match self.parameters.run_interval {
504 RunInterval::Time(d) => Some(d.as_secs()),
505 RunInterval::Count(_) => None,
506 };
507
508 for i in 0..num_clients {
509 let metrics_file = log_dir.join(format!("metrics-{i}.log"));
510
511 if !metrics_file.exists() {
512 continue;
513 }
514
515 match fs::read_to_string(&metrics_file) {
516 Ok(content) => {
517 display::action(format!("Processing: {}\n", metrics_file.display()));
518
519 let chunks = self.split_into_chunks(&content);
520 for chunk in &chunks {
521 let mut measurements: HashMap<String, Measurement> =
522 Measurement::from_prometheus::<M>(chunk);
523
524 if let Some(limit) = time_limit_secs {
525 measurements.retain(|_, m| m.timestamp.as_secs() <= limit);
528 }
529
530 self.add(i, measurements);
531 }
532
533 display::action(format!("Processed metrics for client {i}\n"));
534 }
535 Err(e) => display::warn(format!("Failed to read metrics file {i}: {e}")),
536 }
537 }
538
539 display::done();
540 }
541
542 fn split_into_chunks(&self, text: &str) -> Vec<String> {
545 let mut chunks = Vec::new();
546 let mut current_chunk = String::new();
547 let mut found_first_help = false;
548
549 for line in text.lines() {
550 let trimmed = line.trim();
551
552 if trimmed.starts_with("# HELP benchmark_duration") {
554 if found_first_help && !current_chunk.is_empty() {
555 chunks.push(current_chunk);
557 current_chunk = String::new();
558 }
559 found_first_help = true;
560 }
561
562 if found_first_help {
563 current_chunk.push_str(line);
564 current_chunk.push('\n');
565 }
566 }
567
568 if !current_chunk.is_empty() {
570 chunks.push(current_chunk);
571 }
572
573 chunks
574 }
575
576 pub fn display_summary(&self) {
578 let duration = self.benchmark_duration();
579 let workload_tps = self.workload_tps();
580 let total_tps = self.aggregate_tps();
581 let workload_latency = self.workload_average_latency();
582 let average_latency = self.aggregate_average_latency();
583 let workload_stdev_latency = self.workload_stdev_latency();
584 let stdev_latency = self.aggregate_stdev_latency();
585 let workload_p50_latency = self.workload_p50_latency();
586 let p50_latency = self.aggregate_p50_latency();
587 let workload_p99_latency = self.workload_p99_latency();
588 let p99_latency = self.aggregate_p99_latency();
589
590 let target = self.parameters.load as f64;
591 let achieved = total_tps as f64;
592 let efficiency = if target > 0.0 {
593 100.0 * achieved / target
594 } else {
595 0.0
596 };
597
598 let mut table = Table::new();
599 table.set_format(display::default_table_format());
600
601 table.set_titles(row![bH2->"Benchmark Summary"]);
602
603 table.add_row(row![b->"Benchmark type:", self.parameters.benchmark_type]);
604 table.add_row(row![bH2->""]);
605
606 table.add_row(row![b->"Nodes:", self.parameters.nodes]);
607 table.add_row(
608 row![b->"Use internal IPs:", format!("{}", self.parameters.use_internal_ip_address)],
609 );
610 table.add_row(row![b->"Faults:", self.parameters.faults]);
611
612 table.add_row(row![b->"Load (target):", format!("{} tx/s", self.parameters.load)]);
614 table.add_row(row![b->"Duration:", format!("{} s", duration.as_secs())]);
615
616 table.add_row(row![b->"Achieved TPS:", format!("{total_tps} tx/s")]);
618 table.add_row(row![b->"Efficiency:", format!("{:.1}%", efficiency)]);
619 table.add_row(row![bH2->""]);
620
621 if self.parameters.benchmark_type.to_string()
624 == IotaBenchmarkType::AbstractAccountBench.to_string()
625 {
626 table.add_row(row![bH2->"AA config"]);
627 table.add_row(row![b->"Authenticator:", self.parameters.aa_authenticator.to_string()]);
628 table.add_row(row![b->"Stress workers:", self.parameters.stress_num_workers]);
629 table.add_row(
630 row![b->"Stress in-flight ratio:", self.parameters.stress_in_flight_ratio],
631 );
632
633 table.add_row(row![b->"AA split amount:", self.parameters.aa_split_amount]);
634
635 table.add_row(
636 row![b->"Stress client threads:", self.parameters.stress_num_client_threads],
637 );
638 table.add_row(
639 row![b->"Stress server threads:", self.parameters.stress_num_server_threads],
640 );
641 table.add_row(row![bH2->""]);
642 }
643
644 println!("Grafana UI:");
645 println!(
646 "ssh -i /Users/pk/.ssh/aws_orchestrator -L 3000:127.0.0.1:3000 <ubuntu@<metrics_public_ip>"
647 );
648
649 table.add_row(row![bH2->"Per-workload throughput"]);
650 for (workload, tps) in &workload_tps {
651 table.add_row(row![b->format!(" {workload} TPS:"), format!("{tps} tx/s")]);
652 }
653 table.add_row(row![bH2->""]);
654
655 table.add_row(row![bH2->"Latency"]);
656 table.add_row(row![b->"Latency (avg):", format!("{} ms", average_latency.as_millis())]);
657 table.add_row(row![b->"Latency (stdev):", format!("{} ms", stdev_latency.as_millis())]);
658
659 table.add_row(row![bH2->""]);
660 table.add_row(row![bH2->"Per-workload average latency"]);
661 for (workload, latency) in &workload_latency {
662 table.add_row(
663 row![b->format!(" {workload} avg:"), format!("{} ms", latency.as_millis())],
664 );
665 }
666 table.add_row(row![bH2->""]);
667
668 table.add_row(row![b->"Latency (p50):", format!("{} ms", p50_latency.as_millis())]);
669 for (workload, latency) in &workload_p50_latency {
670 table.add_row(
671 row![b->format!(" {workload} p50 Latency:"), format!("{} ms", latency.as_millis())],
672 );
673 }
674 table.add_row(row![bH2->""]);
675
676 table.add_row(row![b->"Latency (p99):", format!("{} ms", p99_latency.as_millis())]);
677 for (workload, latency) in &workload_p99_latency {
678 table.add_row(
679 row![b->format!(" {workload} p99 Latency:"), format!("{} ms", latency.as_millis())],
680 );
681 }
682 table.add_row(row![bH2->""]);
683
684 table.add_row(row![b->"Latency (stdev):", format!("{} ms", stdev_latency.as_millis())]);
685 for (workload, latency) in &workload_stdev_latency {
686 table.add_row(
687 row![b->format!(" {workload} stdev:"), format!("{} ms", latency.as_millis())],
688 );
689 }
690
691 display::newline();
692 table.printstd();
693
694 let table_string = format!("{}", table);
696 crate::logger::log(&table_string);
697
698 display::newline();
699 }
700
701 fn last_measurements_iter(&self) -> impl Iterator<Item = &Measurement> {
704 self.scrapers
705 .values()
706 .flat_map(|workload_map| workload_map.values())
707 .filter_map(|measurements| measurements.last())
708 }
709}
710
711fn p50_latency(buckets: &HashMap<BucketId, usize>, count: usize) -> Duration {
713 histogram_quantile(buckets, count, 0.5)
714}
715
716fn p99_latency(buckets: &HashMap<BucketId, usize>, count: usize) -> Duration {
718 histogram_quantile(buckets, count, 0.99)
719}
720
721fn histogram_quantile(buckets: &HashMap<BucketId, usize>, count: usize, quantile: f64) -> Duration {
724 if count == 0 || !(0.0..=1.0).contains(&quantile) {
725 return Duration::default();
726 }
727
728 let mut buckets: Vec<(f64, usize)> = buckets
730 .iter()
731 .filter_map(|(bucket, count)| {
732 let bound = if bucket == "inf" {
733 f64::INFINITY
734 } else {
735 bucket.parse::<f64>().ok()?
736 };
737 Some((bound, *count))
738 })
739 .collect();
740
741 if buckets.is_empty() {
742 return Duration::default();
743 }
744
745 buckets.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
746
747 let rank = quantile * count as f64;
749
750 if rank < 0.0 {
752 return Duration::default();
753 }
754
755 let mut prev_count = 0.0;
774 let mut prev_bound = 0.0;
775
776 for (bound, count) in buckets {
777 let count_f64 = count as f64;
778
779 if count_f64 >= rank {
781 if prev_count == 0.0 || count_f64 == prev_count {
783 return Duration::from_secs_f64(bound);
784 }
785
786 let fraction = (rank - prev_count) / (count_f64 - prev_count);
790 let interpolated = prev_bound + (bound - prev_bound) * fraction;
791
792 return Duration::from_secs_f64(interpolated);
793 }
794
795 prev_count = count_f64;
796 prev_bound = bound;
797 }
798
799 Duration::from_secs_f64(prev_bound)
801}
802
803#[cfg(test)]
804mod test {
805 use std::{collections::HashMap, time::Duration};
806
807 use super::{BenchmarkParameters, Measurement, MeasurementsCollection};
808 use crate::{
809 benchmark::{RunInterval, test::TestBenchmarkType},
810 protocol::test_protocol_metrics::TestProtocolMetrics,
811 settings::Settings,
812 };
813
814 #[test]
815 fn average_latency() {
816 let data = Measurement {
817 workload: "transfer_object".into(),
818 timestamp: Duration::from_secs(10),
819 buckets: HashMap::new(),
820 sum: Duration::from_secs(2),
821 count: 100,
822 squared_sum: Duration::from_secs(0),
823 };
824
825 assert_eq!(data.average_latency(), Duration::from_millis(20));
826 }
827
828 #[test]
829 fn stdev_latency() {
830 let data = Measurement {
831 workload: "transfer_object".into(),
832 timestamp: Duration::from_secs(10),
833 buckets: HashMap::new(),
834 sum: Duration::from_secs(50),
835 count: 100,
836 squared_sum: Duration::from_secs(75),
837 };
838
839 assert_eq!(
841 data.squared_sum.checked_div(data.count as u32),
842 Some(Duration::from_secs_f64(0.75))
843 );
844 assert_eq!(data.average_latency().as_secs_f64().powf(2.0), 0.25);
846 let stdev = data.stdev_latency();
848 assert_eq!((stdev.as_secs_f64() * 10.0).round(), 7.0);
849 }
850
851 #[test]
852 fn p50_latency() {
853 let data = Measurement {
863 workload: "transfer_object".into(),
864 timestamp: Duration::from_secs(30),
865 buckets: [
866 ("0.1".into(), 0),
867 ("0.25".into(), 0),
868 ("0.5".into(), 506),
869 ("0.75".into(), 1282),
870 ("1".into(), 1693),
871 ("1.25".into(), 1816),
872 ("1.5".into(), 1860),
873 ("inf".into(), 1860),
874 ]
875 .iter()
876 .cloned()
877 .collect(),
878 sum: Duration::from_secs(1265),
879 count: 1860,
880 squared_sum: Duration::from_secs(952),
881 };
882
883 let p50 = super::p50_latency(&data.buckets, data.count);
884 assert!(p50.as_millis() >= 636 && p50.as_millis() <= 637);
886 }
887
888 #[test]
889 fn aggregate_average_latency_weighted() {
890 let settings = Settings::new_for_test();
892 let mut aggregator = MeasurementsCollection::<TestBenchmarkType>::new(
893 &settings,
894 BenchmarkParameters::default(),
895 );
896
897 let measurement1 = HashMap::from([(
899 "test".to_string(),
900 Measurement {
901 workload: "test".into(),
902 timestamp: Duration::from_secs(10),
903 buckets: HashMap::new(),
904 sum: Duration::from_secs(2),
905 count: 100,
906 squared_sum: Duration::from_secs(0),
907 },
908 )]);
909
910 let measurement2 = HashMap::from([(
912 "test".to_string(),
913 Measurement {
914 workload: "test".into(),
915 timestamp: Duration::from_secs(10),
916 buckets: HashMap::new(),
917 sum: Duration::from_secs(10),
918 count: 200,
919 squared_sum: Duration::from_secs(0),
920 },
921 )]);
922
923 aggregator.add(1, measurement1);
924 aggregator.add(2, measurement2);
925
926 let avg = aggregator.aggregate_average_latency();
928 assert_eq!(avg.as_millis(), 40);
929 }
930
931 #[test]
932 fn prometheus_parse() {
933 let report = r#"
934 # HELP benchmark_duration Duration of the benchmark
935 # TYPE benchmark_duration gauge
936 benchmark_duration 30
937 # HELP latency_s Total time in seconds to return a response
938 # TYPE latency_s histogram
939 latency_s_bucket{workload=transfer_object,le=0.1} 0
940 latency_s_bucket{workload=transfer_object,le=0.25} 0
941 latency_s_bucket{workload=transfer_object,le=0.5} 506
942 latency_s_bucket{workload=transfer_object,le=0.75} 1282
943 latency_s_bucket{workload=transfer_object,le=1} 1693
944 latency_s_bucket{workload="transfer_object",le="1.25"} 1816
945 latency_s_bucket{workload="transfer_object",le="1.5"} 1860
946 latency_s_bucket{workload="transfer_object",le="1.75"} 1860
947 latency_s_bucket{workload="transfer_object",le="2"} 1860
948 latency_s_bucket{workload=transfer_object,le=2.5} 1860
949 latency_s_bucket{workload=transfer_object,le=5} 1860
950 latency_s_bucket{workload=transfer_object,le=10} 1860
951 latency_s_bucket{workload=transfer_object,le=20} 1860
952 latency_s_bucket{workload=transfer_object,le=30} 1860
953 latency_s_bucket{workload=transfer_object,le=60} 1860
954 latency_s_bucket{workload=transfer_object,le=90} 1860
955 latency_s_bucket{workload=transfer_object,le=+Inf} 1860
956 latency_s_sum{workload=transfer_object} 1265.287933130998
957 latency_s_count{workload=transfer_object} 1860
958 # HELP latency_squared_s Square of total time in seconds to return a response
959 # TYPE latency_squared_s counter
960 latency_squared_s{workload="transfer_object"} 952.8160642745289
961 "#;
962
963 let measurement = Measurement::from_prometheus::<TestProtocolMetrics>(report);
964 let settings = Settings::new_for_test();
965 let mut aggregator = MeasurementsCollection::<TestBenchmarkType>::new(
966 &settings,
967 BenchmarkParameters::default(),
968 );
969 let scraper_id = 1;
970 aggregator.add(scraper_id, measurement);
971
972 assert_eq!(aggregator.scrapers.len(), 1);
973 let scraper_data = aggregator.scrapers.get(&scraper_id).unwrap();
974 assert_eq!(scraper_data.len(), 1); let data_points = scraper_data.get("transfer_object").unwrap();
977 assert_eq!(data_points.len(), 1);
978
979 let data = &data_points[0];
980 assert_eq!(
981 data.buckets,
982 ([
983 ("0.1".into(), 0),
984 ("0.25".into(), 0),
985 ("0.5".into(), 506),
986 ("0.75".into(), 1282),
987 ("1".into(), 1693),
988 ("1.25".into(), 1816),
989 ("1.5".into(), 1860),
990 ("1.75".into(), 1860),
991 ("2".into(), 1860),
992 ("2.5".into(), 1860),
993 ("5".into(), 1860),
994 ("10".into(), 1860),
995 ("20".into(), 1860),
996 ("30".into(), 1860),
997 ("60".into(), 1860),
998 ("90".into(), 1860),
999 ("inf".into(), 1860)
1000 ])
1001 .iter()
1002 .cloned()
1003 .collect()
1004 );
1005 assert_eq!(data.sum.as_secs(), 1265);
1006 assert_eq!(data.count, 1860);
1007 assert_eq!(data.timestamp.as_secs(), 30);
1008 assert_eq!(data.squared_sum.as_secs(), 952);
1009 }
1010
1011 #[test]
1012 fn prometheus_parse_multi_workloads() {
1013 let report = r#"
1014 # HELP benchmark_duration Duration of the benchmark
1015 # TYPE benchmark_duration gauge
1016 benchmark_duration 30
1017 # HELP latency_s Total time in seconds to return a response
1018 # TYPE latency_s histogram
1019 latency_s_bucket{workload=transfer_object,le=0.1} 0
1020 latency_s_bucket{workload=transfer_object,le=0.25} 0
1021 latency_s_bucket{workload=transfer_object,le=0.5} 506
1022 latency_s_bucket{workload=transfer_object,le=0.75} 1282
1023 latency_s_bucket{workload=transfer_object,le=1} 1693
1024 latency_s_bucket{workload="transfer_object",le="1.25"} 1816
1025 latency_s_bucket{workload="transfer_object",le="1.5"} 1860
1026 latency_s_bucket{workload="transfer_object",le="1.75"} 1860
1027 latency_s_bucket{workload="transfer_object",le="2"} 1860
1028 latency_s_bucket{workload=transfer_object,le=2.5} 1860
1029 latency_s_bucket{workload=transfer_object,le=5} 1860
1030 latency_s_bucket{workload=transfer_object,le=10} 1860
1031 latency_s_bucket{workload=transfer_object,le=20} 1860
1032 latency_s_bucket{workload=transfer_object,le=30} 1860
1033 latency_s_bucket{workload=transfer_object,le=60} 1860
1034 latency_s_bucket{workload=transfer_object,le=90} 1860
1035 latency_s_bucket{workload=transfer_object,le=+Inf} 1860
1036 latency_s_sum{workload=transfer_object} 1265.287933130998
1037 latency_s_count{workload=transfer_object} 1860
1038 # HELP latency_squared_s Square of total time in seconds to return a response
1039 # TYPE latency_squared_s counter
1040 latency_squared_s{workload="transfer_object"} 952.8160642745289
1041 latency_s_bucket{workload=shared_counter,le=0.1} 0
1042 latency_s_bucket{workload=shared_counter,le=0.25} 1
1043 latency_s_bucket{workload=shared_counter,le=0.5} 600
1044 latency_s_bucket{workload=shared_counter,le=0.75} 1200
1045 latency_s_bucket{workload=shared_counter,le=1} 1600
1046 latency_s_bucket{workload="shared_counter",le="1.25"} 1800
1047 latency_s_bucket{workload="shared_counter",le="1.5"} 1870
1048 latency_s_bucket{workload="shared_counter",le="1.75"} 1870
1049 latency_s_bucket{workload="shared_counter",le="2"} 1870
1050 latency_s_bucket{workload=shared_counter,le=2.5} 1870
1051 latency_s_bucket{workload=shared_counter,le=5} 1870
1052 latency_s_bucket{workload=shared_counter,le=10} 1870
1053 latency_s_bucket{workload=shared_counter,le=20} 1870
1054 latency_s_bucket{workload=shared_counter,le=30} 1870
1055 latency_s_bucket{workload=shared_counter,le=60} 1870
1056 latency_s_bucket{workload=shared_counter,le=90} 1870
1057 latency_s_bucket{workload=shared_counter,le=+Inf} 1870
1058 latency_s_sum{workload=shared_counter} 865.287933130998
1059 latency_s_count{workload=shared_counter} 1870
1060 # HELP latency_squared_s Square of total time in seconds to return a response
1061 # TYPE latency_squared_s counter
1062 latency_squared_s{workload="shared_counter"} 455.8160642745289
1063 "#;
1064
1065 let measurements = Measurement::from_prometheus::<TestProtocolMetrics>(report);
1066 let settings = Settings::new_for_test();
1067 let mut aggregator = MeasurementsCollection::<TestBenchmarkType>::new(
1068 &settings,
1069 BenchmarkParameters::default(),
1070 );
1071 let scraper_id = 1;
1072
1073 aggregator.add(scraper_id, measurements);
1074
1075 assert_eq!(aggregator.scrapers.len(), 1);
1076 let scraper_data = aggregator.scrapers.get(&scraper_id).unwrap();
1077 assert_eq!(scraper_data.len(), 2); let data_points = scraper_data.get("transfer_object").unwrap();
1080 assert_eq!(data_points.len(), 1);
1081
1082 let data = &data_points[0];
1083 assert_eq!(
1084 data.buckets,
1085 ([
1086 ("0.1".into(), 0),
1087 ("0.25".into(), 0),
1088 ("0.5".into(), 506),
1089 ("0.75".into(), 1282),
1090 ("1".into(), 1693),
1091 ("1.25".into(), 1816),
1092 ("1.5".into(), 1860),
1093 ("1.75".into(), 1860),
1094 ("2".into(), 1860),
1095 ("2.5".into(), 1860),
1096 ("5".into(), 1860),
1097 ("10".into(), 1860),
1098 ("20".into(), 1860),
1099 ("30".into(), 1860),
1100 ("60".into(), 1860),
1101 ("90".into(), 1860),
1102 ("inf".into(), 1860)
1103 ])
1104 .iter()
1105 .cloned()
1106 .collect()
1107 );
1108 assert_eq!(data.sum.as_secs(), 1265);
1109 assert_eq!(data.count, 1860);
1110 assert_eq!(data.timestamp.as_secs(), 30);
1111 assert_eq!(data.squared_sum.as_secs(), 952);
1112
1113 let data_points = scraper_data.get("shared_counter").unwrap();
1114 assert_eq!(data_points.len(), 1);
1115
1116 let data = &data_points[0];
1117 assert_eq!(
1118 data.buckets,
1119 ([
1120 ("0.1".into(), 0),
1121 ("0.25".into(), 1),
1122 ("0.5".into(), 600),
1123 ("0.75".into(), 1200),
1124 ("1".into(), 1600),
1125 ("1.25".into(), 1800),
1126 ("1.5".into(), 1870),
1127 ("1.75".into(), 1870),
1128 ("2".into(), 1870),
1129 ("2.5".into(), 1870),
1130 ("5".into(), 1870),
1131 ("10".into(), 1870),
1132 ("20".into(), 1870),
1133 ("30".into(), 1870),
1134 ("60".into(), 1870),
1135 ("90".into(), 1870),
1136 ("inf".into(), 1870)
1137 ])
1138 .iter()
1139 .cloned()
1140 .collect()
1141 );
1142 assert_eq!(data.sum.as_secs(), 865);
1143 assert_eq!(data.count, 1870);
1144 assert_eq!(data.timestamp.as_secs(), 30);
1145 assert_eq!(data.squared_sum.as_secs(), 455);
1146 }
1147
1148 #[test]
1149 #[ignore]
1150 fn debug_real_metrics_aggregation() {
1152 use std::{path::PathBuf, time::Duration};
1153
1154 let metrics_dir = PathBuf::from("PATH/TO/YOUR/METRICS/DIR");
1156
1157 println!("\n\n========== METRICS AGGREGATION DEBUG ==========\n");
1158 println!("Reading metrics from: {}\n", metrics_dir.display());
1159
1160 let settings = Settings::new_for_test();
1161 let num_clients = 10;
1162 let benchmark_parameters = BenchmarkParameters {
1164 run_interval: RunInterval::Time(Duration::from_secs(180)),
1165 load: 1000,
1166 nodes: num_clients,
1167 ..Default::default()
1168 };
1169
1170 let mut aggregator =
1171 MeasurementsCollection::<TestBenchmarkType>::new(&settings, benchmark_parameters);
1172
1173 aggregator.aggregates_metrics_from_files::<TestProtocolMetrics>(num_clients, &metrics_dir);
1175
1176 println!("========== DISPLAY SUMMARY ==========\n");
1177 aggregator.display_summary();
1178 }
1179
1180 #[test]
1181 #[ignore]
1182 fn debug_metrics_from_saved_measurements() {
1184 use std::{fs, path::PathBuf};
1185
1186 use crate::IotaBenchmarkType;
1187
1188 let benchmark_dir = PathBuf::from("PATH/TO/YOUR/SAVED/MEASUREMENTS/DIR");
1189
1190 let mut aggregator: Option<MeasurementsCollection<IotaBenchmarkType>> = None;
1192 if let Ok(entries) = fs::read_dir(&benchmark_dir) {
1193 for entry in entries.filter_map(|e| e.ok()) {
1194 let path = entry.path();
1195 if let Some(filename) = path.file_name() {
1196 let filename_str = filename.to_string_lossy();
1197
1198 if filename_str.starts_with("measurements-") {
1199 match MeasurementsCollection::<IotaBenchmarkType>::load(&path) {
1200 Ok(loaded) => {
1201 println!("Loaded parameters from: {}\n", filename_str);
1202 aggregator = Some(loaded);
1203 break;
1204 }
1205 Err(e) => {
1206 println!("Failed to load {}: {}\n", filename_str, e);
1207 }
1208 }
1209 }
1210 }
1211 }
1212 }
1213
1214 let aggregator = match aggregator {
1215 Some(agg) => agg,
1216 None => {
1217 panic!("No measurement-*.json file found or failed to load");
1218 }
1219 };
1220
1221 println!("========== DISPLAY SUMMARY ==========\n");
1222 aggregator.display_summary();
1223 }
1224}