1use itertools::Itertools;
6use prometheus::proto::{Counter, Gauge, Histogram, Metric, MetricFamily, MetricType};
7use tracing::{debug, error};
8
9use crate::{remote_write, var};
10
11#[derive(Debug)]
12pub struct Mimir<S> {
13 state: S,
14}
15
16impl From<&Metric> for Mimir<Vec<remote_write::Label>> {
17 fn from(m: &Metric) -> Self {
18 let mut m = m.to_owned();
20 let mut sorted = m.take_label();
21 sorted.sort_by(|a, b| {
22 (a.name(), a.value())
23 .partial_cmp(&(b.name(), b.value()))
24 .unwrap()
25 });
26 let mut r = Vec::<remote_write::Label>::default();
27 for label in sorted {
28 let lp = remote_write::Label {
29 name: label.name().into(),
30 value: label.value().into(),
31 };
32 r.push(lp);
33 }
34 Self { state: r }
35 }
36}
37
38impl IntoIterator for Mimir<Vec<remote_write::Label>> {
39 type Item = remote_write::Label;
40 type IntoIter = std::vec::IntoIter<Self::Item>;
41
42 fn into_iter(self) -> Self::IntoIter {
43 self.state.into_iter()
44 }
45}
46
47impl From<&Counter> for Mimir<remote_write::Sample> {
48 fn from(c: &Counter) -> Self {
49 Self {
50 state: remote_write::Sample {
51 value: c.value(),
52 ..Default::default()
53 },
54 }
55 }
56}
57impl From<&Gauge> for Mimir<remote_write::Sample> {
58 fn from(c: &Gauge) -> Self {
59 Self {
60 state: remote_write::Sample {
61 value: c.value(),
62 ..Default::default()
63 },
64 }
65 }
66}
67impl Mimir<remote_write::Sample> {
68 fn sample(self) -> remote_write::Sample {
69 self.state
70 }
71}
72
73impl From<&Histogram> for Mimir<remote_write::Histogram> {
75 fn from(_h: &Histogram) -> Self {
76 Self {
77 state: remote_write::Histogram::default(),
78 }
79 }
80}
81impl Mimir<remote_write::Histogram> {
83 #[allow(dead_code)]
84 fn histogram(self) -> remote_write::Histogram {
85 self.state
86 }
87}
88impl From<Vec<MetricFamily>> for Mimir<Vec<remote_write::WriteRequest>> {
89 fn from(metric_families: Vec<MetricFamily>) -> Self {
90 let mut timeseries: Vec<remote_write::TimeSeries> =
92 Vec::with_capacity(metric_families.len());
93
94 for mf in metric_families {
95 let mt = match mf.get_field_type() {
97 MetricType::COUNTER => remote_write::metric_metadata::MetricType::Counter,
98 MetricType::GAUGE => remote_write::metric_metadata::MetricType::Gauge,
99 MetricType::HISTOGRAM => remote_write::metric_metadata::MetricType::Histogram,
100 MetricType::SUMMARY => remote_write::metric_metadata::MetricType::Summary,
101 MetricType::UNTYPED => remote_write::metric_metadata::MetricType::Unknown,
102 };
103
104 match mt {
106 remote_write::metric_metadata::MetricType::Counter
107 | remote_write::metric_metadata::MetricType::Gauge => (),
108 other => {
109 debug!("{:?} is not yet implemented, skipping metric", other);
110 continue;
111 }
112 }
113
114 timeseries.extend(Mimir::from(mf.clone()).state);
116 }
117
118 Self {
119 state: timeseries
120 .into_iter()
121 .chunks(var!("MIMIR_MAX_SAMPLE_SIZE", 500))
124 .into_iter()
125 .map(|ts| remote_write::WriteRequest {
126 timeseries: ts.collect(),
127 ..Default::default()
128 })
129 .collect_vec(),
130 }
131 }
132}
133
134impl IntoIterator for Mimir<Vec<remote_write::WriteRequest>> {
135 type Item = remote_write::WriteRequest;
136 type IntoIter = std::vec::IntoIter<Self::Item>;
137
138 fn into_iter(self) -> Self::IntoIter {
139 self.state.into_iter()
140 }
141}
142
143impl Mimir<Vec<remote_write::TimeSeries>> {
144 pub fn repeated(self) -> Vec<remote_write::TimeSeries> {
145 self.state
146 }
147}
148
149impl From<MetricFamily> for Mimir<Vec<remote_write::TimeSeries>> {
150 fn from(mf: MetricFamily) -> Self {
151 let mut timeseries = vec![];
152 for metric in mf.get_metric() {
153 let mut ts = remote_write::TimeSeries::default();
154 ts.labels.extend(vec![
155 remote_write::Label {
158 name: "__name__".into(),
159 value: mf.name().into(),
160 },
161 ]);
162 ts.labels
163 .extend(Mimir::<Vec<remote_write::Label>>::from(metric));
164
165 if metric.counter.is_some() {
171 let mut s =
172 Mimir::<remote_write::Sample>::from(metric.get_counter().get_or_default())
173 .sample();
174 s.timestamp = metric.timestamp_ms();
175 ts.samples.push(s);
176 } else if metric.gauge.is_some() {
177 let mut s =
178 Mimir::<remote_write::Sample>::from(metric.get_gauge().get_or_default())
179 .sample();
180 s.timestamp = metric.timestamp_ms();
181 ts.samples.push(s);
182 } else if metric.histogram.is_some() {
183 } else if metric.summary.is_some() {
188 error!("summary is not implemented for a metric type");
190 }
191 timeseries.push(ts);
192 }
193 Self { state: timeseries }
194 }
195}
196
197impl Mimir<remote_write::TimeSeries> {
198 pub fn timeseries(self) -> remote_write::TimeSeries {
199 self.state
200 }
201}
202
203#[cfg(test)]
204pub mod tests {
205 use prometheus::proto;
206
207 use crate::{prom_to_mimir::Mimir, remote_write};
208
209 pub fn create_metric_family(
211 name: &str,
212 help: &str,
213 field_type: Option<proto::MetricType>,
214 metric: Vec<proto::Metric>,
215 ) -> proto::MetricFamily {
216 let mut mf = proto::MetricFamily::default();
218 mf.set_name(name.into());
219 mf.set_help(help.into());
220 if let Some(ft) = field_type {
223 mf.set_field_type(ft);
224 }
225 mf.set_metric(metric);
226 mf
227 }
228 #[allow(dead_code)]
229 fn create_metric_gauge(labels: Vec<proto::LabelPair>, gauge: proto::Gauge) -> proto::Metric {
230 let mut m = proto::Metric::default();
231 m.set_label(labels);
232 m.set_gauge(gauge);
233 m.set_timestamp_ms(12345);
234 m
235 }
236
237 pub fn create_metric_counter(
238 labels: Vec<proto::LabelPair>,
239 counter: proto::Counter,
240 ) -> proto::Metric {
241 let mut m = proto::Metric::default();
242 m.set_label(labels);
243 m.set_counter(counter);
244 m.set_timestamp_ms(12345);
245 m
246 }
247
248 pub fn create_metric_histogram(
249 labels: Vec<proto::LabelPair>,
250 histogram: proto::Histogram,
251 ) -> proto::Metric {
252 let mut m = proto::Metric::default();
253 m.set_label(labels);
254 m.set_histogram(histogram);
255 m.set_timestamp_ms(12345);
256 m
257 }
258
259 pub fn create_histogram() -> proto::Histogram {
260 let mut h = proto::Histogram::default();
261 h.set_sample_count(1);
262 h.set_sample_sum(1.0);
263 let mut b = proto::Bucket::default();
264 b.set_cumulative_count(1);
265 b.set_upper_bound(1.0);
266 h.bucket.push(b);
267 h
268 }
269
270 pub fn create_labels(labels: Vec<(&str, &str)>) -> Vec<proto::LabelPair> {
271 labels
272 .into_iter()
273 .map(|(key, value)| {
274 let mut lp = proto::LabelPair::default();
275 lp.set_name(key.into());
276 lp.set_value(value.into());
277 lp
278 })
279 .collect()
280 }
281 #[allow(dead_code)]
282 fn create_gauge(value: f64) -> proto::Gauge {
283 let mut g = proto::Gauge::default();
284 g.set_value(value);
285 g
286 }
287
288 pub fn create_counter(value: f64) -> proto::Counter {
289 let mut c = proto::Counter::default();
290 c.set_value(value);
291 c
292 }
293
294 fn create_timeseries_with_samples(
298 labels: Vec<remote_write::Label>,
299 samples: Vec<remote_write::Sample>,
300 ) -> remote_write::TimeSeries {
301 remote_write::TimeSeries {
302 labels,
303 samples,
304 ..Default::default()
305 }
306 }
307 #[test]
310 fn metricfamily_to_timeseries() {
311 let tests: Vec<(proto::MetricFamily, Vec<remote_write::TimeSeries>)> = vec![
312 (
313 create_metric_family(
314 "test_gauge",
315 "i'm a help message",
316 Some(proto::MetricType::GAUGE),
317 vec![create_metric_gauge(
318 create_labels(vec![
319 ("host", "local-test-validator"),
320 ("network", "unittest-network"),
321 ]),
322 create_gauge(2046.0),
323 )],
324 ),
325 vec![create_timeseries_with_samples(
326 vec![
327 remote_write::Label {
328 name: "__name__".into(),
329 value: "test_gauge".into(),
330 },
331 remote_write::Label {
332 name: "host".into(),
333 value: "local-test-validator".into(),
334 },
335 remote_write::Label {
336 name: "network".into(),
337 value: "unittest-network".into(),
338 },
339 ],
340 vec![remote_write::Sample {
341 value: 2046.0,
342 timestamp: 12345,
343 }],
344 )],
345 ),
346 (
347 create_metric_family(
348 "test_counter",
349 "i'm a help message",
350 Some(proto::MetricType::GAUGE),
351 vec![create_metric_counter(
352 create_labels(vec![
353 ("host", "local-test-validator"),
354 ("network", "unittest-network"),
355 ]),
356 create_counter(2046.0),
357 )],
358 ),
359 vec![create_timeseries_with_samples(
360 vec![
361 remote_write::Label {
362 name: "__name__".into(),
363 value: "test_counter".into(),
364 },
365 remote_write::Label {
366 name: "host".into(),
367 value: "local-test-validator".into(),
368 },
369 remote_write::Label {
370 name: "network".into(),
371 value: "unittest-network".into(),
372 },
373 ],
374 vec![remote_write::Sample {
375 value: 2046.0,
376 timestamp: 12345,
377 }],
378 )],
379 ),
380 ];
381 for (mf, expected_ts) in tests {
382 for (actual, expected) in Mimir::from(mf).state.into_iter().zip(expected_ts) {
384 assert_eq!(actual.labels, expected.labels);
385 for (actual_sample, expected_sample) in
386 actual.samples.into_iter().zip(expected.samples)
387 {
388 assert_eq!(
389 actual_sample.value, expected_sample.value,
390 "sample values do not match"
391 );
392
393 assert_eq!(
396 actual_sample.timestamp, expected_sample.timestamp,
397 "timestamp should be non-zero"
398 );
399 }
400 }
401 }
402 }
403}