1use std::{
6 collections::VecDeque,
7 net::TcpListener,
8 sync::{Arc, Mutex},
9 time::{SystemTime, UNIX_EPOCH},
10};
11
12use anyhow::{Result, bail};
13use axum::{Router, extract::Extension, http::StatusCode, routing::get};
14use once_cell::sync::Lazy;
15use prometheus::{
16 CounterVec, HistogramVec,
17 proto::{Metric, MetricFamily},
18 register_counter_vec, register_histogram_vec,
19};
20use tower::ServiceBuilder;
21use tower_http::{
22 LatencyUnit,
23 trace::{DefaultOnResponse, TraceLayer},
24};
25use tracing::{Level, info};
26
27use crate::var;
28
29const METRICS_ROUTE: &str = "/metrics";
30
31static RELAY_PRESSURE: Lazy<CounterVec> = Lazy::new(|| {
32 register_counter_vec!(
33 "relay_pressure",
34 "HistogramRelay's number of metric families submitted, exported, overflowed to/from the queue.",
35 &["histogram_relay"]
36 )
37 .unwrap()
38});
39static RELAY_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
40 register_histogram_vec!(
41 "relay_duration_seconds",
42 "HistogramRelay's submit/export fn latencies in seconds.",
43 &["histogram_relay"],
44 vec![
45 0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048, 0.4096, 0.8192,
46 1.0, 1.25, 1.5, 1.75, 2.0, 4.0, 8.0, 10.0, 12.5, 15.0
47 ],
48 )
49 .unwrap()
50});
51
52pub fn start_prometheus_server(listener: TcpListener) -> HistogramRelay {
57 let relay = HistogramRelay::new();
58 let app = Router::new()
59 .route(METRICS_ROUTE, get(metrics))
60 .layer(Extension(relay.clone()))
61 .layer(
62 ServiceBuilder::new().layer(
63 TraceLayer::new_for_http().on_response(
64 DefaultOnResponse::new()
65 .level(Level::INFO)
66 .latency_unit(LatencyUnit::Seconds),
67 ),
68 ),
69 );
70
71 tokio::spawn(async move {
72 listener.set_nonblocking(true).unwrap();
73 let listener = tokio::net::TcpListener::from_std(listener).unwrap();
74 axum::serve(listener, app).await.unwrap();
75 });
76 relay
77}
78
79async fn metrics(Extension(relay): Extension<HistogramRelay>) -> (StatusCode, String) {
80 let Ok(expformat) = relay.export() else {
81 return (
82 StatusCode::INTERNAL_SERVER_ERROR,
83 "unable to pop metrics from HistogramRelay".into(),
84 );
85 };
86 (StatusCode::OK, expformat)
87}
88
89struct Wrapper(i64, Vec<MetricFamily>);
90
91#[derive(Clone)]
92pub struct HistogramRelay(Arc<Mutex<VecDeque<Wrapper>>>);
93
94impl Default for HistogramRelay {
95 fn default() -> Self {
96 HistogramRelay(Arc::new(Mutex::new(VecDeque::new())))
97 }
98}
99impl HistogramRelay {
100 pub fn new() -> Self {
101 Self::default()
102 }
103 pub fn submit(&self, data: Vec<MetricFamily>) {
108 RELAY_PRESSURE.with_label_values(&["submit"]).inc();
109 let timer = RELAY_DURATION.with_label_values(&["submit"]).start_timer();
110 let timestamp_secs = SystemTime::now()
112 .duration_since(UNIX_EPOCH)
113 .unwrap()
114 .as_secs() as i64;
115 let mut queue = self
116 .0
117 .lock()
118 .expect("couldn't get mut lock on HistogramRelay");
119 queue.retain(|v| {
120 if (timestamp_secs - v.0) < var!("MAX_QUEUE_TIME_SECS", 300) {
122 return true;
123 }
124 RELAY_PRESSURE.with_label_values(&["overflow"]).inc();
125 false
126 }); let data: Vec<MetricFamily> = extract_histograms(data).collect();
130 RELAY_PRESSURE
131 .with_label_values(&["submitted"])
132 .inc_by(data.len() as f64);
133 queue.push_back(Wrapper(timestamp_secs, data));
134 timer.observe_duration();
135 }
136 pub fn export(&self) -> Result<String> {
137 RELAY_PRESSURE.with_label_values(&["export"]).inc();
138 let timer = RELAY_DURATION.with_label_values(&["export"]).start_timer();
139 let mut queue = self
142 .0
143 .lock()
144 .expect("couldn't get mut lock on HistogramRelay");
145
146 let data: Vec<Wrapper> = queue.drain(..).collect();
147 let mut histograms = vec![];
148 for mf in data {
149 histograms.extend(mf.1);
150 }
151 info!(
152 "histogram queue drained {} items; remaining count {}",
153 histograms.len(),
154 queue.len()
155 );
156
157 let encoder = prometheus::TextEncoder::new();
158 let string = match encoder.encode_to_string(&histograms) {
159 Ok(s) => s,
160 Err(error) => bail!("{error}"),
161 };
162 RELAY_PRESSURE
163 .with_label_values(&["exported"])
164 .inc_by(histograms.len() as f64);
165 timer.observe_duration();
166 Ok(string)
167 }
168}
169
170fn extract_histograms(data: Vec<MetricFamily>) -> impl Iterator<Item = MetricFamily> {
171 data.into_iter().filter_map(|mf| {
172 let metrics = mf.get_metric().iter().filter_map(|m| {
173 if !m.histogram.is_some() {
174 return None;
175 }
176 let mut v = Metric::default();
177 v.set_label(m.label.clone());
178 v.set_histogram(m.get_histogram().get_or_default().to_owned());
179 v.set_timestamp_ms(m.timestamp_ms());
180 Some(v)
181 });
182
183 let only_histograms = metrics.collect::<Vec<_>>();
184 if only_histograms.is_empty() {
185 return None;
186 }
187
188 let mut v = MetricFamily::default();
189 v.set_name(mf.name().to_owned());
190 v.set_help(mf.help().to_owned());
191 v.set_field_type(mf.get_field_type());
192 v.set_metric(only_histograms);
193 Some(v)
194 })
195}
196
197#[cfg(test)]
198mod tests {
199 use prometheus::proto;
200
201 use crate::{
202 histogram_relay::extract_histograms,
203 prom_to_mimir::tests::{
204 create_counter, create_histogram, create_labels, create_metric_counter,
205 create_metric_family, create_metric_histogram,
206 },
207 };
208
209 #[test]
210 fn filter_histograms() {
211 struct Test {
212 data: Vec<proto::MetricFamily>,
213 expected: Vec<proto::MetricFamily>,
214 }
215
216 let tests = vec![
217 Test {
218 data: vec![create_metric_family(
219 "test_counter",
220 "i'm a help message",
221 Some(proto::MetricType::GAUGE),
222 vec![create_metric_counter(
223 create_labels(vec![
224 ("host", "local-test-validator"),
225 ("network", "unittest-network"),
226 ]),
227 create_counter(2046.0),
228 )],
229 )],
230 expected: vec![],
231 },
232 Test {
233 data: vec![create_metric_family(
234 "test_histogram",
235 "i'm a help message",
236 Some(proto::MetricType::HISTOGRAM),
237 vec![create_metric_histogram(
238 create_labels(vec![
239 ("host", "local-test-validator"),
240 ("network", "unittest-network"),
241 ]),
242 create_histogram(),
243 )],
244 )],
245 expected: vec![create_metric_family(
246 "test_histogram",
247 "i'm a help message",
248 Some(proto::MetricType::HISTOGRAM),
249 vec![create_metric_histogram(
250 create_labels(vec![
251 ("host", "local-test-validator"),
252 ("network", "unittest-network"),
253 ]),
254 create_histogram(),
255 )],
256 )],
257 },
258 ];
259
260 for test in tests {
261 let extracted: Vec<proto::MetricFamily> = extract_histograms(test.data).collect();
262 assert_eq!(extracted, test.expected);
263 }
264 }
265}