1use std::{
6 collections::HashMap,
7 fs,
8 io::BufRead,
9 path::{Path, PathBuf},
10 time::Duration,
11};
12
13use prettytable::{Table, row};
14use prometheus_parse::Scrape;
15use serde::{Deserialize, Serialize};
16
17use crate::{
18 benchmark::{BenchmarkParameters, BenchmarkType},
19 display,
20 protocol::ProtocolMetrics,
21 settings::Settings,
22};
23
24type BucketId = String;
26
27#[derive(Serialize, Deserialize, Default, Clone)]
29pub struct Measurement {
30 timestamp: Duration,
32 buckets: HashMap<BucketId, usize>,
34 sum: Duration,
36 count: usize,
38 squared_sum: Duration,
40}
41
42impl Measurement {
43 pub fn from_prometheus<M: ProtocolMetrics>(text: &str) -> Self {
45 let br = std::io::BufReader::new(text.as_bytes());
46 let parsed = Scrape::parse(br.lines()).unwrap();
47
48 let buckets: HashMap<_, _> = parsed
49 .samples
50 .iter()
51 .find(|x| x.metric == M::LATENCY_BUCKETS)
52 .map(|x| match &x.value {
53 prometheus_parse::Value::Histogram(values) => values
54 .iter()
55 .map(|x| {
56 let bucket_id = x.less_than.to_string();
57 let count = x.count as usize;
58 (bucket_id, count)
59 })
60 .collect(),
61 _ => panic!("Unexpected scraped value"),
62 })
63 .unwrap_or_default();
64
65 let sum = parsed
66 .samples
67 .iter()
68 .find(|x| x.metric == M::LATENCY_SUM)
69 .map(|x| match x.value {
70 prometheus_parse::Value::Untyped(value) => Duration::from_secs_f64(value),
71 _ => panic!("Unexpected scraped value"),
72 })
73 .unwrap_or_default();
74
75 let count = parsed
76 .samples
77 .iter()
78 .find(|x| x.metric == M::TOTAL_TRANSACTIONS)
79 .map(|x| match x.value {
80 prometheus_parse::Value::Untyped(value) => value as usize,
81 _ => panic!("Unexpected scraped value"),
82 })
83 .unwrap_or_default();
84
85 let squared_sum = parsed
86 .samples
87 .iter()
88 .find(|x| x.metric == M::LATENCY_SQUARED_SUM)
89 .map(|x| match x.value {
90 prometheus_parse::Value::Counter(value) => Duration::from_secs_f64(value),
91 _ => panic!("Unexpected scraped value"),
92 })
93 .unwrap_or_default();
94
95 let timestamp = parsed
96 .samples
97 .iter()
98 .find(|x| x.metric == M::BENCHMARK_DURATION)
99 .map(|x| match x.value {
100 prometheus_parse::Value::Gauge(value) => Duration::from_secs(value as u64),
101 _ => panic!("Unexpected scraped value"),
102 })
103 .unwrap_or_default();
104
105 Self {
106 timestamp,
107 buckets,
108 sum,
109 count,
110 squared_sum,
111 }
112 }
113
114 pub fn tps(&self, duration: &Duration) -> u64 {
120 let tps = self.count.checked_div(duration.as_secs() as usize);
121 tps.unwrap_or_default() as u64
122 }
123
124 pub fn average_latency(&self) -> Duration {
126 self.sum.checked_div(self.count as u32).unwrap_or_default()
127 }
128
129 pub fn stdev_latency(&self) -> Duration {
132 let first_term = if self.count == 0 {
134 0.0
135 } else {
136 self.squared_sum.as_secs_f64() / self.count as f64
137 };
138
139 let squared_avg = self.average_latency().as_secs_f64().powf(2.0);
141
142 let variance = if squared_avg > first_term {
144 0.0
145 } else {
146 first_term - squared_avg
147 };
148
149 let stdev = variance.sqrt();
151 Duration::from_secs_f64(stdev)
152 }
153
154 #[cfg(test)]
155 pub fn new_for_test() -> Self {
156 Self {
157 timestamp: Duration::from_secs(30),
158 buckets: HashMap::new(),
159 sum: Duration::from_secs(1265),
160 count: 1860,
161 squared_sum: Duration::from_secs(952),
162 }
163 }
164}
165
166type ScraperId = usize;
168
169#[derive(Serialize, Deserialize, Clone)]
170pub struct MeasurementsCollection<T> {
171 pub machine_specs: String,
173 pub commit: String,
175 pub parameters: BenchmarkParameters<T>,
177 pub scrapers: HashMap<ScraperId, Vec<Measurement>>,
179}
180
181impl<T: BenchmarkType> MeasurementsCollection<T> {
182 pub fn new(settings: &Settings, parameters: BenchmarkParameters<T>) -> Self {
184 Self {
185 machine_specs: settings.specs.clone(),
186 commit: settings.repository.commit.clone(),
187 parameters,
188 scrapers: HashMap::new(),
189 }
190 }
191
192 pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, std::io::Error> {
194 let data = fs::read(path)?;
195 let measurements: Self = serde_json::from_slice(data.as_slice())?;
196 Ok(measurements)
197 }
198
199 pub fn add(&mut self, scraper_id: ScraperId, measurement: Measurement) {
201 self.scrapers
202 .entry(scraper_id)
203 .or_default()
204 .push(measurement);
205 }
206
207 pub fn transaction_load(&self) -> usize {
209 self.parameters.load
210 }
211
212 pub fn benchmark_duration(&self) -> Duration {
215 self.scrapers
216 .values()
217 .filter_map(|x| x.last())
218 .map(|x| x.timestamp)
219 .max()
220 .unwrap_or_default()
221 }
222
223 pub fn aggregate_tps(&self) -> u64 {
225 let duration = self
226 .scrapers
227 .values()
228 .filter_map(|x| x.last())
229 .map(|x| x.timestamp)
230 .max()
231 .unwrap_or_default();
232 self.scrapers
233 .values()
234 .filter_map(|x| x.last())
235 .map(|x| x.tps(&duration))
236 .sum()
237 }
238
239 pub fn aggregate_average_latency(&self) -> Duration {
242 let last_data_points: Vec<_> = self.scrapers.values().filter_map(|x| x.last()).collect();
243 last_data_points
244 .iter()
245 .map(|x| x.average_latency())
246 .sum::<Duration>()
247 .checked_div(last_data_points.len() as u32)
248 .unwrap_or_default()
249 }
250
251 pub fn aggregate_stdev_latency(&self) -> Duration {
253 self.scrapers
254 .values()
255 .filter_map(|x| x.last())
256 .map(|x| x.stdev_latency())
257 .max()
258 .unwrap_or_default()
259 }
260
261 pub fn save<P: AsRef<Path>>(&self, path: P) {
263 let json = serde_json::to_string_pretty(self).expect("Cannot serialize metrics");
264 let mut file = PathBuf::from(path.as_ref());
265 file.push(format!("measurements-{:?}.json", self.parameters));
266 fs::write(file, json).unwrap();
267 }
268
269 pub fn display_summary(&self) {
271 let duration = self.benchmark_duration();
272 let total_tps = self.aggregate_tps();
273 let average_latency = self.aggregate_average_latency();
274 let stdev_latency = self.aggregate_stdev_latency();
275
276 let mut table = Table::new();
277 table.set_format(display::default_table_format());
278
279 table.set_titles(row![bH2->"Benchmark Summary"]);
280 table.add_row(row![b->"Benchmark type:", self.parameters.benchmark_type]);
281 table.add_row(row![bH2->""]);
282 table.add_row(row![b->"Nodes:", self.parameters.nodes]);
283 table.add_row(row![b->"Faults:", self.parameters.faults]);
284 table.add_row(row![b->"Load:", format!("{} tx/s", self.parameters.load)]);
285 table.add_row(row![b->"Duration:", format!("{} s", duration.as_secs())]);
286 table.add_row(row![bH2->""]);
287 table.add_row(row![b->"TPS:", format!("{total_tps} tx/s")]);
288 table.add_row(row![b->"Latency (avg):", format!("{} ms", average_latency.as_millis())]);
289 table.add_row(row![b->"Latency (stdev):", format!("{} ms", stdev_latency.as_millis())]);
290
291 display::newline();
292 table.printstd();
293 display::newline();
294 }
295}
296
297#[cfg(test)]
298mod test {
299 use std::{collections::HashMap, time::Duration};
300
301 use super::{BenchmarkParameters, Measurement, MeasurementsCollection};
302 use crate::{
303 benchmark::test::TestBenchmarkType, protocol::test_protocol_metrics::TestProtocolMetrics,
304 settings::Settings,
305 };
306
307 #[test]
308 fn average_latency() {
309 let data = Measurement {
310 timestamp: Duration::from_secs(10),
311 buckets: HashMap::new(),
312 sum: Duration::from_secs(2),
313 count: 100,
314 squared_sum: Duration::from_secs(0),
315 };
316
317 assert_eq!(data.average_latency(), Duration::from_millis(20));
318 }
319
320 #[test]
321 fn stdev_latency() {
322 let data = Measurement {
323 timestamp: Duration::from_secs(10),
324 buckets: HashMap::new(),
325 sum: Duration::from_secs(50),
326 count: 100,
327 squared_sum: Duration::from_secs(75),
328 };
329
330 assert_eq!(
332 data.squared_sum.checked_div(data.count as u32),
333 Some(Duration::from_secs_f64(0.75))
334 );
335 assert_eq!(data.average_latency().as_secs_f64().powf(2.0), 0.25);
337 let stdev = data.stdev_latency();
339 assert_eq!((stdev.as_secs_f64() * 10.0).round(), 7.0);
340 }
341
342 #[test]
343 fn prometheus_parse() {
344 let report = r#"
345 # HELP benchmark_duration Duration of the benchmark
346 # TYPE benchmark_duration gauge
347 benchmark_duration 30
348 # HELP latency_s Total time in seconds to return a response
349 # TYPE latency_s histogram
350 latency_s_bucket{workload=transfer_object,le=0.1} 0
351 latency_s_bucket{workload=transfer_object,le=0.25} 0
352 latency_s_bucket{workload=transfer_object,le=0.5} 506
353 latency_s_bucket{workload=transfer_object,le=0.75} 1282
354 latency_s_bucket{workload=transfer_object,le=1} 1693
355 latency_s_bucket{workload="transfer_object",le="1.25"} 1816
356 latency_s_bucket{workload="transfer_object",le="1.5"} 1860
357 latency_s_bucket{workload="transfer_object",le="1.75"} 1860
358 latency_s_bucket{workload="transfer_object",le="2"} 1860
359 latency_s_bucket{workload=transfer_object,le=2.5} 1860
360 latency_s_bucket{workload=transfer_object,le=5} 1860
361 latency_s_bucket{workload=transfer_object,le=10} 1860
362 latency_s_bucket{workload=transfer_object,le=20} 1860
363 latency_s_bucket{workload=transfer_object,le=30} 1860
364 latency_s_bucket{workload=transfer_object,le=60} 1860
365 latency_s_bucket{workload=transfer_object,le=90} 1860
366 latency_s_bucket{workload=transfer_object,le=+Inf} 1860
367 latency_s_sum{workload=transfer_object} 1265.287933130998
368 latency_s_count{workload=transfer_object} 1860
369 # HELP latency_squared_s Square of total time in seconds to return a response
370 # TYPE latency_squared_s counter
371 latency_squared_s{workload="transfer_object"} 952.8160642745289
372 "#;
373
374 let measurement = Measurement::from_prometheus::<TestProtocolMetrics>(report);
375 let settings = Settings::new_for_test();
376 let mut aggregator = MeasurementsCollection::<TestBenchmarkType>::new(
377 &settings,
378 BenchmarkParameters::default(),
379 );
380 let scraper_id = 1;
381 aggregator.add(scraper_id, measurement);
382
383 assert_eq!(aggregator.scrapers.len(), 1);
384 let data_points = aggregator.scrapers.get(&scraper_id).unwrap();
385 assert_eq!(data_points.len(), 1);
386
387 let data = &data_points[0];
388 assert_eq!(
389 data.buckets,
390 ([
391 ("0.1".into(), 0),
392 ("0.25".into(), 0),
393 ("0.5".into(), 506),
394 ("0.75".into(), 1282),
395 ("1".into(), 1693),
396 ("1.25".into(), 1816),
397 ("1.5".into(), 1860),
398 ("1.75".into(), 1860),
399 ("2".into(), 1860),
400 ("2.5".into(), 1860),
401 ("5".into(), 1860),
402 ("10".into(), 1860),
403 ("20".into(), 1860),
404 ("30".into(), 1860),
405 ("60".into(), 1860),
406 ("90".into(), 1860),
407 ("inf".into(), 1860)
408 ])
409 .iter()
410 .cloned()
411 .collect()
412 );
413 assert_eq!(data.sum.as_secs(), 1265);
414 assert_eq!(data.count, 1860);
415 assert_eq!(data.timestamp.as_secs(), 30);
416 assert_eq!(data.squared_sum.as_secs(), 952);
417 }
418}