1use std::time::Duration;
6
7use iota_common::metrics::{MetricsPushClient, push_metrics};
8use iota_metrics::RegistryService;
9use iota_network::tonic::Code;
10use iota_network_stack::metrics::MetricsCallbackProvider;
11use prometheus::{
12 HistogramVec, IntCounterVec, IntGaugeVec, Registry, register_histogram_vec_with_registry,
13 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
14};
15
16pub fn start_metrics_push_task(config: &iota_config::NodeConfig, registry: RegistryService) {
19 use fastcrypto::traits::KeyPair;
20 use iota_config::node::MetricsConfig;
21
22 const DEFAULT_METRICS_PUSH_INTERVAL: Duration = Duration::from_secs(60);
23
24 let (interval, url) = match &config.metrics {
25 Some(MetricsConfig {
26 push_interval_seconds,
27 push_url: Some(url),
28 }) => {
29 let interval = push_interval_seconds
30 .map(Duration::from_secs)
31 .unwrap_or(DEFAULT_METRICS_PUSH_INTERVAL);
32 let url = reqwest::Url::parse(url).expect("unable to parse metrics push url");
33 (interval, url)
34 }
35 _ => return,
36 };
37
38 let config_copy = config.clone();
41 let mut client = MetricsPushClient::new(config_copy.network_key_pair().copy());
42
43 tokio::spawn(async move {
44 tracing::info!(push_url =% url, interval =? interval, "Started Metrics Push Service");
45
46 let mut interval = tokio::time::interval(interval);
47 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
48
49 let mut errors = 0;
50 loop {
51 interval.tick().await;
52
53 if let Err(error) = push_metrics(&client, &url, ®istry).await {
54 errors += 1;
55 if errors >= 10 {
56 tracing::error!("unable to push metrics: {error}; new client will be created");
58 } else {
59 tracing::warn!("unable to push metrics: {error}; new client will be created");
60 }
61 client = MetricsPushClient::new(config_copy.network_key_pair().copy());
63 } else {
64 errors = 0;
65 }
66 }
67 });
68}
69
70pub struct IotaNodeMetrics {
71 pub jwk_requests: IntCounterVec,
72 pub jwk_request_errors: IntCounterVec,
73
74 pub total_jwks: IntCounterVec,
75 pub invalid_jwks: IntCounterVec,
76 pub unique_jwks: IntCounterVec,
77}
78
79impl IotaNodeMetrics {
80 pub fn new(registry: &Registry) -> Self {
81 Self {
82 jwk_requests: register_int_counter_vec_with_registry!(
83 "jwk_requests",
84 "Total number of JWK requests",
85 &["provider"],
86 registry,
87 )
88 .unwrap(),
89 jwk_request_errors: register_int_counter_vec_with_registry!(
90 "jwk_request_errors",
91 "Total number of JWK request errors",
92 &["provider"],
93 registry,
94 )
95 .unwrap(),
96 total_jwks: register_int_counter_vec_with_registry!(
97 "total_jwks",
98 "Total number of JWKs",
99 &["provider"],
100 registry,
101 )
102 .unwrap(),
103 invalid_jwks: register_int_counter_vec_with_registry!(
104 "invalid_jwks",
105 "Total number of invalid JWKs",
106 &["provider"],
107 registry,
108 )
109 .unwrap(),
110 unique_jwks: register_int_counter_vec_with_registry!(
111 "unique_jwks",
112 "Total number of unique JWKs",
113 &["provider"],
114 registry,
115 )
116 .unwrap(),
117 }
118 }
119}
120
121#[derive(Clone)]
122pub struct GrpcMetrics {
123 inflight_grpc: IntGaugeVec,
124 grpc_requests: IntCounterVec,
125 grpc_request_latency: HistogramVec,
126}
127
128const LATENCY_SEC_BUCKETS: &[f64] = &[
129 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
130];
131
132impl GrpcMetrics {
133 pub fn new(registry: &Registry) -> Self {
134 Self {
135 inflight_grpc: register_int_gauge_vec_with_registry!(
136 "inflight_grpc",
137 "Total in-flight GRPC requests per route",
138 &["path"],
139 registry,
140 )
141 .unwrap(),
142 grpc_requests: register_int_counter_vec_with_registry!(
143 "grpc_requests",
144 "Total GRPC requests per route",
145 &["path", "status"],
146 registry,
147 )
148 .unwrap(),
149 grpc_request_latency: register_histogram_vec_with_registry!(
150 "grpc_request_latency",
151 "Latency of GRPC requests per route",
152 &["path"],
153 LATENCY_SEC_BUCKETS.to_vec(),
154 registry,
155 )
156 .unwrap(),
157 }
158 }
159}
160
161impl MetricsCallbackProvider for GrpcMetrics {
162 fn on_request(&self, _path: String) {}
163
164 fn on_response(&self, path: String, latency: Duration, _status: u16, grpc_status_code: Code) {
165 self.grpc_requests
166 .with_label_values(&[path.as_str(), format!("{grpc_status_code:?}").as_str()])
167 .inc();
168 self.grpc_request_latency
169 .with_label_values(&[path.as_str()])
170 .observe(latency.as_secs_f64());
171 }
172
173 fn on_start(&self, path: &str) {
174 self.inflight_grpc.with_label_values(&[path]).inc();
175 }
176
177 fn on_drop(&self, path: &str) {
178 self.inflight_grpc.with_label_values(&[path]).dec();
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
185
186 use iota_metrics::start_prometheus_server;
187 use prometheus::{IntCounter, Registry};
188
189 #[tokio::test]
190 pub async fn test_metrics_endpoint_with_multiple_registries_add_remove() {
191 let port: u16 = 8081;
192 let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
193
194 let registry_service = start_prometheus_server(socket);
195
196 tokio::task::yield_now().await;
197
198 let registry_1 = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
200 let counter_1 = IntCounter::new("counter_1", "a sample counter 1").unwrap();
201 registry_1.register(Box::new(counter_1)).unwrap();
202
203 let registry_2 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
204 let counter_2 = IntCounter::new("counter_2", "a sample counter 2").unwrap();
205 registry_2.register(Box::new(counter_2.clone())).unwrap();
206
207 let registry_1_id = registry_service.add(registry_1);
208 let _registry_2_id = registry_service.add(registry_2);
209
210 let result = get_metrics(port).await;
212
213 assert!(result.contains(
214 "# HELP iota_counter_2 a sample counter 2
215# TYPE iota_counter_2 counter
216iota_counter_2 0"
217 ));
218
219 assert!(result.contains(
220 "# HELP consensus_counter_1 a sample counter 1
221# TYPE consensus_counter_1 counter
222consensus_counter_1 0"
223 ));
224
225 assert!(registry_service.remove(registry_1_id));
227
228 counter_2.inc();
230
231 let result = get_metrics(port).await;
234
235 assert!(!result.contains(
237 "# HELP consensus_counter_1 a sample counter 1
238# TYPE consensus_counter_1 counter
239consensus_counter_1 0"
240 ));
241
242 assert!(result.contains(
244 "# HELP iota_counter_2 a sample counter 2
245# TYPE iota_counter_2 counter
246iota_counter_2 1"
247 ));
248 }
249
250 async fn get_metrics(port: u16) -> String {
251 let client = reqwest::Client::new();
252 let response = client
253 .get(format!("http://127.0.0.1:{}/metrics", port))
254 .send()
255 .await
256 .unwrap();
257 response.text().await.unwrap()
258 }
259}