iota_aws_orchestrator/
measurement.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    fs,
8    io::BufRead,
9    path::{Path, PathBuf},
10    time::Duration,
11};
12
13use prettytable::{Table, row};
14use prometheus_parse::Scrape;
15use serde::{Deserialize, Serialize};
16
17use crate::{
18    benchmark::{BenchmarkParameters, BenchmarkType},
19    display,
20    protocol::ProtocolMetrics,
21    settings::Settings,
22};
23
24/// The identifier of prometheus latency buckets.
25type BucketId = String;
26
27/// A snapshot measurement at a given time.
28#[derive(Serialize, Deserialize, Default, Clone)]
29pub struct Measurement {
30    /// Duration since the beginning of the benchmark.
31    timestamp: Duration,
32    /// Latency buckets.
33    buckets: HashMap<BucketId, usize>,
34    /// Sum of the latencies of all finalized transactions.
35    sum: Duration,
36    /// Total number of finalized transactions
37    count: usize,
38    /// Square of the latencies of all finalized transactions.
39    squared_sum: Duration,
40}
41
42impl Measurement {
43    // Make a new measurement from the text exposed by prometheus.
44    pub fn from_prometheus<M: ProtocolMetrics>(text: &str) -> Self {
45        let br = std::io::BufReader::new(text.as_bytes());
46        let parsed = Scrape::parse(br.lines()).unwrap();
47
48        let buckets: HashMap<_, _> = parsed
49            .samples
50            .iter()
51            .find(|x| x.metric == M::LATENCY_BUCKETS)
52            .map(|x| match &x.value {
53                prometheus_parse::Value::Histogram(values) => values
54                    .iter()
55                    .map(|x| {
56                        let bucket_id = x.less_than.to_string();
57                        let count = x.count as usize;
58                        (bucket_id, count)
59                    })
60                    .collect(),
61                _ => panic!("Unexpected scraped value"),
62            })
63            .unwrap_or_default();
64
65        let sum = parsed
66            .samples
67            .iter()
68            .find(|x| x.metric == M::LATENCY_SUM)
69            .map(|x| match x.value {
70                prometheus_parse::Value::Untyped(value) => Duration::from_secs_f64(value),
71                _ => panic!("Unexpected scraped value"),
72            })
73            .unwrap_or_default();
74
75        let count = parsed
76            .samples
77            .iter()
78            .find(|x| x.metric == M::TOTAL_TRANSACTIONS)
79            .map(|x| match x.value {
80                prometheus_parse::Value::Untyped(value) => value as usize,
81                _ => panic!("Unexpected scraped value"),
82            })
83            .unwrap_or_default();
84
85        let squared_sum = parsed
86            .samples
87            .iter()
88            .find(|x| x.metric == M::LATENCY_SQUARED_SUM)
89            .map(|x| match x.value {
90                prometheus_parse::Value::Counter(value) => Duration::from_secs_f64(value),
91                _ => panic!("Unexpected scraped value"),
92            })
93            .unwrap_or_default();
94
95        let timestamp = parsed
96            .samples
97            .iter()
98            .find(|x| x.metric == M::BENCHMARK_DURATION)
99            .map(|x| match x.value {
100                prometheus_parse::Value::Gauge(value) => Duration::from_secs(value as u64),
101                _ => panic!("Unexpected scraped value"),
102            })
103            .unwrap_or_default();
104
105        Self {
106            timestamp,
107            buckets,
108            sum,
109            count,
110            squared_sum,
111        }
112    }
113
114    /// Compute the tps.
115    /// NOTE: Do not use `self.timestamp` as benchmark duration because some
116    /// clients may be unable to submit transactions passed the first few
117    /// seconds of the benchmark. This may happen as a result of a bad
118    /// control system within the nodes.
119    pub fn tps(&self, duration: &Duration) -> u64 {
120        let tps = self.count.checked_div(duration.as_secs() as usize);
121        tps.unwrap_or_default() as u64
122    }
123
124    /// Compute the average latency.
125    pub fn average_latency(&self) -> Duration {
126        self.sum.checked_div(self.count as u32).unwrap_or_default()
127    }
128
129    /// Compute the standard deviation from the sum of squared latencies:
130    /// `stdev = sqrt( squared_sum / count - avg^2 )`
131    pub fn stdev_latency(&self) -> Duration {
132        // Compute `squared_sum / count`.
133        let first_term = if self.count == 0 {
134            0.0
135        } else {
136            self.squared_sum.as_secs_f64() / self.count as f64
137        };
138
139        // Compute `avg^2`.
140        let squared_avg = self.average_latency().as_secs_f64().powf(2.0);
141
142        // Compute `squared_sum / count - avg^2`.
143        let variance = if squared_avg > first_term {
144            0.0
145        } else {
146            first_term - squared_avg
147        };
148
149        // Compute `sqrt( squared_sum / count - avg^2 )`.
150        let stdev = variance.sqrt();
151        Duration::from_secs_f64(stdev)
152    }
153
154    #[cfg(test)]
155    pub fn new_for_test() -> Self {
156        Self {
157            timestamp: Duration::from_secs(30),
158            buckets: HashMap::new(),
159            sum: Duration::from_secs(1265),
160            count: 1860,
161            squared_sum: Duration::from_secs(952),
162        }
163    }
164}
165
166/// The identifier of the scrapers collecting the prometheus metrics.
167type ScraperId = usize;
168
169#[derive(Serialize, Deserialize, Clone)]
170pub struct MeasurementsCollection<T> {
171    /// The machine / instance type.
172    pub machine_specs: String,
173    /// The commit of the codebase.
174    pub commit: String,
175    /// The benchmark parameters of the current run.
176    pub parameters: BenchmarkParameters<T>,
177    /// The data collected by each scraper.
178    pub scrapers: HashMap<ScraperId, Vec<Measurement>>,
179}
180
181impl<T: BenchmarkType> MeasurementsCollection<T> {
182    /// Create a new (empty) collection of measurements.
183    pub fn new(settings: &Settings, parameters: BenchmarkParameters<T>) -> Self {
184        Self {
185            machine_specs: settings.specs.clone(),
186            commit: settings.repository.commit.clone(),
187            parameters,
188            scrapers: HashMap::new(),
189        }
190    }
191
192    /// Load a collection of measurement from a json file.
193    pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, std::io::Error> {
194        let data = fs::read(path)?;
195        let measurements: Self = serde_json::from_slice(data.as_slice())?;
196        Ok(measurements)
197    }
198
199    /// Add a new measurement to the collection.
200    pub fn add(&mut self, scraper_id: ScraperId, measurement: Measurement) {
201        self.scrapers
202            .entry(scraper_id)
203            .or_default()
204            .push(measurement);
205    }
206
207    /// Return the transaction (input) load of the benchmark.
208    pub fn transaction_load(&self) -> usize {
209        self.parameters.load
210    }
211
212    /// Aggregate the benchmark duration of multiple data points by taking the
213    /// max.
214    pub fn benchmark_duration(&self) -> Duration {
215        self.scrapers
216            .values()
217            .filter_map(|x| x.last())
218            .map(|x| x.timestamp)
219            .max()
220            .unwrap_or_default()
221    }
222
223    /// Aggregate the tps of multiple data points by taking the sum.
224    pub fn aggregate_tps(&self) -> u64 {
225        let duration = self
226            .scrapers
227            .values()
228            .filter_map(|x| x.last())
229            .map(|x| x.timestamp)
230            .max()
231            .unwrap_or_default();
232        self.scrapers
233            .values()
234            .filter_map(|x| x.last())
235            .map(|x| x.tps(&duration))
236            .sum()
237    }
238
239    /// Aggregate the average latency of multiple data points by taking the
240    /// average.
241    pub fn aggregate_average_latency(&self) -> Duration {
242        let last_data_points: Vec<_> = self.scrapers.values().filter_map(|x| x.last()).collect();
243        last_data_points
244            .iter()
245            .map(|x| x.average_latency())
246            .sum::<Duration>()
247            .checked_div(last_data_points.len() as u32)
248            .unwrap_or_default()
249    }
250
251    /// Aggregate the stdev latency of multiple data points by taking the max.
252    pub fn aggregate_stdev_latency(&self) -> Duration {
253        self.scrapers
254            .values()
255            .filter_map(|x| x.last())
256            .map(|x| x.stdev_latency())
257            .max()
258            .unwrap_or_default()
259    }
260
261    /// Save the collection of measurements as a json file.
262    pub fn save<P: AsRef<Path>>(&self, path: P) {
263        let json = serde_json::to_string_pretty(self).expect("Cannot serialize metrics");
264        let mut file = PathBuf::from(path.as_ref());
265        file.push(format!("measurements-{:?}.json", self.parameters));
266        fs::write(file, json).unwrap();
267    }
268
269    /// Display a summary of the measurements.
270    pub fn display_summary(&self) {
271        let duration = self.benchmark_duration();
272        let total_tps = self.aggregate_tps();
273        let average_latency = self.aggregate_average_latency();
274        let stdev_latency = self.aggregate_stdev_latency();
275
276        let mut table = Table::new();
277        table.set_format(display::default_table_format());
278
279        table.set_titles(row![bH2->"Benchmark Summary"]);
280        table.add_row(row![b->"Benchmark type:", self.parameters.benchmark_type]);
281        table.add_row(row![bH2->""]);
282        table.add_row(row![b->"Nodes:", self.parameters.nodes]);
283        table.add_row(
284            row![b->"Use internal IPs:", format!("{}", self.parameters.use_internal_ip_address)],
285        );
286        table.add_row(row![b->"Faults:", self.parameters.faults]);
287        table.add_row(row![b->"Load:", format!("{} tx/s", self.parameters.load)]);
288        table.add_row(row![b->"Duration:", format!("{} s", duration.as_secs())]);
289        table.add_row(row![bH2->""]);
290        table.add_row(row![b->"TPS:", format!("{total_tps} tx/s")]);
291        table.add_row(row![b->"Latency (avg):", format!("{} ms", average_latency.as_millis())]);
292        table.add_row(row![b->"Latency (stdev):", format!("{} ms", stdev_latency.as_millis())]);
293
294        display::newline();
295        table.printstd();
296        display::newline();
297    }
298}
299
300#[cfg(test)]
301mod test {
302    use std::{collections::HashMap, time::Duration};
303
304    use super::{BenchmarkParameters, Measurement, MeasurementsCollection};
305    use crate::{
306        benchmark::test::TestBenchmarkType, protocol::test_protocol_metrics::TestProtocolMetrics,
307        settings::Settings,
308    };
309
310    #[test]
311    fn average_latency() {
312        let data = Measurement {
313            timestamp: Duration::from_secs(10),
314            buckets: HashMap::new(),
315            sum: Duration::from_secs(2),
316            count: 100,
317            squared_sum: Duration::from_secs(0),
318        };
319
320        assert_eq!(data.average_latency(), Duration::from_millis(20));
321    }
322
323    #[test]
324    fn stdev_latency() {
325        let data = Measurement {
326            timestamp: Duration::from_secs(10),
327            buckets: HashMap::new(),
328            sum: Duration::from_secs(50),
329            count: 100,
330            squared_sum: Duration::from_secs(75),
331        };
332
333        // squared_sum / count
334        assert_eq!(
335            data.squared_sum.checked_div(data.count as u32),
336            Some(Duration::from_secs_f64(0.75))
337        );
338        // avg^2
339        assert_eq!(data.average_latency().as_secs_f64().powf(2.0), 0.25);
340        // sqrt( squared_sum / count - avg^2 )
341        let stdev = data.stdev_latency();
342        assert_eq!((stdev.as_secs_f64() * 10.0).round(), 7.0);
343    }
344
345    #[test]
346    fn prometheus_parse() {
347        let report = r#"
348            # HELP benchmark_duration Duration of the benchmark
349            # TYPE benchmark_duration gauge
350            benchmark_duration 30
351            # HELP latency_s Total time in seconds to return a response
352            # TYPE latency_s histogram
353            latency_s_bucket{workload=transfer_object,le=0.1} 0
354            latency_s_bucket{workload=transfer_object,le=0.25} 0
355            latency_s_bucket{workload=transfer_object,le=0.5} 506
356            latency_s_bucket{workload=transfer_object,le=0.75} 1282
357            latency_s_bucket{workload=transfer_object,le=1} 1693
358            latency_s_bucket{workload="transfer_object",le="1.25"} 1816
359            latency_s_bucket{workload="transfer_object",le="1.5"} 1860
360            latency_s_bucket{workload="transfer_object",le="1.75"} 1860
361            latency_s_bucket{workload="transfer_object",le="2"} 1860
362            latency_s_bucket{workload=transfer_object,le=2.5} 1860
363            latency_s_bucket{workload=transfer_object,le=5} 1860
364            latency_s_bucket{workload=transfer_object,le=10} 1860
365            latency_s_bucket{workload=transfer_object,le=20} 1860
366            latency_s_bucket{workload=transfer_object,le=30} 1860
367            latency_s_bucket{workload=transfer_object,le=60} 1860
368            latency_s_bucket{workload=transfer_object,le=90} 1860
369            latency_s_bucket{workload=transfer_object,le=+Inf} 1860
370            latency_s_sum{workload=transfer_object} 1265.287933130998
371            latency_s_count{workload=transfer_object} 1860
372            # HELP latency_squared_s Square of total time in seconds to return a response
373            # TYPE latency_squared_s counter
374            latency_squared_s{workload="transfer_object"} 952.8160642745289
375        "#;
376
377        let measurement = Measurement::from_prometheus::<TestProtocolMetrics>(report);
378        let settings = Settings::new_for_test();
379        let mut aggregator = MeasurementsCollection::<TestBenchmarkType>::new(
380            &settings,
381            BenchmarkParameters::default(),
382        );
383        let scraper_id = 1;
384        aggregator.add(scraper_id, measurement);
385
386        assert_eq!(aggregator.scrapers.len(), 1);
387        let data_points = aggregator.scrapers.get(&scraper_id).unwrap();
388        assert_eq!(data_points.len(), 1);
389
390        let data = &data_points[0];
391        assert_eq!(
392            data.buckets,
393            ([
394                ("0.1".into(), 0),
395                ("0.25".into(), 0),
396                ("0.5".into(), 506),
397                ("0.75".into(), 1282),
398                ("1".into(), 1693),
399                ("1.25".into(), 1816),
400                ("1.5".into(), 1860),
401                ("1.75".into(), 1860),
402                ("2".into(), 1860),
403                ("2.5".into(), 1860),
404                ("5".into(), 1860),
405                ("10".into(), 1860),
406                ("20".into(), 1860),
407                ("30".into(), 1860),
408                ("60".into(), 1860),
409                ("90".into(), 1860),
410                ("inf".into(), 1860)
411            ])
412            .iter()
413            .cloned()
414            .collect()
415        );
416        assert_eq!(data.sum.as_secs(), 1265);
417        assert_eq!(data.count, 1860);
418        assert_eq!(data.timestamp.as_secs(), 30);
419        assert_eq!(data.squared_sum.as_secs(), 952);
420    }
421}