1use std::{collections::HashMap, fs, io::BufRead, path::Path, time::Duration};
6
7use prettytable::{Table, row};
8use prometheus_parse::Scrape;
9use serde::{Deserialize, Serialize};
10
11use crate::{
12 benchmark::{BenchmarkParameters, BenchmarkType},
13 display,
14 protocol::ProtocolMetrics,
15 settings::Settings,
16};
17
18type BucketId = String;
20
21#[derive(Serialize, Deserialize, Default, Clone)]
23pub struct Measurement {
24 pub workload: String,
26 timestamp: Duration,
28 buckets: HashMap<BucketId, usize>,
30 sum: Duration,
32 count: usize,
34 squared_sum: Duration,
36}
37
38impl Measurement {
39 pub fn from_prometheus<M: ProtocolMetrics>(text: &str) -> HashMap<String, Self> {
41 let br = std::io::BufReader::new(text.as_bytes());
42 let parsed = Scrape::parse(br.lines()).expect("Failed to parse Prometheus metrics");
43
44 let mut samples_by_workload: HashMap<String, Vec<&prometheus_parse::Sample>> =
46 HashMap::new();
47 for sample in &parsed.samples {
48 if let Some(workload) = sample.labels.get("workload") {
49 samples_by_workload
50 .entry(workload.to_string())
51 .or_default()
52 .push(sample);
53 }
54 }
55
56 if samples_by_workload.is_empty() {
57 return HashMap::new();
59 }
60
61 let global_timestamp = parsed
63 .samples
64 .iter()
65 .find(|x| x.metric == M::BENCHMARK_DURATION && x.labels.get("workload").is_none())
66 .and_then(|x| match x.value {
67 prometheus_parse::Value::Gauge(value) => Some(Duration::from_secs(value as u64)),
68 _ => None,
69 })
70 .unwrap_or_default();
71
72 samples_by_workload
74 .into_iter()
75 .map(|(workload, workload_samples)| {
76 let buckets: HashMap<_, _> = workload_samples
77 .iter()
78 .find(|x| x.metric == M::LATENCY_BUCKETS)
79 .and_then(|sample| match &sample.value {
80 prometheus_parse::Value::Histogram(values) => Some(
81 values
82 .iter()
83 .map(|x| (x.less_than.to_string(), x.count as usize))
84 .collect(),
85 ),
86 _ => None,
87 })
88 .unwrap_or_default();
89
90 let sum = workload_samples
91 .iter()
92 .find(|x| x.metric == M::LATENCY_SUM)
93 .and_then(|sample| match sample.value {
94 prometheus_parse::Value::Untyped(value) => {
95 Some(Duration::from_secs_f64(value))
96 }
97 _ => None,
98 })
99 .unwrap_or_default();
100
101 let count = workload_samples
102 .iter()
103 .find(|x| x.metric == M::TOTAL_TRANSACTIONS)
104 .and_then(|sample| match sample.value {
105 prometheus_parse::Value::Untyped(value) => Some(value as usize),
106 _ => None,
107 })
108 .unwrap_or_default();
109
110 let squared_sum = workload_samples
111 .iter()
112 .find(|x| x.metric == M::LATENCY_SQUARED_SUM)
113 .and_then(|sample| match sample.value {
114 prometheus_parse::Value::Counter(value) => {
115 Some(Duration::from_secs_f64(value))
116 }
117 _ => None,
118 })
119 .unwrap_or_default();
120
121 let timestamp = workload_samples
123 .iter()
124 .find(|x| x.metric == M::BENCHMARK_DURATION)
125 .and_then(|sample| match sample.value {
126 prometheus_parse::Value::Gauge(value) => {
127 Some(Duration::from_secs(value as u64))
128 }
129 _ => None,
130 })
131 .unwrap_or(global_timestamp);
132
133 let measurement = Self {
134 workload: workload.clone(),
135 timestamp,
136 buckets,
137 sum,
138 count,
139 squared_sum,
140 };
141
142 (workload, measurement)
143 })
144 .collect()
145 }
146
147 pub fn tps(&self, duration: &Duration) -> u64 {
153 let tps = self.count.checked_div(duration.as_secs() as usize);
154 tps.unwrap_or_default() as u64
155 }
156
157 pub fn average_latency(&self) -> Duration {
159 self.sum.checked_div(self.count as u32).unwrap_or_default()
160 }
161
162 pub fn stdev_latency(&self) -> Duration {
165 let first_term = if self.count == 0 {
167 0.0
168 } else {
169 self.squared_sum.as_secs_f64() / self.count as f64
170 };
171
172 let squared_avg = self.average_latency().as_secs_f64().powf(2.0);
174
175 let variance = if squared_avg > first_term {
177 0.0
178 } else {
179 first_term - squared_avg
180 };
181
182 let stdev = variance.sqrt();
184 Duration::from_secs_f64(stdev)
185 }
186
187 #[cfg(test)]
188 pub fn new_for_test(workload: String) -> Self {
189 Self {
190 workload,
191 timestamp: Duration::from_secs(30),
192 buckets: HashMap::new(),
193 sum: Duration::from_secs(1265),
194 count: 1860,
195 squared_sum: Duration::from_secs(952),
196 }
197 }
198}
199
200type ScraperId = usize;
202
203#[derive(Serialize, Deserialize, Clone)]
204pub struct MeasurementsCollection<T> {
205 pub machine_specs: String,
207 pub commit: String,
209 pub parameters: BenchmarkParameters<T>,
211 pub scrapers: HashMap<ScraperId, HashMap<String, Vec<Measurement>>>,
213}
214
215impl<T: BenchmarkType> MeasurementsCollection<T> {
216 pub fn new(settings: &Settings, parameters: BenchmarkParameters<T>) -> Self {
218 Self {
219 machine_specs: settings.node_specs.clone(),
220 commit: settings.repository.commit.clone(),
221 parameters,
222 scrapers: HashMap::new(),
223 }
224 }
225
226 pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, std::io::Error> {
228 let data = fs::read(path)?;
229 let measurements: Self = serde_json::from_slice(data.as_slice())?;
230 Ok(measurements)
231 }
232
233 pub fn add(&mut self, scraper_id: ScraperId, measurements: HashMap<String, Measurement>) {
235 let scraper_workloads = self.scrapers.entry(scraper_id).or_default();
236 for (workload, workload_measurement) in measurements {
237 scraper_workloads
238 .entry(workload)
239 .or_default()
240 .push(workload_measurement);
241 }
242 }
243
244 pub fn transaction_load(&self) -> usize {
246 self.parameters.load
247 }
248
249 pub fn benchmark_duration(&self) -> Duration {
252 self.last_measurements_iter()
253 .map(|x| x.timestamp)
254 .max()
255 .unwrap_or_default()
256 }
257
258 pub fn workload_tps(&self) -> HashMap<String, u64> {
259 let last_measurements: Vec<_> = self.last_measurements_iter().collect();
261
262 let duration = last_measurements
264 .iter()
265 .map(|x| x.timestamp)
266 .max()
267 .unwrap_or_default();
268
269 last_measurements
270 .into_iter()
271 .fold(HashMap::new(), |mut acc, measurement| {
273 *acc.entry(measurement.workload.clone()).or_insert(0) += measurement.tps(&duration);
274 acc
275 })
276 }
277
278 pub fn aggregate_tps(&self) -> u64 {
282 let last_measurements: Vec<_> = self.last_measurements_iter().collect();
284
285 let duration = last_measurements
287 .iter()
288 .map(|x| x.timestamp)
289 .max()
290 .unwrap_or_default();
291
292 last_measurements.iter().map(|x| x.tps(&duration)).sum()
294 }
295
296 pub fn workload_average_latency(&self) -> HashMap<String, Duration> {
297 self.last_measurements_iter()
298 .fold(HashMap::new(), |mut acc, measurement| {
300 let latency = measurement.average_latency();
301 acc.entry(measurement.workload.clone())
302 .and_modify(|max_latency| {
303 if latency > *max_latency {
304 *max_latency = latency;
305 }
306 })
307 .or_insert(latency);
308 acc
309 })
310 }
311
312 pub fn aggregate_average_latency(&self) -> Duration {
315 let last_measurements: Vec<_> = self.last_measurements_iter().collect();
316
317 last_measurements
318 .iter()
319 .map(|x| x.average_latency())
320 .sum::<Duration>()
321 .checked_div(last_measurements.len() as u32)
322 .unwrap_or_default()
323 }
324
325 pub fn workload_stdev_latency(&self) -> HashMap<String, Duration> {
326 self.last_measurements_iter()
327 .fold(HashMap::new(), |mut acc, measurement| {
329 let stdev = measurement.stdev_latency();
330 acc.entry(measurement.workload.clone())
331 .and_modify(|max_stdev| {
332 if stdev > *max_stdev {
333 *max_stdev = stdev;
334 }
335 })
336 .or_insert(stdev);
337 acc
338 })
339 }
340
341 pub fn aggregate_stdev_latency(&self) -> Duration {
343 self.last_measurements_iter()
344 .map(|x| x.stdev_latency())
345 .max()
346 .unwrap_or_default()
347 }
348
349 pub fn save<P: AsRef<Path>>(&self, path: P) {
351 let json = serde_json::to_string_pretty(self).expect("Cannot serialize metrics");
352 let file = path
353 .as_ref()
354 .join(format!("measurements-{:?}.json", self.parameters));
355 fs::write(file, json).unwrap();
356 }
357
358 pub fn display_summary(&self) {
360 let duration = self.benchmark_duration();
361 let workload_tps = self.workload_tps();
362 let total_tps = self.aggregate_tps();
363 let workload_latency = self.workload_average_latency();
364 let average_latency = self.aggregate_average_latency();
365 let workload_stdev_latency = self.workload_stdev_latency();
366 let stdev_latency = self.aggregate_stdev_latency();
367
368 let mut table = Table::new();
369 table.set_format(display::default_table_format());
370
371 table.set_titles(row![bH2->"Benchmark Summary"]);
372 table.add_row(row![b->"Benchmark type:", self.parameters.benchmark_type]);
373 table.add_row(row![bH2->""]);
374 table.add_row(row![b->"Nodes:", self.parameters.nodes]);
375 table.add_row(
376 row![b->"Use internal IPs:", format!("{}", self.parameters.use_internal_ip_address)],
377 );
378 table.add_row(row![b->"Faults:", self.parameters.faults]);
379 table.add_row(row![b->"Load:", format!("{} tx/s", self.parameters.load)]);
380 table.add_row(row![b->"Duration:", format!("{} s", duration.as_secs())]);
381 table.add_row(row![bH2->""]);
382 table.add_row(row![b->"TPS:", format!("{total_tps} tx/s")]);
383 for (workload, tps) in &workload_tps {
384 table.add_row(row![b->format!(" {workload} TPS:"), format!("{tps} tx/s")]);
385 }
386 table.add_row(row![bH2->""]);
387
388 table.add_row(row![b->"Latency (avg):", format!("{} ms", average_latency.as_millis())]);
389 for (workload, latency) in &workload_latency {
390 table.add_row(
391 row![b->format!(" {workload} Latency:" ), format!("{} ms", latency.as_millis())],
392 );
393 }
394 table.add_row(row![bH2->""]);
395
396 table.add_row(row![b->"Latency (stdev):", format!("{} ms", stdev_latency.as_millis())]);
397 for (workload, latency) in &workload_stdev_latency {
398 table.add_row(
399 row![b->format!(" {workload} Latency:"), format!("{} ms", latency.as_millis())],
400 );
401 }
402
403 display::newline();
404 table.printstd();
405
406 let table_string = format!("{}", table);
408 crate::logger::log(&table_string);
409
410 display::newline();
411 }
412
413 fn last_measurements_iter(&self) -> impl Iterator<Item = &Measurement> {
416 self.scrapers
417 .values()
418 .flat_map(|workload_map| workload_map.values())
419 .filter_map(|measurements| measurements.last())
420 }
421}
422
423#[cfg(test)]
424mod test {
425 use std::{collections::HashMap, time::Duration};
426
427 use super::{BenchmarkParameters, Measurement, MeasurementsCollection};
428 use crate::{
429 benchmark::test::TestBenchmarkType, protocol::test_protocol_metrics::TestProtocolMetrics,
430 settings::Settings,
431 };
432
433 #[test]
434 fn average_latency() {
435 let data = Measurement {
436 workload: "transfer_object".into(),
437 timestamp: Duration::from_secs(10),
438 buckets: HashMap::new(),
439 sum: Duration::from_secs(2),
440 count: 100,
441 squared_sum: Duration::from_secs(0),
442 };
443
444 assert_eq!(data.average_latency(), Duration::from_millis(20));
445 }
446
447 #[test]
448 fn stdev_latency() {
449 let data = Measurement {
450 workload: "transfer_object".into(),
451 timestamp: Duration::from_secs(10),
452 buckets: HashMap::new(),
453 sum: Duration::from_secs(50),
454 count: 100,
455 squared_sum: Duration::from_secs(75),
456 };
457
458 assert_eq!(
460 data.squared_sum.checked_div(data.count as u32),
461 Some(Duration::from_secs_f64(0.75))
462 );
463 assert_eq!(data.average_latency().as_secs_f64().powf(2.0), 0.25);
465 let stdev = data.stdev_latency();
467 assert_eq!((stdev.as_secs_f64() * 10.0).round(), 7.0);
468 }
469
470 #[test]
471 fn prometheus_parse() {
472 let report = r#"
473 # HELP benchmark_duration Duration of the benchmark
474 # TYPE benchmark_duration gauge
475 benchmark_duration 30
476 # HELP latency_s Total time in seconds to return a response
477 # TYPE latency_s histogram
478 latency_s_bucket{workload=transfer_object,le=0.1} 0
479 latency_s_bucket{workload=transfer_object,le=0.25} 0
480 latency_s_bucket{workload=transfer_object,le=0.5} 506
481 latency_s_bucket{workload=transfer_object,le=0.75} 1282
482 latency_s_bucket{workload=transfer_object,le=1} 1693
483 latency_s_bucket{workload="transfer_object",le="1.25"} 1816
484 latency_s_bucket{workload="transfer_object",le="1.5"} 1860
485 latency_s_bucket{workload="transfer_object",le="1.75"} 1860
486 latency_s_bucket{workload="transfer_object",le="2"} 1860
487 latency_s_bucket{workload=transfer_object,le=2.5} 1860
488 latency_s_bucket{workload=transfer_object,le=5} 1860
489 latency_s_bucket{workload=transfer_object,le=10} 1860
490 latency_s_bucket{workload=transfer_object,le=20} 1860
491 latency_s_bucket{workload=transfer_object,le=30} 1860
492 latency_s_bucket{workload=transfer_object,le=60} 1860
493 latency_s_bucket{workload=transfer_object,le=90} 1860
494 latency_s_bucket{workload=transfer_object,le=+Inf} 1860
495 latency_s_sum{workload=transfer_object} 1265.287933130998
496 latency_s_count{workload=transfer_object} 1860
497 # HELP latency_squared_s Square of total time in seconds to return a response
498 # TYPE latency_squared_s counter
499 latency_squared_s{workload="transfer_object"} 952.8160642745289
500 "#;
501
502 let measurement = Measurement::from_prometheus::<TestProtocolMetrics>(report);
503 let settings = Settings::new_for_test();
504 let mut aggregator = MeasurementsCollection::<TestBenchmarkType>::new(
505 &settings,
506 BenchmarkParameters::default(),
507 );
508 let scraper_id = 1;
509 aggregator.add(scraper_id, measurement);
510
511 assert_eq!(aggregator.scrapers.len(), 1);
512 let scraper_data = aggregator.scrapers.get(&scraper_id).unwrap();
513 assert_eq!(scraper_data.len(), 1); let data_points = scraper_data.get("transfer_object").unwrap();
516 assert_eq!(data_points.len(), 1);
517
518 let data = &data_points[0];
519 assert_eq!(
520 data.buckets,
521 ([
522 ("0.1".into(), 0),
523 ("0.25".into(), 0),
524 ("0.5".into(), 506),
525 ("0.75".into(), 1282),
526 ("1".into(), 1693),
527 ("1.25".into(), 1816),
528 ("1.5".into(), 1860),
529 ("1.75".into(), 1860),
530 ("2".into(), 1860),
531 ("2.5".into(), 1860),
532 ("5".into(), 1860),
533 ("10".into(), 1860),
534 ("20".into(), 1860),
535 ("30".into(), 1860),
536 ("60".into(), 1860),
537 ("90".into(), 1860),
538 ("inf".into(), 1860)
539 ])
540 .iter()
541 .cloned()
542 .collect()
543 );
544 assert_eq!(data.sum.as_secs(), 1265);
545 assert_eq!(data.count, 1860);
546 assert_eq!(data.timestamp.as_secs(), 30);
547 assert_eq!(data.squared_sum.as_secs(), 952);
548 }
549
550 #[test]
551 fn prometheus_parse_multi_workloads() {
552 let report = r#"
553 # HELP benchmark_duration Duration of the benchmark
554 # TYPE benchmark_duration gauge
555 benchmark_duration 30
556 # HELP latency_s Total time in seconds to return a response
557 # TYPE latency_s histogram
558 latency_s_bucket{workload=transfer_object,le=0.1} 0
559 latency_s_bucket{workload=transfer_object,le=0.25} 0
560 latency_s_bucket{workload=transfer_object,le=0.5} 506
561 latency_s_bucket{workload=transfer_object,le=0.75} 1282
562 latency_s_bucket{workload=transfer_object,le=1} 1693
563 latency_s_bucket{workload="transfer_object",le="1.25"} 1816
564 latency_s_bucket{workload="transfer_object",le="1.5"} 1860
565 latency_s_bucket{workload="transfer_object",le="1.75"} 1860
566 latency_s_bucket{workload="transfer_object",le="2"} 1860
567 latency_s_bucket{workload=transfer_object,le=2.5} 1860
568 latency_s_bucket{workload=transfer_object,le=5} 1860
569 latency_s_bucket{workload=transfer_object,le=10} 1860
570 latency_s_bucket{workload=transfer_object,le=20} 1860
571 latency_s_bucket{workload=transfer_object,le=30} 1860
572 latency_s_bucket{workload=transfer_object,le=60} 1860
573 latency_s_bucket{workload=transfer_object,le=90} 1860
574 latency_s_bucket{workload=transfer_object,le=+Inf} 1860
575 latency_s_sum{workload=transfer_object} 1265.287933130998
576 latency_s_count{workload=transfer_object} 1860
577 # HELP latency_squared_s Square of total time in seconds to return a response
578 # TYPE latency_squared_s counter
579 latency_squared_s{workload="transfer_object"} 952.8160642745289
580 latency_s_bucket{workload=shared_counter,le=0.1} 0
581 latency_s_bucket{workload=shared_counter,le=0.25} 1
582 latency_s_bucket{workload=shared_counter,le=0.5} 600
583 latency_s_bucket{workload=shared_counter,le=0.75} 1200
584 latency_s_bucket{workload=shared_counter,le=1} 1600
585 latency_s_bucket{workload="shared_counter",le="1.25"} 1800
586 latency_s_bucket{workload="shared_counter",le="1.5"} 1870
587 latency_s_bucket{workload="shared_counter",le="1.75"} 1870
588 latency_s_bucket{workload="shared_counter",le="2"} 1870
589 latency_s_bucket{workload=shared_counter,le=2.5} 1870
590 latency_s_bucket{workload=shared_counter,le=5} 1870
591 latency_s_bucket{workload=shared_counter,le=10} 1870
592 latency_s_bucket{workload=shared_counter,le=20} 1870
593 latency_s_bucket{workload=shared_counter,le=30} 1870
594 latency_s_bucket{workload=shared_counter,le=60} 1870
595 latency_s_bucket{workload=shared_counter,le=90} 1870
596 latency_s_bucket{workload=shared_counter,le=+Inf} 1870
597 latency_s_sum{workload=shared_counter} 865.287933130998
598 latency_s_count{workload=shared_counter} 1870
599 # HELP latency_squared_s Square of total time in seconds to return a response
600 # TYPE latency_squared_s counter
601 latency_squared_s{workload="shared_counter"} 455.8160642745289
602 "#;
603
604 let measurements = Measurement::from_prometheus::<TestProtocolMetrics>(report);
605 let settings = Settings::new_for_test();
606 let mut aggregator = MeasurementsCollection::<TestBenchmarkType>::new(
607 &settings,
608 BenchmarkParameters::default(),
609 );
610 let scraper_id = 1;
611
612 aggregator.add(scraper_id, measurements);
613
614 assert_eq!(aggregator.scrapers.len(), 1);
615 let scraper_data = aggregator.scrapers.get(&scraper_id).unwrap();
616 assert_eq!(scraper_data.len(), 2); let data_points = scraper_data.get("transfer_object").unwrap();
619 assert_eq!(data_points.len(), 1);
620
621 let data = &data_points[0];
622 assert_eq!(
623 data.buckets,
624 ([
625 ("0.1".into(), 0),
626 ("0.25".into(), 0),
627 ("0.5".into(), 506),
628 ("0.75".into(), 1282),
629 ("1".into(), 1693),
630 ("1.25".into(), 1816),
631 ("1.5".into(), 1860),
632 ("1.75".into(), 1860),
633 ("2".into(), 1860),
634 ("2.5".into(), 1860),
635 ("5".into(), 1860),
636 ("10".into(), 1860),
637 ("20".into(), 1860),
638 ("30".into(), 1860),
639 ("60".into(), 1860),
640 ("90".into(), 1860),
641 ("inf".into(), 1860)
642 ])
643 .iter()
644 .cloned()
645 .collect()
646 );
647 assert_eq!(data.sum.as_secs(), 1265);
648 assert_eq!(data.count, 1860);
649 assert_eq!(data.timestamp.as_secs(), 30);
650 assert_eq!(data.squared_sum.as_secs(), 952);
651
652 let data_points = scraper_data.get("shared_counter").unwrap();
653 assert_eq!(data_points.len(), 1);
654
655 let data = &data_points[0];
656 assert_eq!(
657 data.buckets,
658 ([
659 ("0.1".into(), 0),
660 ("0.25".into(), 1),
661 ("0.5".into(), 600),
662 ("0.75".into(), 1200),
663 ("1".into(), 1600),
664 ("1.25".into(), 1800),
665 ("1.5".into(), 1870),
666 ("1.75".into(), 1870),
667 ("2".into(), 1870),
668 ("2.5".into(), 1870),
669 ("5".into(), 1870),
670 ("10".into(), 1870),
671 ("20".into(), 1870),
672 ("30".into(), 1870),
673 ("60".into(), 1870),
674 ("90".into(), 1870),
675 ("inf".into(), 1870)
676 ])
677 .iter()
678 .cloned()
679 .collect()
680 );
681 assert_eq!(data.sum.as_secs(), 865);
682 assert_eq!(data.count, 1870);
683 assert_eq!(data.timestamp.as_secs(), 30);
684 assert_eq!(data.squared_sum.as_secs(), 455);
685 }
686}