1use std::{
6 collections::HashMap,
7 fs,
8 io::BufRead,
9 path::{Path, PathBuf},
10 time::Duration,
11};
12
13use prettytable::{Table, row};
14use prometheus_parse::Scrape;
15use serde::{Deserialize, Serialize};
16
17use crate::{
18 benchmark::{BenchmarkParameters, BenchmarkType},
19 display,
20 protocol::ProtocolMetrics,
21 settings::Settings,
22};
23
24type BucketId = String;
26
27#[derive(Serialize, Deserialize, Default, Clone)]
29pub struct Measurement {
30 timestamp: Duration,
32 buckets: HashMap<BucketId, usize>,
34 sum: Duration,
36 count: usize,
38 squared_sum: Duration,
40}
41
42impl Measurement {
43 pub fn from_prometheus<M: ProtocolMetrics>(text: &str) -> Self {
45 let br = std::io::BufReader::new(text.as_bytes());
46 let parsed = Scrape::parse(br.lines()).unwrap();
47
48 let buckets: HashMap<_, _> = parsed
49 .samples
50 .iter()
51 .find(|x| x.metric == M::LATENCY_BUCKETS)
52 .map(|x| match &x.value {
53 prometheus_parse::Value::Histogram(values) => values
54 .iter()
55 .map(|x| {
56 let bucket_id = x.less_than.to_string();
57 let count = x.count as usize;
58 (bucket_id, count)
59 })
60 .collect(),
61 _ => panic!("Unexpected scraped value"),
62 })
63 .unwrap_or_default();
64
65 let sum = parsed
66 .samples
67 .iter()
68 .find(|x| x.metric == M::LATENCY_SUM)
69 .map(|x| match x.value {
70 prometheus_parse::Value::Untyped(value) => Duration::from_secs_f64(value),
71 _ => panic!("Unexpected scraped value"),
72 })
73 .unwrap_or_default();
74
75 let count = parsed
76 .samples
77 .iter()
78 .find(|x| x.metric == M::TOTAL_TRANSACTIONS)
79 .map(|x| match x.value {
80 prometheus_parse::Value::Untyped(value) => value as usize,
81 _ => panic!("Unexpected scraped value"),
82 })
83 .unwrap_or_default();
84
85 let squared_sum = parsed
86 .samples
87 .iter()
88 .find(|x| x.metric == M::LATENCY_SQUARED_SUM)
89 .map(|x| match x.value {
90 prometheus_parse::Value::Counter(value) => Duration::from_secs_f64(value),
91 _ => panic!("Unexpected scraped value"),
92 })
93 .unwrap_or_default();
94
95 let timestamp = parsed
96 .samples
97 .iter()
98 .find(|x| x.metric == M::BENCHMARK_DURATION)
99 .map(|x| match x.value {
100 prometheus_parse::Value::Gauge(value) => Duration::from_secs(value as u64),
101 _ => panic!("Unexpected scraped value"),
102 })
103 .unwrap_or_default();
104
105 Self {
106 timestamp,
107 buckets,
108 sum,
109 count,
110 squared_sum,
111 }
112 }
113
114 pub fn tps(&self, duration: &Duration) -> u64 {
120 let tps = self.count.checked_div(duration.as_secs() as usize);
121 tps.unwrap_or_default() as u64
122 }
123
124 pub fn average_latency(&self) -> Duration {
126 self.sum.checked_div(self.count as u32).unwrap_or_default()
127 }
128
129 pub fn stdev_latency(&self) -> Duration {
132 let first_term = if self.count == 0 {
134 0.0
135 } else {
136 self.squared_sum.as_secs_f64() / self.count as f64
137 };
138
139 let squared_avg = self.average_latency().as_secs_f64().powf(2.0);
141
142 let variance = if squared_avg > first_term {
144 0.0
145 } else {
146 first_term - squared_avg
147 };
148
149 let stdev = variance.sqrt();
151 Duration::from_secs_f64(stdev)
152 }
153
154 #[cfg(test)]
155 pub fn new_for_test() -> Self {
156 Self {
157 timestamp: Duration::from_secs(30),
158 buckets: HashMap::new(),
159 sum: Duration::from_secs(1265),
160 count: 1860,
161 squared_sum: Duration::from_secs(952),
162 }
163 }
164}
165
166type ScraperId = usize;
168
169#[derive(Serialize, Deserialize, Clone)]
170pub struct MeasurementsCollection<T> {
171 pub machine_specs: String,
173 pub commit: String,
175 pub parameters: BenchmarkParameters<T>,
177 pub scrapers: HashMap<ScraperId, Vec<Measurement>>,
179}
180
181impl<T: BenchmarkType> MeasurementsCollection<T> {
182 pub fn new(settings: &Settings, parameters: BenchmarkParameters<T>) -> Self {
184 Self {
185 machine_specs: settings.specs.clone(),
186 commit: settings.repository.commit.clone(),
187 parameters,
188 scrapers: HashMap::new(),
189 }
190 }
191
192 pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, std::io::Error> {
194 let data = fs::read(path)?;
195 let measurements: Self = serde_json::from_slice(data.as_slice())?;
196 Ok(measurements)
197 }
198
199 pub fn add(&mut self, scraper_id: ScraperId, measurement: Measurement) {
201 self.scrapers
202 .entry(scraper_id)
203 .or_default()
204 .push(measurement);
205 }
206
207 pub fn transaction_load(&self) -> usize {
209 self.parameters.load
210 }
211
212 pub fn benchmark_duration(&self) -> Duration {
215 self.scrapers
216 .values()
217 .filter_map(|x| x.last())
218 .map(|x| x.timestamp)
219 .max()
220 .unwrap_or_default()
221 }
222
223 pub fn aggregate_tps(&self) -> u64 {
225 let duration = self
226 .scrapers
227 .values()
228 .filter_map(|x| x.last())
229 .map(|x| x.timestamp)
230 .max()
231 .unwrap_or_default();
232 self.scrapers
233 .values()
234 .filter_map(|x| x.last())
235 .map(|x| x.tps(&duration))
236 .sum()
237 }
238
239 pub fn aggregate_average_latency(&self) -> Duration {
242 let last_data_points: Vec<_> = self.scrapers.values().filter_map(|x| x.last()).collect();
243 last_data_points
244 .iter()
245 .map(|x| x.average_latency())
246 .sum::<Duration>()
247 .checked_div(last_data_points.len() as u32)
248 .unwrap_or_default()
249 }
250
251 pub fn aggregate_stdev_latency(&self) -> Duration {
253 self.scrapers
254 .values()
255 .filter_map(|x| x.last())
256 .map(|x| x.stdev_latency())
257 .max()
258 .unwrap_or_default()
259 }
260
261 pub fn save<P: AsRef<Path>>(&self, path: P) {
263 let json = serde_json::to_string_pretty(self).expect("Cannot serialize metrics");
264 let mut file = PathBuf::from(path.as_ref());
265 file.push(format!("measurements-{:?}.json", self.parameters));
266 fs::write(file, json).unwrap();
267 }
268
269 pub fn display_summary(&self) {
271 let duration = self.benchmark_duration();
272 let total_tps = self.aggregate_tps();
273 let average_latency = self.aggregate_average_latency();
274 let stdev_latency = self.aggregate_stdev_latency();
275
276 let mut table = Table::new();
277 table.set_format(display::default_table_format());
278
279 table.set_titles(row![bH2->"Benchmark Summary"]);
280 table.add_row(row![b->"Benchmark type:", self.parameters.benchmark_type]);
281 table.add_row(row![bH2->""]);
282 table.add_row(row![b->"Nodes:", self.parameters.nodes]);
283 table.add_row(
284 row![b->"Use internal IPs:", format!("{}", self.parameters.use_internal_ip_address)],
285 );
286 table.add_row(row![b->"Faults:", self.parameters.faults]);
287 table.add_row(row![b->"Load:", format!("{} tx/s", self.parameters.load)]);
288 table.add_row(row![b->"Duration:", format!("{} s", duration.as_secs())]);
289 table.add_row(row![bH2->""]);
290 table.add_row(row![b->"TPS:", format!("{total_tps} tx/s")]);
291 table.add_row(row![b->"Latency (avg):", format!("{} ms", average_latency.as_millis())]);
292 table.add_row(row![b->"Latency (stdev):", format!("{} ms", stdev_latency.as_millis())]);
293
294 display::newline();
295 table.printstd();
296 display::newline();
297 }
298}
299
300#[cfg(test)]
301mod test {
302 use std::{collections::HashMap, time::Duration};
303
304 use super::{BenchmarkParameters, Measurement, MeasurementsCollection};
305 use crate::{
306 benchmark::test::TestBenchmarkType, protocol::test_protocol_metrics::TestProtocolMetrics,
307 settings::Settings,
308 };
309
310 #[test]
311 fn average_latency() {
312 let data = Measurement {
313 timestamp: Duration::from_secs(10),
314 buckets: HashMap::new(),
315 sum: Duration::from_secs(2),
316 count: 100,
317 squared_sum: Duration::from_secs(0),
318 };
319
320 assert_eq!(data.average_latency(), Duration::from_millis(20));
321 }
322
323 #[test]
324 fn stdev_latency() {
325 let data = Measurement {
326 timestamp: Duration::from_secs(10),
327 buckets: HashMap::new(),
328 sum: Duration::from_secs(50),
329 count: 100,
330 squared_sum: Duration::from_secs(75),
331 };
332
333 assert_eq!(
335 data.squared_sum.checked_div(data.count as u32),
336 Some(Duration::from_secs_f64(0.75))
337 );
338 assert_eq!(data.average_latency().as_secs_f64().powf(2.0), 0.25);
340 let stdev = data.stdev_latency();
342 assert_eq!((stdev.as_secs_f64() * 10.0).round(), 7.0);
343 }
344
345 #[test]
346 fn prometheus_parse() {
347 let report = r#"
348 # HELP benchmark_duration Duration of the benchmark
349 # TYPE benchmark_duration gauge
350 benchmark_duration 30
351 # HELP latency_s Total time in seconds to return a response
352 # TYPE latency_s histogram
353 latency_s_bucket{workload=transfer_object,le=0.1} 0
354 latency_s_bucket{workload=transfer_object,le=0.25} 0
355 latency_s_bucket{workload=transfer_object,le=0.5} 506
356 latency_s_bucket{workload=transfer_object,le=0.75} 1282
357 latency_s_bucket{workload=transfer_object,le=1} 1693
358 latency_s_bucket{workload="transfer_object",le="1.25"} 1816
359 latency_s_bucket{workload="transfer_object",le="1.5"} 1860
360 latency_s_bucket{workload="transfer_object",le="1.75"} 1860
361 latency_s_bucket{workload="transfer_object",le="2"} 1860
362 latency_s_bucket{workload=transfer_object,le=2.5} 1860
363 latency_s_bucket{workload=transfer_object,le=5} 1860
364 latency_s_bucket{workload=transfer_object,le=10} 1860
365 latency_s_bucket{workload=transfer_object,le=20} 1860
366 latency_s_bucket{workload=transfer_object,le=30} 1860
367 latency_s_bucket{workload=transfer_object,le=60} 1860
368 latency_s_bucket{workload=transfer_object,le=90} 1860
369 latency_s_bucket{workload=transfer_object,le=+Inf} 1860
370 latency_s_sum{workload=transfer_object} 1265.287933130998
371 latency_s_count{workload=transfer_object} 1860
372 # HELP latency_squared_s Square of total time in seconds to return a response
373 # TYPE latency_squared_s counter
374 latency_squared_s{workload="transfer_object"} 952.8160642745289
375 "#;
376
377 let measurement = Measurement::from_prometheus::<TestProtocolMetrics>(report);
378 let settings = Settings::new_for_test();
379 let mut aggregator = MeasurementsCollection::<TestBenchmarkType>::new(
380 &settings,
381 BenchmarkParameters::default(),
382 );
383 let scraper_id = 1;
384 aggregator.add(scraper_id, measurement);
385
386 assert_eq!(aggregator.scrapers.len(), 1);
387 let data_points = aggregator.scrapers.get(&scraper_id).unwrap();
388 assert_eq!(data_points.len(), 1);
389
390 let data = &data_points[0];
391 assert_eq!(
392 data.buckets,
393 ([
394 ("0.1".into(), 0),
395 ("0.25".into(), 0),
396 ("0.5".into(), 506),
397 ("0.75".into(), 1282),
398 ("1".into(), 1693),
399 ("1.25".into(), 1816),
400 ("1.5".into(), 1860),
401 ("1.75".into(), 1860),
402 ("2".into(), 1860),
403 ("2.5".into(), 1860),
404 ("5".into(), 1860),
405 ("10".into(), 1860),
406 ("20".into(), 1860),
407 ("30".into(), 1860),
408 ("60".into(), 1860),
409 ("90".into(), 1860),
410 ("inf".into(), 1860)
411 ])
412 .iter()
413 .cloned()
414 .collect()
415 );
416 assert_eq!(data.sum.as_secs(), 1265);
417 assert_eq!(data.count, 1860);
418 assert_eq!(data.timestamp.as_secs(), 30);
419 assert_eq!(data.squared_sum.as_secs(), 952);
420 }
421}