1use std::{
6 collections::{HashMap, HashSet, hash_map::DefaultHasher},
7 hash::{Hash, Hasher},
8 sync::Arc,
9 time::Duration,
10};
11
12use futures::FutureExt;
13use parking_lot::Mutex;
14use prometheus_filtered::{
15 IntCounterVec, IntGaugeVec, Registry, register_int_counter_vec_with_registry,
16 register_int_gauge_vec_with_registry,
17};
18use tokio::{
19 runtime::Handle,
20 sync::{mpsc, mpsc::error::TrySendError},
21 time::Instant,
22};
23use tracing::{debug, error};
24
25use crate::monitored_scope;
26
27type Point = u64;
28type HistogramMessage = (HistogramLabels, Point);
29
30#[derive(Clone)]
35pub struct Histogram {
36 labels: HistogramLabels,
37 channel: mpsc::Sender<HistogramMessage>,
38}
39
40pub struct HistogramTimerGuard<'a> {
44 histogram: &'a Histogram,
45 start: Instant,
46}
47
48#[derive(Clone)]
53pub struct HistogramVec {
54 channel: mpsc::Sender<HistogramMessage>,
55}
56
57struct HistogramCollector {
63 reporter: Arc<Mutex<HistogramReporter>>,
64 channel: mpsc::Receiver<HistogramMessage>,
65 _name: String,
66}
67
68struct HistogramReporter {
75 gauge: IntGaugeVec,
76 sum: IntCounterVec,
77 count: IntCounterVec,
78 known_labels: HashSet<HistogramLabels>,
79 percentiles: Vec<usize>,
80}
81
82type HistogramLabels = Arc<HistogramLabelsInner>;
83
84struct HistogramLabelsInner {
88 labels: Vec<String>,
89 hash: u64,
90}
91
92impl HistogramVec {
126 pub fn new_in_registry(name: &str, desc: &str, labels: &[&str], registry: &Registry) -> Self {
127 Self::new_in_registry_with_percentiles(
128 name,
129 desc,
130 labels,
131 registry,
132 vec![500usize, 950, 990],
133 )
134 }
135
136 pub fn new_in_registry_with_percentiles(
139 name: &str,
140 desc: &str,
141 labels: &[&str],
142 registry: &Registry,
143 percentiles: Vec<usize>,
144 ) -> Self {
145 let sum_name = format!("{name}_sum");
146 let count_name = format!("{name}_count");
147 let sum =
148 register_int_counter_vec_with_registry!(sum_name, desc, labels, registry).unwrap();
149 let count =
150 register_int_counter_vec_with_registry!(count_name, desc, labels, registry).unwrap();
151 let labels: Vec<_> = labels.iter().cloned().chain(["pct"]).collect();
152 let gauge = register_int_gauge_vec_with_registry!(name, desc, &labels, registry).unwrap();
153 Self::new(gauge, sum, count, percentiles, name)
154 }
155
156 fn new(
159 gauge: IntGaugeVec,
160 sum: IntCounterVec,
161 count: IntCounterVec,
162 percentiles: Vec<usize>,
163 name: &str,
164 ) -> Self {
165 let (sender, receiver) = mpsc::channel(1000);
166 let reporter = HistogramReporter {
167 gauge,
168 sum,
169 count,
170 percentiles,
171 known_labels: Default::default(),
172 };
173 let reporter = Arc::new(Mutex::new(reporter));
174 let collector = HistogramCollector {
175 reporter,
176 channel: receiver,
177 _name: name.to_string(),
178 };
179 Handle::current().spawn(collector.run());
180 Self { channel: sender }
181 }
182
183 pub fn with_label_values(&self, labels: &[&str]) -> Histogram {
188 let labels = labels.iter().map(ToString::to_string).collect();
189 let labels = HistogramLabelsInner::new(labels);
190 Histogram {
191 labels,
192 channel: self.channel.clone(),
193 }
194 }
195}
196
197impl HistogramLabelsInner {
198 pub fn new(labels: Vec<String>) -> HistogramLabels {
199 let mut hasher = DefaultHasher::new();
201 labels.hash(&mut hasher);
202 let hash = hasher.finish();
203 Arc::new(Self { labels, hash })
204 }
205}
206
207impl PartialEq for HistogramLabelsInner {
208 fn eq(&self, other: &Self) -> bool {
209 self.hash == other.hash
210 }
211}
212
213impl Eq for HistogramLabelsInner {}
214
215impl Hash for HistogramLabelsInner {
216 fn hash<H: Hasher>(&self, state: &mut H) {
217 self.hash.hash(state)
218 }
219}
220
221impl Histogram {
222 pub fn new_in_registry(name: &str, desc: &str, registry: &Registry) -> Self {
226 HistogramVec::new_in_registry(name, desc, &[], registry).with_label_values(&[])
227 }
228
229 pub fn observe(&self, v: Point) {
231 self.report(v)
232 }
233
234 pub fn report(&self, v: Point) {
238 match self.channel.try_send((self.labels.clone(), v)) {
239 Ok(()) => {}
240 Err(TrySendError::Closed(_)) => {
241 }
243 Err(TrySendError::Full(_)) => debug!("Histogram channel is full, dropping data"),
244 }
245 }
246
247 pub fn start_timer(&self) -> HistogramTimerGuard<'_> {
250 HistogramTimerGuard {
251 histogram: self,
252 start: Instant::now(),
253 }
254 }
255}
256
257impl HistogramCollector {
258 pub async fn run(mut self) {
264 let mut deadline = Instant::now();
265 loop {
266 #[cfg(test)]
269 const HISTOGRAM_WINDOW_SEC: u64 = 1;
270 #[cfg(not(test))]
271 const HISTOGRAM_WINDOW_SEC: u64 = 60;
272 deadline += Duration::from_secs(HISTOGRAM_WINDOW_SEC);
273 if self.cycle(deadline).await.is_err() {
274 return;
275 }
276 }
277 }
278
279 async fn cycle(&mut self, deadline: Instant) -> Result<(), ()> {
287 let mut labeled_data: HashMap<HistogramLabels, Vec<Point>> = HashMap::new();
288 let mut count = 0usize;
289 let mut timeout = tokio::time::sleep_until(deadline).boxed();
290 const MAX_POINTS: usize = 500_000;
291 loop {
292 tokio::select! {
293 _ = &mut timeout => break,
294 point = self.channel.recv() => {
295 count += 1;
296 if count > MAX_POINTS {
297 continue;
298 }
299 if let Some((label, point)) = point {
300 let values = labeled_data.entry(label).or_default();
301 values.push(point);
302 } else {
303 return Err(());
305 }
306 },
307 }
308 }
309 if count > MAX_POINTS {
310 error!(
311 "Too many data points for histogram, dropping {} points",
312 count - MAX_POINTS
313 );
314 }
315 if Arc::strong_count(&self.reporter) != 1 {
316 #[cfg(not(debug_assertions))]
317 error!(
318 "Histogram data overflow - we receive histogram data for {} faster then can process. Some histogram data is dropped",
319 self._name
320 );
321 } else {
322 let reporter = self.reporter.clone();
323 Handle::current().spawn_blocking(move || reporter.lock().report(labeled_data));
324 }
325 Ok(())
326 }
327}
328
329impl HistogramReporter {
330 pub fn report(&mut self, labeled_data: HashMap<HistogramLabels, Vec<Point>>) {
338 let _scope = monitored_scope("HistogramReporter::report");
339 let mut reset_labels = self.known_labels.clone();
340 for (label, mut data) in labeled_data {
341 self.known_labels.insert(label.clone());
342 reset_labels.remove(&label);
343 assert!(!data.is_empty());
344 data.sort_unstable();
345 for pct1000 in self.percentiles.iter() {
346 let index = Self::pct1000_index(data.len(), *pct1000);
347 let point = *data.get(index).unwrap();
348 let pct_str = Self::format_pct1000(*pct1000);
349 let labels = Self::gauge_labels(&label, &pct_str);
350 let metric = self.gauge.with_label_values(&labels);
351 metric.set(point as i64);
352 }
353 let mut sum = 0u64;
354 let count = data.len() as u64;
355 for point in data {
356 sum += point;
357 }
358 let labels: Vec<_> = label.labels.iter().map(|s| &s[..]).collect();
359 self.sum.with_label_values(&labels).inc_by(sum);
360 self.count.with_label_values(&labels).inc_by(count);
361 }
362
363 for reset_label in reset_labels {
364 for pct1000 in self.percentiles.iter() {
365 let pct_str = Self::format_pct1000(*pct1000);
366 let labels = Self::gauge_labels(&reset_label, &pct_str);
367 let metric = self.gauge.with_label_values(&labels);
368 metric.set(0);
369 }
370 }
371 }
372
373 fn gauge_labels<'a>(label: &'a HistogramLabels, pct_str: &'a str) -> Vec<&'a str> {
378 let labels = label.labels.iter().map(|s| &s[..]).chain([pct_str]);
379 labels.collect()
380 }
381
382 fn pct1000_index(len: usize, pct1000: usize) -> usize {
384 len * pct1000 / 1000
385 }
386
387 fn format_pct1000(pct1000: usize) -> String {
391 format!("{}", (pct1000 as f64) / 10.)
392 }
393}
394
395impl Drop for HistogramTimerGuard<'_> {
396 fn drop(&mut self) {
399 self.histogram
400 .report(self.start.elapsed().as_millis() as u64);
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use prometheus_filtered::proto::MetricFamily;
407
408 use super::*;
409
410 #[test]
411 fn pct_index_test() {
412 assert_eq!(200, HistogramReporter::pct1000_index(1000, 200));
413 assert_eq!(100, HistogramReporter::pct1000_index(500, 200));
414 assert_eq!(1800, HistogramReporter::pct1000_index(2000, 900));
415 assert_eq!(21, HistogramReporter::pct1000_index(22, 999));
417 assert_eq!(0, HistogramReporter::pct1000_index(1, 999));
418 assert_eq!(0, HistogramReporter::pct1000_index(1, 100));
419 assert_eq!(0, HistogramReporter::pct1000_index(1, 1));
420 }
421
422 #[test]
423 fn format_pct1000_test() {
424 assert_eq!(HistogramReporter::format_pct1000(999), "99.9");
425 assert_eq!(HistogramReporter::format_pct1000(990), "99");
426 assert_eq!(HistogramReporter::format_pct1000(900), "90");
427 }
428
429 #[tokio::test]
430 async fn histogram_test() {
431 let registry = Registry::new();
432 let histogram = HistogramVec::new_in_registry_with_percentiles(
433 "test",
434 "xx",
435 &["lab"],
436 ®istry,
437 vec![500, 900],
438 );
439 let a = histogram.with_label_values(&["a"]);
440 let b = histogram.with_label_values(&["b"]);
441 a.report(1);
442 a.report(2);
443 a.report(3);
444 a.report(4);
445 b.report(10);
446 b.report(20);
447 b.report(30);
448 b.report(40);
449 tokio::time::sleep(Duration::from_millis(1500)).await;
450 let gather = registry.gather();
451 let gather: HashMap<_, _> = gather
452 .into_iter()
453 .map(|f| (f.name().to_string(), f))
454 .collect();
455 let hist = gather.get("test").unwrap();
456 let sum = gather.get("test_sum").unwrap();
457 let count = gather.get("test_count").unwrap();
458 let hist = aggregate_gauge_by_label(hist);
459 let sum = aggregate_counter_by_label(sum);
460 let count = aggregate_counter_by_label(count);
461 assert_eq!(Some(3.), hist.get("::a::50").cloned());
462 assert_eq!(Some(4.), hist.get("::a::90").cloned());
463 assert_eq!(Some(30.), hist.get("::b::50").cloned());
464 assert_eq!(Some(40.), hist.get("::b::90").cloned());
465
466 assert_eq!(Some(10.), sum.get("::a").cloned());
467 assert_eq!(Some(100.), sum.get("::b").cloned());
468
469 assert_eq!(Some(4.), count.get("::a").cloned());
470 assert_eq!(Some(4.), count.get("::b").cloned());
471 }
472
473 fn aggregate_gauge_by_label(family: &MetricFamily) -> HashMap<String, f64> {
474 family
475 .get_metric()
476 .iter()
477 .map(|m| {
478 let value = m.get_gauge().value();
479 let mut key = String::new();
480 for label in m.get_label() {
481 key.push_str("::");
482 key.push_str(label.value());
483 }
484 (key, value)
485 })
486 .collect()
487 }
488
489 fn aggregate_counter_by_label(family: &MetricFamily) -> HashMap<String, f64> {
490 family
491 .get_metric()
492 .iter()
493 .map(|m| {
494 let value = m.get_counter().value();
495 let mut key = String::new();
496 for label in m.get_label() {
497 key.push_str("::");
498 key.push_str(label.value());
499 }
500 (key, value)
501 })
502 .collect()
503 }
504}