1use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use iota_metrics::RegistryService;
8use prometheus::Encoder;
9use tracing::{debug, error, info};
10
11const METRICS_PUSH_TIMEOUT: Duration = Duration::from_secs(45);
12
13pub struct MetricsPushClient {
14 certificate: std::sync::Arc<iota_tls::SelfSignedCertificate>,
15 client: reqwest::Client,
16}
17
18impl MetricsPushClient {
19 pub fn new(metrics_key: iota_types::crypto::NetworkKeyPair) -> Self {
20 use fastcrypto::traits::KeyPair;
21 let certificate = std::sync::Arc::new(iota_tls::SelfSignedCertificate::new(
22 metrics_key.private(),
23 iota_tls::IOTA_VALIDATOR_SERVER_NAME,
24 ));
25 let identity = certificate.reqwest_identity();
26 let client = reqwest::Client::builder()
27 .identity(identity)
28 .build()
29 .unwrap();
30
31 Self {
32 certificate,
33 client,
34 }
35 }
36
37 pub fn certificate(&self) -> &iota_tls::SelfSignedCertificate {
38 &self.certificate
39 }
40
41 pub fn client(&self) -> &reqwest::Client {
42 &self.client
43 }
44}
45
46pub async fn push_metrics(
47 client: &MetricsPushClient,
48 url: &reqwest::Url,
49 registry: &RegistryService,
50) -> Result<(), anyhow::Error> {
51 info!(push_url =% url, "pushing metrics to remote");
52
53 let now = SystemTime::now()
56 .duration_since(UNIX_EPOCH)
57 .unwrap()
58 .as_millis() as i64;
59
60 let mut metric_families = registry.gather_all();
61 for mf in metric_families.iter_mut() {
62 for m in mf.mut_metric() {
63 m.set_timestamp_ms(now);
64 }
65 }
66
67 let mut buf: Vec<u8> = vec![];
68 let encoder = prometheus::ProtobufEncoder::new();
69 encoder.encode(&metric_families, &mut buf)?;
70
71 let mut s = snap::raw::Encoder::new();
72 let compressed = s.compress_vec(&buf).map_err(|err| {
73 error!("unable to snappy encode; {err}");
74 err
75 })?;
76
77 let response = client
78 .client()
79 .post(url.to_owned())
80 .header(reqwest::header::CONTENT_ENCODING, "snappy")
81 .header(reqwest::header::CONTENT_TYPE, prometheus::PROTOBUF_FORMAT)
82 .body(compressed)
83 .timeout(METRICS_PUSH_TIMEOUT)
84 .send()
85 .await?;
86
87 if !response.status().is_success() {
88 let status = response.status();
89 let body = match response.text().await {
90 Ok(body) => body,
91 Err(error) => format!("couldn't decode response body; {error}"),
92 };
93 return Err(anyhow::anyhow!(
94 "metrics push failed: [{}]:{}",
95 status,
96 body
97 ));
98 }
99
100 debug!("successfully pushed metrics to {url}");
101
102 Ok(())
103}