1use std::{collections::HashSet, time::Duration};
6
7use iota_grpc_server::metrics::{LATENCY_SEC_BUCKETS, SPAM_LABEL, grpc_code_to_str};
8use iota_network::{api::VALIDATOR_METHOD_PATHS, tonic::Code};
9use iota_network_stack::metrics::MetricsCallbackProvider;
10use prometheus_filtered::{
11 HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
12 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
13 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
14};
15
16pub struct IotaNodeMetrics {
17 pub current_protocol_version: IntGauge,
18 pub binary_max_protocol_version: IntGauge,
19 pub configured_max_protocol_version: IntGauge,
20}
21
22impl IotaNodeMetrics {
23 pub fn new(registry: &Registry) -> Self {
24 Self {
25 current_protocol_version: register_int_gauge_with_registry!(
26 "iota_current_protocol_version",
27 "Current protocol version in this epoch",
28 registry,
29 )
30 .unwrap(),
31 binary_max_protocol_version: register_int_gauge_with_registry!(
32 "iota_binary_max_protocol_version",
33 "Max protocol version supported by this binary",
34 registry,
35 )
36 .unwrap(),
37 configured_max_protocol_version: register_int_gauge_with_registry!(
38 "iota_configured_max_protocol_version",
39 "Max protocol version configured in the node config",
40 registry,
41 )
42 .unwrap(),
43 }
44 }
45}
46
47#[derive(Clone)]
48pub struct GrpcMetrics {
49 inflight_requests: IntGaugeVec,
50 num_requests: IntCounterVec,
51 num_errors: IntCounterVec,
55 request_latency: HistogramVec,
56 known_methods: HashSet<&'static str>,
59}
60
61impl GrpcMetrics {
62 pub fn new(registry: &Registry) -> Self {
63 Self {
64 inflight_requests: register_int_gauge_vec_with_registry!(
65 "authority_grpc_inflight_requests",
66 "Total in-flight authority gRPC requests per method",
67 &["method"],
68 registry,
69 )
70 .unwrap(),
71 num_requests: register_int_counter_vec_with_registry!(
72 "authority_grpc_requests",
73 "Total authority gRPC requests per method and status code",
74 &["method", "status"],
75 registry,
76 )
77 .unwrap(),
78 num_errors: register_int_counter_vec_with_registry!(
79 "authority_grpc_errors",
80 "Total authority gRPC transport/middleware failures by status code (service panics, connection drops, timeouts)",
81 &["status"],
82 registry,
83 )
84 .unwrap(),
85 request_latency: register_histogram_vec_with_registry!(
86 "authority_grpc_request_latency",
87 "Latency of authority gRPC requests per method in seconds",
88 &["method"],
89 LATENCY_SEC_BUCKETS.to_vec(),
90 registry,
91 )
92 .unwrap(),
93 known_methods: VALIDATOR_METHOD_PATHS.iter().copied().collect(),
94 }
95 }
96
97 fn sanitize_path<'a>(&self, path: &'a str) -> &'a str {
99 if self.known_methods.contains(path) {
100 path
101 } else {
102 SPAM_LABEL
103 }
104 }
105}
106
107impl MetricsCallbackProvider for GrpcMetrics {
108 fn on_request(&self, _path: String) {}
109
110 fn on_response(&self, path: String, latency: Duration, _status: u16, grpc_status_code: Code) {
111 let method = self.sanitize_path(&path);
112 self.num_requests
113 .with_label_values(&[method, grpc_code_to_str(grpc_status_code)])
114 .inc();
115 self.request_latency
116 .with_label_values(&[method])
117 .observe(latency.as_secs_f64());
118 }
119
120 fn on_error(&self, _latency: Duration, grpc_status_code: Code) {
121 self.num_errors
122 .with_label_values(&[grpc_code_to_str(grpc_status_code)])
123 .inc();
124 }
125
126 fn on_start(&self, path: &str) {
127 let method = self.sanitize_path(path);
128 self.inflight_requests.with_label_values(&[method]).inc();
129 }
130
131 fn on_drop(&self, path: &str) {
132 let method = self.sanitize_path(path);
133 self.inflight_requests.with_label_values(&[method]).dec();
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
140
141 use iota_metrics::start_prometheus_server;
142 use prometheus_filtered::{IntCounter, Registry};
143
144 #[tokio::test]
145 pub async fn test_metrics_endpoint_with_multiple_registries_add_remove() {
146 let port: u16 = 8081;
147 let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
148
149 let registry_service = start_prometheus_server(socket);
150
151 tokio::task::yield_now().await;
152
153 let registry_1 = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
155 let counter_1 = IntCounter::new("counter_1", "a sample counter 1").unwrap();
156 registry_1.register(Box::new(counter_1)).unwrap();
157
158 let registry_2 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
159 let counter_2 = IntCounter::new("counter_2", "a sample counter 2").unwrap();
160 registry_2.register(Box::new(counter_2.clone())).unwrap();
161
162 let registry_1_id = registry_service.add(registry_1);
163 let _registry_2_id = registry_service.add(registry_2);
164
165 let result = get_metrics(port).await;
167
168 assert!(result.contains(
169 "# HELP iota_counter_2 a sample counter 2
170# TYPE iota_counter_2 counter
171iota_counter_2 0"
172 ));
173
174 assert!(result.contains(
175 "# HELP consensus_counter_1 a sample counter 1
176# TYPE consensus_counter_1 counter
177consensus_counter_1 0"
178 ));
179
180 assert!(registry_service.remove(registry_1_id));
182
183 counter_2.inc();
185
186 let result = get_metrics(port).await;
189
190 assert!(!result.contains(
192 "# HELP consensus_counter_1 a sample counter 1
193# TYPE consensus_counter_1 counter
194consensus_counter_1 0"
195 ));
196
197 assert!(result.contains(
199 "# HELP iota_counter_2 a sample counter 2
200# TYPE iota_counter_2 counter
201iota_counter_2 1"
202 ));
203 }
204
205 async fn get_metrics(port: u16) -> String {
206 let client = reqwest::Client::new();
207 let response = client
208 .get(format!("http://127.0.0.1:{port}/metrics"))
209 .send()
210 .await
211 .unwrap();
212 response.text().await.unwrap()
213 }
214}