iota_aws_orchestrator/
measurement.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    fs,
8    io::BufRead,
9    path::{Path, PathBuf},
10    time::Duration,
11};
12
13use prettytable::{Table, row};
14use prometheus_parse::Scrape;
15use serde::{Deserialize, Serialize};
16
17use crate::{
18    benchmark::{BenchmarkParameters, BenchmarkType},
19    display,
20    protocol::ProtocolMetrics,
21    settings::Settings,
22};
23
24/// The identifier of prometheus latency buckets.
25type BucketId = String;
26
27/// A snapshot measurement at a given time.
28#[derive(Serialize, Deserialize, Default, Clone)]
29pub struct Measurement {
30    /// Duration since the beginning of the benchmark.
31    timestamp: Duration,
32    /// Latency buckets.
33    buckets: HashMap<BucketId, usize>,
34    /// Sum of the latencies of all finalized transactions.
35    sum: Duration,
36    /// Total number of finalized transactions
37    count: usize,
38    /// Square of the latencies of all finalized transactions.
39    squared_sum: Duration,
40}
41
42impl Measurement {
43    // Make a new measurement from the text exposed by prometheus.
44    pub fn from_prometheus<M: ProtocolMetrics>(text: &str) -> Self {
45        let br = std::io::BufReader::new(text.as_bytes());
46        let parsed = Scrape::parse(br.lines()).unwrap();
47
48        let buckets: HashMap<_, _> = parsed
49            .samples
50            .iter()
51            .find(|x| x.metric == M::LATENCY_BUCKETS)
52            .map(|x| match &x.value {
53                prometheus_parse::Value::Histogram(values) => values
54                    .iter()
55                    .map(|x| {
56                        let bucket_id = x.less_than.to_string();
57                        let count = x.count as usize;
58                        (bucket_id, count)
59                    })
60                    .collect(),
61                _ => panic!("Unexpected scraped value"),
62            })
63            .unwrap_or_default();
64
65        let sum = parsed
66            .samples
67            .iter()
68            .find(|x| x.metric == M::LATENCY_SUM)
69            .map(|x| match x.value {
70                prometheus_parse::Value::Untyped(value) => Duration::from_secs_f64(value),
71                _ => panic!("Unexpected scraped value"),
72            })
73            .unwrap_or_default();
74
75        let count = parsed
76            .samples
77            .iter()
78            .find(|x| x.metric == M::TOTAL_TRANSACTIONS)
79            .map(|x| match x.value {
80                prometheus_parse::Value::Untyped(value) => value as usize,
81                _ => panic!("Unexpected scraped value"),
82            })
83            .unwrap_or_default();
84
85        let squared_sum = parsed
86            .samples
87            .iter()
88            .find(|x| x.metric == M::LATENCY_SQUARED_SUM)
89            .map(|x| match x.value {
90                prometheus_parse::Value::Counter(value) => Duration::from_secs_f64(value),
91                _ => panic!("Unexpected scraped value"),
92            })
93            .unwrap_or_default();
94
95        let timestamp = parsed
96            .samples
97            .iter()
98            .find(|x| x.metric == M::BENCHMARK_DURATION)
99            .map(|x| match x.value {
100                prometheus_parse::Value::Gauge(value) => Duration::from_secs(value as u64),
101                _ => panic!("Unexpected scraped value"),
102            })
103            .unwrap_or_default();
104
105        Self {
106            timestamp,
107            buckets,
108            sum,
109            count,
110            squared_sum,
111        }
112    }
113
114    /// Compute the tps.
115    /// NOTE: Do not use `self.timestamp` as benchmark duration because some
116    /// clients may be unable to submit transactions passed the first few
117    /// seconds of the benchmark. This may happen as a result of a bad
118    /// control system within the nodes.
119    pub fn tps(&self, duration: &Duration) -> u64 {
120        let tps = self.count.checked_div(duration.as_secs() as usize);
121        tps.unwrap_or_default() as u64
122    }
123
124    /// Compute the average latency.
125    pub fn average_latency(&self) -> Duration {
126        self.sum.checked_div(self.count as u32).unwrap_or_default()
127    }
128
129    /// Compute the standard deviation from the sum of squared latencies:
130    /// `stdev = sqrt( squared_sum / count - avg^2 )`
131    pub fn stdev_latency(&self) -> Duration {
132        // Compute `squared_sum / count`.
133        let first_term = if self.count == 0 {
134            0.0
135        } else {
136            self.squared_sum.as_secs_f64() / self.count as f64
137        };
138
139        // Compute `avg^2`.
140        let squared_avg = self.average_latency().as_secs_f64().powf(2.0);
141
142        // Compute `squared_sum / count - avg^2`.
143        let variance = if squared_avg > first_term {
144            0.0
145        } else {
146            first_term - squared_avg
147        };
148
149        // Compute `sqrt( squared_sum / count - avg^2 )`.
150        let stdev = variance.sqrt();
151        Duration::from_secs_f64(stdev)
152    }
153
154    #[cfg(test)]
155    pub fn new_for_test() -> Self {
156        Self {
157            timestamp: Duration::from_secs(30),
158            buckets: HashMap::new(),
159            sum: Duration::from_secs(1265),
160            count: 1860,
161            squared_sum: Duration::from_secs(952),
162        }
163    }
164}
165
166/// The identifier of the scrapers collecting the prometheus metrics.
167type ScraperId = usize;
168
169#[derive(Serialize, Deserialize, Clone)]
170pub struct MeasurementsCollection<T> {
171    /// The machine / instance type.
172    pub machine_specs: String,
173    /// The commit of the codebase.
174    pub commit: String,
175    /// The benchmark parameters of the current run.
176    pub parameters: BenchmarkParameters<T>,
177    /// The data collected by each scraper.
178    pub scrapers: HashMap<ScraperId, Vec<Measurement>>,
179}
180
181impl<T: BenchmarkType> MeasurementsCollection<T> {
182    /// Create a new (empty) collection of measurements.
183    pub fn new(settings: &Settings, parameters: BenchmarkParameters<T>) -> Self {
184        Self {
185            machine_specs: settings.specs.clone(),
186            commit: settings.repository.commit.clone(),
187            parameters,
188            scrapers: HashMap::new(),
189        }
190    }
191
192    /// Load a collection of measurement from a json file.
193    pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, std::io::Error> {
194        let data = fs::read(path)?;
195        let measurements: Self = serde_json::from_slice(data.as_slice())?;
196        Ok(measurements)
197    }
198
199    /// Add a new measurement to the collection.
200    pub fn add(&mut self, scraper_id: ScraperId, measurement: Measurement) {
201        self.scrapers
202            .entry(scraper_id)
203            .or_default()
204            .push(measurement);
205    }
206
207    /// Return the transaction (input) load of the benchmark.
208    pub fn transaction_load(&self) -> usize {
209        self.parameters.load
210    }
211
212    /// Aggregate the benchmark duration of multiple data points by taking the
213    /// max.
214    pub fn benchmark_duration(&self) -> Duration {
215        self.scrapers
216            .values()
217            .filter_map(|x| x.last())
218            .map(|x| x.timestamp)
219            .max()
220            .unwrap_or_default()
221    }
222
223    /// Aggregate the tps of multiple data points by taking the sum.
224    pub fn aggregate_tps(&self) -> u64 {
225        let duration = self
226            .scrapers
227            .values()
228            .filter_map(|x| x.last())
229            .map(|x| x.timestamp)
230            .max()
231            .unwrap_or_default();
232        self.scrapers
233            .values()
234            .filter_map(|x| x.last())
235            .map(|x| x.tps(&duration))
236            .sum()
237    }
238
239    /// Aggregate the average latency of multiple data points by taking the
240    /// average.
241    pub fn aggregate_average_latency(&self) -> Duration {
242        let last_data_points: Vec<_> = self.scrapers.values().filter_map(|x| x.last()).collect();
243        last_data_points
244            .iter()
245            .map(|x| x.average_latency())
246            .sum::<Duration>()
247            .checked_div(last_data_points.len() as u32)
248            .unwrap_or_default()
249    }
250
251    /// Aggregate the stdev latency of multiple data points by taking the max.
252    pub fn aggregate_stdev_latency(&self) -> Duration {
253        self.scrapers
254            .values()
255            .filter_map(|x| x.last())
256            .map(|x| x.stdev_latency())
257            .max()
258            .unwrap_or_default()
259    }
260
261    /// Save the collection of measurements as a json file.
262    pub fn save<P: AsRef<Path>>(&self, path: P) {
263        let json = serde_json::to_string_pretty(self).expect("Cannot serialize metrics");
264        let mut file = PathBuf::from(path.as_ref());
265        file.push(format!("measurements-{:?}.json", self.parameters));
266        fs::write(file, json).unwrap();
267    }
268
269    /// Display a summary of the measurements.
270    pub fn display_summary(&self) {
271        let duration = self.benchmark_duration();
272        let total_tps = self.aggregate_tps();
273        let average_latency = self.aggregate_average_latency();
274        let stdev_latency = self.aggregate_stdev_latency();
275
276        let mut table = Table::new();
277        table.set_format(display::default_table_format());
278
279        table.set_titles(row![bH2->"Benchmark Summary"]);
280        table.add_row(row![b->"Benchmark type:", self.parameters.benchmark_type]);
281        table.add_row(row![bH2->""]);
282        table.add_row(row![b->"Nodes:", self.parameters.nodes]);
283        table.add_row(row![b->"Faults:", self.parameters.faults]);
284        table.add_row(row![b->"Load:", format!("{} tx/s", self.parameters.load)]);
285        table.add_row(row![b->"Duration:", format!("{} s", duration.as_secs())]);
286        table.add_row(row![bH2->""]);
287        table.add_row(row![b->"TPS:", format!("{total_tps} tx/s")]);
288        table.add_row(row![b->"Latency (avg):", format!("{} ms", average_latency.as_millis())]);
289        table.add_row(row![b->"Latency (stdev):", format!("{} ms", stdev_latency.as_millis())]);
290
291        display::newline();
292        table.printstd();
293        display::newline();
294    }
295}
296
297#[cfg(test)]
298mod test {
299    use std::{collections::HashMap, time::Duration};
300
301    use super::{BenchmarkParameters, Measurement, MeasurementsCollection};
302    use crate::{
303        benchmark::test::TestBenchmarkType, protocol::test_protocol_metrics::TestProtocolMetrics,
304        settings::Settings,
305    };
306
307    #[test]
308    fn average_latency() {
309        let data = Measurement {
310            timestamp: Duration::from_secs(10),
311            buckets: HashMap::new(),
312            sum: Duration::from_secs(2),
313            count: 100,
314            squared_sum: Duration::from_secs(0),
315        };
316
317        assert_eq!(data.average_latency(), Duration::from_millis(20));
318    }
319
320    #[test]
321    fn stdev_latency() {
322        let data = Measurement {
323            timestamp: Duration::from_secs(10),
324            buckets: HashMap::new(),
325            sum: Duration::from_secs(50),
326            count: 100,
327            squared_sum: Duration::from_secs(75),
328        };
329
330        // squared_sum / count
331        assert_eq!(
332            data.squared_sum.checked_div(data.count as u32),
333            Some(Duration::from_secs_f64(0.75))
334        );
335        // avg^2
336        assert_eq!(data.average_latency().as_secs_f64().powf(2.0), 0.25);
337        // sqrt( squared_sum / count - avg^2 )
338        let stdev = data.stdev_latency();
339        assert_eq!((stdev.as_secs_f64() * 10.0).round(), 7.0);
340    }
341
342    #[test]
343    fn prometheus_parse() {
344        let report = r#"
345            # HELP benchmark_duration Duration of the benchmark
346            # TYPE benchmark_duration gauge
347            benchmark_duration 30
348            # HELP latency_s Total time in seconds to return a response
349            # TYPE latency_s histogram
350            latency_s_bucket{workload=transfer_object,le=0.1} 0
351            latency_s_bucket{workload=transfer_object,le=0.25} 0
352            latency_s_bucket{workload=transfer_object,le=0.5} 506
353            latency_s_bucket{workload=transfer_object,le=0.75} 1282
354            latency_s_bucket{workload=transfer_object,le=1} 1693
355            latency_s_bucket{workload="transfer_object",le="1.25"} 1816
356            latency_s_bucket{workload="transfer_object",le="1.5"} 1860
357            latency_s_bucket{workload="transfer_object",le="1.75"} 1860
358            latency_s_bucket{workload="transfer_object",le="2"} 1860
359            latency_s_bucket{workload=transfer_object,le=2.5} 1860
360            latency_s_bucket{workload=transfer_object,le=5} 1860
361            latency_s_bucket{workload=transfer_object,le=10} 1860
362            latency_s_bucket{workload=transfer_object,le=20} 1860
363            latency_s_bucket{workload=transfer_object,le=30} 1860
364            latency_s_bucket{workload=transfer_object,le=60} 1860
365            latency_s_bucket{workload=transfer_object,le=90} 1860
366            latency_s_bucket{workload=transfer_object,le=+Inf} 1860
367            latency_s_sum{workload=transfer_object} 1265.287933130998
368            latency_s_count{workload=transfer_object} 1860
369            # HELP latency_squared_s Square of total time in seconds to return a response
370            # TYPE latency_squared_s counter
371            latency_squared_s{workload="transfer_object"} 952.8160642745289
372        "#;
373
374        let measurement = Measurement::from_prometheus::<TestProtocolMetrics>(report);
375        let settings = Settings::new_for_test();
376        let mut aggregator = MeasurementsCollection::<TestBenchmarkType>::new(
377            &settings,
378            BenchmarkParameters::default(),
379        );
380        let scraper_id = 1;
381        aggregator.add(scraper_id, measurement);
382
383        assert_eq!(aggregator.scrapers.len(), 1);
384        let data_points = aggregator.scrapers.get(&scraper_id).unwrap();
385        assert_eq!(data_points.len(), 1);
386
387        let data = &data_points[0];
388        assert_eq!(
389            data.buckets,
390            ([
391                ("0.1".into(), 0),
392                ("0.25".into(), 0),
393                ("0.5".into(), 506),
394                ("0.75".into(), 1282),
395                ("1".into(), 1693),
396                ("1.25".into(), 1816),
397                ("1.5".into(), 1860),
398                ("1.75".into(), 1860),
399                ("2".into(), 1860),
400                ("2.5".into(), 1860),
401                ("5".into(), 1860),
402                ("10".into(), 1860),
403                ("20".into(), 1860),
404                ("30".into(), 1860),
405                ("60".into(), 1860),
406                ("90".into(), 1860),
407                ("inf".into(), 1860)
408            ])
409            .iter()
410            .cloned()
411            .collect()
412        );
413        assert_eq!(data.sum.as_secs(), 1265);
414        assert_eq!(data.count, 1860);
415        assert_eq!(data.timestamp.as_secs(), 30);
416        assert_eq!(data.squared_sum.as_secs(), 952);
417    }
418}