iota_node/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::time::Duration;
6
7use iota_common::metrics::{MetricsPushClient, push_metrics};
8use iota_metrics::RegistryService;
9use iota_network::tonic::Code;
10use iota_network_stack::metrics::MetricsCallbackProvider;
11use prometheus::{
12    HistogramVec, IntCounterVec, IntGaugeVec, Registry, register_histogram_vec_with_registry,
13    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
14};
15
16/// Starts a task to periodically push metrics to a configured endpoint if a
17/// metrics push endpoint is configured.
18pub fn start_metrics_push_task(config: &iota_config::NodeConfig, registry: RegistryService) {
19    use fastcrypto::traits::KeyPair;
20    use iota_config::node::MetricsConfig;
21
22    const DEFAULT_METRICS_PUSH_INTERVAL: Duration = Duration::from_secs(60);
23
24    let (interval, url) = match &config.metrics {
25        Some(MetricsConfig {
26            push_interval_seconds,
27            push_url: Some(url),
28        }) => {
29            let interval = push_interval_seconds
30                .map(Duration::from_secs)
31                .unwrap_or(DEFAULT_METRICS_PUSH_INTERVAL);
32            let url = reqwest::Url::parse(url).expect("unable to parse metrics push url");
33            (interval, url)
34        }
35        _ => return,
36    };
37
38    // make a copy so we can make a new client later when we hit errors posting
39    // metrics
40    let config_copy = config.clone();
41    let mut client = MetricsPushClient::new(config_copy.network_key_pair().copy());
42
43    tokio::spawn(async move {
44        tracing::info!(push_url =% url, interval =? interval, "Started Metrics Push Service");
45
46        let mut interval = tokio::time::interval(interval);
47        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
48
49        let mut errors = 0;
50        loop {
51            interval.tick().await;
52
53            if let Err(error) = push_metrics(&client, &url, &registry).await {
54                errors += 1;
55                if errors >= 10 {
56                    // If we hit 10 failures in a row, start logging errors.
57                    tracing::error!("unable to push metrics: {error}; new client will be created");
58                } else {
59                    tracing::warn!("unable to push metrics: {error}; new client will be created");
60                }
61                // aggressively recreate our client connection if we hit an error
62                client = MetricsPushClient::new(config_copy.network_key_pair().copy());
63            } else {
64                errors = 0;
65            }
66        }
67    });
68}
69
70pub struct IotaNodeMetrics {
71    pub jwk_requests: IntCounterVec,
72    pub jwk_request_errors: IntCounterVec,
73
74    pub total_jwks: IntCounterVec,
75    pub invalid_jwks: IntCounterVec,
76    pub unique_jwks: IntCounterVec,
77}
78
79impl IotaNodeMetrics {
80    pub fn new(registry: &Registry) -> Self {
81        Self {
82            jwk_requests: register_int_counter_vec_with_registry!(
83                "jwk_requests",
84                "Total number of JWK requests",
85                &["provider"],
86                registry,
87            )
88            .unwrap(),
89            jwk_request_errors: register_int_counter_vec_with_registry!(
90                "jwk_request_errors",
91                "Total number of JWK request errors",
92                &["provider"],
93                registry,
94            )
95            .unwrap(),
96            total_jwks: register_int_counter_vec_with_registry!(
97                "total_jwks",
98                "Total number of JWKs",
99                &["provider"],
100                registry,
101            )
102            .unwrap(),
103            invalid_jwks: register_int_counter_vec_with_registry!(
104                "invalid_jwks",
105                "Total number of invalid JWKs",
106                &["provider"],
107                registry,
108            )
109            .unwrap(),
110            unique_jwks: register_int_counter_vec_with_registry!(
111                "unique_jwks",
112                "Total number of unique JWKs",
113                &["provider"],
114                registry,
115            )
116            .unwrap(),
117        }
118    }
119}
120
121#[derive(Clone)]
122pub struct GrpcMetrics {
123    inflight_grpc: IntGaugeVec,
124    grpc_requests: IntCounterVec,
125    grpc_request_latency: HistogramVec,
126}
127
128const LATENCY_SEC_BUCKETS: &[f64] = &[
129    0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
130];
131
132impl GrpcMetrics {
133    pub fn new(registry: &Registry) -> Self {
134        Self {
135            inflight_grpc: register_int_gauge_vec_with_registry!(
136                "inflight_grpc",
137                "Total in-flight GRPC requests per route",
138                &["path"],
139                registry,
140            )
141            .unwrap(),
142            grpc_requests: register_int_counter_vec_with_registry!(
143                "grpc_requests",
144                "Total GRPC requests per route",
145                &["path", "status"],
146                registry,
147            )
148            .unwrap(),
149            grpc_request_latency: register_histogram_vec_with_registry!(
150                "grpc_request_latency",
151                "Latency of GRPC requests per route",
152                &["path"],
153                LATENCY_SEC_BUCKETS.to_vec(),
154                registry,
155            )
156            .unwrap(),
157        }
158    }
159}
160
161impl MetricsCallbackProvider for GrpcMetrics {
162    fn on_request(&self, _path: String) {}
163
164    fn on_response(&self, path: String, latency: Duration, _status: u16, grpc_status_code: Code) {
165        self.grpc_requests
166            .with_label_values(&[path.as_str(), format!("{grpc_status_code:?}").as_str()])
167            .inc();
168        self.grpc_request_latency
169            .with_label_values(&[path.as_str()])
170            .observe(latency.as_secs_f64());
171    }
172
173    fn on_start(&self, path: &str) {
174        self.inflight_grpc.with_label_values(&[path]).inc();
175    }
176
177    fn on_drop(&self, path: &str) {
178        self.inflight_grpc.with_label_values(&[path]).dec();
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
185
186    use iota_metrics::start_prometheus_server;
187    use prometheus::{IntCounter, Registry};
188
189    #[tokio::test]
190    pub async fn test_metrics_endpoint_with_multiple_registries_add_remove() {
191        let port: u16 = 8081;
192        let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
193
194        let registry_service = start_prometheus_server(socket);
195
196        tokio::task::yield_now().await;
197
198        // now add a few registries to the service along side with metrics
199        let registry_1 = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
200        let counter_1 = IntCounter::new("counter_1", "a sample counter 1").unwrap();
201        registry_1.register(Box::new(counter_1)).unwrap();
202
203        let registry_2 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
204        let counter_2 = IntCounter::new("counter_2", "a sample counter 2").unwrap();
205        registry_2.register(Box::new(counter_2.clone())).unwrap();
206
207        let registry_1_id = registry_service.add(registry_1);
208        let _registry_2_id = registry_service.add(registry_2);
209
210        // request the endpoint
211        let result = get_metrics(port).await;
212
213        assert!(result.contains(
214            "# HELP iota_counter_2 a sample counter 2
215# TYPE iota_counter_2 counter
216iota_counter_2 0"
217        ));
218
219        assert!(result.contains(
220            "# HELP consensus_counter_1 a sample counter 1
221# TYPE consensus_counter_1 counter
222consensus_counter_1 0"
223        ));
224
225        // Now remove registry 1
226        assert!(registry_service.remove(registry_1_id));
227
228        // AND increase metric 2
229        counter_2.inc();
230
231        // Now pull again metrics
232        // request the endpoint
233        let result = get_metrics(port).await;
234
235        // Registry 1 metrics should not be present anymore
236        assert!(!result.contains(
237            "# HELP consensus_counter_1 a sample counter 1
238# TYPE consensus_counter_1 counter
239consensus_counter_1 0"
240        ));
241
242        // Registry 2 metric should have increased by 1
243        assert!(result.contains(
244            "# HELP iota_counter_2 a sample counter 2
245# TYPE iota_counter_2 counter
246iota_counter_2 1"
247        ));
248    }
249
250    async fn get_metrics(port: u16) -> String {
251        let client = reqwest::Client::new();
252        let response = client
253            .get(format!("http://127.0.0.1:{}/metrics", port))
254            .send()
255            .await
256            .unwrap();
257        response.text().await.unwrap()
258    }
259}