1use std::{
6 collections::{HashMap, HashSet, hash_map::DefaultHasher},
7 hash::{Hash, Hasher},
8 sync::Arc,
9 time::Duration,
10};
11
12use futures::FutureExt;
13use parking_lot::Mutex;
14use prometheus::{
15 IntCounterVec, IntGaugeVec, Registry, register_int_counter_vec_with_registry,
16 register_int_gauge_vec_with_registry,
17};
18use tokio::{
19 runtime::Handle,
20 sync::{mpsc, mpsc::error::TrySendError},
21 time::Instant,
22};
23use tracing::{debug, error};
24
25use crate::monitored_scope;
26
27type Point = u64;
28type HistogramMessage = (HistogramLabels, Point);
29
30#[derive(Clone)]
35pub struct Histogram {
36 labels: HistogramLabels,
37 channel: mpsc::Sender<HistogramMessage>,
38}
39
40pub struct HistogramTimerGuard<'a> {
44 histogram: &'a Histogram,
45 start: Instant,
46}
47
48#[derive(Clone)]
53pub struct HistogramVec {
54 channel: mpsc::Sender<HistogramMessage>,
55}
56
57struct HistogramCollector {
63 reporter: Arc<Mutex<HistogramReporter>>,
64 channel: mpsc::Receiver<HistogramMessage>,
65 _name: String,
66}
67
68struct HistogramReporter {
75 gauge: IntGaugeVec,
76 sum: IntCounterVec,
77 count: IntCounterVec,
78 known_labels: HashSet<HistogramLabels>,
79 percentiles: Vec<usize>,
80}
81
82type HistogramLabels = Arc<HistogramLabelsInner>;
83
84struct HistogramLabelsInner {
88 labels: Vec<String>,
89 hash: u64,
90}
91
92impl HistogramVec {
125 pub fn new_in_registry(name: &str, desc: &str, labels: &[&str], registry: &Registry) -> Self {
126 Self::new_in_registry_with_percentiles(
127 name,
128 desc,
129 labels,
130 registry,
131 vec![500usize, 950, 990],
132 )
133 }
134
135 pub fn new_in_registry_with_percentiles(
138 name: &str,
139 desc: &str,
140 labels: &[&str],
141 registry: &Registry,
142 percentiles: Vec<usize>,
143 ) -> Self {
144 let sum_name = format!("{name}_sum");
145 let count_name = format!("{name}_count");
146 let sum =
147 register_int_counter_vec_with_registry!(sum_name, desc, labels, registry).unwrap();
148 let count =
149 register_int_counter_vec_with_registry!(count_name, desc, labels, registry).unwrap();
150 let labels: Vec<_> = labels.iter().cloned().chain(["pct"]).collect();
151 let gauge = register_int_gauge_vec_with_registry!(name, desc, &labels, registry).unwrap();
152 Self::new(gauge, sum, count, percentiles, name)
153 }
154
155 fn new(
158 gauge: IntGaugeVec,
159 sum: IntCounterVec,
160 count: IntCounterVec,
161 percentiles: Vec<usize>,
162 name: &str,
163 ) -> Self {
164 let (sender, receiver) = mpsc::channel(1000);
165 let reporter = HistogramReporter {
166 gauge,
167 sum,
168 count,
169 percentiles,
170 known_labels: Default::default(),
171 };
172 let reporter = Arc::new(Mutex::new(reporter));
173 let collector = HistogramCollector {
174 reporter,
175 channel: receiver,
176 _name: name.to_string(),
177 };
178 Handle::current().spawn(collector.run());
179 Self { channel: sender }
180 }
181
182 pub fn with_label_values(&self, labels: &[&str]) -> Histogram {
187 let labels = labels.iter().map(ToString::to_string).collect();
188 let labels = HistogramLabelsInner::new(labels);
189 Histogram {
190 labels,
191 channel: self.channel.clone(),
192 }
193 }
194}
195
196impl HistogramLabelsInner {
197 pub fn new(labels: Vec<String>) -> HistogramLabels {
198 let mut hasher = DefaultHasher::new();
200 labels.hash(&mut hasher);
201 let hash = hasher.finish();
202 Arc::new(Self { labels, hash })
203 }
204}
205
206impl PartialEq for HistogramLabelsInner {
207 fn eq(&self, other: &Self) -> bool {
208 self.hash == other.hash
209 }
210}
211
212impl Eq for HistogramLabelsInner {}
213
214impl Hash for HistogramLabelsInner {
215 fn hash<H: Hasher>(&self, state: &mut H) {
216 self.hash.hash(state)
217 }
218}
219
220impl Histogram {
221 pub fn new_in_registry(name: &str, desc: &str, registry: &Registry) -> Self {
225 HistogramVec::new_in_registry(name, desc, &[], registry).with_label_values(&[])
226 }
227
228 pub fn observe(&self, v: Point) {
230 self.report(v)
231 }
232
233 pub fn report(&self, v: Point) {
237 match self.channel.try_send((self.labels.clone(), v)) {
238 Ok(()) => {}
239 Err(TrySendError::Closed(_)) => {
240 }
242 Err(TrySendError::Full(_)) => debug!("Histogram channel is full, dropping data"),
243 }
244 }
245
246 pub fn start_timer(&self) -> HistogramTimerGuard {
249 HistogramTimerGuard {
250 histogram: self,
251 start: Instant::now(),
252 }
253 }
254}
255
256impl HistogramCollector {
257 pub async fn run(mut self) {
263 let mut deadline = Instant::now();
264 loop {
265 #[cfg(test)]
268 const HISTOGRAM_WINDOW_SEC: u64 = 1;
269 #[cfg(not(test))]
270 const HISTOGRAM_WINDOW_SEC: u64 = 60;
271 deadline += Duration::from_secs(HISTOGRAM_WINDOW_SEC);
272 if self.cycle(deadline).await.is_err() {
273 return;
274 }
275 }
276 }
277
278 async fn cycle(&mut self, deadline: Instant) -> Result<(), ()> {
286 let mut labeled_data: HashMap<HistogramLabels, Vec<Point>> = HashMap::new();
287 let mut count = 0usize;
288 let mut timeout = tokio::time::sleep_until(deadline).boxed();
289 const MAX_POINTS: usize = 500_000;
290 loop {
291 tokio::select! {
292 _ = &mut timeout => break,
293 point = self.channel.recv() => {
294 count += 1;
295 if count > MAX_POINTS {
296 continue;
297 }
298 if let Some((label, point)) = point {
299 let values = labeled_data.entry(label).or_default();
300 values.push(point);
301 } else {
302 return Err(());
304 }
305 },
306 }
307 }
308 if count > MAX_POINTS {
309 error!(
310 "Too many data points for histogram, dropping {} points",
311 count - MAX_POINTS
312 );
313 }
314 if Arc::strong_count(&self.reporter) != 1 {
315 #[cfg(not(debug_assertions))]
316 error!(
317 "Histogram data overflow - we receive histogram data for {} faster then can process. Some histogram data is dropped",
318 self._name
319 );
320 } else {
321 let reporter = self.reporter.clone();
322 Handle::current().spawn_blocking(move || reporter.lock().report(labeled_data));
323 }
324 Ok(())
325 }
326}
327
328impl HistogramReporter {
329 pub fn report(&mut self, labeled_data: HashMap<HistogramLabels, Vec<Point>>) {
337 let _scope = monitored_scope("HistogramReporter::report");
338 let mut reset_labels = self.known_labels.clone();
339 for (label, mut data) in labeled_data {
340 self.known_labels.insert(label.clone());
341 reset_labels.remove(&label);
342 assert!(!data.is_empty());
343 data.sort_unstable();
344 for pct1000 in self.percentiles.iter() {
345 let index = Self::pct1000_index(data.len(), *pct1000);
346 let point = *data.get(index).unwrap();
347 let pct_str = Self::format_pct1000(*pct1000);
348 let labels = Self::gauge_labels(&label, &pct_str);
349 let metric = self.gauge.with_label_values(&labels);
350 metric.set(point as i64);
351 }
352 let mut sum = 0u64;
353 let count = data.len() as u64;
354 for point in data {
355 sum += point;
356 }
357 let labels: Vec<_> = label.labels.iter().map(|s| &s[..]).collect();
358 self.sum.with_label_values(&labels).inc_by(sum);
359 self.count.with_label_values(&labels).inc_by(count);
360 }
361
362 for reset_label in reset_labels {
363 for pct1000 in self.percentiles.iter() {
364 let pct_str = Self::format_pct1000(*pct1000);
365 let labels = Self::gauge_labels(&reset_label, &pct_str);
366 let metric = self.gauge.with_label_values(&labels);
367 metric.set(0);
368 }
369 }
370 }
371
372 fn gauge_labels<'a>(label: &'a HistogramLabels, pct_str: &'a str) -> Vec<&'a str> {
377 let labels = label.labels.iter().map(|s| &s[..]).chain([pct_str]);
378 labels.collect()
379 }
380
381 fn pct1000_index(len: usize, pct1000: usize) -> usize {
383 len * pct1000 / 1000
384 }
385
386 fn format_pct1000(pct1000: usize) -> String {
390 format!("{}", (pct1000 as f64) / 10.)
391 }
392}
393
394impl Drop for HistogramTimerGuard<'_> {
395 fn drop(&mut self) {
398 self.histogram
399 .report(self.start.elapsed().as_millis() as u64);
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use prometheus::proto::MetricFamily;
406
407 use super::*;
408
409 #[test]
410 fn pct_index_test() {
411 assert_eq!(200, HistogramReporter::pct1000_index(1000, 200));
412 assert_eq!(100, HistogramReporter::pct1000_index(500, 200));
413 assert_eq!(1800, HistogramReporter::pct1000_index(2000, 900));
414 assert_eq!(21, HistogramReporter::pct1000_index(22, 999));
416 assert_eq!(0, HistogramReporter::pct1000_index(1, 999));
417 assert_eq!(0, HistogramReporter::pct1000_index(1, 100));
418 assert_eq!(0, HistogramReporter::pct1000_index(1, 1));
419 }
420
421 #[test]
422 fn format_pct1000_test() {
423 assert_eq!(HistogramReporter::format_pct1000(999), "99.9");
424 assert_eq!(HistogramReporter::format_pct1000(990), "99");
425 assert_eq!(HistogramReporter::format_pct1000(900), "90");
426 }
427
428 #[tokio::test]
429 async fn histogram_test() {
430 let registry = Registry::new();
431 let histogram = HistogramVec::new_in_registry_with_percentiles(
432 "test",
433 "xx",
434 &["lab"],
435 ®istry,
436 vec![500, 900],
437 );
438 let a = histogram.with_label_values(&["a"]);
439 let b = histogram.with_label_values(&["b"]);
440 a.report(1);
441 a.report(2);
442 a.report(3);
443 a.report(4);
444 b.report(10);
445 b.report(20);
446 b.report(30);
447 b.report(40);
448 tokio::time::sleep(Duration::from_millis(1500)).await;
449 let gather = registry.gather();
450 let gather: HashMap<_, _> = gather
451 .into_iter()
452 .map(|f| (f.name().to_string(), f))
453 .collect();
454 let hist = gather.get("test").unwrap();
455 let sum = gather.get("test_sum").unwrap();
456 let count = gather.get("test_count").unwrap();
457 let hist = aggregate_gauge_by_label(hist);
458 let sum = aggregate_counter_by_label(sum);
459 let count = aggregate_counter_by_label(count);
460 assert_eq!(Some(3.), hist.get("::a::50").cloned());
461 assert_eq!(Some(4.), hist.get("::a::90").cloned());
462 assert_eq!(Some(30.), hist.get("::b::50").cloned());
463 assert_eq!(Some(40.), hist.get("::b::90").cloned());
464
465 assert_eq!(Some(10.), sum.get("::a").cloned());
466 assert_eq!(Some(100.), sum.get("::b").cloned());
467
468 assert_eq!(Some(4.), count.get("::a").cloned());
469 assert_eq!(Some(4.), count.get("::b").cloned());
470 }
471
472 fn aggregate_gauge_by_label(family: &MetricFamily) -> HashMap<String, f64> {
473 family
474 .get_metric()
475 .iter()
476 .map(|m| {
477 let value = m.get_gauge().value();
478 let mut key = String::new();
479 for label in m.get_label() {
480 key.push_str("::");
481 key.push_str(label.value());
482 }
483 (key, value)
484 })
485 .collect()
486 }
487
488 fn aggregate_counter_by_label(family: &MetricFamily) -> HashMap<String, f64> {
489 family
490 .get_metric()
491 .iter()
492 .map(|m| {
493 let value = m.get_counter().value();
494 let mut key = String::new();
495 for label in m.get_label() {
496 key.push_str("::");
497 key.push_str(label.value());
498 }
499 (key, value)
500 })
501 .collect()
502 }
503}