Skip to main content

iota_node/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashSet, time::Duration};
6
7use iota_grpc_server::metrics::{LATENCY_SEC_BUCKETS, SPAM_LABEL, grpc_code_to_str};
8use iota_network::{api::VALIDATOR_METHOD_PATHS, tonic::Code};
9use iota_network_stack::metrics::MetricsCallbackProvider;
10use prometheus_filtered::{
11    HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
12    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
13    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
14};
15
16pub struct IotaNodeMetrics {
17    pub current_protocol_version: IntGauge,
18    pub binary_max_protocol_version: IntGauge,
19    pub configured_max_protocol_version: IntGauge,
20}
21
22impl IotaNodeMetrics {
23    pub fn new(registry: &Registry) -> Self {
24        Self {
25            current_protocol_version: register_int_gauge_with_registry!(
26                "iota_current_protocol_version",
27                "Current protocol version in this epoch",
28                registry,
29            )
30            .unwrap(),
31            binary_max_protocol_version: register_int_gauge_with_registry!(
32                "iota_binary_max_protocol_version",
33                "Max protocol version supported by this binary",
34                registry,
35            )
36            .unwrap(),
37            configured_max_protocol_version: register_int_gauge_with_registry!(
38                "iota_configured_max_protocol_version",
39                "Max protocol version configured in the node config",
40                registry,
41            )
42            .unwrap(),
43        }
44    }
45}
46
47#[derive(Clone)]
48pub struct GrpcMetrics {
49    inflight_requests: IntGaugeVec,
50    num_requests: IntCounterVec,
51    /// Counts gRPC requests that failed at the transport/middleware level
52    /// (e.g. service panic, connection drop, timeout). gRPC application
53    /// errors are NOT counted here — they are already in `num_requests`.
54    num_errors: IntCounterVec,
55    request_latency: HistogramVec,
56    /// Known gRPC method paths. Paths not in this set are labelled as `"SPAM"`
57    /// to prevent unbounded metric cardinality from arbitrary HTTP traffic.
58    known_methods: HashSet<&'static str>,
59}
60
61impl GrpcMetrics {
62    pub fn new(registry: &Registry) -> Self {
63        Self {
64            inflight_requests: register_int_gauge_vec_with_registry!(
65                "authority_grpc_inflight_requests",
66                "Total in-flight authority gRPC requests per method",
67                &["method"],
68                registry,
69            )
70            .unwrap(),
71            num_requests: register_int_counter_vec_with_registry!(
72                "authority_grpc_requests",
73                "Total authority gRPC requests per method and status code",
74                &["method", "status"],
75                registry,
76            )
77            .unwrap(),
78            num_errors: register_int_counter_vec_with_registry!(
79                "authority_grpc_errors",
80                "Total authority gRPC transport/middleware failures by status code (service panics, connection drops, timeouts)",
81                &["status"],
82                registry,
83            )
84            .unwrap(),
85            request_latency: register_histogram_vec_with_registry!(
86                "authority_grpc_request_latency",
87                "Latency of authority gRPC requests per method in seconds",
88                &["method"],
89                LATENCY_SEC_BUCKETS.to_vec(),
90                registry,
91            )
92            .unwrap(),
93            known_methods: VALIDATOR_METHOD_PATHS.iter().copied().collect(),
94        }
95    }
96
97    /// Returns the path if it is a known gRPC method, or `"SPAM"` otherwise.
98    fn sanitize_path<'a>(&self, path: &'a str) -> &'a str {
99        if self.known_methods.contains(path) {
100            path
101        } else {
102            SPAM_LABEL
103        }
104    }
105}
106
107impl MetricsCallbackProvider for GrpcMetrics {
108    fn on_request(&self, _path: String) {}
109
110    fn on_response(&self, path: String, latency: Duration, _status: u16, grpc_status_code: Code) {
111        let method = self.sanitize_path(&path);
112        self.num_requests
113            .with_label_values(&[method, grpc_code_to_str(grpc_status_code)])
114            .inc();
115        self.request_latency
116            .with_label_values(&[method])
117            .observe(latency.as_secs_f64());
118    }
119
120    fn on_error(&self, _latency: Duration, grpc_status_code: Code) {
121        self.num_errors
122            .with_label_values(&[grpc_code_to_str(grpc_status_code)])
123            .inc();
124    }
125
126    fn on_start(&self, path: &str) {
127        let method = self.sanitize_path(path);
128        self.inflight_requests.with_label_values(&[method]).inc();
129    }
130
131    fn on_drop(&self, path: &str) {
132        let method = self.sanitize_path(path);
133        self.inflight_requests.with_label_values(&[method]).dec();
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
140
141    use iota_metrics::start_prometheus_server;
142    use prometheus_filtered::{IntCounter, Registry};
143
144    #[tokio::test]
145    pub async fn test_metrics_endpoint_with_multiple_registries_add_remove() {
146        let port: u16 = 8081;
147        let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
148
149        let registry_service = start_prometheus_server(socket);
150
151        tokio::task::yield_now().await;
152
153        // now add a few registries to the service along side with metrics
154        let registry_1 = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
155        let counter_1 = IntCounter::new("counter_1", "a sample counter 1").unwrap();
156        registry_1.register(Box::new(counter_1)).unwrap();
157
158        let registry_2 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
159        let counter_2 = IntCounter::new("counter_2", "a sample counter 2").unwrap();
160        registry_2.register(Box::new(counter_2.clone())).unwrap();
161
162        let registry_1_id = registry_service.add(registry_1);
163        let _registry_2_id = registry_service.add(registry_2);
164
165        // request the endpoint
166        let result = get_metrics(port).await;
167
168        assert!(result.contains(
169            "# HELP iota_counter_2 a sample counter 2
170# TYPE iota_counter_2 counter
171iota_counter_2 0"
172        ));
173
174        assert!(result.contains(
175            "# HELP consensus_counter_1 a sample counter 1
176# TYPE consensus_counter_1 counter
177consensus_counter_1 0"
178        ));
179
180        // Now remove registry 1
181        assert!(registry_service.remove(registry_1_id));
182
183        // AND increase metric 2
184        counter_2.inc();
185
186        // Now pull again metrics
187        // request the endpoint
188        let result = get_metrics(port).await;
189
190        // Registry 1 metrics should not be present anymore
191        assert!(!result.contains(
192            "# HELP consensus_counter_1 a sample counter 1
193# TYPE consensus_counter_1 counter
194consensus_counter_1 0"
195        ));
196
197        // Registry 2 metric should have increased by 1
198        assert!(result.contains(
199            "# HELP iota_counter_2 a sample counter 2
200# TYPE iota_counter_2 counter
201iota_counter_2 1"
202        ));
203    }
204
205    async fn get_metrics(port: u16) -> String {
206        let client = reqwest::Client::new();
207        let response = client
208            .get(format!("http://127.0.0.1:{port}/metrics"))
209            .send()
210            .await
211            .unwrap();
212        response.text().await.unwrap()
213    }
214}