iota_common/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2025 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use iota_metrics::RegistryService;
8use prometheus::Encoder;
9use tracing::{debug, error, info};
10
11const METRICS_PUSH_TIMEOUT: Duration = Duration::from_secs(45);
12
13pub struct MetricsPushClient {
14    certificate: std::sync::Arc<iota_tls::SelfSignedCertificate>,
15    client: reqwest::Client,
16}
17
18impl MetricsPushClient {
19    pub fn new(metrics_key: iota_types::crypto::NetworkKeyPair) -> Self {
20        use fastcrypto::traits::KeyPair;
21        let certificate = std::sync::Arc::new(iota_tls::SelfSignedCertificate::new(
22            metrics_key.private(),
23            iota_tls::IOTA_VALIDATOR_SERVER_NAME,
24        ));
25        let identity = certificate.reqwest_identity();
26        let client = reqwest::Client::builder()
27            .identity(identity)
28            .build()
29            .unwrap();
30
31        Self {
32            certificate,
33            client,
34        }
35    }
36
37    pub fn certificate(&self) -> &iota_tls::SelfSignedCertificate {
38        &self.certificate
39    }
40
41    pub fn client(&self) -> &reqwest::Client {
42        &self.client
43    }
44}
45
46pub async fn push_metrics(
47    client: &MetricsPushClient,
48    url: &reqwest::Url,
49    registry: &RegistryService,
50) -> Result<(), anyhow::Error> {
51    info!(push_url =% url, "pushing metrics to remote");
52
53    // now represents a collection timestamp for all of the metrics we send to the
54    // proxy
55    let now = SystemTime::now()
56        .duration_since(UNIX_EPOCH)
57        .unwrap()
58        .as_millis() as i64;
59
60    let mut metric_families = registry.gather_all();
61    for mf in metric_families.iter_mut() {
62        for m in mf.mut_metric() {
63            m.set_timestamp_ms(now);
64        }
65    }
66
67    let mut buf: Vec<u8> = vec![];
68    let encoder = prometheus::ProtobufEncoder::new();
69    encoder.encode(&metric_families, &mut buf)?;
70
71    let mut s = snap::raw::Encoder::new();
72    let compressed = s.compress_vec(&buf).map_err(|err| {
73        error!("unable to snappy encode; {err}");
74        err
75    })?;
76
77    let response = client
78        .client()
79        .post(url.to_owned())
80        .header(reqwest::header::CONTENT_ENCODING, "snappy")
81        .header(reqwest::header::CONTENT_TYPE, prometheus::PROTOBUF_FORMAT)
82        .body(compressed)
83        .timeout(METRICS_PUSH_TIMEOUT)
84        .send()
85        .await?;
86
87    if !response.status().is_success() {
88        let status = response.status();
89        let body = match response.text().await {
90            Ok(body) => body,
91            Err(error) => format!("couldn't decode response body; {error}"),
92        };
93        return Err(anyhow::anyhow!(
94            "metrics push failed: [{}]:{}",
95            status,
96            body
97        ));
98    }
99
100    debug!("successfully pushed metrics to {url}");
101
102    Ok(())
103}