iota_aws_orchestrator/
measurement.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, fs, io::BufRead, path::Path, time::Duration};
6
7use prettytable::{Table, row};
8use prometheus_parse::Scrape;
9use serde::{Deserialize, Serialize};
10
11use crate::{
12    IotaBenchmarkType,
13    benchmark::{BenchmarkParameters, BenchmarkType, RunInterval},
14    display,
15    protocol::ProtocolMetrics,
16    settings::Settings,
17};
18
19/// The identifier of prometheus latency buckets.
20type BucketId = String;
21
22/// A snapshot measurement at a given time.
23#[derive(Serialize, Deserialize, Default, Clone)]
24pub struct Measurement {
25    /// The type of the workload, e.g. "transfer_object", "shared_counter".
26    pub workload: String,
27    /// Duration since the beginning of the benchmark.
28    timestamp: Duration,
29    /// Latency buckets.
30    buckets: HashMap<BucketId, usize>,
31    /// Sum of the latencies of all finalized transactions.
32    sum: Duration,
33    /// Total number of finalized transactions
34    count: usize,
35    /// Square of the latencies of all finalized transactions.
36    squared_sum: Duration,
37}
38
39impl Measurement {
40    /// Parse measurements from Prometheus metrics text format.
41    pub fn from_prometheus<M: ProtocolMetrics>(text: &str) -> HashMap<String, Self> {
42        let br = std::io::BufReader::new(text.as_bytes());
43        let parsed = Scrape::parse(br.lines()).expect("Failed to parse Prometheus metrics");
44
45        // Pre-group samples by workload to avoid repeated iteration
46        let mut samples_by_workload: HashMap<String, Vec<&prometheus_parse::Sample>> =
47            HashMap::new();
48        for sample in &parsed.samples {
49            if let Some(workload) = sample.labels.get("workload") {
50                samples_by_workload
51                    .entry(workload.to_string())
52                    .or_default()
53                    .push(sample);
54            }
55        }
56
57        if samples_by_workload.is_empty() {
58            // No workload labels found; return empty measurements
59            return HashMap::new();
60        }
61
62        // Also get the global timestamp (without workload label) as fallback
63        let global_timestamp = parsed
64            .samples
65            .iter()
66            .find(|x| x.metric == M::BENCHMARK_DURATION && x.labels.get("workload").is_none())
67            .and_then(|x| match x.value {
68                prometheus_parse::Value::Gauge(value) => Some(Duration::from_secs(value as u64)),
69                _ => None,
70            })
71            .unwrap_or_default();
72
73        // Extract the measurement for each workload.
74        samples_by_workload
75            .into_iter()
76            .map(|(workload, workload_samples)| {
77                let buckets: HashMap<_, _> = workload_samples
78                    .iter()
79                    .find(|x| x.metric == M::LATENCY_BUCKETS)
80                    .and_then(|sample| match &sample.value {
81                        prometheus_parse::Value::Histogram(values) => Some(
82                            values
83                                .iter()
84                                .map(|x| (x.less_than.to_string(), x.count as usize))
85                                .collect(),
86                        ),
87                        _ => None,
88                    })
89                    .unwrap_or_default();
90
91                let sum = workload_samples
92                    .iter()
93                    .find(|x| x.metric == M::LATENCY_SUM)
94                    .and_then(|sample| match sample.value {
95                        prometheus_parse::Value::Untyped(value) => {
96                            Some(Duration::from_secs_f64(value))
97                        }
98                        _ => None,
99                    })
100                    .unwrap_or_default();
101
102                let count = workload_samples
103                    .iter()
104                    .find(|x| x.metric == M::TOTAL_TRANSACTIONS)
105                    .and_then(|sample| match sample.value {
106                        prometheus_parse::Value::Untyped(value) => Some(value as usize),
107                        _ => None,
108                    })
109                    .unwrap_or_default();
110
111                let squared_sum = workload_samples
112                    .iter()
113                    .find(|x| x.metric == M::LATENCY_SQUARED_SUM)
114                    .and_then(|sample| match sample.value {
115                        prometheus_parse::Value::Counter(value) => {
116                            Some(Duration::from_secs_f64(value))
117                        }
118                        _ => None,
119                    })
120                    .unwrap_or_default();
121
122                // Try to get workload-specific timestamp, fall back to global timestamp
123                let timestamp = workload_samples
124                    .iter()
125                    .find(|x| x.metric == M::BENCHMARK_DURATION)
126                    .and_then(|sample| match sample.value {
127                        prometheus_parse::Value::Gauge(value) => {
128                            Some(Duration::from_secs(value as u64))
129                        }
130                        _ => None,
131                    })
132                    .unwrap_or(global_timestamp);
133
134                let measurement = Self {
135                    workload: workload.clone(),
136                    timestamp,
137                    buckets,
138                    sum,
139                    count,
140                    squared_sum,
141                };
142
143                (workload, measurement)
144            })
145            .collect()
146    }
147
148    /// Compute the tps.
149    /// NOTE: Do not use `self.timestamp` as benchmark duration because some
150    /// clients may be unable to submit transactions passed the first few
151    /// seconds of the benchmark. This may happen as a result of a bad
152    /// control system within the nodes.
153    pub fn tps(&self, duration: &Duration) -> u64 {
154        let tps = self.count.checked_div(duration.as_secs() as usize);
155        tps.unwrap_or_default() as u64
156    }
157
158    /// Compute the average latency.
159    pub fn average_latency(&self) -> Duration {
160        self.sum.checked_div(self.count as u32).unwrap_or_default()
161    }
162
163    /// Compute the standard deviation from the sum of squared latencies:
164    /// `stdev = sqrt( squared_sum / count - avg^2 )`
165    pub fn stdev_latency(&self) -> Duration {
166        // Compute `squared_sum / count`.
167        let first_term = if self.count == 0 {
168            0.0
169        } else {
170            self.squared_sum.as_secs_f64() / self.count as f64
171        };
172
173        // Compute `avg^2`.
174        let squared_avg = self.average_latency().as_secs_f64().powf(2.0);
175
176        // Compute `squared_sum / count - avg^2`.
177        let variance = if squared_avg > first_term {
178            0.0
179        } else {
180            first_term - squared_avg
181        };
182
183        // Compute `sqrt( squared_sum / count - avg^2 )`.
184        let stdev = variance.sqrt();
185        Duration::from_secs_f64(stdev)
186    }
187
188    #[cfg(test)]
189    pub fn new_for_test(workload: String) -> Self {
190        Self {
191            workload,
192            timestamp: Duration::from_secs(30),
193            buckets: HashMap::new(),
194            sum: Duration::from_secs(1265),
195            count: 1860,
196            squared_sum: Duration::from_secs(952),
197        }
198    }
199}
200
201/// The identifier of the scrapers collecting the prometheus metrics.
202type ScraperId = usize;
203
204#[derive(Serialize, Deserialize, Clone)]
205pub struct MeasurementsCollection<T> {
206    /// The machine / instance type.
207    pub machine_specs: String,
208    /// The commit of the codebase.
209    pub commit: String,
210    /// The benchmark parameters of the current run.
211    pub parameters: BenchmarkParameters<T>,
212    /// The data collected by each scraper, organized by workload.
213    pub scrapers: HashMap<ScraperId, HashMap<String, Vec<Measurement>>>,
214}
215
216impl<T: BenchmarkType> MeasurementsCollection<T> {
217    /// Create a new (empty) collection of measurements.
218    pub fn new(settings: &Settings, parameters: BenchmarkParameters<T>) -> Self {
219        Self {
220            machine_specs: settings.node_specs.clone(),
221            commit: settings.repository.commit.clone(),
222            parameters,
223            scrapers: HashMap::new(),
224        }
225    }
226
227    /// Load a collection of measurement from a json file.
228    pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, std::io::Error> {
229        let data = fs::read(path)?;
230        let measurements: Self = serde_json::from_slice(data.as_slice())?;
231        Ok(measurements)
232    }
233
234    /// Add a new measurement to the collection.
235    pub fn add(&mut self, scraper_id: ScraperId, measurements: HashMap<String, Measurement>) {
236        let scraper_workloads = self.scrapers.entry(scraper_id).or_default();
237        for (workload, workload_measurement) in measurements {
238            scraper_workloads
239                .entry(workload)
240                .or_default()
241                .push(workload_measurement);
242        }
243    }
244
245    /// Return the transaction (input) load of the benchmark.
246    pub fn transaction_load(&self) -> usize {
247        self.parameters.load
248    }
249
250    /// Aggregate the benchmark duration of multiple data points by taking the
251    /// max.
252    pub fn benchmark_duration(&self) -> Duration {
253        self.last_measurements_iter()
254            .map(|x| x.timestamp)
255            .max()
256            .unwrap_or_default()
257    }
258
259    pub fn workload_tps(&self) -> HashMap<String, u64> {
260        // Collect all last measurements
261        let last_measurements: Vec<_> = self.last_measurements_iter().collect();
262
263        last_measurements
264            .into_iter()
265            // Sum TPS for each workload across all scrapers
266            .fold(HashMap::new(), |mut acc, measurement| {
267                *acc.entry(measurement.workload.clone()).or_insert(0) +=
268                    measurement.tps(&measurement.timestamp);
269                acc
270            })
271    }
272
273    /// Aggregate the tps of multiple data points by taking the sum.
274    /// Calculates TPS for each workload separately, then sums across all
275    /// workloads.
276    pub fn aggregate_tps(&self) -> u64 {
277        // Collect all last measurements
278        let last_measurements: Vec<_> = self.last_measurements_iter().collect();
279
280        // Calculate and sum TPS for each measurement
281        last_measurements.iter().map(|x| x.tps(&x.timestamp)).sum()
282    }
283
284    pub fn workload_average_latency(&self) -> HashMap<String, Duration> {
285        // Collect sum and count for each workload across all scrapers
286        let mut workload_data: HashMap<String, (Duration, usize)> = HashMap::new();
287
288        for measurement in self.last_measurements_iter() {
289            workload_data
290                .entry(measurement.workload.clone())
291                .and_modify(|(sum, count)| {
292                    *sum += measurement.sum;
293                    *count += measurement.count;
294                })
295                .or_insert((measurement.sum, measurement.count));
296        }
297
298        // Calculate average for each workload
299        workload_data
300            .into_iter()
301            .map(|(workload, (sum, count))| {
302                let avg = if count == 0 {
303                    Duration::default()
304                } else {
305                    sum.checked_div(count as u32).unwrap_or_default()
306                };
307                (workload, avg)
308            })
309            .collect()
310    }
311
312    /// Aggregate the average latency of multiple data points by taking the
313    /// weighted average based on transaction counts.
314    /// This computes: (sum of all latency_sum) / (sum of all counts)
315    pub fn aggregate_average_latency(&self) -> Duration {
316        let last_measurements: Vec<_> = self.last_measurements_iter().collect();
317
318        let total_sum: Duration = last_measurements.iter().map(|x| x.sum).sum();
319        let total_count: usize = last_measurements.iter().map(|x| x.count).sum();
320
321        if total_count == 0 {
322            return Duration::default();
323        }
324
325        total_sum
326            .checked_div(total_count as u32)
327            .unwrap_or_default()
328    }
329
330    pub fn workload_stdev_latency(&self) -> HashMap<String, Duration> {
331        // Collect sum, squared_sum, and count for each workload across all scrapers
332        let mut workload_data: HashMap<String, (Duration, Duration, usize)> = HashMap::new();
333
334        for measurement in self.last_measurements_iter() {
335            workload_data
336                .entry(measurement.workload.clone())
337                .and_modify(|(sum, squared_sum, count)| {
338                    *sum += measurement.sum;
339                    *squared_sum += measurement.squared_sum;
340                    *count += measurement.count;
341                })
342                .or_insert((measurement.sum, measurement.squared_sum, measurement.count));
343        }
344
345        // Calculate stdev for each workload from aggregated data
346        workload_data
347            .into_iter()
348            .map(|(workload, (sum, squared_sum, count))| {
349                let stdev = if count == 0 {
350                    Duration::default()
351                } else {
352                    let first_term = squared_sum.as_secs_f64() / count as f64;
353                    let avg = sum.as_secs_f64() / count as f64;
354                    let variance = if avg.powf(2.0) > first_term {
355                        0.0
356                    } else {
357                        first_term - avg.powf(2.0)
358                    };
359                    Duration::from_secs_f64(variance.sqrt())
360                };
361                (workload, stdev)
362            })
363            .collect()
364    }
365
366    /// Aggregate the stdev latency by combining all squared sums, sums, and
367    /// counts. Uses the pooled variance formula: sqrt((Σsquared_sum /
368    /// Σcount) - (Σsum / Σcount)^2)
369    pub fn aggregate_stdev_latency(&self) -> Duration {
370        let last_measurements: Vec<_> = self.last_measurements_iter().collect();
371
372        let total_sum: Duration = last_measurements.iter().map(|x| x.sum).sum();
373        let total_squared_sum: Duration = last_measurements.iter().map(|x| x.squared_sum).sum();
374        let total_count: usize = last_measurements.iter().map(|x| x.count).sum();
375
376        if total_count == 0 {
377            return Duration::default();
378        }
379
380        let first_term = total_squared_sum.as_secs_f64() / total_count as f64;
381        let avg = total_sum.as_secs_f64() / total_count as f64;
382        let variance = if avg.powf(2.0) > first_term {
383            0.0
384        } else {
385            first_term - avg.powf(2.0)
386        };
387
388        Duration::from_secs_f64(variance.sqrt())
389    }
390
391    pub fn workload_p50_latency(&self) -> HashMap<String, Duration> {
392        // Aggregate buckets and counts for each workload across all scrapers
393        let mut workload_data: HashMap<String, (HashMap<BucketId, usize>, usize)> = HashMap::new();
394
395        for measurement in self.last_measurements_iter() {
396            workload_data
397                .entry(measurement.workload.clone())
398                .and_modify(|(buckets, count)| {
399                    // Sum bucket counts
400                    for (bucket_id, bucket_count) in &measurement.buckets {
401                        *buckets.entry(bucket_id.clone()).or_insert(0) += bucket_count;
402                    }
403                    *count += measurement.count;
404                })
405                .or_insert((measurement.buckets.clone(), measurement.count));
406        }
407
408        // Calculate P50 for each workload from aggregated buckets
409        workload_data
410            .into_iter()
411            .map(|(workload, (buckets, count))| {
412                let p50 = p50_latency(&buckets, count);
413                (workload, p50)
414            })
415            .collect()
416    }
417
418    /// Aggregate the P50 latency by combining all histogram buckets and
419    /// calculating P50 from the combined histogram.
420    pub fn aggregate_p50_latency(&self) -> Duration {
421        let last_measurements: Vec<_> = self.last_measurements_iter().collect();
422
423        // Aggregate all buckets across all workloads and scrapers
424        let mut combined_buckets: HashMap<BucketId, usize> = HashMap::new();
425        let mut total_count = 0;
426
427        for measurement in &last_measurements {
428            for (bucket_id, bucket_count) in &measurement.buckets {
429                *combined_buckets.entry(bucket_id.clone()).or_insert(0) += bucket_count;
430            }
431            total_count += measurement.count;
432        }
433
434        // Calculate P50 from combined histogram
435        p50_latency(&combined_buckets, total_count)
436    }
437
438    pub fn workload_p99_latency(&self) -> HashMap<String, Duration> {
439        // Aggregate buckets and counts for each workload across all scrapers
440        let mut workload_data: HashMap<String, (HashMap<BucketId, usize>, usize)> = HashMap::new();
441
442        for measurement in self.last_measurements_iter() {
443            workload_data
444                .entry(measurement.workload.clone())
445                .and_modify(|(buckets, count)| {
446                    // Sum bucket counts
447                    for (bucket_id, bucket_count) in &measurement.buckets {
448                        *buckets.entry(bucket_id.clone()).or_insert(0) += bucket_count;
449                    }
450                    *count += measurement.count;
451                })
452                .or_insert((measurement.buckets.clone(), measurement.count));
453        }
454
455        // Calculate P99 for each workload from aggregated buckets
456        workload_data
457            .into_iter()
458            .map(|(workload, (buckets, count))| {
459                let p99 = p99_latency(&buckets, count);
460                (workload, p99)
461            })
462            .collect()
463    }
464
465    /// Aggregate the P99 latency by combining all histogram buckets and
466    /// calculating P99 from the combined histogram.
467    pub fn aggregate_p99_latency(&self) -> Duration {
468        let last_measurements: Vec<_> = self.last_measurements_iter().collect();
469
470        // Aggregate all buckets across all workloads and scrapers
471        let mut combined_buckets: HashMap<BucketId, usize> = HashMap::new();
472        let mut total_count = 0;
473
474        for measurement in &last_measurements {
475            for (bucket_id, bucket_count) in &measurement.buckets {
476                *combined_buckets.entry(bucket_id.clone()).or_insert(0) += bucket_count;
477            }
478            total_count += measurement.count;
479        }
480
481        p99_latency(&combined_buckets, total_count)
482    }
483
484    /// Save the collection of measurements as a json file.
485    pub fn save<P: AsRef<Path>>(&self, path: P) {
486        let json = serde_json::to_string_pretty(self).expect("Cannot serialize metrics");
487        let file = path
488            .as_ref()
489            .join(format!("measurements-{:?}.json", self.parameters));
490        fs::write(file, json).unwrap();
491    }
492
493    pub fn aggregates_metrics_from_files<M: ProtocolMetrics>(
494        &mut self,
495        num_clients: usize,
496        log_dir: &Path,
497    ) {
498        display::action("Processing metrics files");
499
500        // IMPORTANT:
501        // - Time-mode: keep only samples within [0 ..= duration_secs]
502        // - Count-mode: do NOT cut by time (the run ends by tx-count, not wall clock)
503        let time_limit_secs: Option<u64> = match self.parameters.run_interval {
504            RunInterval::Time(d) => Some(d.as_secs()),
505            RunInterval::Count(_) => None,
506        };
507
508        for i in 0..num_clients {
509            let metrics_file = log_dir.join(format!("metrics-{i}.log"));
510
511            if !metrics_file.exists() {
512                continue;
513            }
514
515            match fs::read_to_string(&metrics_file) {
516                Ok(content) => {
517                    display::action(format!("Processing: {}\n", metrics_file.display()));
518
519                    let chunks = self.split_into_chunks(&content);
520                    for chunk in &chunks {
521                        let mut measurements: HashMap<String, Measurement> =
522                            Measurement::from_prometheus::<M>(chunk);
523
524                        if let Some(limit) = time_limit_secs {
525                            // Retain only measurements within the benchmark duration (seconds since
526                            // start).
527                            measurements.retain(|_, m| m.timestamp.as_secs() <= limit);
528                        }
529
530                        self.add(i, measurements);
531                    }
532
533                    display::action(format!("Processed metrics for client {i}\n"));
534                }
535                Err(e) => display::warn(format!("Failed to read metrics file {i}: {e}")),
536            }
537        }
538
539        display::done();
540    }
541
542    /// Split metrics content into chunks separated by "# HELP
543    /// benchmark_duration" lines
544    fn split_into_chunks(&self, text: &str) -> Vec<String> {
545        let mut chunks = Vec::new();
546        let mut current_chunk = String::new();
547        let mut found_first_help = false;
548
549        for line in text.lines() {
550            let trimmed = line.trim();
551
552            // Skip everything until we find the first "# HELP benchmark_duration"
553            if trimmed.starts_with("# HELP benchmark_duration") {
554                if found_first_help && !current_chunk.is_empty() {
555                    // We've found another chunk boundary, save the previous one
556                    chunks.push(current_chunk);
557                    current_chunk = String::new();
558                }
559                found_first_help = true;
560            }
561
562            if found_first_help {
563                current_chunk.push_str(line);
564                current_chunk.push('\n');
565            }
566        }
567
568        // Add the last chunk
569        if !current_chunk.is_empty() {
570            chunks.push(current_chunk);
571        }
572
573        chunks
574    }
575
576    /// Display a summary of the measurements.
577    pub fn display_summary(&self) {
578        let duration = self.benchmark_duration();
579        let workload_tps = self.workload_tps();
580        let total_tps = self.aggregate_tps();
581        let workload_latency = self.workload_average_latency();
582        let average_latency = self.aggregate_average_latency();
583        let workload_stdev_latency = self.workload_stdev_latency();
584        let stdev_latency = self.aggregate_stdev_latency();
585        let workload_p50_latency = self.workload_p50_latency();
586        let p50_latency = self.aggregate_p50_latency();
587        let workload_p99_latency = self.workload_p99_latency();
588        let p99_latency = self.aggregate_p99_latency();
589
590        let target = self.parameters.load as f64;
591        let achieved = total_tps as f64;
592        let efficiency = if target > 0.0 {
593            100.0 * achieved / target
594        } else {
595            0.0
596        };
597
598        let mut table = Table::new();
599        table.set_format(display::default_table_format());
600
601        table.set_titles(row![bH2->"Benchmark Summary"]);
602
603        table.add_row(row![b->"Benchmark type:", self.parameters.benchmark_type]);
604        table.add_row(row![bH2->""]);
605
606        table.add_row(row![b->"Nodes:", self.parameters.nodes]);
607        table.add_row(
608            row![b->"Use internal IPs:", format!("{}", self.parameters.use_internal_ip_address)],
609        );
610        table.add_row(row![b->"Faults:", self.parameters.faults]);
611
612        // Workload config
613        table.add_row(row![b->"Load (target):", format!("{} tx/s", self.parameters.load)]);
614        table.add_row(row![b->"Duration:", format!("{} s", duration.as_secs())]);
615
616        // Efficiency / saturation signal
617        table.add_row(row![b->"Achieved TPS:", format!("{total_tps} tx/s")]);
618        table.add_row(row![b->"Efficiency:", format!("{:.1}%", efficiency)]);
619        table.add_row(row![bH2->""]);
620
621        // AA-specific block
622
623        if self.parameters.benchmark_type.to_string()
624            == IotaBenchmarkType::AbstractAccountBench.to_string()
625        {
626            table.add_row(row![bH2->"AA config"]);
627            table.add_row(row![b->"Authenticator:", self.parameters.aa_authenticator.to_string()]);
628            table.add_row(row![b->"Stress workers:", self.parameters.stress_num_workers]);
629            table.add_row(
630                row![b->"Stress in-flight ratio:", self.parameters.stress_in_flight_ratio],
631            );
632
633            table.add_row(row![b->"AA split amount:", self.parameters.aa_split_amount]);
634
635            table.add_row(
636                row![b->"Stress client threads:", self.parameters.stress_num_client_threads],
637            );
638            table.add_row(
639                row![b->"Stress server threads:", self.parameters.stress_num_server_threads],
640            );
641            table.add_row(row![bH2->""]);
642        }
643
644        println!("Grafana UI:");
645        println!(
646            "ssh -i /Users/pk/.ssh/aws_orchestrator -L 3000:127.0.0.1:3000 <ubuntu@<metrics_public_ip>"
647        );
648
649        table.add_row(row![bH2->"Per-workload throughput"]);
650        for (workload, tps) in &workload_tps {
651            table.add_row(row![b->format!("  {workload} TPS:"), format!("{tps} tx/s")]);
652        }
653        table.add_row(row![bH2->""]);
654
655        table.add_row(row![bH2->"Latency"]);
656        table.add_row(row![b->"Latency (avg):", format!("{} ms", average_latency.as_millis())]);
657        table.add_row(row![b->"Latency (stdev):", format!("{} ms", stdev_latency.as_millis())]);
658
659        table.add_row(row![bH2->""]);
660        table.add_row(row![bH2->"Per-workload average latency"]);
661        for (workload, latency) in &workload_latency {
662            table.add_row(
663                row![b->format!("  {workload} avg:"), format!("{} ms", latency.as_millis())],
664            );
665        }
666        table.add_row(row![bH2->""]);
667
668        table.add_row(row![b->"Latency (p50):", format!("{} ms", p50_latency.as_millis())]);
669        for (workload, latency) in &workload_p50_latency {
670            table.add_row(
671                row![b->format!("  {workload} p50 Latency:"), format!("{} ms", latency.as_millis())],
672            );
673        }
674        table.add_row(row![bH2->""]);
675
676        table.add_row(row![b->"Latency (p99):", format!("{} ms", p99_latency.as_millis())]);
677        for (workload, latency) in &workload_p99_latency {
678            table.add_row(
679                row![b->format!("  {workload} p99 Latency:"), format!("{} ms", latency.as_millis())],
680            );
681        }
682        table.add_row(row![bH2->""]);
683
684        table.add_row(row![b->"Latency (stdev):", format!("{} ms", stdev_latency.as_millis())]);
685        for (workload, latency) in &workload_stdev_latency {
686            table.add_row(
687                row![b->format!("  {workload} stdev:"), format!("{} ms", latency.as_millis())],
688            );
689        }
690
691        display::newline();
692        table.printstd();
693
694        // Also log the table to file
695        let table_string = format!("{}", table);
696        crate::logger::log(&table_string);
697
698        display::newline();
699    }
700
701    // Get an iterator over the last measurements of all workloads across all
702    // scrapers
703    fn last_measurements_iter(&self) -> impl Iterator<Item = &Measurement> {
704        self.scrapers
705            .values()
706            .flat_map(|workload_map| workload_map.values())
707            .filter_map(|measurements| measurements.last())
708    }
709}
710
711/// Compute the P50 (median) latency from histogram buckets.
712fn p50_latency(buckets: &HashMap<BucketId, usize>, count: usize) -> Duration {
713    histogram_quantile(buckets, count, 0.5)
714}
715
716/// Compute the P99 latency from histogram buckets using.
717fn p99_latency(buckets: &HashMap<BucketId, usize>, count: usize) -> Duration {
718    histogram_quantile(buckets, count, 0.99)
719}
720
721/// Calculate a quantile from histogram buckets using linear interpolation,
722/// matching Prometheus's histogram_quantile behavior.
723fn histogram_quantile(buckets: &HashMap<BucketId, usize>, count: usize, quantile: f64) -> Duration {
724    if count == 0 || !(0.0..=1.0).contains(&quantile) {
725        return Duration::default();
726    }
727
728    // Parse and sort buckets by boundary
729    let mut buckets: Vec<(f64, usize)> = buckets
730        .iter()
731        .filter_map(|(bucket, count)| {
732            let bound = if bucket == "inf" {
733                f64::INFINITY
734            } else {
735                bucket.parse::<f64>().ok()?
736            };
737            Some((bound, *count))
738        })
739        .collect();
740
741    if buckets.is_empty() {
742        return Duration::default();
743    }
744
745    buckets.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
746
747    // The rank we're looking for (0.5 for P50 means the middle observation)
748    let rank = quantile * count as f64;
749
750    // Handle edge cases
751    if rank < 0.0 {
752        return Duration::default();
753    }
754
755    // Find the two buckets between which the quantile falls
756    //
757    // Example: Calculate P50 (median) with 1000 total observations
758    // Buckets: [(0.5s, 400), (1.0s, 800), (2.0s, 1000)]
759    // This means: 400 observations ≤ 0.5s, 800 observations ≤ 1.0s, 1000
760    // observations ≤ 2.0s
761    //
762    // rank = 0.5 * 1000 = 500 (we want the 500th observation)
763    //
764    // Iteration 1: bound=0.5s, count=400
765    //   - 400 < 500, so P50 is not in this bucket, continue
766    //   - prev_count=400, prev_bound=0.5
767    //
768    // Iteration 2: bound=1.0s, count=800
769    //   - 800 >= 500, so P50 is in this bucket (between observations 400-800)
770    //   - Linear interpolation: fraction = (500 - 400) / (800 - 400) = 100 / 400 =
771    //     0.25 interpolated = 0.5 + (1.0 - 0.5) * 0.25 = 0.5 + 0.125 = 0.625s
772    //   - The 500th observation is estimated at 0.625s
773    let mut prev_count = 0.0;
774    let mut prev_bound = 0.0;
775
776    for (bound, count) in buckets {
777        let count_f64 = count as f64;
778
779        // If this bucket contains our quantile
780        if count_f64 >= rank {
781            // If this is the first bucket or all observations are in this bucket
782            if prev_count == 0.0 || count_f64 == prev_count {
783                return Duration::from_secs_f64(bound);
784            }
785
786            // Linear interpolation between prev_bound and bound
787            // Formula: prev_bound + (bound - prev_bound) * ((rank - prev_count) / (count -
788            // prev_count))
789            let fraction = (rank - prev_count) / (count_f64 - prev_count);
790            let interpolated = prev_bound + (bound - prev_bound) * fraction;
791
792            return Duration::from_secs_f64(interpolated);
793        }
794
795        prev_count = count_f64;
796        prev_bound = bound;
797    }
798
799    // If we get here, return the last finite bucket boundary
800    Duration::from_secs_f64(prev_bound)
801}
802
803#[cfg(test)]
804mod test {
805    use std::{collections::HashMap, time::Duration};
806
807    use super::{BenchmarkParameters, Measurement, MeasurementsCollection};
808    use crate::{
809        benchmark::{RunInterval, test::TestBenchmarkType},
810        protocol::test_protocol_metrics::TestProtocolMetrics,
811        settings::Settings,
812    };
813
814    #[test]
815    fn average_latency() {
816        let data = Measurement {
817            workload: "transfer_object".into(),
818            timestamp: Duration::from_secs(10),
819            buckets: HashMap::new(),
820            sum: Duration::from_secs(2),
821            count: 100,
822            squared_sum: Duration::from_secs(0),
823        };
824
825        assert_eq!(data.average_latency(), Duration::from_millis(20));
826    }
827
828    #[test]
829    fn stdev_latency() {
830        let data = Measurement {
831            workload: "transfer_object".into(),
832            timestamp: Duration::from_secs(10),
833            buckets: HashMap::new(),
834            sum: Duration::from_secs(50),
835            count: 100,
836            squared_sum: Duration::from_secs(75),
837        };
838
839        // squared_sum / count
840        assert_eq!(
841            data.squared_sum.checked_div(data.count as u32),
842            Some(Duration::from_secs_f64(0.75))
843        );
844        // avg^2
845        assert_eq!(data.average_latency().as_secs_f64().powf(2.0), 0.25);
846        // sqrt( squared_sum / count - avg^2 )
847        let stdev = data.stdev_latency();
848        assert_eq!((stdev.as_secs_f64() * 10.0).round(), 7.0);
849    }
850
851    #[test]
852    fn p50_latency() {
853        // Test with the example histogram from prometheus_parse test
854        // Total count: 1860, P50 should be at observation 930
855        // Buckets show: 506 at 0.5s, 1282 at 0.75s
856        // So P50 falls between 0.5s and 0.75s buckets
857        // Linear interpolation: 0.5 + (0.75 - 0.5) * ((930 - 506) / (1282 - 506))
858        //                     = 0.5 + 0.25 * (424 / 776)
859        //                     = 0.5 + 0.25 * 0.5464
860        //                     = 0.5 + 0.1366
861        //                     = 0.6366s ≈ 637ms
862        let data = Measurement {
863            workload: "transfer_object".into(),
864            timestamp: Duration::from_secs(30),
865            buckets: [
866                ("0.1".into(), 0),
867                ("0.25".into(), 0),
868                ("0.5".into(), 506),
869                ("0.75".into(), 1282),
870                ("1".into(), 1693),
871                ("1.25".into(), 1816),
872                ("1.5".into(), 1860),
873                ("inf".into(), 1860),
874            ]
875            .iter()
876            .cloned()
877            .collect(),
878            sum: Duration::from_secs(1265),
879            count: 1860,
880            squared_sum: Duration::from_secs(952),
881        };
882
883        let p50 = super::p50_latency(&data.buckets, data.count);
884        // Should be around 636-637ms
885        assert!(p50.as_millis() >= 636 && p50.as_millis() <= 637);
886    }
887
888    #[test]
889    fn aggregate_average_latency_weighted() {
890        // Test that aggregate average is properly weighted
891        let settings = Settings::new_for_test();
892        let mut aggregator = MeasurementsCollection::<TestBenchmarkType>::new(
893            &settings,
894            BenchmarkParameters::default(),
895        );
896
897        // Scraper 1: 100 transactions with 2s total = 20ms avg
898        let measurement1 = HashMap::from([(
899            "test".to_string(),
900            Measurement {
901                workload: "test".into(),
902                timestamp: Duration::from_secs(10),
903                buckets: HashMap::new(),
904                sum: Duration::from_secs(2),
905                count: 100,
906                squared_sum: Duration::from_secs(0),
907            },
908        )]);
909
910        // Scraper 2: 200 transactions with 10s total = 50ms avg
911        let measurement2 = HashMap::from([(
912            "test".to_string(),
913            Measurement {
914                workload: "test".into(),
915                timestamp: Duration::from_secs(10),
916                buckets: HashMap::new(),
917                sum: Duration::from_secs(10),
918                count: 200,
919                squared_sum: Duration::from_secs(0),
920            },
921        )]);
922
923        aggregator.add(1, measurement1);
924        aggregator.add(2, measurement2);
925
926        // Weighted average should be (2 + 10) / (100 + 200) = 12 / 300 = 0.04s = 40ms
927        let avg = aggregator.aggregate_average_latency();
928        assert_eq!(avg.as_millis(), 40);
929    }
930
931    #[test]
932    fn prometheus_parse() {
933        let report = r#"
934            # HELP benchmark_duration Duration of the benchmark
935            # TYPE benchmark_duration gauge
936            benchmark_duration 30
937            # HELP latency_s Total time in seconds to return a response
938            # TYPE latency_s histogram
939            latency_s_bucket{workload=transfer_object,le=0.1} 0
940            latency_s_bucket{workload=transfer_object,le=0.25} 0
941            latency_s_bucket{workload=transfer_object,le=0.5} 506
942            latency_s_bucket{workload=transfer_object,le=0.75} 1282
943            latency_s_bucket{workload=transfer_object,le=1} 1693
944            latency_s_bucket{workload="transfer_object",le="1.25"} 1816
945            latency_s_bucket{workload="transfer_object",le="1.5"} 1860
946            latency_s_bucket{workload="transfer_object",le="1.75"} 1860
947            latency_s_bucket{workload="transfer_object",le="2"} 1860
948            latency_s_bucket{workload=transfer_object,le=2.5} 1860
949            latency_s_bucket{workload=transfer_object,le=5} 1860
950            latency_s_bucket{workload=transfer_object,le=10} 1860
951            latency_s_bucket{workload=transfer_object,le=20} 1860
952            latency_s_bucket{workload=transfer_object,le=30} 1860
953            latency_s_bucket{workload=transfer_object,le=60} 1860
954            latency_s_bucket{workload=transfer_object,le=90} 1860
955            latency_s_bucket{workload=transfer_object,le=+Inf} 1860
956            latency_s_sum{workload=transfer_object} 1265.287933130998
957            latency_s_count{workload=transfer_object} 1860
958            # HELP latency_squared_s Square of total time in seconds to return a response
959            # TYPE latency_squared_s counter
960            latency_squared_s{workload="transfer_object"} 952.8160642745289
961        "#;
962
963        let measurement = Measurement::from_prometheus::<TestProtocolMetrics>(report);
964        let settings = Settings::new_for_test();
965        let mut aggregator = MeasurementsCollection::<TestBenchmarkType>::new(
966            &settings,
967            BenchmarkParameters::default(),
968        );
969        let scraper_id = 1;
970        aggregator.add(scraper_id, measurement);
971
972        assert_eq!(aggregator.scrapers.len(), 1);
973        let scraper_data = aggregator.scrapers.get(&scraper_id).unwrap();
974        assert_eq!(scraper_data.len(), 1); // One workload
975
976        let data_points = scraper_data.get("transfer_object").unwrap();
977        assert_eq!(data_points.len(), 1);
978
979        let data = &data_points[0];
980        assert_eq!(
981            data.buckets,
982            ([
983                ("0.1".into(), 0),
984                ("0.25".into(), 0),
985                ("0.5".into(), 506),
986                ("0.75".into(), 1282),
987                ("1".into(), 1693),
988                ("1.25".into(), 1816),
989                ("1.5".into(), 1860),
990                ("1.75".into(), 1860),
991                ("2".into(), 1860),
992                ("2.5".into(), 1860),
993                ("5".into(), 1860),
994                ("10".into(), 1860),
995                ("20".into(), 1860),
996                ("30".into(), 1860),
997                ("60".into(), 1860),
998                ("90".into(), 1860),
999                ("inf".into(), 1860)
1000            ])
1001            .iter()
1002            .cloned()
1003            .collect()
1004        );
1005        assert_eq!(data.sum.as_secs(), 1265);
1006        assert_eq!(data.count, 1860);
1007        assert_eq!(data.timestamp.as_secs(), 30);
1008        assert_eq!(data.squared_sum.as_secs(), 952);
1009    }
1010
1011    #[test]
1012    fn prometheus_parse_multi_workloads() {
1013        let report = r#"
1014            # HELP benchmark_duration Duration of the benchmark
1015            # TYPE benchmark_duration gauge
1016            benchmark_duration 30
1017            # HELP latency_s Total time in seconds to return a response
1018            # TYPE latency_s histogram
1019            latency_s_bucket{workload=transfer_object,le=0.1} 0
1020            latency_s_bucket{workload=transfer_object,le=0.25} 0
1021            latency_s_bucket{workload=transfer_object,le=0.5} 506
1022            latency_s_bucket{workload=transfer_object,le=0.75} 1282
1023            latency_s_bucket{workload=transfer_object,le=1} 1693
1024            latency_s_bucket{workload="transfer_object",le="1.25"} 1816
1025            latency_s_bucket{workload="transfer_object",le="1.5"} 1860
1026            latency_s_bucket{workload="transfer_object",le="1.75"} 1860
1027            latency_s_bucket{workload="transfer_object",le="2"} 1860
1028            latency_s_bucket{workload=transfer_object,le=2.5} 1860
1029            latency_s_bucket{workload=transfer_object,le=5} 1860
1030            latency_s_bucket{workload=transfer_object,le=10} 1860
1031            latency_s_bucket{workload=transfer_object,le=20} 1860
1032            latency_s_bucket{workload=transfer_object,le=30} 1860
1033            latency_s_bucket{workload=transfer_object,le=60} 1860
1034            latency_s_bucket{workload=transfer_object,le=90} 1860
1035            latency_s_bucket{workload=transfer_object,le=+Inf} 1860
1036            latency_s_sum{workload=transfer_object} 1265.287933130998
1037            latency_s_count{workload=transfer_object} 1860
1038            # HELP latency_squared_s Square of total time in seconds to return a response
1039            # TYPE latency_squared_s counter
1040            latency_squared_s{workload="transfer_object"} 952.8160642745289
1041            latency_s_bucket{workload=shared_counter,le=0.1} 0
1042            latency_s_bucket{workload=shared_counter,le=0.25} 1
1043            latency_s_bucket{workload=shared_counter,le=0.5} 600
1044            latency_s_bucket{workload=shared_counter,le=0.75} 1200
1045            latency_s_bucket{workload=shared_counter,le=1} 1600
1046            latency_s_bucket{workload="shared_counter",le="1.25"} 1800
1047            latency_s_bucket{workload="shared_counter",le="1.5"} 1870
1048            latency_s_bucket{workload="shared_counter",le="1.75"} 1870
1049            latency_s_bucket{workload="shared_counter",le="2"} 1870
1050            latency_s_bucket{workload=shared_counter,le=2.5} 1870
1051            latency_s_bucket{workload=shared_counter,le=5} 1870
1052            latency_s_bucket{workload=shared_counter,le=10} 1870
1053            latency_s_bucket{workload=shared_counter,le=20} 1870
1054            latency_s_bucket{workload=shared_counter,le=30} 1870
1055            latency_s_bucket{workload=shared_counter,le=60} 1870
1056            latency_s_bucket{workload=shared_counter,le=90} 1870
1057            latency_s_bucket{workload=shared_counter,le=+Inf} 1870
1058            latency_s_sum{workload=shared_counter} 865.287933130998
1059            latency_s_count{workload=shared_counter} 1870
1060            # HELP latency_squared_s Square of total time in seconds to return a response
1061            # TYPE latency_squared_s counter
1062            latency_squared_s{workload="shared_counter"} 455.8160642745289
1063        "#;
1064
1065        let measurements = Measurement::from_prometheus::<TestProtocolMetrics>(report);
1066        let settings = Settings::new_for_test();
1067        let mut aggregator = MeasurementsCollection::<TestBenchmarkType>::new(
1068            &settings,
1069            BenchmarkParameters::default(),
1070        );
1071        let scraper_id = 1;
1072
1073        aggregator.add(scraper_id, measurements);
1074
1075        assert_eq!(aggregator.scrapers.len(), 1);
1076        let scraper_data = aggregator.scrapers.get(&scraper_id).unwrap();
1077        assert_eq!(scraper_data.len(), 2); // Two workloads
1078
1079        let data_points = scraper_data.get("transfer_object").unwrap();
1080        assert_eq!(data_points.len(), 1);
1081
1082        let data = &data_points[0];
1083        assert_eq!(
1084            data.buckets,
1085            ([
1086                ("0.1".into(), 0),
1087                ("0.25".into(), 0),
1088                ("0.5".into(), 506),
1089                ("0.75".into(), 1282),
1090                ("1".into(), 1693),
1091                ("1.25".into(), 1816),
1092                ("1.5".into(), 1860),
1093                ("1.75".into(), 1860),
1094                ("2".into(), 1860),
1095                ("2.5".into(), 1860),
1096                ("5".into(), 1860),
1097                ("10".into(), 1860),
1098                ("20".into(), 1860),
1099                ("30".into(), 1860),
1100                ("60".into(), 1860),
1101                ("90".into(), 1860),
1102                ("inf".into(), 1860)
1103            ])
1104            .iter()
1105            .cloned()
1106            .collect()
1107        );
1108        assert_eq!(data.sum.as_secs(), 1265);
1109        assert_eq!(data.count, 1860);
1110        assert_eq!(data.timestamp.as_secs(), 30);
1111        assert_eq!(data.squared_sum.as_secs(), 952);
1112
1113        let data_points = scraper_data.get("shared_counter").unwrap();
1114        assert_eq!(data_points.len(), 1);
1115
1116        let data = &data_points[0];
1117        assert_eq!(
1118            data.buckets,
1119            ([
1120                ("0.1".into(), 0),
1121                ("0.25".into(), 1),
1122                ("0.5".into(), 600),
1123                ("0.75".into(), 1200),
1124                ("1".into(), 1600),
1125                ("1.25".into(), 1800),
1126                ("1.5".into(), 1870),
1127                ("1.75".into(), 1870),
1128                ("2".into(), 1870),
1129                ("2.5".into(), 1870),
1130                ("5".into(), 1870),
1131                ("10".into(), 1870),
1132                ("20".into(), 1870),
1133                ("30".into(), 1870),
1134                ("60".into(), 1870),
1135                ("90".into(), 1870),
1136                ("inf".into(), 1870)
1137            ])
1138            .iter()
1139            .cloned()
1140            .collect()
1141        );
1142        assert_eq!(data.sum.as_secs(), 865);
1143        assert_eq!(data.count, 1870);
1144        assert_eq!(data.timestamp.as_secs(), 30);
1145        assert_eq!(data.squared_sum.as_secs(), 455);
1146    }
1147
1148    #[test]
1149    #[ignore]
1150    // This test could be used to debug / test existed metrics aggregation
1151    fn debug_real_metrics_aggregation() {
1152        use std::{path::PathBuf, time::Duration};
1153
1154        // Put the path to the metrics log directory
1155        let metrics_dir = PathBuf::from("PATH/TO/YOUR/METRICS/DIR");
1156
1157        println!("\n\n========== METRICS AGGREGATION DEBUG ==========\n");
1158        println!("Reading metrics from: {}\n", metrics_dir.display());
1159
1160        let settings = Settings::new_for_test();
1161        let num_clients = 10;
1162        // Define benchmark parameters matching the real benchmark
1163        let benchmark_parameters = BenchmarkParameters {
1164            run_interval: RunInterval::Time(Duration::from_secs(180)),
1165            load: 1000,
1166            nodes: num_clients,
1167            ..Default::default()
1168        };
1169
1170        let mut aggregator =
1171            MeasurementsCollection::<TestBenchmarkType>::new(&settings, benchmark_parameters);
1172
1173        // Parse all metrics files
1174        aggregator.aggregates_metrics_from_files::<TestProtocolMetrics>(num_clients, &metrics_dir);
1175
1176        println!("========== DISPLAY SUMMARY ==========\n");
1177        aggregator.display_summary();
1178    }
1179
1180    #[test]
1181    #[ignore]
1182    // Load measurements from measurement-*.json and parse associated metrics files
1183    fn debug_metrics_from_saved_measurements() {
1184        use std::{fs, path::PathBuf};
1185
1186        use crate::IotaBenchmarkType;
1187
1188        let benchmark_dir = PathBuf::from("PATH/TO/YOUR/SAVED/MEASUREMENTS/DIR");
1189
1190        // Find and parse the measurement-*.json file to get parameters
1191        let mut aggregator: Option<MeasurementsCollection<IotaBenchmarkType>> = None;
1192        if let Ok(entries) = fs::read_dir(&benchmark_dir) {
1193            for entry in entries.filter_map(|e| e.ok()) {
1194                let path = entry.path();
1195                if let Some(filename) = path.file_name() {
1196                    let filename_str = filename.to_string_lossy();
1197
1198                    if filename_str.starts_with("measurements-") {
1199                        match MeasurementsCollection::<IotaBenchmarkType>::load(&path) {
1200                            Ok(loaded) => {
1201                                println!("Loaded parameters from: {}\n", filename_str);
1202                                aggregator = Some(loaded);
1203                                break;
1204                            }
1205                            Err(e) => {
1206                                println!("Failed to load {}: {}\n", filename_str, e);
1207                            }
1208                        }
1209                    }
1210                }
1211            }
1212        }
1213
1214        let aggregator = match aggregator {
1215            Some(agg) => agg,
1216            None => {
1217                panic!("No measurement-*.json file found or failed to load");
1218            }
1219        };
1220
1221        println!("========== DISPLAY SUMMARY ==========\n");
1222        aggregator.display_summary();
1223    }
1224}