iota_node/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use axum::http::header;
8use iota_metrics::RegistryService;
9use iota_network::tonic::Code;
10use iota_network_stack::metrics::MetricsCallbackProvider;
11use prometheus::{
12    Encoder, HistogramVec, IntCounterVec, IntGaugeVec, PROTOBUF_FORMAT, Registry,
13    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
14    register_int_gauge_vec_with_registry,
15};
16use tracing::error;
17
18const METRICS_PUSH_TIMEOUT: Duration = Duration::from_secs(45);
19
20pub struct MetricsPushClient {
21    certificate: std::sync::Arc<iota_tls::SelfSignedCertificate>,
22    client: reqwest::Client,
23}
24
25impl MetricsPushClient {
26    pub fn new(network_key: iota_types::crypto::NetworkKeyPair) -> Self {
27        use fastcrypto::traits::KeyPair;
28        let certificate = std::sync::Arc::new(iota_tls::SelfSignedCertificate::new(
29            network_key.private(),
30            iota_tls::IOTA_VALIDATOR_SERVER_NAME,
31        ));
32        let identity = certificate.reqwest_identity();
33        let client = reqwest::Client::builder()
34            .identity(identity)
35            .build()
36            .unwrap();
37
38        Self {
39            certificate,
40            client,
41        }
42    }
43
44    pub fn certificate(&self) -> &iota_tls::SelfSignedCertificate {
45        &self.certificate
46    }
47
48    pub fn client(&self) -> &reqwest::Client {
49        &self.client
50    }
51}
52
53/// Starts a task to periodically push metrics to a configured endpoint if a
54/// metrics push endpoint is configured.
55pub fn start_metrics_push_task(config: &iota_config::NodeConfig, registry: RegistryService) {
56    use fastcrypto::traits::KeyPair;
57    use iota_config::node::MetricsConfig;
58
59    const DEFAULT_METRICS_PUSH_INTERVAL: Duration = Duration::from_secs(60);
60
61    let (interval, url) = match &config.metrics {
62        Some(MetricsConfig {
63            push_interval_seconds,
64            push_url: Some(url),
65        }) => {
66            let interval = push_interval_seconds
67                .map(Duration::from_secs)
68                .unwrap_or(DEFAULT_METRICS_PUSH_INTERVAL);
69            let url = reqwest::Url::parse(url).expect("unable to parse metrics push url");
70            (interval, url)
71        }
72        _ => return,
73    };
74
75    // make a copy so we can make a new client later when we hit errors posting
76    // metrics
77    let config_copy = config.clone();
78    let mut client = MetricsPushClient::new(config_copy.network_key_pair().copy());
79
80    async fn push_metrics(
81        client: &MetricsPushClient,
82        url: &reqwest::Url,
83        registry: &RegistryService,
84    ) -> Result<(), anyhow::Error> {
85        // now represents a collection timestamp for all of the metrics we send to the
86        // proxy
87        let now = SystemTime::now()
88            .duration_since(UNIX_EPOCH)
89            .unwrap()
90            .as_millis() as i64;
91
92        let mut metric_families = registry.gather_all();
93        for mf in metric_families.iter_mut() {
94            for m in mf.mut_metric() {
95                m.set_timestamp_ms(now);
96            }
97        }
98
99        let mut buf: Vec<u8> = vec![];
100        let encoder = prometheus::ProtobufEncoder::new();
101        encoder.encode(&metric_families, &mut buf)?;
102
103        let mut s = snap::raw::Encoder::new();
104        let compressed = s.compress_vec(&buf).map_err(|err| {
105            error!("unable to snappy encode; {err}");
106            err
107        })?;
108
109        let response = client
110            .client()
111            .post(url.to_owned())
112            .header(reqwest::header::CONTENT_ENCODING, "snappy")
113            .header(header::CONTENT_TYPE, PROTOBUF_FORMAT)
114            .body(compressed)
115            .timeout(METRICS_PUSH_TIMEOUT)
116            .send()
117            .await?;
118
119        if !response.status().is_success() {
120            let status = response.status();
121            let body = match response.text().await {
122                Ok(body) => body,
123                Err(error) => format!("couldn't decode response body; {error}"),
124            };
125            return Err(anyhow::anyhow!(
126                "metrics push failed: [{}]:{}",
127                status,
128                body
129            ));
130        }
131
132        tracing::debug!("successfully pushed metrics to {url}");
133
134        Ok(())
135    }
136
137    tokio::spawn(async move {
138        tracing::info!(push_url =% url, interval =? interval, "Started Metrics Push Service");
139
140        let mut interval = tokio::time::interval(interval);
141        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
142
143        let mut errors = 0;
144        loop {
145            interval.tick().await;
146
147            if let Err(error) = push_metrics(&client, &url, &registry).await {
148                errors += 1;
149                if errors >= 10 {
150                    // If we hit 10 failures in a row, start logging errors.
151                    tracing::error!("unable to push metrics: {error}; new client will be created");
152                } else {
153                    tracing::warn!("unable to push metrics: {error}; new client will be created");
154                }
155                // aggressively recreate our client connection if we hit an error
156                client = MetricsPushClient::new(config_copy.network_key_pair().copy());
157            } else {
158                errors = 0;
159            }
160        }
161    });
162}
163
164pub struct IotaNodeMetrics {
165    pub jwk_requests: IntCounterVec,
166    pub jwk_request_errors: IntCounterVec,
167
168    pub total_jwks: IntCounterVec,
169    pub invalid_jwks: IntCounterVec,
170    pub unique_jwks: IntCounterVec,
171}
172
173impl IotaNodeMetrics {
174    pub fn new(registry: &Registry) -> Self {
175        Self {
176            jwk_requests: register_int_counter_vec_with_registry!(
177                "jwk_requests",
178                "Total number of JWK requests",
179                &["provider"],
180                registry,
181            )
182            .unwrap(),
183            jwk_request_errors: register_int_counter_vec_with_registry!(
184                "jwk_request_errors",
185                "Total number of JWK request errors",
186                &["provider"],
187                registry,
188            )
189            .unwrap(),
190            total_jwks: register_int_counter_vec_with_registry!(
191                "total_jwks",
192                "Total number of JWKs",
193                &["provider"],
194                registry,
195            )
196            .unwrap(),
197            invalid_jwks: register_int_counter_vec_with_registry!(
198                "invalid_jwks",
199                "Total number of invalid JWKs",
200                &["provider"],
201                registry,
202            )
203            .unwrap(),
204            unique_jwks: register_int_counter_vec_with_registry!(
205                "unique_jwks",
206                "Total number of unique JWKs",
207                &["provider"],
208                registry,
209            )
210            .unwrap(),
211        }
212    }
213}
214
215#[derive(Clone)]
216pub struct GrpcMetrics {
217    inflight_grpc: IntGaugeVec,
218    grpc_requests: IntCounterVec,
219    grpc_request_latency: HistogramVec,
220}
221
222const LATENCY_SEC_BUCKETS: &[f64] = &[
223    0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
224];
225
226impl GrpcMetrics {
227    pub fn new(registry: &Registry) -> Self {
228        Self {
229            inflight_grpc: register_int_gauge_vec_with_registry!(
230                "inflight_grpc",
231                "Total in-flight GRPC requests per route",
232                &["path"],
233                registry,
234            )
235            .unwrap(),
236            grpc_requests: register_int_counter_vec_with_registry!(
237                "grpc_requests",
238                "Total GRPC requests per route",
239                &["path", "status"],
240                registry,
241            )
242            .unwrap(),
243            grpc_request_latency: register_histogram_vec_with_registry!(
244                "grpc_request_latency",
245                "Latency of GRPC requests per route",
246                &["path"],
247                LATENCY_SEC_BUCKETS.to_vec(),
248                registry,
249            )
250            .unwrap(),
251        }
252    }
253}
254
255impl MetricsCallbackProvider for GrpcMetrics {
256    fn on_request(&self, _path: String) {}
257
258    fn on_response(&self, path: String, latency: Duration, _status: u16, grpc_status_code: Code) {
259        self.grpc_requests
260            .with_label_values(&[path.as_str(), format!("{grpc_status_code:?}").as_str()])
261            .inc();
262        self.grpc_request_latency
263            .with_label_values(&[path.as_str()])
264            .observe(latency.as_secs_f64());
265    }
266
267    fn on_start(&self, path: &str) {
268        self.inflight_grpc.with_label_values(&[path]).inc();
269    }
270
271    fn on_drop(&self, path: &str) {
272        self.inflight_grpc.with_label_values(&[path]).dec();
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
279
280    use iota_metrics::start_prometheus_server;
281    use prometheus::{IntCounter, Registry};
282
283    #[tokio::test]
284    pub async fn test_metrics_endpoint_with_multiple_registries_add_remove() {
285        let port: u16 = 8081;
286        let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
287
288        let registry_service = start_prometheus_server(socket);
289
290        tokio::task::yield_now().await;
291
292        // now add a few registries to the service along side with metrics
293        let registry_1 = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
294        let counter_1 = IntCounter::new("counter_1", "a sample counter 1").unwrap();
295        registry_1.register(Box::new(counter_1)).unwrap();
296
297        let registry_2 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
298        let counter_2 = IntCounter::new("counter_2", "a sample counter 2").unwrap();
299        registry_2.register(Box::new(counter_2.clone())).unwrap();
300
301        let registry_1_id = registry_service.add(registry_1);
302        let _registry_2_id = registry_service.add(registry_2);
303
304        // request the endpoint
305        let result = get_metrics(port).await;
306
307        assert!(result.contains(
308            "# HELP iota_counter_2 a sample counter 2
309# TYPE iota_counter_2 counter
310iota_counter_2 0"
311        ));
312
313        assert!(result.contains(
314            "# HELP consensus_counter_1 a sample counter 1
315# TYPE consensus_counter_1 counter
316consensus_counter_1 0"
317        ));
318
319        // Now remove registry 1
320        assert!(registry_service.remove(registry_1_id));
321
322        // AND increase metric 2
323        counter_2.inc();
324
325        // Now pull again metrics
326        // request the endpoint
327        let result = get_metrics(port).await;
328
329        // Registry 1 metrics should not be present anymore
330        assert!(!result.contains(
331            "# HELP consensus_counter_1 a sample counter 1
332# TYPE consensus_counter_1 counter
333consensus_counter_1 0"
334        ));
335
336        // Registry 2 metric should have increased by 1
337        assert!(result.contains(
338            "# HELP iota_counter_2 a sample counter 2
339# TYPE iota_counter_2 counter
340iota_counter_2 1"
341        ));
342    }
343
344    async fn get_metrics(port: u16) -> String {
345        let client = reqwest::Client::new();
346        let response = client
347            .get(format!("http://127.0.0.1:{}/metrics", port))
348            .send()
349            .await
350            .unwrap();
351        response.text().await.unwrap()
352    }
353}