iota_proxy/
histogram_relay.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::VecDeque,
7    net::TcpListener,
8    sync::{Arc, Mutex},
9    time::{SystemTime, UNIX_EPOCH},
10};
11
12use anyhow::{Result, bail};
13use axum::{Router, extract::Extension, http::StatusCode, routing::get};
14use once_cell::sync::Lazy;
15use prometheus::{
16    CounterVec, HistogramVec,
17    proto::{Metric, MetricFamily},
18    register_counter_vec, register_histogram_vec,
19};
20use tower::ServiceBuilder;
21use tower_http::{
22    LatencyUnit,
23    trace::{DefaultOnResponse, TraceLayer},
24};
25use tracing::{Level, info};
26
27use crate::var;
28
29const METRICS_ROUTE: &str = "/metrics";
30
31static RELAY_PRESSURE: Lazy<CounterVec> = Lazy::new(|| {
32    register_counter_vec!(
33        "relay_pressure",
34        "HistogramRelay's number of metric families submitted, exported, overflowed to/from the queue.",
35        &["histogram_relay"]
36    )
37    .unwrap()
38});
39static RELAY_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
40    register_histogram_vec!(
41        "relay_duration_seconds",
42        "HistogramRelay's submit/export fn latencies in seconds.",
43        &["histogram_relay"],
44        vec![
45            0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048, 0.4096, 0.8192,
46            1.0, 1.25, 1.5, 1.75, 2.0, 4.0, 8.0, 10.0, 12.5, 15.0
47        ],
48    )
49    .unwrap()
50});
51
52// Creates a new http server that has as a sole purpose to expose
53// and endpoint that prometheus agent can use to poll for the metrics.
54// A RegistryService is returned that can be used to get access in prometheus
55// Registries.
56pub fn start_prometheus_server(listener: TcpListener) -> HistogramRelay {
57    let relay = HistogramRelay::new();
58    let app = Router::new()
59        .route(METRICS_ROUTE, get(metrics))
60        .layer(Extension(relay.clone()))
61        .layer(
62            ServiceBuilder::new().layer(
63                TraceLayer::new_for_http().on_response(
64                    DefaultOnResponse::new()
65                        .level(Level::INFO)
66                        .latency_unit(LatencyUnit::Seconds),
67                ),
68            ),
69        );
70
71    tokio::spawn(async move {
72        listener.set_nonblocking(true).unwrap();
73        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
74        axum::serve(listener, app).await.unwrap();
75    });
76    relay
77}
78
79async fn metrics(Extension(relay): Extension<HistogramRelay>) -> (StatusCode, String) {
80    let Ok(expformat) = relay.export() else {
81        return (
82            StatusCode::INTERNAL_SERVER_ERROR,
83            "unable to pop metrics from HistogramRelay".into(),
84        );
85    };
86    (StatusCode::OK, expformat)
87}
88
89struct Wrapper(i64, Vec<MetricFamily>);
90
91#[derive(Clone)]
92pub struct HistogramRelay(Arc<Mutex<VecDeque<Wrapper>>>);
93
94impl Default for HistogramRelay {
95    fn default() -> Self {
96        HistogramRelay(Arc::new(Mutex::new(VecDeque::new())))
97    }
98}
99impl HistogramRelay {
100    pub fn new() -> Self {
101        Self::default()
102    }
103    /// submit will take metric family submissions and store them for scraping
104    /// in doing so, it will also wrap each entry in a timestamp which will be
105    /// use for pruning old entries on each submission call. this may not be
106    /// ideal long term.
107    pub fn submit(&self, data: Vec<MetricFamily>) {
108        RELAY_PRESSURE.with_label_values(&["submit"]).inc();
109        let timer = RELAY_DURATION.with_label_values(&["submit"]).start_timer();
110        //  represents a collection timestamp
111        let timestamp_secs = SystemTime::now()
112            .duration_since(UNIX_EPOCH)
113            .unwrap()
114            .as_secs() as i64;
115        let mut queue = self
116            .0
117            .lock()
118            .expect("couldn't get mut lock on HistogramRelay");
119        queue.retain(|v| {
120            // 5 mins is the max time in the queue allowed
121            if (timestamp_secs - v.0) < var!("MAX_QUEUE_TIME_SECS", 300) {
122                return true;
123            }
124            RELAY_PRESSURE.with_label_values(&["overflow"]).inc();
125            false
126        }); // drain anything 5 mins or older
127
128        // filter out our histograms from normal metrics
129        let data: Vec<MetricFamily> = extract_histograms(data).collect();
130        RELAY_PRESSURE
131            .with_label_values(&["submitted"])
132            .inc_by(data.len() as f64);
133        queue.push_back(Wrapper(timestamp_secs, data));
134        timer.observe_duration();
135    }
136    pub fn export(&self) -> Result<String> {
137        RELAY_PRESSURE.with_label_values(&["export"]).inc();
138        let timer = RELAY_DURATION.with_label_values(&["export"]).start_timer();
139        // totally drain all metrics whenever we get a scrape request from the metrics
140        // handler
141        let mut queue = self
142            .0
143            .lock()
144            .expect("couldn't get mut lock on HistogramRelay");
145
146        let data: Vec<Wrapper> = queue.drain(..).collect();
147        let mut histograms = vec![];
148        for mf in data {
149            histograms.extend(mf.1);
150        }
151        info!(
152            "histogram queue drained {} items; remaining count {}",
153            histograms.len(),
154            queue.len()
155        );
156
157        let encoder = prometheus::TextEncoder::new();
158        let string = match encoder.encode_to_string(&histograms) {
159            Ok(s) => s,
160            Err(error) => bail!("{error}"),
161        };
162        RELAY_PRESSURE
163            .with_label_values(&["exported"])
164            .inc_by(histograms.len() as f64);
165        timer.observe_duration();
166        Ok(string)
167    }
168}
169
170fn extract_histograms(data: Vec<MetricFamily>) -> impl Iterator<Item = MetricFamily> {
171    data.into_iter().filter_map(|mf| {
172        let metrics = mf.get_metric().iter().filter_map(|m| {
173            if !m.histogram.is_some() {
174                return None;
175            }
176            let mut v = Metric::default();
177            v.set_label(m.label.clone());
178            v.set_histogram(m.get_histogram().get_or_default().to_owned());
179            v.set_timestamp_ms(m.timestamp_ms());
180            Some(v)
181        });
182
183        let only_histograms = metrics.collect::<Vec<_>>();
184        if only_histograms.is_empty() {
185            return None;
186        }
187
188        let mut v = MetricFamily::default();
189        v.set_name(mf.name().to_owned());
190        v.set_help(mf.help().to_owned());
191        v.set_field_type(mf.get_field_type());
192        v.set_metric(only_histograms);
193        Some(v)
194    })
195}
196
197#[cfg(test)]
198mod tests {
199    use prometheus::proto;
200
201    use crate::{
202        histogram_relay::extract_histograms,
203        prom_to_mimir::tests::{
204            create_counter, create_histogram, create_labels, create_metric_counter,
205            create_metric_family, create_metric_histogram,
206        },
207    };
208
209    #[test]
210    fn filter_histograms() {
211        struct Test {
212            data: Vec<proto::MetricFamily>,
213            expected: Vec<proto::MetricFamily>,
214        }
215
216        let tests = vec![
217            Test {
218                data: vec![create_metric_family(
219                    "test_counter",
220                    "i'm a help message",
221                    Some(proto::MetricType::GAUGE),
222                    vec![create_metric_counter(
223                        create_labels(vec![
224                            ("host", "local-test-validator"),
225                            ("network", "unittest-network"),
226                        ]),
227                        create_counter(2046.0),
228                    )],
229                )],
230                expected: vec![],
231            },
232            Test {
233                data: vec![create_metric_family(
234                    "test_histogram",
235                    "i'm a help message",
236                    Some(proto::MetricType::HISTOGRAM),
237                    vec![create_metric_histogram(
238                        create_labels(vec![
239                            ("host", "local-test-validator"),
240                            ("network", "unittest-network"),
241                        ]),
242                        create_histogram(),
243                    )],
244                )],
245                expected: vec![create_metric_family(
246                    "test_histogram",
247                    "i'm a help message",
248                    Some(proto::MetricType::HISTOGRAM),
249                    vec![create_metric_histogram(
250                        create_labels(vec![
251                            ("host", "local-test-validator"),
252                            ("network", "unittest-network"),
253                        ]),
254                        create_histogram(),
255                    )],
256                )],
257            },
258        ];
259
260        for test in tests {
261            let extracted: Vec<proto::MetricFamily> = extract_histograms(test.data).collect();
262            assert_eq!(extracted, test.expected);
263        }
264    }
265}