1use std::{
6 collections::{HashMap, HashSet, hash_map::DefaultHasher},
7 hash::{Hash, Hasher},
8 sync::Arc,
9 time::Duration,
10};
11
12use futures::FutureExt;
13use parking_lot::Mutex;
14use prometheus::{
15 IntCounterVec, IntGaugeVec, Registry, opts, register_int_counter_vec_with_registry,
16 register_int_gauge_vec_with_registry,
17};
18use tokio::{
19 runtime::Handle,
20 sync::{mpsc, mpsc::error::TrySendError},
21 time::Instant,
22};
23use tracing::{debug, error};
24
25use crate::monitored_scope;
26
27type Point = u64;
28type HistogramMessage = (HistogramLabels, Point);
29
30#[derive(Clone)]
35pub struct Histogram {
36 labels: HistogramLabels,
37 channel: mpsc::Sender<HistogramMessage>,
38}
39
40pub struct HistogramTimerGuard<'a> {
44 histogram: &'a Histogram,
45 start: Instant,
46}
47
48#[derive(Clone)]
53pub struct HistogramVec {
54 channel: mpsc::Sender<HistogramMessage>,
55}
56
57struct HistogramCollector {
63 reporter: Arc<Mutex<HistogramReporter>>,
64 channel: mpsc::Receiver<HistogramMessage>,
65 _name: String,
66}
67
68struct HistogramReporter {
75 gauge: IntGaugeVec,
76 sum: IntCounterVec,
77 count: IntCounterVec,
78 known_labels: HashSet<HistogramLabels>,
79 percentiles: Vec<usize>,
80}
81
82type HistogramLabels = Arc<HistogramLabelsInner>;
83
84struct HistogramLabelsInner {
88 labels: Vec<String>,
89 hash: u64,
90}
91
92impl HistogramVec {
125 pub fn new_in_registry(name: &str, desc: &str, labels: &[&str], registry: &Registry) -> Self {
126 Self::new_in_registry_with_percentiles(
127 name,
128 desc,
129 labels,
130 registry,
131 vec![500usize, 950, 990],
132 )
133 }
134
135 pub fn new_in_registry_with_percentiles(
138 name: &str,
139 desc: &str,
140 labels: &[&str],
141 registry: &Registry,
142 percentiles: Vec<usize>,
143 ) -> Self {
144 let sum_name = format!("{}_sum", name);
145 let count_name = format!("{}_count", name);
146 let sum =
147 register_int_counter_vec_with_registry!(sum_name, desc, labels, registry).unwrap();
148 let count =
149 register_int_counter_vec_with_registry!(count_name, desc, labels, registry).unwrap();
150 let labels: Vec<_> = labels.iter().cloned().chain(["pct"]).collect();
151 let gauge = register_int_gauge_vec_with_registry!(name, desc, &labels, registry).unwrap();
152 Self::new(gauge, sum, count, percentiles, name)
153 }
154
155 fn new(
158 gauge: IntGaugeVec,
159 sum: IntCounterVec,
160 count: IntCounterVec,
161 percentiles: Vec<usize>,
162 name: &str,
163 ) -> Self {
164 let (sender, receiver) = mpsc::channel(1000);
165 let reporter = HistogramReporter {
166 gauge,
167 sum,
168 count,
169 percentiles,
170 known_labels: Default::default(),
171 };
172 let reporter = Arc::new(Mutex::new(reporter));
173 let collector = HistogramCollector {
174 reporter,
175 channel: receiver,
176 _name: name.to_string(),
177 };
178 Handle::current().spawn(collector.run());
179 Self { channel: sender }
180 }
181
182 pub fn with_label_values(&self, labels: &[&str]) -> Histogram {
187 let labels = labels.iter().map(ToString::to_string).collect();
188 let labels = HistogramLabelsInner::new(labels);
189 Histogram {
190 labels,
191 channel: self.channel.clone(),
192 }
193 }
194
195 pub fn unregister(name: &str, desc: &str, labels: &[&str], registry: &Registry) {
201 let sum_name = format!("{}_sum", name);
202 let count_name = format!("{}_count", name);
203
204 let sum = IntCounterVec::new(opts!(sum_name, desc), labels).unwrap();
205 registry
206 .unregister(Box::new(sum))
207 .unwrap_or_else(|_| panic!("{}_sum counter is in prometheus registry", name));
208
209 let count = IntCounterVec::new(opts!(count_name, desc), labels).unwrap();
210 registry
211 .unregister(Box::new(count))
212 .unwrap_or_else(|_| panic!("{}_count counter is in prometheus registry", name));
213
214 let labels: Vec<_> = labels.iter().cloned().chain(["pct"]).collect();
215 let gauge = IntGaugeVec::new(opts!(name, desc), &labels).unwrap();
216 registry
217 .unregister(Box::new(gauge))
218 .unwrap_or_else(|_| panic!("{} gauge is in prometheus registry", name));
219 }
220}
221
222impl HistogramLabelsInner {
223 pub fn new(labels: Vec<String>) -> HistogramLabels {
224 let mut hasher = DefaultHasher::new();
226 labels.hash(&mut hasher);
227 let hash = hasher.finish();
228 Arc::new(Self { labels, hash })
229 }
230}
231
232impl PartialEq for HistogramLabelsInner {
233 fn eq(&self, other: &Self) -> bool {
234 self.hash == other.hash
235 }
236}
237
238impl Eq for HistogramLabelsInner {}
239
240impl Hash for HistogramLabelsInner {
241 fn hash<H: Hasher>(&self, state: &mut H) {
242 self.hash.hash(state)
243 }
244}
245
246impl Histogram {
247 pub fn new_in_registry(name: &str, desc: &str, registry: &Registry) -> Self {
251 HistogramVec::new_in_registry(name, desc, &[], registry).with_label_values(&[])
252 }
253
254 pub fn observe(&self, v: Point) {
256 self.report(v)
257 }
258
259 pub fn report(&self, v: Point) {
263 match self.channel.try_send((self.labels.clone(), v)) {
264 Ok(()) => {}
265 Err(TrySendError::Closed(_)) => {
266 }
268 Err(TrySendError::Full(_)) => debug!("Histogram channel is full, dropping data"),
269 }
270 }
271
272 pub fn start_timer(&self) -> HistogramTimerGuard {
275 HistogramTimerGuard {
276 histogram: self,
277 start: Instant::now(),
278 }
279 }
280}
281
282impl HistogramCollector {
283 pub async fn run(mut self) {
289 let mut deadline = Instant::now();
290 loop {
291 #[cfg(test)]
294 const HISTOGRAM_WINDOW_SEC: u64 = 1;
295 #[cfg(not(test))]
296 const HISTOGRAM_WINDOW_SEC: u64 = 60;
297 deadline += Duration::from_secs(HISTOGRAM_WINDOW_SEC);
298 if self.cycle(deadline).await.is_err() {
299 return;
300 }
301 }
302 }
303
304 async fn cycle(&mut self, deadline: Instant) -> Result<(), ()> {
312 let mut labeled_data: HashMap<HistogramLabels, Vec<Point>> = HashMap::new();
313 let mut count = 0usize;
314 let mut timeout = tokio::time::sleep_until(deadline).boxed();
315 const MAX_POINTS: usize = 500_000;
316 loop {
317 tokio::select! {
318 _ = &mut timeout => break,
319 point = self.channel.recv() => {
320 count += 1;
321 if count > MAX_POINTS {
322 continue;
323 }
324 if let Some((label, point)) = point {
325 let values = labeled_data.entry(label).or_default();
326 values.push(point);
327 } else {
328 return Err(());
330 }
331 },
332 }
333 }
334 if count > MAX_POINTS {
335 error!(
336 "Too many data points for histogram, dropping {} points",
337 count - MAX_POINTS
338 );
339 }
340 if Arc::strong_count(&self.reporter) != 1 {
341 #[cfg(not(debug_assertions))]
342 error!(
343 "Histogram data overflow - we receive histogram data for {} faster then can process. Some histogram data is dropped",
344 self._name
345 );
346 } else {
347 let reporter = self.reporter.clone();
348 Handle::current().spawn_blocking(move || reporter.lock().report(labeled_data));
349 }
350 Ok(())
351 }
352}
353
354impl HistogramReporter {
355 pub fn report(&mut self, labeled_data: HashMap<HistogramLabels, Vec<Point>>) {
363 let _scope = monitored_scope("HistogramReporter::report");
364 let mut reset_labels = self.known_labels.clone();
365 for (label, mut data) in labeled_data {
366 self.known_labels.insert(label.clone());
367 reset_labels.remove(&label);
368 assert!(!data.is_empty());
369 data.sort_unstable();
370 for pct1000 in self.percentiles.iter() {
371 let index = Self::pct1000_index(data.len(), *pct1000);
372 let point = *data.get(index).unwrap();
373 let pct_str = Self::format_pct1000(*pct1000);
374 let labels = Self::gauge_labels(&label, &pct_str);
375 let metric = self.gauge.with_label_values(&labels);
376 metric.set(point as i64);
377 }
378 let mut sum = 0u64;
379 let count = data.len() as u64;
380 for point in data {
381 sum += point;
382 }
383 let labels: Vec<_> = label.labels.iter().map(|s| &s[..]).collect();
384 self.sum.with_label_values(&labels).inc_by(sum);
385 self.count.with_label_values(&labels).inc_by(count);
386 }
387
388 for reset_label in reset_labels {
389 for pct1000 in self.percentiles.iter() {
390 let pct_str = Self::format_pct1000(*pct1000);
391 let labels = Self::gauge_labels(&reset_label, &pct_str);
392 let metric = self.gauge.with_label_values(&labels);
393 metric.set(0);
394 }
395 }
396 }
397
398 fn gauge_labels<'a>(label: &'a HistogramLabels, pct_str: &'a str) -> Vec<&'a str> {
403 let labels = label.labels.iter().map(|s| &s[..]).chain([pct_str]);
404 labels.collect()
405 }
406
407 fn pct1000_index(len: usize, pct1000: usize) -> usize {
409 len * pct1000 / 1000
410 }
411
412 fn format_pct1000(pct1000: usize) -> String {
416 format!("{}", (pct1000 as f64) / 10.)
417 }
418}
419
420impl Drop for HistogramTimerGuard<'_> {
421 fn drop(&mut self) {
424 self.histogram
425 .report(self.start.elapsed().as_millis() as u64);
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use prometheus::proto::MetricFamily;
432
433 use super::*;
434
435 #[test]
436 fn pct_index_test() {
437 assert_eq!(200, HistogramReporter::pct1000_index(1000, 200));
438 assert_eq!(100, HistogramReporter::pct1000_index(500, 200));
439 assert_eq!(1800, HistogramReporter::pct1000_index(2000, 900));
440 assert_eq!(21, HistogramReporter::pct1000_index(22, 999));
442 assert_eq!(0, HistogramReporter::pct1000_index(1, 999));
443 assert_eq!(0, HistogramReporter::pct1000_index(1, 100));
444 assert_eq!(0, HistogramReporter::pct1000_index(1, 1));
445 }
446
447 #[test]
448 fn format_pct1000_test() {
449 assert_eq!(HistogramReporter::format_pct1000(999), "99.9");
450 assert_eq!(HistogramReporter::format_pct1000(990), "99");
451 assert_eq!(HistogramReporter::format_pct1000(900), "90");
452 }
453
454 #[tokio::test]
455 async fn histogram_test() {
456 let registry = Registry::new();
457 let histogram = HistogramVec::new_in_registry_with_percentiles(
458 "test",
459 "xx",
460 &["lab"],
461 ®istry,
462 vec![500, 900],
463 );
464 let a = histogram.with_label_values(&["a"]);
465 let b = histogram.with_label_values(&["b"]);
466 a.report(1);
467 a.report(2);
468 a.report(3);
469 a.report(4);
470 b.report(10);
471 b.report(20);
472 b.report(30);
473 b.report(40);
474 tokio::time::sleep(Duration::from_millis(1500)).await;
475 let gather = registry.gather();
476 let gather: HashMap<_, _> = gather
477 .into_iter()
478 .map(|f| (f.name().to_string(), f))
479 .collect();
480 let hist = gather.get("test").unwrap();
481 let sum = gather.get("test_sum").unwrap();
482 let count = gather.get("test_count").unwrap();
483 let hist = aggregate_gauge_by_label(hist);
484 let sum = aggregate_counter_by_label(sum);
485 let count = aggregate_counter_by_label(count);
486 assert_eq!(Some(3.), hist.get("::a::50").cloned());
487 assert_eq!(Some(4.), hist.get("::a::90").cloned());
488 assert_eq!(Some(30.), hist.get("::b::50").cloned());
489 assert_eq!(Some(40.), hist.get("::b::90").cloned());
490
491 assert_eq!(Some(10.), sum.get("::a").cloned());
492 assert_eq!(Some(100.), sum.get("::b").cloned());
493
494 assert_eq!(Some(4.), count.get("::a").cloned());
495 assert_eq!(Some(4.), count.get("::b").cloned());
496 }
497
498 fn aggregate_gauge_by_label(family: &MetricFamily) -> HashMap<String, f64> {
499 family
500 .get_metric()
501 .iter()
502 .map(|m| {
503 let value = m.get_gauge().value();
504 let mut key = String::new();
505 for label in m.get_label() {
506 key.push_str("::");
507 key.push_str(label.value());
508 }
509 (key, value)
510 })
511 .collect()
512 }
513
514 fn aggregate_counter_by_label(family: &MetricFamily) -> HashMap<String, f64> {
515 family
516 .get_metric()
517 .iter()
518 .map(|m| {
519 let value = m.get_counter().value();
520 let mut key = String::new();
521 for label in m.get_label() {
522 key.push_str("::");
523 key.push_str(label.value());
524 }
525 (key, value)
526 })
527 .collect()
528 }
529}