Skip to main content

iota_metrics/
histogram.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{HashMap, HashSet, hash_map::DefaultHasher},
7    hash::{Hash, Hasher},
8    sync::Arc,
9    time::Duration,
10};
11
12use futures::FutureExt;
13use parking_lot::Mutex;
14use prometheus_filtered::{
15    IntCounterVec, IntGaugeVec, Registry, register_int_counter_vec_with_registry,
16    register_int_gauge_vec_with_registry,
17};
18use tokio::{
19    runtime::Handle,
20    sync::{mpsc, mpsc::error::TrySendError},
21    time::Instant,
22};
23use tracing::{debug, error};
24
25use crate::monitored_scope;
26
27type Point = u64;
28type HistogramMessage = (HistogramLabels, Point);
29
30/// Represents a histogram metric used for collecting and recording data
31/// distributions. The `Histogram` struct contains `labels` that categorize the
32/// histogram and a `channel` for sending `HistogramMessage` instances to record
33/// the data.
34#[derive(Clone)]
35pub struct Histogram {
36    labels: HistogramLabels,
37    channel: mpsc::Sender<HistogramMessage>,
38}
39
40/// A guard used for timing the duration of an operation and recording it in a
41/// `Histogram`. The `HistogramTimerGuard` starts a timer upon creation and,
42/// when dropped, records the elapsed time into the associated `Histogram`.
43pub struct HistogramTimerGuard<'a> {
44    histogram: &'a Histogram,
45    start: Instant,
46}
47
48/// Represents a collection of histograms for managing multiple labeled metrics.
49/// The `HistogramVec` struct allows for sending `HistogramMessage` instances
50/// via a channel to record data in a particular histogram, providing a way to
51/// track different metrics concurrently.
52#[derive(Clone)]
53pub struct HistogramVec {
54    channel: mpsc::Sender<HistogramMessage>,
55}
56
57/// Collects histogram data by receiving `HistogramMessage` instances and
58/// passing them to the `HistogramReporter`. The `HistogramCollector` manages an
59/// asynchronous channel for receiving messages and uses a `Mutex`-protected
60/// `HistogramReporter` to process and report the collected data. It also stores
61/// the name of the collector for identification.
62struct HistogramCollector {
63    reporter: Arc<Mutex<HistogramReporter>>,
64    channel: mpsc::Receiver<HistogramMessage>,
65    _name: String,
66}
67
68/// Reports histogram metrics by aggregating and processing data collected from
69/// multiple histograms. The `HistogramReporter` maintains various metrics,
70/// including a gauge (`gauge`), total sum (`sum`), and count (`count`) for
71/// tracking histogram values. It uses `known_labels` to manage label sets for
72/// data categorization, and `percentiles` to calculate specific statistical
73/// measurements for the collected data.
74struct HistogramReporter {
75    gauge: IntGaugeVec,
76    sum: IntCounterVec,
77    count: IntCounterVec,
78    known_labels: HashSet<HistogramLabels>,
79    percentiles: Vec<usize>,
80}
81
82type HistogramLabels = Arc<HistogramLabelsInner>;
83
84/// Represents the inner structure of histogram labels, containing a list of
85/// labels (`labels`) and a precomputed hash (`hash`) for efficient lookup and
86/// categorization.
87struct HistogramLabelsInner {
88    labels: Vec<String>,
89    hash: u64,
90}
91
92/// Reports the histogram to the given prometheus gauge.
93/// Unlike the histogram from prometheus crate, this histogram does not require
94/// to specify buckets It works by calculating 'true' histogram by aggregating
95/// and sorting values.
96///
97/// The values are reported into prometheus gauge with requested labels and
98/// additional dimension for the histogram percentile.
99///
100/// It worth pointing out that due to those more precise calculations, this
101/// Histogram usage is somewhat more limited comparing to original prometheus
102/// Histogram.
103///
104/// On the bright side, this histogram exports less data to Prometheus comparing
105/// to prometheus_filtered::Histogram, it exports each requested percentile into
106/// separate prometheus gauge, while original implementation creates gauge per
107/// bucket. It also exports _sum and _count aggregates same as original
108/// implementation.
109///
110/// It is ok to measure timings for things like network latencies and expensive
111/// crypto operations. However as a rule of thumb this histogram should not be
112/// used in places that can produce very high data point count.
113///
114/// As a last round of defence this histogram emits error log when too much data
115/// is flowing in and drops data points.
116///
117/// This implementation puts great deal of effort to make sure the metric does
118/// not cause any harm to the code itself:
119/// * Reporting data point is a non-blocking send to a channel
120/// * Data point collections tries to clear the channel as fast as possible
121/// * Expensive histogram calculations are done in a separate blocking tokio
122///   thread pool to avoid effects on main scheduler
123/// * If histogram data is produced too fast, the data is dropped and error! log
124///   is emitted
125impl HistogramVec {
126    pub fn new_in_registry(name: &str, desc: &str, labels: &[&str], registry: &Registry) -> Self {
127        Self::new_in_registry_with_percentiles(
128            name,
129            desc,
130            labels,
131            registry,
132            vec![500usize, 950, 990],
133        )
134    }
135
136    /// Allows to specify percentiles in 1/1000th, e.g. 90pct is specified as
137    /// 900
138    pub fn new_in_registry_with_percentiles(
139        name: &str,
140        desc: &str,
141        labels: &[&str],
142        registry: &Registry,
143        percentiles: Vec<usize>,
144    ) -> Self {
145        let sum_name = format!("{name}_sum");
146        let count_name = format!("{name}_count");
147        let sum =
148            register_int_counter_vec_with_registry!(sum_name, desc, labels, registry).unwrap();
149        let count =
150            register_int_counter_vec_with_registry!(count_name, desc, labels, registry).unwrap();
151        let labels: Vec<_> = labels.iter().cloned().chain(["pct"]).collect();
152        let gauge = register_int_gauge_vec_with_registry!(name, desc, &labels, registry).unwrap();
153        Self::new(gauge, sum, count, percentiles, name)
154    }
155
156    // Do not expose it to public interface because we need labels to have a
157    // specific format (e.g. add last label is "pct")
158    fn new(
159        gauge: IntGaugeVec,
160        sum: IntCounterVec,
161        count: IntCounterVec,
162        percentiles: Vec<usize>,
163        name: &str,
164    ) -> Self {
165        let (sender, receiver) = mpsc::channel(1000);
166        let reporter = HistogramReporter {
167            gauge,
168            sum,
169            count,
170            percentiles,
171            known_labels: Default::default(),
172        };
173        let reporter = Arc::new(Mutex::new(reporter));
174        let collector = HistogramCollector {
175            reporter,
176            channel: receiver,
177            _name: name.to_string(),
178        };
179        Handle::current().spawn(collector.run());
180        Self { channel: sender }
181    }
182
183    /// Creates a new `Histogram` with the specified label values. The function
184    /// takes a slice of label strings, converts them into a
185    /// `HistogramLabelsInner` structure, and returns a new `Histogram`
186    /// instance that shares the same data channel as the original.
187    pub fn with_label_values(&self, labels: &[&str]) -> Histogram {
188        let labels = labels.iter().map(ToString::to_string).collect();
189        let labels = HistogramLabelsInner::new(labels);
190        Histogram {
191            labels,
192            channel: self.channel.clone(),
193        }
194    }
195}
196
197impl HistogramLabelsInner {
198    pub fn new(labels: Vec<String>) -> HistogramLabels {
199        // Not a crypto hash
200        let mut hasher = DefaultHasher::new();
201        labels.hash(&mut hasher);
202        let hash = hasher.finish();
203        Arc::new(Self { labels, hash })
204    }
205}
206
207impl PartialEq for HistogramLabelsInner {
208    fn eq(&self, other: &Self) -> bool {
209        self.hash == other.hash
210    }
211}
212
213impl Eq for HistogramLabelsInner {}
214
215impl Hash for HistogramLabelsInner {
216    fn hash<H: Hasher>(&self, state: &mut H) {
217        self.hash.hash(state)
218    }
219}
220
221impl Histogram {
222    /// Creates a new `Histogram` instance in the specified `Registry` with the
223    /// given `name` and `desc`. It initializes the histogram in the
224    /// `registry`, with no labels by default.
225    pub fn new_in_registry(name: &str, desc: &str, registry: &Registry) -> Self {
226        HistogramVec::new_in_registry(name, desc, &[], registry).with_label_values(&[])
227    }
228
229    /// Observes a value in the histogram by reporting the given `Point`.
230    pub fn observe(&self, v: Point) {
231        self.report(v)
232    }
233
234    /// Reports a value (`Point`) to the histogram by sending it through the
235    /// internal channel. This method manages the process of collecting and
236    /// reporting metrics for the histogram.
237    pub fn report(&self, v: Point) {
238        match self.channel.try_send((self.labels.clone(), v)) {
239            Ok(()) => {}
240            Err(TrySendError::Closed(_)) => {
241                // can happen during runtime shutdown
242            }
243            Err(TrySendError::Full(_)) => debug!("Histogram channel is full, dropping data"),
244        }
245    }
246
247    /// Starts a timer and returns a `HistogramTimerGuard` that, when dropped,
248    /// will record the elapsed time in the associated histogram.
249    pub fn start_timer(&self) -> HistogramTimerGuard<'_> {
250        HistogramTimerGuard {
251            histogram: self,
252            start: Instant::now(),
253        }
254    }
255}
256
257impl HistogramCollector {
258    /// Runs the histogram collection process asynchronously, cycling at a
259    /// specified interval (`HISTOGRAM_WINDOW_SEC`). It calculates the next
260    /// deadline and continuously processes incoming data points. The
261    /// process stops when `cycle` returns an error, which typically
262    /// indicates that the histogram no longer exists.
263    pub async fn run(mut self) {
264        let mut deadline = Instant::now();
265        loop {
266            // We calculate deadline here instead of just using sleep inside cycle to avoid
267            // accumulating error
268            #[cfg(test)]
269            const HISTOGRAM_WINDOW_SEC: u64 = 1;
270            #[cfg(not(test))]
271            const HISTOGRAM_WINDOW_SEC: u64 = 60;
272            deadline += Duration::from_secs(HISTOGRAM_WINDOW_SEC);
273            if self.cycle(deadline).await.is_err() {
274                return;
275            }
276        }
277    }
278
279    /// Collects histogram data points until a deadline or a maximum number of
280    /// points (`MAX_POINTS`) is reached. The function collects data points
281    /// into `labeled_data` while receiving them from the channel, breaking
282    /// when either the deadline is reached or the histogram channel is closed.
283    /// If the number of data points exceeds the limit, some points are
284    /// dropped, and an error is logged. After processing, the data is
285    /// handed off to the reporter for aggregation and analysis.
286    async fn cycle(&mut self, deadline: Instant) -> Result<(), ()> {
287        let mut labeled_data: HashMap<HistogramLabels, Vec<Point>> = HashMap::new();
288        let mut count = 0usize;
289        let mut timeout = tokio::time::sleep_until(deadline).boxed();
290        const MAX_POINTS: usize = 500_000;
291        loop {
292            tokio::select! {
293                _ = &mut timeout => break,
294                point = self.channel.recv() => {
295                    count += 1;
296                    if count > MAX_POINTS {
297                        continue;
298                    }
299                    if let Some((label, point)) = point {
300                        let values = labeled_data.entry(label).or_default();
301                        values.push(point);
302                    } else {
303                        // Histogram no longer exists
304                        return Err(());
305                    }
306                },
307            }
308        }
309        if count > MAX_POINTS {
310            error!(
311                "Too many data points for histogram, dropping {} points",
312                count - MAX_POINTS
313            );
314        }
315        if Arc::strong_count(&self.reporter) != 1 {
316            #[cfg(not(debug_assertions))]
317            error!(
318                "Histogram data overflow - we receive histogram data for {} faster then can process. Some histogram data is dropped",
319                self._name
320            );
321        } else {
322            let reporter = self.reporter.clone();
323            Handle::current().spawn_blocking(move || reporter.lock().report(labeled_data));
324        }
325        Ok(())
326    }
327}
328
329impl HistogramReporter {
330    /// Reports the collected histogram data by aggregating it and updating the
331    /// corresponding metrics. It first sorts the data points and then
332    /// calculates specific percentiles as defined by `self.percentiles`.
333    /// Each calculated percentile value is set in the `IntGaugeVec`. It also
334    /// computes the total sum and count of the data points, updating the
335    /// respective metrics (`sum` and `count`). If any labels are no longer in
336    /// use, their metrics are reset to zero.
337    pub fn report(&mut self, labeled_data: HashMap<HistogramLabels, Vec<Point>>) {
338        let _scope = monitored_scope("HistogramReporter::report");
339        let mut reset_labels = self.known_labels.clone();
340        for (label, mut data) in labeled_data {
341            self.known_labels.insert(label.clone());
342            reset_labels.remove(&label);
343            assert!(!data.is_empty());
344            data.sort_unstable();
345            for pct1000 in self.percentiles.iter() {
346                let index = Self::pct1000_index(data.len(), *pct1000);
347                let point = *data.get(index).unwrap();
348                let pct_str = Self::format_pct1000(*pct1000);
349                let labels = Self::gauge_labels(&label, &pct_str);
350                let metric = self.gauge.with_label_values(&labels);
351                metric.set(point as i64);
352            }
353            let mut sum = 0u64;
354            let count = data.len() as u64;
355            for point in data {
356                sum += point;
357            }
358            let labels: Vec<_> = label.labels.iter().map(|s| &s[..]).collect();
359            self.sum.with_label_values(&labels).inc_by(sum);
360            self.count.with_label_values(&labels).inc_by(count);
361        }
362
363        for reset_label in reset_labels {
364            for pct1000 in self.percentiles.iter() {
365                let pct_str = Self::format_pct1000(*pct1000);
366                let labels = Self::gauge_labels(&reset_label, &pct_str);
367                let metric = self.gauge.with_label_values(&labels);
368                metric.set(0);
369            }
370        }
371    }
372
373    /// Constructs a vector of label values for a gauge metric. It takes a
374    /// `HistogramLabels` instance and a percentile string (`pct_str`),
375    /// returning a combined list of label values to be used for identifying
376    /// the gauge metric.
377    fn gauge_labels<'a>(label: &'a HistogramLabels, pct_str: &'a str) -> Vec<&'a str> {
378        let labels = label.labels.iter().map(|s| &s[..]).chain([pct_str]);
379        labels.collect()
380    }
381
382    /// Returns value in range [0; len)
383    fn pct1000_index(len: usize, pct1000: usize) -> usize {
384        len * pct1000 / 1000
385    }
386
387    /// Formats a given percentile value (`pct1000`) as a string by converting
388    /// it to a decimal percentage. The `pct1000` parameter is divided by 10
389    /// to represent the correct percentile value (e.g., 250 -> "25.0").
390    fn format_pct1000(pct1000: usize) -> String {
391        format!("{}", (pct1000 as f64) / 10.)
392    }
393}
394
395impl Drop for HistogramTimerGuard<'_> {
396    /// Reports the elapsed time in milliseconds to the associated histogram
397    /// when the `HistogramTimerGuard` is dropped.
398    fn drop(&mut self) {
399        self.histogram
400            .report(self.start.elapsed().as_millis() as u64);
401    }
402}
403
404#[cfg(test)]
405mod tests {
406    use prometheus_filtered::proto::MetricFamily;
407
408    use super::*;
409
410    #[test]
411    fn pct_index_test() {
412        assert_eq!(200, HistogramReporter::pct1000_index(1000, 200));
413        assert_eq!(100, HistogramReporter::pct1000_index(500, 200));
414        assert_eq!(1800, HistogramReporter::pct1000_index(2000, 900));
415        // Boundary checks
416        assert_eq!(21, HistogramReporter::pct1000_index(22, 999));
417        assert_eq!(0, HistogramReporter::pct1000_index(1, 999));
418        assert_eq!(0, HistogramReporter::pct1000_index(1, 100));
419        assert_eq!(0, HistogramReporter::pct1000_index(1, 1));
420    }
421
422    #[test]
423    fn format_pct1000_test() {
424        assert_eq!(HistogramReporter::format_pct1000(999), "99.9");
425        assert_eq!(HistogramReporter::format_pct1000(990), "99");
426        assert_eq!(HistogramReporter::format_pct1000(900), "90");
427    }
428
429    #[tokio::test]
430    async fn histogram_test() {
431        let registry = Registry::new();
432        let histogram = HistogramVec::new_in_registry_with_percentiles(
433            "test",
434            "xx",
435            &["lab"],
436            &registry,
437            vec![500, 900],
438        );
439        let a = histogram.with_label_values(&["a"]);
440        let b = histogram.with_label_values(&["b"]);
441        a.report(1);
442        a.report(2);
443        a.report(3);
444        a.report(4);
445        b.report(10);
446        b.report(20);
447        b.report(30);
448        b.report(40);
449        tokio::time::sleep(Duration::from_millis(1500)).await;
450        let gather = registry.gather();
451        let gather: HashMap<_, _> = gather
452            .into_iter()
453            .map(|f| (f.name().to_string(), f))
454            .collect();
455        let hist = gather.get("test").unwrap();
456        let sum = gather.get("test_sum").unwrap();
457        let count = gather.get("test_count").unwrap();
458        let hist = aggregate_gauge_by_label(hist);
459        let sum = aggregate_counter_by_label(sum);
460        let count = aggregate_counter_by_label(count);
461        assert_eq!(Some(3.), hist.get("::a::50").cloned());
462        assert_eq!(Some(4.), hist.get("::a::90").cloned());
463        assert_eq!(Some(30.), hist.get("::b::50").cloned());
464        assert_eq!(Some(40.), hist.get("::b::90").cloned());
465
466        assert_eq!(Some(10.), sum.get("::a").cloned());
467        assert_eq!(Some(100.), sum.get("::b").cloned());
468
469        assert_eq!(Some(4.), count.get("::a").cloned());
470        assert_eq!(Some(4.), count.get("::b").cloned());
471    }
472
473    fn aggregate_gauge_by_label(family: &MetricFamily) -> HashMap<String, f64> {
474        family
475            .get_metric()
476            .iter()
477            .map(|m| {
478                let value = m.get_gauge().value();
479                let mut key = String::new();
480                for label in m.get_label() {
481                    key.push_str("::");
482                    key.push_str(label.value());
483                }
484                (key, value)
485            })
486            .collect()
487    }
488
489    fn aggregate_counter_by_label(family: &MetricFamily) -> HashMap<String, f64> {
490        family
491            .get_metric()
492            .iter()
493            .map(|m| {
494                let value = m.get_counter().value();
495                let mut key = String::new();
496                for label in m.get_label() {
497                    key.push_str("::");
498                    key.push_str(label.value());
499                }
500                (key, value)
501            })
502            .collect()
503    }
504}