iota_proxy/
consumer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::io::Read;
6
7use anyhow::Result;
8use axum::{body::Bytes, http::StatusCode};
9use bytes::buf::Reader;
10use fastcrypto::ed25519::Ed25519PublicKey;
11use multiaddr::Multiaddr;
12use once_cell::sync::Lazy;
13use prometheus::{
14    Counter, CounterVec, HistogramVec,
15    proto::{self, MetricFamily},
16    register_counter, register_counter_vec, register_histogram_vec,
17};
18use prost::Message;
19use protobuf::CodedInputStream;
20use tracing::{debug, error};
21
22use crate::{admin::ReqwestClient, prom_to_mimir::Mimir, remote_write::WriteRequest};
23
24static CONSUMER_OPS_SUBMITTED: Lazy<Counter> = Lazy::new(|| {
25    register_counter!(
26        "consumer_operations_submitted",
27        "Operations counter for the number of metric family types we submit, excluding histograms, and not the discrete timeseries counts.",
28    )
29    .unwrap()
30});
31static CONSUMER_OPS: Lazy<CounterVec> = Lazy::new(|| {
32    register_counter_vec!(
33        "consumer_operations",
34        "Operations counters and status from operations performed in the consumer.",
35        &["operation", "status"]
36    )
37    .unwrap()
38});
39static CONSUMER_ENCODE_COMPRESS_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
40    register_histogram_vec!(
41        "protobuf_compression_seconds",
42        "The time it takes to compress a remote_write payload in seconds.",
43        &["operation"],
44        vec![
45            1e-08, 2e-08, 4e-08, 8e-08, 1.6e-07, 3.2e-07, 6.4e-07, 1.28e-06, 2.56e-06, 5.12e-06,
46            1.024e-05, 2.048e-05, 4.096e-05, 8.192e-05
47        ],
48    )
49    .unwrap()
50});
51static CONSUMER_OPERATION_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
52    register_histogram_vec!(
53        "consumer_operations_duration_seconds",
54        "The time it takes to perform various consumer operations in seconds.",
55        &["operation"],
56        vec![
57            0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048, 0.4096, 0.8192,
58            1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3.0, 3.25, 3.5, 3.75, 4.0, 4.25, 4.5, 4.75,
59            5.0, 5.25, 5.5, 5.75, 6.0, 6.25, 6.5, 6.75, 7.0, 7.25, 7.5, 7.75, 8.0, 8.25, 8.5, 8.75,
60            9.0, 9.25, 9.5, 9.75, 10.0, 10.25, 10.5, 10.75, 11.0, 11.25, 11.5, 11.75, 12.0, 12.25,
61            12.5, 12.75, 13.0, 13.25, 13.5, 13.75, 14.0, 14.25, 14.5, 14.75, 15.0, 15.25, 15.5,
62            15.75, 16.0, 16.25, 16.5, 16.75, 17.0, 17.25, 17.5, 17.75, 18.0, 18.25, 18.5, 18.75,
63            19.0, 19.25, 19.5, 19.75, 20.0, 20.25, 20.5, 20.75, 21.0, 21.25, 21.5, 21.75, 22.0,
64            22.25, 22.5, 22.75, 23.0, 23.25, 23.5, 23.75, 24.0, 24.25, 24.5, 24.75, 25.0, 26.0,
65            27.0, 28.0, 29.0, 30.0
66        ],
67    )
68    .unwrap()
69});
70
71/// NodeMetric holds metadata and a metric payload from the calling node
72#[derive(Debug)]
73pub struct NodeMetric {
74    pub peer_addr: Multiaddr, // the sockaddr source address from the incoming request
75    pub public_key: Ed25519PublicKey, // the public key from the iota blockchain
76    pub data: Vec<proto::MetricFamily>, // decoded protobuf of prometheus data
77}
78
79/// The ProtobufDecoder will decode message delimited protobuf messages from
80/// prom_model.proto types They are delimited by size, eg a format is such:
81/// []byte{size, data, size, data, size, data}, etc etc
82pub struct ProtobufDecoder {
83    buf: Reader<Bytes>,
84}
85
86impl ProtobufDecoder {
87    pub fn new(buf: Reader<Bytes>) -> Self {
88        Self { buf }
89    }
90    /// parse a delimited buffer of protobufs. this is used to consume data sent
91    /// from a iota-node
92    pub fn parse<T: protobuf::Message>(&mut self) -> Result<Vec<T>> {
93        let timer = CONSUMER_OPERATION_DURATION
94            .with_label_values(&["decode_len_delim_protobuf"])
95            .start_timer();
96        let mut result: Vec<T> = vec![];
97        while !self.buf.get_ref().is_empty() {
98            let len = {
99                let mut is = CodedInputStream::from_buf_read(&mut self.buf);
100                is.read_raw_varint32()
101            }?;
102            let mut buf = vec![0; len as usize];
103            self.buf.read_exact(&mut buf)?;
104            result.push(T::parse_from_bytes(&buf)?);
105        }
106        timer.observe_duration();
107        Ok(result)
108    }
109}
110
111// populate labels in place for our given metric family data
112pub fn populate_labels(
113    name: String,    // host field for grafana agent (from chain data)
114    network: String, // network name from ansible (via config)
115    data: Vec<proto::MetricFamily>,
116) -> Vec<proto::MetricFamily> {
117    let timer = CONSUMER_OPERATION_DURATION
118        .with_label_values(&["populate_labels"])
119        .start_timer();
120    debug!("received metrics from {name}");
121    // proto::LabelPair doesn't have pub fields so we can't use
122    // struct literals to construct
123    let mut network_label = proto::LabelPair::default();
124    network_label.set_name("network".into());
125    network_label.set_value(network);
126
127    let mut host_label = proto::LabelPair::default();
128    host_label.set_name("host".into());
129    host_label.set_value(name);
130
131    let labels = vec![network_label, host_label];
132
133    let mut data = data;
134    // add our extra labels to our incoming metric data
135    for mf in data.iter_mut() {
136        for m in mf.mut_metric() {
137            m.label.extend(labels.clone());
138        }
139    }
140    timer.observe_duration();
141    data
142}
143
144fn encode_compress(request: &WriteRequest) -> Result<Vec<u8>, (StatusCode, &'static str)> {
145    let observe = || {
146        let timer = CONSUMER_ENCODE_COMPRESS_DURATION
147            .with_label_values(&["encode_compress"])
148            .start_timer();
149        || {
150            timer.observe_duration();
151        }
152    }();
153    let mut buf = Vec::with_capacity(request.encoded_len());
154    if request.encode(&mut buf).is_err() {
155        observe();
156        CONSUMER_OPS
157            .with_label_values(&["encode_compress", "failed"])
158            .inc();
159        error!("unable to encode prompb to mimirpb");
160        return Err((
161            StatusCode::INTERNAL_SERVER_ERROR,
162            "unable to encode prompb to remote_write pb",
163        ));
164    };
165
166    let mut s = snap::raw::Encoder::new();
167    let compressed = match s.compress_vec(&buf) {
168        Ok(compressed) => compressed,
169        Err(error) => {
170            observe();
171            CONSUMER_OPS
172                .with_label_values(&["encode_compress", "failed"])
173                .inc();
174            error!("unable to compress to snappy block format; {error}");
175            return Err((
176                StatusCode::INTERNAL_SERVER_ERROR,
177                "unable to compress to snappy block format",
178            ));
179        }
180    };
181    observe();
182    CONSUMER_OPS
183        .with_label_values(&["encode_compress", "success"])
184        .inc();
185    Ok(compressed)
186}
187
188async fn check_response(
189    request: WriteRequest,
190    response: reqwest::Response,
191) -> Result<(), (StatusCode, &'static str)> {
192    match response.status() {
193        reqwest::StatusCode::OK => {
194            CONSUMER_OPS
195                .with_label_values(&["check_response", "OK"])
196                .inc();
197            debug!("({}) SUCCESS: {:?}", reqwest::StatusCode::OK, request);
198            Ok(())
199        }
200        reqwest::StatusCode::BAD_REQUEST => {
201            let body = response
202                .text()
203                .await
204                .unwrap_or_else(|_| "response body cannot be decoded".into());
205
206            // see mimir docs on this error condition. it's not actionable from the proxy
207            // so we drop it.
208            if body.contains("err-mimir-sample-out-of-order") {
209                CONSUMER_OPS
210                    .with_label_values(&["check_response", "BAD_REQUEST"])
211                    .inc();
212                error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
213                return Err((
214                    StatusCode::INTERNAL_SERVER_ERROR,
215                    "IGNORING METRICS due to err-mimir-sample-out-of-order",
216                ));
217            }
218            CONSUMER_OPS
219                .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
220                .inc();
221            error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
222            Err((
223                StatusCode::INTERNAL_SERVER_ERROR,
224                "unknown bad request error encountered in remote_push",
225            ))
226        }
227        code => {
228            let body = response
229                .text()
230                .await
231                .unwrap_or_else(|_| "response body cannot be decoded".into());
232            CONSUMER_OPS
233                .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
234                .inc();
235            error!("({}) ERROR: {:?}", code, body);
236            Err((
237                StatusCode::INTERNAL_SERVER_ERROR,
238                "unknown error encountered in remote_push",
239            ))
240        }
241    }
242}
243
244async fn convert(
245    mfs: Vec<MetricFamily>,
246) -> Result<impl Iterator<Item = WriteRequest>, (StatusCode, &'static str)> {
247    let result = tokio::task::spawn_blocking(|| {
248        let timer = CONSUMER_OPERATION_DURATION
249            .with_label_values(&["convert_to_remote_write_task"])
250            .start_timer();
251        let result = Mimir::from(mfs);
252        timer.observe_duration();
253        result.into_iter()
254    })
255    .await;
256
257    let result = match result {
258        Ok(v) => v,
259        Err(err) => {
260            error!("unable to convert to remote_write; {err}");
261            return Err((
262                StatusCode::INTERNAL_SERVER_ERROR,
263                "DROPPING METRICS; unable to convert to remote_write",
264            ));
265        }
266    };
267    Ok(result)
268}
269
270/// convert_to_remote_write is an expensive method due to the time it takes to
271/// submit to mimir. other operations here are optimized for async, within
272/// reason.  The post process uses a single connection to mimir and thus incurs
273/// the seriliaztion delay for each metric family sent. Possible
274/// future optimizations would be to use multiple tcp connections to mimir,
275/// within reason. Nevertheless we await on each post of each metric family so
276/// it shouldn't block any other async work in a significant way.
277pub async fn convert_to_remote_write(
278    rc: ReqwestClient,
279    node_metric: NodeMetric,
280) -> (StatusCode, &'static str) {
281    let timer = CONSUMER_OPERATION_DURATION
282        .with_label_values(&["convert_to_remote_write"])
283        .start_timer();
284
285    let remote_write_protos = match convert(node_metric.data).await {
286        Ok(v) => v,
287        Err(err) => {
288            timer.stop_and_discard();
289            return err;
290        }
291    };
292
293    // a counter so we don't iterate the node data 2x
294    let mut mf_cnt = 0;
295    for request in remote_write_protos {
296        mf_cnt += 1;
297        let compressed = match encode_compress(&request) {
298            Ok(compressed) => compressed,
299            Err(error) => return error,
300        };
301
302        let response = match rc
303            .client
304            .post(rc.settings.url.to_owned())
305            .header(reqwest::header::CONTENT_ENCODING, "snappy")
306            .header(reqwest::header::CONTENT_TYPE, "application/x-protobuf")
307            .header("X-Prometheus-Remote-Write-Version", "0.1.0")
308            .basic_auth(
309                rc.settings.username.to_owned(),
310                Some(rc.settings.password.to_owned()),
311            )
312            .body(compressed)
313            .send()
314            .await
315        {
316            Ok(response) => response,
317            Err(error) => {
318                CONSUMER_OPS
319                    .with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
320                    .inc();
321                error!("DROPPING METRICS due to post error: {error}");
322                timer.stop_and_discard();
323                return (
324                    StatusCode::INTERNAL_SERVER_ERROR,
325                    "DROPPING METRICS due to post error",
326                );
327            }
328        };
329
330        match check_response(request, response).await {
331            Ok(_) => (),
332            Err(err) => {
333                timer.stop_and_discard();
334                return err;
335            }
336        }
337    }
338    CONSUMER_OPS_SUBMITTED.inc_by(mf_cnt as f64);
339    timer.observe_duration();
340    (StatusCode::CREATED, "created")
341}
342
343#[cfg(test)]
344mod tests {
345    use prometheus::proto;
346
347    use crate::{
348        consumer::populate_labels,
349        prom_to_mimir::tests::{
350            create_histogram, create_labels, create_metric_family, create_metric_histogram,
351        },
352    };
353
354    #[test]
355    fn test_populate_labels() {
356        let mf = create_metric_family(
357            "test_histogram",
358            "i'm a help message",
359            Some(proto::MetricType::HISTOGRAM),
360            vec![create_metric_histogram(
361                create_labels(vec![]),
362                create_histogram(),
363            )],
364        );
365
366        let labeled_mf = populate_labels("validator-0".into(), "unittest-network".into(), vec![mf]);
367        let metric = &labeled_mf[0].get_metric()[0];
368        assert_eq!(
369            metric.get_label(),
370            &create_labels(vec![
371                ("network", "unittest-network"),
372                ("host", "validator-0"),
373            ])
374        );
375    }
376}