1use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use axum::http::header;
8use iota_metrics::RegistryService;
9use iota_network::tonic::Code;
10use iota_network_stack::metrics::MetricsCallbackProvider;
11use prometheus::{
12 Encoder, HistogramVec, IntCounterVec, IntGaugeVec, PROTOBUF_FORMAT, Registry,
13 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
14 register_int_gauge_vec_with_registry,
15};
16use tracing::error;
17
18const METRICS_PUSH_TIMEOUT: Duration = Duration::from_secs(45);
19
20pub struct MetricsPushClient {
21 certificate: std::sync::Arc<iota_tls::SelfSignedCertificate>,
22 client: reqwest::Client,
23}
24
25impl MetricsPushClient {
26 pub fn new(network_key: iota_types::crypto::NetworkKeyPair) -> Self {
27 use fastcrypto::traits::KeyPair;
28 let certificate = std::sync::Arc::new(iota_tls::SelfSignedCertificate::new(
29 network_key.private(),
30 iota_tls::IOTA_VALIDATOR_SERVER_NAME,
31 ));
32 let identity = certificate.reqwest_identity();
33 let client = reqwest::Client::builder()
34 .identity(identity)
35 .build()
36 .unwrap();
37
38 Self {
39 certificate,
40 client,
41 }
42 }
43
44 pub fn certificate(&self) -> &iota_tls::SelfSignedCertificate {
45 &self.certificate
46 }
47
48 pub fn client(&self) -> &reqwest::Client {
49 &self.client
50 }
51}
52
53pub fn start_metrics_push_task(config: &iota_config::NodeConfig, registry: RegistryService) {
56 use fastcrypto::traits::KeyPair;
57 use iota_config::node::MetricsConfig;
58
59 const DEFAULT_METRICS_PUSH_INTERVAL: Duration = Duration::from_secs(60);
60
61 let (interval, url) = match &config.metrics {
62 Some(MetricsConfig {
63 push_interval_seconds,
64 push_url: Some(url),
65 }) => {
66 let interval = push_interval_seconds
67 .map(Duration::from_secs)
68 .unwrap_or(DEFAULT_METRICS_PUSH_INTERVAL);
69 let url = reqwest::Url::parse(url).expect("unable to parse metrics push url");
70 (interval, url)
71 }
72 _ => return,
73 };
74
75 let config_copy = config.clone();
78 let mut client = MetricsPushClient::new(config_copy.network_key_pair().copy());
79
80 async fn push_metrics(
81 client: &MetricsPushClient,
82 url: &reqwest::Url,
83 registry: &RegistryService,
84 ) -> Result<(), anyhow::Error> {
85 let now = SystemTime::now()
88 .duration_since(UNIX_EPOCH)
89 .unwrap()
90 .as_millis() as i64;
91
92 let mut metric_families = registry.gather_all();
93 for mf in metric_families.iter_mut() {
94 for m in mf.mut_metric() {
95 m.set_timestamp_ms(now);
96 }
97 }
98
99 let mut buf: Vec<u8> = vec![];
100 let encoder = prometheus::ProtobufEncoder::new();
101 encoder.encode(&metric_families, &mut buf)?;
102
103 let mut s = snap::raw::Encoder::new();
104 let compressed = s.compress_vec(&buf).map_err(|err| {
105 error!("unable to snappy encode; {err}");
106 err
107 })?;
108
109 let response = client
110 .client()
111 .post(url.to_owned())
112 .header(reqwest::header::CONTENT_ENCODING, "snappy")
113 .header(header::CONTENT_TYPE, PROTOBUF_FORMAT)
114 .body(compressed)
115 .timeout(METRICS_PUSH_TIMEOUT)
116 .send()
117 .await?;
118
119 if !response.status().is_success() {
120 let status = response.status();
121 let body = match response.text().await {
122 Ok(body) => body,
123 Err(error) => format!("couldn't decode response body; {error}"),
124 };
125 return Err(anyhow::anyhow!(
126 "metrics push failed: [{}]:{}",
127 status,
128 body
129 ));
130 }
131
132 tracing::debug!("successfully pushed metrics to {url}");
133
134 Ok(())
135 }
136
137 tokio::spawn(async move {
138 tracing::info!(push_url =% url, interval =? interval, "Started Metrics Push Service");
139
140 let mut interval = tokio::time::interval(interval);
141 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
142
143 let mut errors = 0;
144 loop {
145 interval.tick().await;
146
147 if let Err(error) = push_metrics(&client, &url, ®istry).await {
148 errors += 1;
149 if errors >= 10 {
150 tracing::error!("unable to push metrics: {error}; new client will be created");
152 } else {
153 tracing::warn!("unable to push metrics: {error}; new client will be created");
154 }
155 client = MetricsPushClient::new(config_copy.network_key_pair().copy());
157 } else {
158 errors = 0;
159 }
160 }
161 });
162}
163
164pub struct IotaNodeMetrics {
165 pub jwk_requests: IntCounterVec,
166 pub jwk_request_errors: IntCounterVec,
167
168 pub total_jwks: IntCounterVec,
169 pub invalid_jwks: IntCounterVec,
170 pub unique_jwks: IntCounterVec,
171}
172
173impl IotaNodeMetrics {
174 pub fn new(registry: &Registry) -> Self {
175 Self {
176 jwk_requests: register_int_counter_vec_with_registry!(
177 "jwk_requests",
178 "Total number of JWK requests",
179 &["provider"],
180 registry,
181 )
182 .unwrap(),
183 jwk_request_errors: register_int_counter_vec_with_registry!(
184 "jwk_request_errors",
185 "Total number of JWK request errors",
186 &["provider"],
187 registry,
188 )
189 .unwrap(),
190 total_jwks: register_int_counter_vec_with_registry!(
191 "total_jwks",
192 "Total number of JWKs",
193 &["provider"],
194 registry,
195 )
196 .unwrap(),
197 invalid_jwks: register_int_counter_vec_with_registry!(
198 "invalid_jwks",
199 "Total number of invalid JWKs",
200 &["provider"],
201 registry,
202 )
203 .unwrap(),
204 unique_jwks: register_int_counter_vec_with_registry!(
205 "unique_jwks",
206 "Total number of unique JWKs",
207 &["provider"],
208 registry,
209 )
210 .unwrap(),
211 }
212 }
213}
214
215#[derive(Clone)]
216pub struct GrpcMetrics {
217 inflight_grpc: IntGaugeVec,
218 grpc_requests: IntCounterVec,
219 grpc_request_latency: HistogramVec,
220}
221
222const LATENCY_SEC_BUCKETS: &[f64] = &[
223 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
224];
225
226impl GrpcMetrics {
227 pub fn new(registry: &Registry) -> Self {
228 Self {
229 inflight_grpc: register_int_gauge_vec_with_registry!(
230 "inflight_grpc",
231 "Total in-flight GRPC requests per route",
232 &["path"],
233 registry,
234 )
235 .unwrap(),
236 grpc_requests: register_int_counter_vec_with_registry!(
237 "grpc_requests",
238 "Total GRPC requests per route",
239 &["path", "status"],
240 registry,
241 )
242 .unwrap(),
243 grpc_request_latency: register_histogram_vec_with_registry!(
244 "grpc_request_latency",
245 "Latency of GRPC requests per route",
246 &["path"],
247 LATENCY_SEC_BUCKETS.to_vec(),
248 registry,
249 )
250 .unwrap(),
251 }
252 }
253}
254
255impl MetricsCallbackProvider for GrpcMetrics {
256 fn on_request(&self, _path: String) {}
257
258 fn on_response(&self, path: String, latency: Duration, _status: u16, grpc_status_code: Code) {
259 self.grpc_requests
260 .with_label_values(&[path.as_str(), format!("{grpc_status_code:?}").as_str()])
261 .inc();
262 self.grpc_request_latency
263 .with_label_values(&[path.as_str()])
264 .observe(latency.as_secs_f64());
265 }
266
267 fn on_start(&self, path: &str) {
268 self.inflight_grpc.with_label_values(&[path]).inc();
269 }
270
271 fn on_drop(&self, path: &str) {
272 self.inflight_grpc.with_label_values(&[path]).dec();
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
279
280 use iota_metrics::start_prometheus_server;
281 use prometheus::{IntCounter, Registry};
282
283 #[tokio::test]
284 pub async fn test_metrics_endpoint_with_multiple_registries_add_remove() {
285 let port: u16 = 8081;
286 let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
287
288 let registry_service = start_prometheus_server(socket);
289
290 tokio::task::yield_now().await;
291
292 let registry_1 = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
294 let counter_1 = IntCounter::new("counter_1", "a sample counter 1").unwrap();
295 registry_1.register(Box::new(counter_1)).unwrap();
296
297 let registry_2 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
298 let counter_2 = IntCounter::new("counter_2", "a sample counter 2").unwrap();
299 registry_2.register(Box::new(counter_2.clone())).unwrap();
300
301 let registry_1_id = registry_service.add(registry_1);
302 let _registry_2_id = registry_service.add(registry_2);
303
304 let result = get_metrics(port).await;
306
307 assert!(result.contains(
308 "# HELP iota_counter_2 a sample counter 2
309# TYPE iota_counter_2 counter
310iota_counter_2 0"
311 ));
312
313 assert!(result.contains(
314 "# HELP consensus_counter_1 a sample counter 1
315# TYPE consensus_counter_1 counter
316consensus_counter_1 0"
317 ));
318
319 assert!(registry_service.remove(registry_1_id));
321
322 counter_2.inc();
324
325 let result = get_metrics(port).await;
328
329 assert!(!result.contains(
331 "# HELP consensus_counter_1 a sample counter 1
332# TYPE consensus_counter_1 counter
333consensus_counter_1 0"
334 ));
335
336 assert!(result.contains(
338 "# HELP iota_counter_2 a sample counter 2
339# TYPE iota_counter_2 counter
340iota_counter_2 1"
341 ));
342 }
343
344 async fn get_metrics(port: u16) -> String {
345 let client = reqwest::Client::new();
346 let response = client
347 .get(format!("http://127.0.0.1:{}/metrics", port))
348 .send()
349 .await
350 .unwrap();
351 response.text().await.unwrap()
352 }
353}