iota_metrics/
histogram.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{HashMap, HashSet, hash_map::DefaultHasher},
7    hash::{Hash, Hasher},
8    sync::Arc,
9    time::Duration,
10};
11
12use futures::FutureExt;
13use parking_lot::Mutex;
14use prometheus::{
15    IntCounterVec, IntGaugeVec, Registry, register_int_counter_vec_with_registry,
16    register_int_gauge_vec_with_registry,
17};
18use tokio::{
19    runtime::Handle,
20    sync::{mpsc, mpsc::error::TrySendError},
21    time::Instant,
22};
23use tracing::{debug, error};
24
25use crate::monitored_scope;
26
27type Point = u64;
28type HistogramMessage = (HistogramLabels, Point);
29
30/// Represents a histogram metric used for collecting and recording data
31/// distributions. The `Histogram` struct contains `labels` that categorize the
32/// histogram and a `channel` for sending `HistogramMessage` instances to record
33/// the data.
34#[derive(Clone)]
35pub struct Histogram {
36    labels: HistogramLabels,
37    channel: mpsc::Sender<HistogramMessage>,
38}
39
40/// A guard used for timing the duration of an operation and recording it in a
41/// `Histogram`. The `HistogramTimerGuard` starts a timer upon creation and,
42/// when dropped, records the elapsed time into the associated `Histogram`.
43pub struct HistogramTimerGuard<'a> {
44    histogram: &'a Histogram,
45    start: Instant,
46}
47
48/// Represents a collection of histograms for managing multiple labeled metrics.
49/// The `HistogramVec` struct allows for sending `HistogramMessage` instances
50/// via a channel to record data in a particular histogram, providing a way to
51/// track different metrics concurrently.
52#[derive(Clone)]
53pub struct HistogramVec {
54    channel: mpsc::Sender<HistogramMessage>,
55}
56
57/// Collects histogram data by receiving `HistogramMessage` instances and
58/// passing them to the `HistogramReporter`. The `HistogramCollector` manages an
59/// asynchronous channel for receiving messages and uses a `Mutex`-protected
60/// `HistogramReporter` to process and report the collected data. It also stores
61/// the name of the collector for identification.
62struct HistogramCollector {
63    reporter: Arc<Mutex<HistogramReporter>>,
64    channel: mpsc::Receiver<HistogramMessage>,
65    _name: String,
66}
67
68/// Reports histogram metrics by aggregating and processing data collected from
69/// multiple histograms. The `HistogramReporter` maintains various metrics,
70/// including a gauge (`gauge`), total sum (`sum`), and count (`count`) for
71/// tracking histogram values. It uses `known_labels` to manage label sets for
72/// data categorization, and `percentiles` to calculate specific statistical
73/// measurements for the collected data.
74struct HistogramReporter {
75    gauge: IntGaugeVec,
76    sum: IntCounterVec,
77    count: IntCounterVec,
78    known_labels: HashSet<HistogramLabels>,
79    percentiles: Vec<usize>,
80}
81
82type HistogramLabels = Arc<HistogramLabelsInner>;
83
84/// Represents the inner structure of histogram labels, containing a list of
85/// labels (`labels`) and a precomputed hash (`hash`) for efficient lookup and
86/// categorization.
87struct HistogramLabelsInner {
88    labels: Vec<String>,
89    hash: u64,
90}
91
92/// Reports the histogram to the given prometheus gauge.
93/// Unlike the histogram from prometheus crate, this histogram does not require
94/// to specify buckets It works by calculating 'true' histogram by aggregating
95/// and sorting values.
96///
97/// The values are reported into prometheus gauge with requested labels and
98/// additional dimension for the histogram percentile.
99///
100/// It worth pointing out that due to those more precise calculations, this
101/// Histogram usage is somewhat more limited comparing to original prometheus
102/// Histogram.
103///
104/// On the bright side, this histogram exports less data to Prometheus comparing
105/// to prometheus::Histogram, it exports each requested percentile into separate
106/// prometheus gauge, while original implementation creates gauge per bucket.
107/// It also exports _sum and _count aggregates same as original implementation.
108///
109/// It is ok to measure timings for things like network latencies and expensive
110/// crypto operations. However as a rule of thumb this histogram should not be
111/// used in places that can produce very high data point count.
112///
113/// As a last round of defence this histogram emits error log when too much data
114/// is flowing in and drops data points.
115///
116/// This implementation puts great deal of effort to make sure the metric does
117/// not cause any harm to the code itself:
118/// * Reporting data point is a non-blocking send to a channel
119/// * Data point collections tries to clear the channel as fast as possible
120/// * Expensive histogram calculations are done in a separate blocking tokio
121///   thread pool to avoid effects on main scheduler
122/// * If histogram data is produced too fast, the data is dropped and error! log
123///   is emitted
124impl HistogramVec {
125    pub fn new_in_registry(name: &str, desc: &str, labels: &[&str], registry: &Registry) -> Self {
126        Self::new_in_registry_with_percentiles(
127            name,
128            desc,
129            labels,
130            registry,
131            vec![500usize, 950, 990],
132        )
133    }
134
135    /// Allows to specify percentiles in 1/1000th, e.g. 90pct is specified as
136    /// 900
137    pub fn new_in_registry_with_percentiles(
138        name: &str,
139        desc: &str,
140        labels: &[&str],
141        registry: &Registry,
142        percentiles: Vec<usize>,
143    ) -> Self {
144        let sum_name = format!("{name}_sum");
145        let count_name = format!("{name}_count");
146        let sum =
147            register_int_counter_vec_with_registry!(sum_name, desc, labels, registry).unwrap();
148        let count =
149            register_int_counter_vec_with_registry!(count_name, desc, labels, registry).unwrap();
150        let labels: Vec<_> = labels.iter().cloned().chain(["pct"]).collect();
151        let gauge = register_int_gauge_vec_with_registry!(name, desc, &labels, registry).unwrap();
152        Self::new(gauge, sum, count, percentiles, name)
153    }
154
155    // Do not expose it to public interface because we need labels to have a
156    // specific format (e.g. add last label is "pct")
157    fn new(
158        gauge: IntGaugeVec,
159        sum: IntCounterVec,
160        count: IntCounterVec,
161        percentiles: Vec<usize>,
162        name: &str,
163    ) -> Self {
164        let (sender, receiver) = mpsc::channel(1000);
165        let reporter = HistogramReporter {
166            gauge,
167            sum,
168            count,
169            percentiles,
170            known_labels: Default::default(),
171        };
172        let reporter = Arc::new(Mutex::new(reporter));
173        let collector = HistogramCollector {
174            reporter,
175            channel: receiver,
176            _name: name.to_string(),
177        };
178        Handle::current().spawn(collector.run());
179        Self { channel: sender }
180    }
181
182    /// Creates a new `Histogram` with the specified label values. The function
183    /// takes a slice of label strings, converts them into a
184    /// `HistogramLabelsInner` structure, and returns a new `Histogram`
185    /// instance that shares the same data channel as the original.
186    pub fn with_label_values(&self, labels: &[&str]) -> Histogram {
187        let labels = labels.iter().map(ToString::to_string).collect();
188        let labels = HistogramLabelsInner::new(labels);
189        Histogram {
190            labels,
191            channel: self.channel.clone(),
192        }
193    }
194}
195
196impl HistogramLabelsInner {
197    pub fn new(labels: Vec<String>) -> HistogramLabels {
198        // Not a crypto hash
199        let mut hasher = DefaultHasher::new();
200        labels.hash(&mut hasher);
201        let hash = hasher.finish();
202        Arc::new(Self { labels, hash })
203    }
204}
205
206impl PartialEq for HistogramLabelsInner {
207    fn eq(&self, other: &Self) -> bool {
208        self.hash == other.hash
209    }
210}
211
212impl Eq for HistogramLabelsInner {}
213
214impl Hash for HistogramLabelsInner {
215    fn hash<H: Hasher>(&self, state: &mut H) {
216        self.hash.hash(state)
217    }
218}
219
220impl Histogram {
221    /// Creates a new `Histogram` instance in the specified `Registry` with the
222    /// given `name` and `desc`. It initializes the histogram in the
223    /// `registry`, with no labels by default.
224    pub fn new_in_registry(name: &str, desc: &str, registry: &Registry) -> Self {
225        HistogramVec::new_in_registry(name, desc, &[], registry).with_label_values(&[])
226    }
227
228    /// Observes a value in the histogram by reporting the given `Point`.
229    pub fn observe(&self, v: Point) {
230        self.report(v)
231    }
232
233    /// Reports a value (`Point`) to the histogram by sending it through the
234    /// internal channel. This method manages the process of collecting and
235    /// reporting metrics for the histogram.
236    pub fn report(&self, v: Point) {
237        match self.channel.try_send((self.labels.clone(), v)) {
238            Ok(()) => {}
239            Err(TrySendError::Closed(_)) => {
240                // can happen during runtime shutdown
241            }
242            Err(TrySendError::Full(_)) => debug!("Histogram channel is full, dropping data"),
243        }
244    }
245
246    /// Starts a timer and returns a `HistogramTimerGuard` that, when dropped,
247    /// will record the elapsed time in the associated histogram.
248    pub fn start_timer(&self) -> HistogramTimerGuard {
249        HistogramTimerGuard {
250            histogram: self,
251            start: Instant::now(),
252        }
253    }
254}
255
256impl HistogramCollector {
257    /// Runs the histogram collection process asynchronously, cycling at a
258    /// specified interval (`HISTOGRAM_WINDOW_SEC`). It calculates the next
259    /// deadline and continuously processes incoming data points. The
260    /// process stops when `cycle` returns an error, which typically
261    /// indicates that the histogram no longer exists.
262    pub async fn run(mut self) {
263        let mut deadline = Instant::now();
264        loop {
265            // We calculate deadline here instead of just using sleep inside cycle to avoid
266            // accumulating error
267            #[cfg(test)]
268            const HISTOGRAM_WINDOW_SEC: u64 = 1;
269            #[cfg(not(test))]
270            const HISTOGRAM_WINDOW_SEC: u64 = 60;
271            deadline += Duration::from_secs(HISTOGRAM_WINDOW_SEC);
272            if self.cycle(deadline).await.is_err() {
273                return;
274            }
275        }
276    }
277
278    /// Collects histogram data points until a deadline or a maximum number of
279    /// points (`MAX_POINTS`) is reached. The function collects data points
280    /// into `labeled_data` while receiving them from the channel, breaking
281    /// when either the deadline is reached or the histogram channel is closed.
282    /// If the number of data points exceeds the limit, some points are
283    /// dropped, and an error is logged. After processing, the data is
284    /// handed off to the reporter for aggregation and analysis.
285    async fn cycle(&mut self, deadline: Instant) -> Result<(), ()> {
286        let mut labeled_data: HashMap<HistogramLabels, Vec<Point>> = HashMap::new();
287        let mut count = 0usize;
288        let mut timeout = tokio::time::sleep_until(deadline).boxed();
289        const MAX_POINTS: usize = 500_000;
290        loop {
291            tokio::select! {
292                _ = &mut timeout => break,
293                point = self.channel.recv() => {
294                    count += 1;
295                    if count > MAX_POINTS {
296                        continue;
297                    }
298                    if let Some((label, point)) = point {
299                        let values = labeled_data.entry(label).or_default();
300                        values.push(point);
301                    } else {
302                        // Histogram no longer exists
303                        return Err(());
304                    }
305                },
306            }
307        }
308        if count > MAX_POINTS {
309            error!(
310                "Too many data points for histogram, dropping {} points",
311                count - MAX_POINTS
312            );
313        }
314        if Arc::strong_count(&self.reporter) != 1 {
315            #[cfg(not(debug_assertions))]
316            error!(
317                "Histogram data overflow - we receive histogram data for {} faster then can process. Some histogram data is dropped",
318                self._name
319            );
320        } else {
321            let reporter = self.reporter.clone();
322            Handle::current().spawn_blocking(move || reporter.lock().report(labeled_data));
323        }
324        Ok(())
325    }
326}
327
328impl HistogramReporter {
329    /// Reports the collected histogram data by aggregating it and updating the
330    /// corresponding metrics. It first sorts the data points and then
331    /// calculates specific percentiles as defined by `self.percentiles`.
332    /// Each calculated percentile value is set in the `IntGaugeVec`. It also
333    /// computes the total sum and count of the data points, updating the
334    /// respective metrics (`sum` and `count`). If any labels are no longer in
335    /// use, their metrics are reset to zero.
336    pub fn report(&mut self, labeled_data: HashMap<HistogramLabels, Vec<Point>>) {
337        let _scope = monitored_scope("HistogramReporter::report");
338        let mut reset_labels = self.known_labels.clone();
339        for (label, mut data) in labeled_data {
340            self.known_labels.insert(label.clone());
341            reset_labels.remove(&label);
342            assert!(!data.is_empty());
343            data.sort_unstable();
344            for pct1000 in self.percentiles.iter() {
345                let index = Self::pct1000_index(data.len(), *pct1000);
346                let point = *data.get(index).unwrap();
347                let pct_str = Self::format_pct1000(*pct1000);
348                let labels = Self::gauge_labels(&label, &pct_str);
349                let metric = self.gauge.with_label_values(&labels);
350                metric.set(point as i64);
351            }
352            let mut sum = 0u64;
353            let count = data.len() as u64;
354            for point in data {
355                sum += point;
356            }
357            let labels: Vec<_> = label.labels.iter().map(|s| &s[..]).collect();
358            self.sum.with_label_values(&labels).inc_by(sum);
359            self.count.with_label_values(&labels).inc_by(count);
360        }
361
362        for reset_label in reset_labels {
363            for pct1000 in self.percentiles.iter() {
364                let pct_str = Self::format_pct1000(*pct1000);
365                let labels = Self::gauge_labels(&reset_label, &pct_str);
366                let metric = self.gauge.with_label_values(&labels);
367                metric.set(0);
368            }
369        }
370    }
371
372    /// Constructs a vector of label values for a gauge metric. It takes a
373    /// `HistogramLabels` instance and a percentile string (`pct_str`),
374    /// returning a combined list of label values to be used for identifying
375    /// the gauge metric.
376    fn gauge_labels<'a>(label: &'a HistogramLabels, pct_str: &'a str) -> Vec<&'a str> {
377        let labels = label.labels.iter().map(|s| &s[..]).chain([pct_str]);
378        labels.collect()
379    }
380
381    /// Returns value in range [0; len)
382    fn pct1000_index(len: usize, pct1000: usize) -> usize {
383        len * pct1000 / 1000
384    }
385
386    /// Formats a given percentile value (`pct1000`) as a string by converting
387    /// it to a decimal percentage. The `pct1000` parameter is divided by 10
388    /// to represent the correct percentile value (e.g., 250 -> "25.0").
389    fn format_pct1000(pct1000: usize) -> String {
390        format!("{}", (pct1000 as f64) / 10.)
391    }
392}
393
394impl Drop for HistogramTimerGuard<'_> {
395    /// Reports the elapsed time in milliseconds to the associated histogram
396    /// when the `HistogramTimerGuard` is dropped.
397    fn drop(&mut self) {
398        self.histogram
399            .report(self.start.elapsed().as_millis() as u64);
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use prometheus::proto::MetricFamily;
406
407    use super::*;
408
409    #[test]
410    fn pct_index_test() {
411        assert_eq!(200, HistogramReporter::pct1000_index(1000, 200));
412        assert_eq!(100, HistogramReporter::pct1000_index(500, 200));
413        assert_eq!(1800, HistogramReporter::pct1000_index(2000, 900));
414        // Boundary checks
415        assert_eq!(21, HistogramReporter::pct1000_index(22, 999));
416        assert_eq!(0, HistogramReporter::pct1000_index(1, 999));
417        assert_eq!(0, HistogramReporter::pct1000_index(1, 100));
418        assert_eq!(0, HistogramReporter::pct1000_index(1, 1));
419    }
420
421    #[test]
422    fn format_pct1000_test() {
423        assert_eq!(HistogramReporter::format_pct1000(999), "99.9");
424        assert_eq!(HistogramReporter::format_pct1000(990), "99");
425        assert_eq!(HistogramReporter::format_pct1000(900), "90");
426    }
427
428    #[tokio::test]
429    async fn histogram_test() {
430        let registry = Registry::new();
431        let histogram = HistogramVec::new_in_registry_with_percentiles(
432            "test",
433            "xx",
434            &["lab"],
435            &registry,
436            vec![500, 900],
437        );
438        let a = histogram.with_label_values(&["a"]);
439        let b = histogram.with_label_values(&["b"]);
440        a.report(1);
441        a.report(2);
442        a.report(3);
443        a.report(4);
444        b.report(10);
445        b.report(20);
446        b.report(30);
447        b.report(40);
448        tokio::time::sleep(Duration::from_millis(1500)).await;
449        let gather = registry.gather();
450        let gather: HashMap<_, _> = gather
451            .into_iter()
452            .map(|f| (f.name().to_string(), f))
453            .collect();
454        let hist = gather.get("test").unwrap();
455        let sum = gather.get("test_sum").unwrap();
456        let count = gather.get("test_count").unwrap();
457        let hist = aggregate_gauge_by_label(hist);
458        let sum = aggregate_counter_by_label(sum);
459        let count = aggregate_counter_by_label(count);
460        assert_eq!(Some(3.), hist.get("::a::50").cloned());
461        assert_eq!(Some(4.), hist.get("::a::90").cloned());
462        assert_eq!(Some(30.), hist.get("::b::50").cloned());
463        assert_eq!(Some(40.), hist.get("::b::90").cloned());
464
465        assert_eq!(Some(10.), sum.get("::a").cloned());
466        assert_eq!(Some(100.), sum.get("::b").cloned());
467
468        assert_eq!(Some(4.), count.get("::a").cloned());
469        assert_eq!(Some(4.), count.get("::b").cloned());
470    }
471
472    fn aggregate_gauge_by_label(family: &MetricFamily) -> HashMap<String, f64> {
473        family
474            .get_metric()
475            .iter()
476            .map(|m| {
477                let value = m.get_gauge().value();
478                let mut key = String::new();
479                for label in m.get_label() {
480                    key.push_str("::");
481                    key.push_str(label.value());
482                }
483                (key, value)
484            })
485            .collect()
486    }
487
488    fn aggregate_counter_by_label(family: &MetricFamily) -> HashMap<String, f64> {
489        family
490            .get_metric()
491            .iter()
492            .map(|m| {
493                let value = m.get_counter().value();
494                let mut key = String::new();
495                for label in m.get_label() {
496                    key.push_str("::");
497                    key.push_str(label.value());
498                }
499                (key, value)
500            })
501            .collect()
502    }
503}