iota_metrics/
histogram.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{HashMap, HashSet, hash_map::DefaultHasher},
7    hash::{Hash, Hasher},
8    sync::Arc,
9    time::Duration,
10};
11
12use futures::FutureExt;
13use parking_lot::Mutex;
14use prometheus::{
15    IntCounterVec, IntGaugeVec, Registry, opts, register_int_counter_vec_with_registry,
16    register_int_gauge_vec_with_registry,
17};
18use tokio::{
19    runtime::Handle,
20    sync::{mpsc, mpsc::error::TrySendError},
21    time::Instant,
22};
23use tracing::{debug, error};
24
25use crate::monitored_scope;
26
27type Point = u64;
28type HistogramMessage = (HistogramLabels, Point);
29
30/// Represents a histogram metric used for collecting and recording data
31/// distributions. The `Histogram` struct contains `labels` that categorize the
32/// histogram and a `channel` for sending `HistogramMessage` instances to record
33/// the data.
34#[derive(Clone)]
35pub struct Histogram {
36    labels: HistogramLabels,
37    channel: mpsc::Sender<HistogramMessage>,
38}
39
40/// A guard used for timing the duration of an operation and recording it in a
41/// `Histogram`. The `HistogramTimerGuard` starts a timer upon creation and,
42/// when dropped, records the elapsed time into the associated `Histogram`.
43pub struct HistogramTimerGuard<'a> {
44    histogram: &'a Histogram,
45    start: Instant,
46}
47
48/// Represents a collection of histograms for managing multiple labeled metrics.
49/// The `HistogramVec` struct allows for sending `HistogramMessage` instances
50/// via a channel to record data in a particular histogram, providing a way to
51/// track different metrics concurrently.
52#[derive(Clone)]
53pub struct HistogramVec {
54    channel: mpsc::Sender<HistogramMessage>,
55}
56
57/// Collects histogram data by receiving `HistogramMessage` instances and
58/// passing them to the `HistogramReporter`. The `HistogramCollector` manages an
59/// asynchronous channel for receiving messages and uses a `Mutex`-protected
60/// `HistogramReporter` to process and report the collected data. It also stores
61/// the name of the collector for identification.
62struct HistogramCollector {
63    reporter: Arc<Mutex<HistogramReporter>>,
64    channel: mpsc::Receiver<HistogramMessage>,
65    _name: String,
66}
67
68/// Reports histogram metrics by aggregating and processing data collected from
69/// multiple histograms. The `HistogramReporter` maintains various metrics,
70/// including a gauge (`gauge`), total sum (`sum`), and count (`count`) for
71/// tracking histogram values. It uses `known_labels` to manage label sets for
72/// data categorization, and `percentiles` to calculate specific statistical
73/// measurements for the collected data.
74struct HistogramReporter {
75    gauge: IntGaugeVec,
76    sum: IntCounterVec,
77    count: IntCounterVec,
78    known_labels: HashSet<HistogramLabels>,
79    percentiles: Vec<usize>,
80}
81
82type HistogramLabels = Arc<HistogramLabelsInner>;
83
84/// Represents the inner structure of histogram labels, containing a list of
85/// labels (`labels`) and a precomputed hash (`hash`) for efficient lookup and
86/// categorization.
87struct HistogramLabelsInner {
88    labels: Vec<String>,
89    hash: u64,
90}
91
92/// Reports the histogram to the given prometheus gauge.
93/// Unlike the histogram from prometheus crate, this histogram does not require
94/// to specify buckets It works by calculating 'true' histogram by aggregating
95/// and sorting values.
96///
97/// The values are reported into prometheus gauge with requested labels and
98/// additional dimension for the histogram percentile.
99///
100/// It worth pointing out that due to those more precise calculations, this
101/// Histogram usage is somewhat more limited comparing to original prometheus
102/// Histogram.
103///
104/// On the bright side, this histogram exports less data to Prometheus comparing
105/// to prometheus::Histogram, it exports each requested percentile into separate
106/// prometheus gauge, while original implementation creates gauge per bucket.
107/// It also exports _sum and _count aggregates same as original implementation.
108///
109/// It is ok to measure timings for things like network latencies and expensive
110/// crypto operations. However as a rule of thumb this histogram should not be
111/// used in places that can produce very high data point count.
112///
113/// As a last round of defence this histogram emits error log when too much data
114/// is flowing in and drops data points.
115///
116/// This implementation puts great deal of effort to make sure the metric does
117/// not cause any harm to the code itself:
118/// * Reporting data point is a non-blocking send to a channel
119/// * Data point collections tries to clear the channel as fast as possible
120/// * Expensive histogram calculations are done in a separate blocking tokio
121///   thread pool to avoid effects on main scheduler
122/// * If histogram data is produced too fast, the data is dropped and error! log
123///   is emitted
124impl HistogramVec {
125    pub fn new_in_registry(name: &str, desc: &str, labels: &[&str], registry: &Registry) -> Self {
126        Self::new_in_registry_with_percentiles(
127            name,
128            desc,
129            labels,
130            registry,
131            vec![500usize, 950, 990],
132        )
133    }
134
135    /// Allows to specify percentiles in 1/1000th, e.g. 90pct is specified as
136    /// 900
137    pub fn new_in_registry_with_percentiles(
138        name: &str,
139        desc: &str,
140        labels: &[&str],
141        registry: &Registry,
142        percentiles: Vec<usize>,
143    ) -> Self {
144        let sum_name = format!("{}_sum", name);
145        let count_name = format!("{}_count", name);
146        let sum =
147            register_int_counter_vec_with_registry!(sum_name, desc, labels, registry).unwrap();
148        let count =
149            register_int_counter_vec_with_registry!(count_name, desc, labels, registry).unwrap();
150        let labels: Vec<_> = labels.iter().cloned().chain(["pct"]).collect();
151        let gauge = register_int_gauge_vec_with_registry!(name, desc, &labels, registry).unwrap();
152        Self::new(gauge, sum, count, percentiles, name)
153    }
154
155    // Do not expose it to public interface because we need labels to have a
156    // specific format (e.g. add last label is "pct")
157    fn new(
158        gauge: IntGaugeVec,
159        sum: IntCounterVec,
160        count: IntCounterVec,
161        percentiles: Vec<usize>,
162        name: &str,
163    ) -> Self {
164        let (sender, receiver) = mpsc::channel(1000);
165        let reporter = HistogramReporter {
166            gauge,
167            sum,
168            count,
169            percentiles,
170            known_labels: Default::default(),
171        };
172        let reporter = Arc::new(Mutex::new(reporter));
173        let collector = HistogramCollector {
174            reporter,
175            channel: receiver,
176            _name: name.to_string(),
177        };
178        Handle::current().spawn(collector.run());
179        Self { channel: sender }
180    }
181
182    /// Creates a new `Histogram` with the specified label values. The function
183    /// takes a slice of label strings, converts them into a
184    /// `HistogramLabelsInner` structure, and returns a new `Histogram`
185    /// instance that shares the same data channel as the original.
186    pub fn with_label_values(&self, labels: &[&str]) -> Histogram {
187        let labels = labels.iter().map(ToString::to_string).collect();
188        let labels = HistogramLabelsInner::new(labels);
189        Histogram {
190            labels,
191            channel: self.channel.clone(),
192        }
193    }
194
195    // HistogramVec uses asynchronous model to report metrics which makes
196    // it difficult to unregister counters in the usual manner. Here we
197    // re-create counters so that their `desc()`s match the ones created by
198    // HistogramVec and remove them from the registry. Counters can be safely
199    // unregistered even if they are still in use.
200    pub fn unregister(name: &str, desc: &str, labels: &[&str], registry: &Registry) {
201        let sum_name = format!("{}_sum", name);
202        let count_name = format!("{}_count", name);
203
204        let sum = IntCounterVec::new(opts!(sum_name, desc), labels).unwrap();
205        registry
206            .unregister(Box::new(sum))
207            .unwrap_or_else(|_| panic!("{}_sum counter is in prometheus registry", name));
208
209        let count = IntCounterVec::new(opts!(count_name, desc), labels).unwrap();
210        registry
211            .unregister(Box::new(count))
212            .unwrap_or_else(|_| panic!("{}_count counter is in prometheus registry", name));
213
214        let labels: Vec<_> = labels.iter().cloned().chain(["pct"]).collect();
215        let gauge = IntGaugeVec::new(opts!(name, desc), &labels).unwrap();
216        registry
217            .unregister(Box::new(gauge))
218            .unwrap_or_else(|_| panic!("{} gauge is in prometheus registry", name));
219    }
220}
221
222impl HistogramLabelsInner {
223    pub fn new(labels: Vec<String>) -> HistogramLabels {
224        // Not a crypto hash
225        let mut hasher = DefaultHasher::new();
226        labels.hash(&mut hasher);
227        let hash = hasher.finish();
228        Arc::new(Self { labels, hash })
229    }
230}
231
232impl PartialEq for HistogramLabelsInner {
233    fn eq(&self, other: &Self) -> bool {
234        self.hash == other.hash
235    }
236}
237
238impl Eq for HistogramLabelsInner {}
239
240impl Hash for HistogramLabelsInner {
241    fn hash<H: Hasher>(&self, state: &mut H) {
242        self.hash.hash(state)
243    }
244}
245
246impl Histogram {
247    /// Creates a new `Histogram` instance in the specified `Registry` with the
248    /// given `name` and `desc`. It initializes the histogram in the
249    /// `registry`, with no labels by default.
250    pub fn new_in_registry(name: &str, desc: &str, registry: &Registry) -> Self {
251        HistogramVec::new_in_registry(name, desc, &[], registry).with_label_values(&[])
252    }
253
254    /// Observes a value in the histogram by reporting the given `Point`.
255    pub fn observe(&self, v: Point) {
256        self.report(v)
257    }
258
259    /// Reports a value (`Point`) to the histogram by sending it through the
260    /// internal channel. This method manages the process of collecting and
261    /// reporting metrics for the histogram.
262    pub fn report(&self, v: Point) {
263        match self.channel.try_send((self.labels.clone(), v)) {
264            Ok(()) => {}
265            Err(TrySendError::Closed(_)) => {
266                // can happen during runtime shutdown
267            }
268            Err(TrySendError::Full(_)) => debug!("Histogram channel is full, dropping data"),
269        }
270    }
271
272    /// Starts a timer and returns a `HistogramTimerGuard` that, when dropped,
273    /// will record the elapsed time in the associated histogram.
274    pub fn start_timer(&self) -> HistogramTimerGuard {
275        HistogramTimerGuard {
276            histogram: self,
277            start: Instant::now(),
278        }
279    }
280}
281
282impl HistogramCollector {
283    /// Runs the histogram collection process asynchronously, cycling at a
284    /// specified interval (`HISTOGRAM_WINDOW_SEC`). It calculates the next
285    /// deadline and continuously processes incoming data points. The
286    /// process stops when `cycle` returns an error, which typically
287    /// indicates that the histogram no longer exists.
288    pub async fn run(mut self) {
289        let mut deadline = Instant::now();
290        loop {
291            // We calculate deadline here instead of just using sleep inside cycle to avoid
292            // accumulating error
293            #[cfg(test)]
294            const HISTOGRAM_WINDOW_SEC: u64 = 1;
295            #[cfg(not(test))]
296            const HISTOGRAM_WINDOW_SEC: u64 = 60;
297            deadline += Duration::from_secs(HISTOGRAM_WINDOW_SEC);
298            if self.cycle(deadline).await.is_err() {
299                return;
300            }
301        }
302    }
303
304    /// Collects histogram data points until a deadline or a maximum number of
305    /// points (`MAX_POINTS`) is reached. The function collects data points
306    /// into `labeled_data` while receiving them from the channel, breaking
307    /// when either the deadline is reached or the histogram channel is closed.
308    /// If the number of data points exceeds the limit, some points are
309    /// dropped, and an error is logged. After processing, the data is
310    /// handed off to the reporter for aggregation and analysis.
311    async fn cycle(&mut self, deadline: Instant) -> Result<(), ()> {
312        let mut labeled_data: HashMap<HistogramLabels, Vec<Point>> = HashMap::new();
313        let mut count = 0usize;
314        let mut timeout = tokio::time::sleep_until(deadline).boxed();
315        const MAX_POINTS: usize = 500_000;
316        loop {
317            tokio::select! {
318                _ = &mut timeout => break,
319                point = self.channel.recv() => {
320                    count += 1;
321                    if count > MAX_POINTS {
322                        continue;
323                    }
324                    if let Some((label, point)) = point {
325                        let values = labeled_data.entry(label).or_default();
326                        values.push(point);
327                    } else {
328                        // Histogram no longer exists
329                        return Err(());
330                    }
331                },
332            }
333        }
334        if count > MAX_POINTS {
335            error!(
336                "Too many data points for histogram, dropping {} points",
337                count - MAX_POINTS
338            );
339        }
340        if Arc::strong_count(&self.reporter) != 1 {
341            #[cfg(not(debug_assertions))]
342            error!(
343                "Histogram data overflow - we receive histogram data for {} faster then can process. Some histogram data is dropped",
344                self._name
345            );
346        } else {
347            let reporter = self.reporter.clone();
348            Handle::current().spawn_blocking(move || reporter.lock().report(labeled_data));
349        }
350        Ok(())
351    }
352}
353
354impl HistogramReporter {
355    /// Reports the collected histogram data by aggregating it and updating the
356    /// corresponding metrics. It first sorts the data points and then
357    /// calculates specific percentiles as defined by `self.percentiles`.
358    /// Each calculated percentile value is set in the `IntGaugeVec`. It also
359    /// computes the total sum and count of the data points, updating the
360    /// respective metrics (`sum` and `count`). If any labels are no longer in
361    /// use, their metrics are reset to zero.
362    pub fn report(&mut self, labeled_data: HashMap<HistogramLabels, Vec<Point>>) {
363        let _scope = monitored_scope("HistogramReporter::report");
364        let mut reset_labels = self.known_labels.clone();
365        for (label, mut data) in labeled_data {
366            self.known_labels.insert(label.clone());
367            reset_labels.remove(&label);
368            assert!(!data.is_empty());
369            data.sort_unstable();
370            for pct1000 in self.percentiles.iter() {
371                let index = Self::pct1000_index(data.len(), *pct1000);
372                let point = *data.get(index).unwrap();
373                let pct_str = Self::format_pct1000(*pct1000);
374                let labels = Self::gauge_labels(&label, &pct_str);
375                let metric = self.gauge.with_label_values(&labels);
376                metric.set(point as i64);
377            }
378            let mut sum = 0u64;
379            let count = data.len() as u64;
380            for point in data {
381                sum += point;
382            }
383            let labels: Vec<_> = label.labels.iter().map(|s| &s[..]).collect();
384            self.sum.with_label_values(&labels).inc_by(sum);
385            self.count.with_label_values(&labels).inc_by(count);
386        }
387
388        for reset_label in reset_labels {
389            for pct1000 in self.percentiles.iter() {
390                let pct_str = Self::format_pct1000(*pct1000);
391                let labels = Self::gauge_labels(&reset_label, &pct_str);
392                let metric = self.gauge.with_label_values(&labels);
393                metric.set(0);
394            }
395        }
396    }
397
398    /// Constructs a vector of label values for a gauge metric. It takes a
399    /// `HistogramLabels` instance and a percentile string (`pct_str`),
400    /// returning a combined list of label values to be used for identifying
401    /// the gauge metric.
402    fn gauge_labels<'a>(label: &'a HistogramLabels, pct_str: &'a str) -> Vec<&'a str> {
403        let labels = label.labels.iter().map(|s| &s[..]).chain([pct_str]);
404        labels.collect()
405    }
406
407    /// Returns value in range [0; len)
408    fn pct1000_index(len: usize, pct1000: usize) -> usize {
409        len * pct1000 / 1000
410    }
411
412    /// Formats a given percentile value (`pct1000`) as a string by converting
413    /// it to a decimal percentage. The `pct1000` parameter is divided by 10
414    /// to represent the correct percentile value (e.g., 250 -> "25.0").
415    fn format_pct1000(pct1000: usize) -> String {
416        format!("{}", (pct1000 as f64) / 10.)
417    }
418}
419
420impl Drop for HistogramTimerGuard<'_> {
421    /// Reports the elapsed time in milliseconds to the associated histogram
422    /// when the `HistogramTimerGuard` is dropped.
423    fn drop(&mut self) {
424        self.histogram
425            .report(self.start.elapsed().as_millis() as u64);
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use prometheus::proto::MetricFamily;
432
433    use super::*;
434
435    #[test]
436    fn pct_index_test() {
437        assert_eq!(200, HistogramReporter::pct1000_index(1000, 200));
438        assert_eq!(100, HistogramReporter::pct1000_index(500, 200));
439        assert_eq!(1800, HistogramReporter::pct1000_index(2000, 900));
440        // Boundary checks
441        assert_eq!(21, HistogramReporter::pct1000_index(22, 999));
442        assert_eq!(0, HistogramReporter::pct1000_index(1, 999));
443        assert_eq!(0, HistogramReporter::pct1000_index(1, 100));
444        assert_eq!(0, HistogramReporter::pct1000_index(1, 1));
445    }
446
447    #[test]
448    fn format_pct1000_test() {
449        assert_eq!(HistogramReporter::format_pct1000(999), "99.9");
450        assert_eq!(HistogramReporter::format_pct1000(990), "99");
451        assert_eq!(HistogramReporter::format_pct1000(900), "90");
452    }
453
454    #[tokio::test]
455    async fn histogram_test() {
456        let registry = Registry::new();
457        let histogram = HistogramVec::new_in_registry_with_percentiles(
458            "test",
459            "xx",
460            &["lab"],
461            &registry,
462            vec![500, 900],
463        );
464        let a = histogram.with_label_values(&["a"]);
465        let b = histogram.with_label_values(&["b"]);
466        a.report(1);
467        a.report(2);
468        a.report(3);
469        a.report(4);
470        b.report(10);
471        b.report(20);
472        b.report(30);
473        b.report(40);
474        tokio::time::sleep(Duration::from_millis(1500)).await;
475        let gather = registry.gather();
476        let gather: HashMap<_, _> = gather
477            .into_iter()
478            .map(|f| (f.name().to_string(), f))
479            .collect();
480        let hist = gather.get("test").unwrap();
481        let sum = gather.get("test_sum").unwrap();
482        let count = gather.get("test_count").unwrap();
483        let hist = aggregate_gauge_by_label(hist);
484        let sum = aggregate_counter_by_label(sum);
485        let count = aggregate_counter_by_label(count);
486        assert_eq!(Some(3.), hist.get("::a::50").cloned());
487        assert_eq!(Some(4.), hist.get("::a::90").cloned());
488        assert_eq!(Some(30.), hist.get("::b::50").cloned());
489        assert_eq!(Some(40.), hist.get("::b::90").cloned());
490
491        assert_eq!(Some(10.), sum.get("::a").cloned());
492        assert_eq!(Some(100.), sum.get("::b").cloned());
493
494        assert_eq!(Some(4.), count.get("::a").cloned());
495        assert_eq!(Some(4.), count.get("::b").cloned());
496    }
497
498    fn aggregate_gauge_by_label(family: &MetricFamily) -> HashMap<String, f64> {
499        family
500            .get_metric()
501            .iter()
502            .map(|m| {
503                let value = m.get_gauge().value();
504                let mut key = String::new();
505                for label in m.get_label() {
506                    key.push_str("::");
507                    key.push_str(label.value());
508                }
509                (key, value)
510            })
511            .collect()
512    }
513
514    fn aggregate_counter_by_label(family: &MetricFamily) -> HashMap<String, f64> {
515        family
516            .get_metric()
517            .iter()
518            .map(|m| {
519                let value = m.get_counter().value();
520                let mut key = String::new();
521                for label in m.get_label() {
522                    key.push_str("::");
523                    key.push_str(label.value());
524                }
525                (key, value)
526            })
527            .collect()
528    }
529}