iota_proxy/
prom_to_mimir.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use itertools::Itertools;
6use prometheus::proto::{Counter, Gauge, Histogram, Metric, MetricFamily, MetricType};
7use tracing::{debug, error};
8
9use crate::{remote_write, var};
10
11#[derive(Debug)]
12pub struct Mimir<S> {
13    state: S,
14}
15
16impl From<&Metric> for Mimir<Vec<remote_write::Label>> {
17    fn from(m: &Metric) -> Self {
18        // we consume metric labels from an owned version so we can sort them
19        let mut m = m.to_owned();
20        let mut sorted = m.take_label();
21        sorted.sort_by(|a, b| {
22            (a.name(), a.value())
23                .partial_cmp(&(b.name(), b.value()))
24                .unwrap()
25        });
26        let mut r = Vec::<remote_write::Label>::default();
27        for label in sorted {
28            let lp = remote_write::Label {
29                name: label.name().into(),
30                value: label.value().into(),
31            };
32            r.push(lp);
33        }
34        Self { state: r }
35    }
36}
37
38impl IntoIterator for Mimir<Vec<remote_write::Label>> {
39    type Item = remote_write::Label;
40    type IntoIter = std::vec::IntoIter<Self::Item>;
41
42    fn into_iter(self) -> Self::IntoIter {
43        self.state.into_iter()
44    }
45}
46
47impl From<&Counter> for Mimir<remote_write::Sample> {
48    fn from(c: &Counter) -> Self {
49        Self {
50            state: remote_write::Sample {
51                value: c.value(),
52                ..Default::default()
53            },
54        }
55    }
56}
57impl From<&Gauge> for Mimir<remote_write::Sample> {
58    fn from(c: &Gauge) -> Self {
59        Self {
60            state: remote_write::Sample {
61                value: c.value(),
62                ..Default::default()
63            },
64        }
65    }
66}
67impl Mimir<remote_write::Sample> {
68    fn sample(self) -> remote_write::Sample {
69        self.state
70    }
71}
72
73/// TODO implement histogram
74impl From<&Histogram> for Mimir<remote_write::Histogram> {
75    fn from(_h: &Histogram) -> Self {
76        Self {
77            state: remote_write::Histogram::default(),
78        }
79    }
80}
81/// TODO implement histogram
82impl Mimir<remote_write::Histogram> {
83    #[allow(dead_code)]
84    fn histogram(self) -> remote_write::Histogram {
85        self.state
86    }
87}
88impl From<Vec<MetricFamily>> for Mimir<Vec<remote_write::WriteRequest>> {
89    fn from(metric_families: Vec<MetricFamily>) -> Self {
90        // we may have more but we'll have at least this many timeseries
91        let mut timeseries: Vec<remote_write::TimeSeries> =
92            Vec::with_capacity(metric_families.len());
93
94        for mf in metric_families {
95            // TODO add From impl
96            let mt = match mf.get_field_type() {
97                MetricType::COUNTER => remote_write::metric_metadata::MetricType::Counter,
98                MetricType::GAUGE => remote_write::metric_metadata::MetricType::Gauge,
99                MetricType::HISTOGRAM => remote_write::metric_metadata::MetricType::Histogram,
100                MetricType::SUMMARY => remote_write::metric_metadata::MetricType::Summary,
101                MetricType::UNTYPED => remote_write::metric_metadata::MetricType::Unknown,
102            };
103
104            // filter out the types we don't support
105            match mt {
106                remote_write::metric_metadata::MetricType::Counter
107                | remote_write::metric_metadata::MetricType::Gauge => (),
108                other => {
109                    debug!("{:?} is not yet implemented, skipping metric", other);
110                    continue;
111                }
112            }
113
114            // TODO stop using state directly
115            timeseries.extend(Mimir::from(mf.clone()).state);
116        }
117
118        Self {
119            state: timeseries
120                .into_iter()
121                // the upstream remote_write should have a max sample size per request set to this
122                // number
123                .chunks(var!("MIMIR_MAX_SAMPLE_SIZE", 500))
124                .into_iter()
125                .map(|ts| remote_write::WriteRequest {
126                    timeseries: ts.collect(),
127                    ..Default::default()
128                })
129                .collect_vec(),
130        }
131    }
132}
133
134impl IntoIterator for Mimir<Vec<remote_write::WriteRequest>> {
135    type Item = remote_write::WriteRequest;
136    type IntoIter = std::vec::IntoIter<Self::Item>;
137
138    fn into_iter(self) -> Self::IntoIter {
139        self.state.into_iter()
140    }
141}
142
143impl Mimir<Vec<remote_write::TimeSeries>> {
144    pub fn repeated(self) -> Vec<remote_write::TimeSeries> {
145        self.state
146    }
147}
148
149impl From<MetricFamily> for Mimir<Vec<remote_write::TimeSeries>> {
150    fn from(mf: MetricFamily) -> Self {
151        let mut timeseries = vec![];
152        for metric in mf.get_metric() {
153            let mut ts = remote_write::TimeSeries::default();
154            ts.labels.extend(vec![
155                // mimir requires that we use __name__ as a key that points to a value
156                // of the metric name
157                remote_write::Label {
158                    name: "__name__".into(),
159                    value: mf.name().into(),
160                },
161            ]);
162            ts.labels
163                .extend(Mimir::<Vec<remote_write::Label>>::from(metric));
164
165            // assumption here is that since a MetricFamily will have one MetricType, we'll
166            // only need to look for one of these types.  Setting two different
167            // types on Metric at the same time in a way that is conflicting
168            // with the MetricFamily type will result in undefined mimir
169            // behavior, probably an error.
170            if metric.counter.is_some() {
171                let mut s =
172                    Mimir::<remote_write::Sample>::from(metric.get_counter().get_or_default())
173                        .sample();
174                s.timestamp = metric.timestamp_ms();
175                ts.samples.push(s);
176            } else if metric.gauge.is_some() {
177                let mut s =
178                    Mimir::<remote_write::Sample>::from(metric.get_gauge().get_or_default())
179                        .sample();
180                s.timestamp = metric.timestamp_ms();
181                ts.samples.push(s);
182            } else if metric.histogram.is_some() {
183                // TODO implement
184                // ts.mut_histograms()
185                //     .push(Mimir::<remote_write::Histogram>::from(metric.
186                // get_histogram()).histogram());
187            } else if metric.summary.is_some() {
188                // TODO implement
189                error!("summary is not implemented for a metric type");
190            }
191            timeseries.push(ts);
192        }
193        Self { state: timeseries }
194    }
195}
196
197impl Mimir<remote_write::TimeSeries> {
198    pub fn timeseries(self) -> remote_write::TimeSeries {
199        self.state
200    }
201}
202
203#[cfg(test)]
204pub mod tests {
205    use prometheus::proto;
206
207    use crate::{prom_to_mimir::Mimir, remote_write};
208
209    // protobuf stuff
210    pub fn create_metric_family(
211        name: &str,
212        help: &str,
213        field_type: Option<proto::MetricType>,
214        metric: Vec<proto::Metric>,
215    ) -> proto::MetricFamily {
216        // no public fields, cannot use literals
217        let mut mf = proto::MetricFamily::default();
218        mf.set_name(name.into());
219        mf.set_help(help.into());
220        // TODO remove the metric type serialization if we still don't use it
221        // after implementing histogram and summary
222        if let Some(ft) = field_type {
223            mf.set_field_type(ft);
224        }
225        mf.set_metric(metric);
226        mf
227    }
228    #[allow(dead_code)]
229    fn create_metric_gauge(labels: Vec<proto::LabelPair>, gauge: proto::Gauge) -> proto::Metric {
230        let mut m = proto::Metric::default();
231        m.set_label(labels);
232        m.set_gauge(gauge);
233        m.set_timestamp_ms(12345);
234        m
235    }
236
237    pub fn create_metric_counter(
238        labels: Vec<proto::LabelPair>,
239        counter: proto::Counter,
240    ) -> proto::Metric {
241        let mut m = proto::Metric::default();
242        m.set_label(labels);
243        m.set_counter(counter);
244        m.set_timestamp_ms(12345);
245        m
246    }
247
248    pub fn create_metric_histogram(
249        labels: Vec<proto::LabelPair>,
250        histogram: proto::Histogram,
251    ) -> proto::Metric {
252        let mut m = proto::Metric::default();
253        m.set_label(labels);
254        m.set_histogram(histogram);
255        m.set_timestamp_ms(12345);
256        m
257    }
258
259    pub fn create_histogram() -> proto::Histogram {
260        let mut h = proto::Histogram::default();
261        h.set_sample_count(1);
262        h.set_sample_sum(1.0);
263        let mut b = proto::Bucket::default();
264        b.set_cumulative_count(1);
265        b.set_upper_bound(1.0);
266        h.bucket.push(b);
267        h
268    }
269
270    pub fn create_labels(labels: Vec<(&str, &str)>) -> Vec<proto::LabelPair> {
271        labels
272            .into_iter()
273            .map(|(key, value)| {
274                let mut lp = proto::LabelPair::default();
275                lp.set_name(key.into());
276                lp.set_value(value.into());
277                lp
278            })
279            .collect()
280    }
281    #[allow(dead_code)]
282    fn create_gauge(value: f64) -> proto::Gauge {
283        let mut g = proto::Gauge::default();
284        g.set_value(value);
285        g
286    }
287
288    pub fn create_counter(value: f64) -> proto::Counter {
289        let mut c = proto::Counter::default();
290        c.set_value(value);
291        c
292    }
293
294    // end protobuf stuff
295
296    // mimir stuff
297    fn create_timeseries_with_samples(
298        labels: Vec<remote_write::Label>,
299        samples: Vec<remote_write::Sample>,
300    ) -> remote_write::TimeSeries {
301        remote_write::TimeSeries {
302            labels,
303            samples,
304            ..Default::default()
305        }
306    }
307    // end mimir stuff
308
309    #[test]
310    fn metricfamily_to_timeseries() {
311        let tests: Vec<(proto::MetricFamily, Vec<remote_write::TimeSeries>)> = vec![
312            (
313                create_metric_family(
314                    "test_gauge",
315                    "i'm a help message",
316                    Some(proto::MetricType::GAUGE),
317                    vec![create_metric_gauge(
318                        create_labels(vec![
319                            ("host", "local-test-validator"),
320                            ("network", "unittest-network"),
321                        ]),
322                        create_gauge(2046.0),
323                    )],
324                ),
325                vec![create_timeseries_with_samples(
326                    vec![
327                        remote_write::Label {
328                            name: "__name__".into(),
329                            value: "test_gauge".into(),
330                        },
331                        remote_write::Label {
332                            name: "host".into(),
333                            value: "local-test-validator".into(),
334                        },
335                        remote_write::Label {
336                            name: "network".into(),
337                            value: "unittest-network".into(),
338                        },
339                    ],
340                    vec![remote_write::Sample {
341                        value: 2046.0,
342                        timestamp: 12345,
343                    }],
344                )],
345            ),
346            (
347                create_metric_family(
348                    "test_counter",
349                    "i'm a help message",
350                    Some(proto::MetricType::GAUGE),
351                    vec![create_metric_counter(
352                        create_labels(vec![
353                            ("host", "local-test-validator"),
354                            ("network", "unittest-network"),
355                        ]),
356                        create_counter(2046.0),
357                    )],
358                ),
359                vec![create_timeseries_with_samples(
360                    vec![
361                        remote_write::Label {
362                            name: "__name__".into(),
363                            value: "test_counter".into(),
364                        },
365                        remote_write::Label {
366                            name: "host".into(),
367                            value: "local-test-validator".into(),
368                        },
369                        remote_write::Label {
370                            name: "network".into(),
371                            value: "unittest-network".into(),
372                        },
373                    ],
374                    vec![remote_write::Sample {
375                        value: 2046.0,
376                        timestamp: 12345,
377                    }],
378                )],
379            ),
380        ];
381        for (mf, expected_ts) in tests {
382            // TODO stop using state directly
383            for (actual, expected) in Mimir::from(mf).state.into_iter().zip(expected_ts) {
384                assert_eq!(actual.labels, expected.labels);
385                for (actual_sample, expected_sample) in
386                    actual.samples.into_iter().zip(expected.samples)
387                {
388                    assert_eq!(
389                        actual_sample.value, expected_sample.value,
390                        "sample values do not match"
391                    );
392
393                    // timestamps are injected on the iota-node and we copy it to our sample
394                    // make sure that works
395                    assert_eq!(
396                        actual_sample.timestamp, expected_sample.timestamp,
397                        "timestamp should be non-zero"
398                    );
399                }
400            }
401        }
402    }
403}