1use std::io::Read;
6
7use anyhow::Result;
8use axum::{body::Bytes, http::StatusCode};
9use bytes::buf::Reader;
10use fastcrypto::ed25519::Ed25519PublicKey;
11use multiaddr::Multiaddr;
12use once_cell::sync::Lazy;
13use prometheus::{
14 Counter, CounterVec, HistogramVec,
15 proto::{self, MetricFamily},
16 register_counter, register_counter_vec, register_histogram_vec,
17};
18use prost::Message;
19use protobuf::CodedInputStream;
20use tracing::{debug, error};
21
22use crate::{admin::ReqwestClient, prom_to_mimir::Mimir, remote_write::WriteRequest};
23
24static CONSUMER_OPS_SUBMITTED: Lazy<Counter> = Lazy::new(|| {
25 register_counter!(
26 "consumer_operations_submitted",
27 "Operations counter for the number of metric family types we submit, excluding histograms, and not the discrete timeseries counts.",
28 )
29 .unwrap()
30});
31static CONSUMER_OPS: Lazy<CounterVec> = Lazy::new(|| {
32 register_counter_vec!(
33 "consumer_operations",
34 "Operations counters and status from operations performed in the consumer.",
35 &["operation", "status"]
36 )
37 .unwrap()
38});
39static CONSUMER_ENCODE_COMPRESS_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
40 register_histogram_vec!(
41 "protobuf_compression_seconds",
42 "The time it takes to compress a remote_write payload in seconds.",
43 &["operation"],
44 vec![
45 1e-08, 2e-08, 4e-08, 8e-08, 1.6e-07, 3.2e-07, 6.4e-07, 1.28e-06, 2.56e-06, 5.12e-06,
46 1.024e-05, 2.048e-05, 4.096e-05, 8.192e-05
47 ],
48 )
49 .unwrap()
50});
51static CONSUMER_OPERATION_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
52 register_histogram_vec!(
53 "consumer_operations_duration_seconds",
54 "The time it takes to perform various consumer operations in seconds.",
55 &["operation"],
56 vec![
57 0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048, 0.4096, 0.8192,
58 1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3.0, 3.25, 3.5, 3.75, 4.0, 4.25, 4.5, 4.75,
59 5.0, 5.25, 5.5, 5.75, 6.0, 6.25, 6.5, 6.75, 7.0, 7.25, 7.5, 7.75, 8.0, 8.25, 8.5, 8.75,
60 9.0, 9.25, 9.5, 9.75, 10.0, 10.25, 10.5, 10.75, 11.0, 11.25, 11.5, 11.75, 12.0, 12.25,
61 12.5, 12.75, 13.0, 13.25, 13.5, 13.75, 14.0, 14.25, 14.5, 14.75, 15.0, 15.25, 15.5,
62 15.75, 16.0, 16.25, 16.5, 16.75, 17.0, 17.25, 17.5, 17.75, 18.0, 18.25, 18.5, 18.75,
63 19.0, 19.25, 19.5, 19.75, 20.0, 20.25, 20.5, 20.75, 21.0, 21.25, 21.5, 21.75, 22.0,
64 22.25, 22.5, 22.75, 23.0, 23.25, 23.5, 23.75, 24.0, 24.25, 24.5, 24.75, 25.0, 26.0,
65 27.0, 28.0, 29.0, 30.0
66 ],
67 )
68 .unwrap()
69});
70
71#[derive(Debug)]
73pub struct NodeMetric {
74 pub peer_addr: Multiaddr, pub public_key: Ed25519PublicKey, pub data: Vec<proto::MetricFamily>, }
78
79pub struct ProtobufDecoder {
83 buf: Reader<Bytes>,
84}
85
86impl ProtobufDecoder {
87 pub fn new(buf: Reader<Bytes>) -> Self {
88 Self { buf }
89 }
90 pub fn parse<T: protobuf::Message>(&mut self) -> Result<Vec<T>> {
93 let timer = CONSUMER_OPERATION_DURATION
94 .with_label_values(&["decode_len_delim_protobuf"])
95 .start_timer();
96 let mut result: Vec<T> = vec![];
97 while !self.buf.get_ref().is_empty() {
98 let len = {
99 let mut is = CodedInputStream::from_buf_read(&mut self.buf);
100 is.read_raw_varint32()
101 }?;
102 let mut buf = vec![0; len as usize];
103 self.buf.read_exact(&mut buf)?;
104 result.push(T::parse_from_bytes(&buf)?);
105 }
106 timer.observe_duration();
107 Ok(result)
108 }
109}
110
111pub fn populate_labels(
113 name: String, network: String, data: Vec<proto::MetricFamily>,
116) -> Vec<proto::MetricFamily> {
117 let timer = CONSUMER_OPERATION_DURATION
118 .with_label_values(&["populate_labels"])
119 .start_timer();
120 debug!("received metrics from {name}");
121 let mut network_label = proto::LabelPair::default();
124 network_label.set_name("network".into());
125 network_label.set_value(network);
126
127 let mut host_label = proto::LabelPair::default();
128 host_label.set_name("host".into());
129 host_label.set_value(name);
130
131 let labels = vec![network_label, host_label];
132
133 let mut data = data;
134 for mf in data.iter_mut() {
136 for m in mf.mut_metric() {
137 m.label.extend(labels.clone());
138 }
139 }
140 timer.observe_duration();
141 data
142}
143
144fn encode_compress(request: &WriteRequest) -> Result<Vec<u8>, (StatusCode, &'static str)> {
145 let observe = || {
146 let timer = CONSUMER_ENCODE_COMPRESS_DURATION
147 .with_label_values(&["encode_compress"])
148 .start_timer();
149 || {
150 timer.observe_duration();
151 }
152 }();
153 let mut buf = Vec::with_capacity(request.encoded_len());
154 if request.encode(&mut buf).is_err() {
155 observe();
156 CONSUMER_OPS
157 .with_label_values(&["encode_compress", "failed"])
158 .inc();
159 error!("unable to encode prompb to mimirpb");
160 return Err((
161 StatusCode::INTERNAL_SERVER_ERROR,
162 "unable to encode prompb to remote_write pb",
163 ));
164 };
165
166 let mut s = snap::raw::Encoder::new();
167 let compressed = match s.compress_vec(&buf) {
168 Ok(compressed) => compressed,
169 Err(error) => {
170 observe();
171 CONSUMER_OPS
172 .with_label_values(&["encode_compress", "failed"])
173 .inc();
174 error!("unable to compress to snappy block format; {error}");
175 return Err((
176 StatusCode::INTERNAL_SERVER_ERROR,
177 "unable to compress to snappy block format",
178 ));
179 }
180 };
181 observe();
182 CONSUMER_OPS
183 .with_label_values(&["encode_compress", "success"])
184 .inc();
185 Ok(compressed)
186}
187
188async fn check_response(
189 request: WriteRequest,
190 response: reqwest::Response,
191) -> Result<(), (StatusCode, &'static str)> {
192 match response.status() {
193 reqwest::StatusCode::OK => {
194 CONSUMER_OPS
195 .with_label_values(&["check_response", "OK"])
196 .inc();
197 debug!("({}) SUCCESS: {:?}", reqwest::StatusCode::OK, request);
198 Ok(())
199 }
200 reqwest::StatusCode::BAD_REQUEST => {
201 let body = response
202 .text()
203 .await
204 .unwrap_or_else(|_| "response body cannot be decoded".into());
205
206 if body.contains("err-mimir-sample-out-of-order") {
209 CONSUMER_OPS
210 .with_label_values(&["check_response", "BAD_REQUEST"])
211 .inc();
212 error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
213 return Err((
214 StatusCode::INTERNAL_SERVER_ERROR,
215 "IGNORING METRICS due to err-mimir-sample-out-of-order",
216 ));
217 }
218 CONSUMER_OPS
219 .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
220 .inc();
221 error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
222 Err((
223 StatusCode::INTERNAL_SERVER_ERROR,
224 "unknown bad request error encountered in remote_push",
225 ))
226 }
227 code => {
228 let body = response
229 .text()
230 .await
231 .unwrap_or_else(|_| "response body cannot be decoded".into());
232 CONSUMER_OPS
233 .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
234 .inc();
235 error!("({}) ERROR: {:?}", code, body);
236 Err((
237 StatusCode::INTERNAL_SERVER_ERROR,
238 "unknown error encountered in remote_push",
239 ))
240 }
241 }
242}
243
244async fn convert(
245 mfs: Vec<MetricFamily>,
246) -> Result<impl Iterator<Item = WriteRequest>, (StatusCode, &'static str)> {
247 let result = tokio::task::spawn_blocking(|| {
248 let timer = CONSUMER_OPERATION_DURATION
249 .with_label_values(&["convert_to_remote_write_task"])
250 .start_timer();
251 let result = Mimir::from(mfs);
252 timer.observe_duration();
253 result.into_iter()
254 })
255 .await;
256
257 let result = match result {
258 Ok(v) => v,
259 Err(err) => {
260 error!("unable to convert to remote_write; {err}");
261 return Err((
262 StatusCode::INTERNAL_SERVER_ERROR,
263 "DROPPING METRICS; unable to convert to remote_write",
264 ));
265 }
266 };
267 Ok(result)
268}
269
270pub async fn convert_to_remote_write(
278 rc: ReqwestClient,
279 node_metric: NodeMetric,
280) -> (StatusCode, &'static str) {
281 let timer = CONSUMER_OPERATION_DURATION
282 .with_label_values(&["convert_to_remote_write"])
283 .start_timer();
284
285 let remote_write_protos = match convert(node_metric.data).await {
286 Ok(v) => v,
287 Err(err) => {
288 timer.stop_and_discard();
289 return err;
290 }
291 };
292
293 let mut mf_cnt = 0;
295 for request in remote_write_protos {
296 mf_cnt += 1;
297 let compressed = match encode_compress(&request) {
298 Ok(compressed) => compressed,
299 Err(error) => return error,
300 };
301
302 let response = match rc
303 .client
304 .post(rc.settings.url.to_owned())
305 .header(reqwest::header::CONTENT_ENCODING, "snappy")
306 .header(reqwest::header::CONTENT_TYPE, "application/x-protobuf")
307 .header("X-Prometheus-Remote-Write-Version", "0.1.0")
308 .basic_auth(
309 rc.settings.username.to_owned(),
310 Some(rc.settings.password.to_owned()),
311 )
312 .body(compressed)
313 .send()
314 .await
315 {
316 Ok(response) => response,
317 Err(error) => {
318 CONSUMER_OPS
319 .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
320 .inc();
321 error!("DROPPING METRICS due to post error: {error}");
322 timer.stop_and_discard();
323 return (
324 StatusCode::INTERNAL_SERVER_ERROR,
325 "DROPPING METRICS due to post error",
326 );
327 }
328 };
329
330 match check_response(request, response).await {
331 Ok(_) => (),
332 Err(err) => {
333 timer.stop_and_discard();
334 return err;
335 }
336 }
337 }
338 CONSUMER_OPS_SUBMITTED.inc_by(mf_cnt as f64);
339 timer.observe_duration();
340 (StatusCode::CREATED, "created")
341}
342
343#[cfg(test)]
344mod tests {
345 use prometheus::proto;
346
347 use crate::{
348 consumer::populate_labels,
349 prom_to_mimir::tests::{
350 create_histogram, create_labels, create_metric_family, create_metric_histogram,
351 },
352 };
353
354 #[test]
355 fn test_populate_labels() {
356 let mf = create_metric_family(
357 "test_histogram",
358 "i'm a help message",
359 Some(proto::MetricType::HISTOGRAM),
360 vec![create_metric_histogram(
361 create_labels(vec![]),
362 create_histogram(),
363 )],
364 );
365
366 let labeled_mf = populate_labels("validator-0".into(), "unittest-network".into(), vec![mf]);
367 let metric = &labeled_mf[0].get_metric()[0];
368 assert_eq!(
369 metric.get_label(),
370 &create_labels(vec![
371 ("network", "unittest-network"),
372 ("host", "validator-0"),
373 ])
374 );
375 }
376}