iota_metrics/
metrics_network.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use anemo_tower::callback::{MakeCallbackHandler, ResponseHandler};
8use prometheus::{
9    HistogramTimer, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
10    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
11    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
12};
13use tracing::warn;
14
15#[derive(Clone)]
16pub struct NetworkConnectionMetrics {
17    /// The connection status of known peers. 0 if not connected, 1 if
18    /// connected.
19    pub network_peer_connected: IntGaugeVec,
20    /// The number of connected peers
21    pub network_peers: IntGauge,
22    /// Number of disconnect events per peer.
23    pub network_peer_disconnects: IntCounterVec,
24    /// Receive buffer size of Anemo socket.
25    pub socket_receive_buffer_size: IntGauge,
26    /// Send buffer size of Anemo socket.
27    pub socket_send_buffer_size: IntGauge,
28
29    /// PathStats
30    /// The rtt for a peer connection in ms.
31    pub network_peer_rtt: IntGaugeVec,
32    /// The total number of lost packets for a peer connection.
33    pub network_peer_lost_packets: IntGaugeVec,
34    /// The total number of lost bytes for a peer connection.
35    pub network_peer_lost_bytes: IntGaugeVec,
36    /// The total number of packets sent for a peer connection.
37    pub network_peer_sent_packets: IntGaugeVec,
38    /// The total number of congestion events for a peer connection.
39    pub network_peer_congestion_events: IntGaugeVec,
40    /// The congestion window for a peer connection.
41    pub network_peer_congestion_window: IntGaugeVec,
42
43    /// FrameStats
44    /// The number of max data frames for a peer connection.
45    pub network_peer_max_data: IntGaugeVec,
46    /// The number of closed connections frames for a peer connection.
47    pub network_peer_closed_connections: IntGaugeVec,
48    /// The number of data blocked frames for a peer connection.
49    pub network_peer_data_blocked: IntGaugeVec,
50
51    /// UDPStats
52    /// The total number datagrams observed by the UDP peer connection.
53    pub network_peer_udp_datagrams: IntGaugeVec,
54    /// The total number bytes observed by the UDP peer connection.
55    pub network_peer_udp_bytes: IntGaugeVec,
56    /// The total number transmits observed by the UDP peer connection.
57    pub network_peer_udp_transmits: IntGaugeVec,
58}
59
60impl NetworkConnectionMetrics {
61    pub fn new(node: &'static str, registry: &Registry) -> Self {
62        Self {
63            network_peer_connected: register_int_gauge_vec_with_registry!(
64                format!("{node}_network_peer_connected"),
65                "The connection status of a peer. 0 if not connected, 1 if connected",
66                &["peer_id", "type"],
67                registry
68            )
69            .unwrap(),
70            network_peers: register_int_gauge_with_registry!(
71                format!("{node}_network_peers"),
72                "The number of connected peers.",
73                registry
74            )
75            .unwrap(),
76            network_peer_disconnects: register_int_counter_vec_with_registry!(
77                format!("{node}_network_peer_disconnects"),
78                "Number of disconnect events per peer.",
79                &["peer_id", "reason"],
80                registry
81            )
82            .unwrap(),
83            socket_receive_buffer_size: register_int_gauge_with_registry!(
84                format!("{node}_socket_receive_buffer_size"),
85                "Receive buffer size of Anemo socket.",
86                registry
87            )
88            .unwrap(),
89            socket_send_buffer_size: register_int_gauge_with_registry!(
90                format!("{node}_socket_send_buffer_size"),
91                "Send buffer size of Anemo socket.",
92                registry
93            )
94            .unwrap(),
95
96            // PathStats
97            network_peer_rtt: register_int_gauge_vec_with_registry!(
98                format!("{node}_network_peer_rtt"),
99                "The rtt for a peer connection in ms.",
100                &["peer_id"],
101                registry
102            )
103            .unwrap(),
104            network_peer_lost_packets: register_int_gauge_vec_with_registry!(
105                format!("{node}_network_peer_lost_packets"),
106                "The total number of lost packets for a peer connection.",
107                &["peer_id"],
108                registry
109            )
110            .unwrap(),
111            network_peer_lost_bytes: register_int_gauge_vec_with_registry!(
112                format!("{node}_network_peer_lost_bytes"),
113                "The total number of lost bytes for a peer connection.",
114                &["peer_id"],
115                registry
116            )
117            .unwrap(),
118            network_peer_sent_packets: register_int_gauge_vec_with_registry!(
119                format!("{node}_network_peer_sent_packets"),
120                "The total number of sent packets for a peer connection.",
121                &["peer_id"],
122                registry
123            )
124            .unwrap(),
125            network_peer_congestion_events: register_int_gauge_vec_with_registry!(
126                format!("{node}_network_peer_congestion_events"),
127                "The total number of congestion events for a peer connection.",
128                &["peer_id"],
129                registry
130            )
131            .unwrap(),
132            network_peer_congestion_window: register_int_gauge_vec_with_registry!(
133                format!("{node}_network_peer_congestion_window"),
134                "The congestion window for a peer connection.",
135                &["peer_id"],
136                registry
137            )
138            .unwrap(),
139
140            // FrameStats
141            network_peer_closed_connections: register_int_gauge_vec_with_registry!(
142                format!("{node}_network_peer_closed_connections"),
143                "The number of closed connections for a peer connection.",
144                &["peer_id", "direction"],
145                registry
146            )
147            .unwrap(),
148            network_peer_max_data: register_int_gauge_vec_with_registry!(
149                format!("{node}_network_peer_max_data"),
150                "The number of max data frames for a peer connection.",
151                &["peer_id", "direction"],
152                registry
153            )
154            .unwrap(),
155            network_peer_data_blocked: register_int_gauge_vec_with_registry!(
156                format!("{node}_network_peer_data_blocked"),
157                "The number of data blocked frames for a peer connection.",
158                &["peer_id", "direction"],
159                registry
160            )
161            .unwrap(),
162
163            // UDPStats
164            network_peer_udp_datagrams: register_int_gauge_vec_with_registry!(
165                format!("{node}_network_peer_udp_datagrams"),
166                "The total number datagrams observed by the UDP peer connection.",
167                &["peer_id", "direction"],
168                registry
169            )
170            .unwrap(),
171            network_peer_udp_bytes: register_int_gauge_vec_with_registry!(
172                format!("{node}_network_peer_udp_bytes"),
173                "The total number bytes observed by the UDP peer connection.",
174                &["peer_id", "direction"],
175                registry
176            )
177            .unwrap(),
178            network_peer_udp_transmits: register_int_gauge_vec_with_registry!(
179                format!("{node}_network_peer_udp_transmits"),
180                "The total number transmits observed by the UDP peer connection.",
181                &["peer_id", "direction"],
182                registry
183            )
184            .unwrap(),
185        }
186    }
187}
188
189#[derive(Clone)]
190pub struct NetworkMetrics {
191    /// Counter of requests by route
192    requests: IntCounterVec,
193    /// Request latency by route
194    request_latency: HistogramVec,
195    /// Request size by route
196    request_size: HistogramVec,
197    /// Response size by route
198    response_size: HistogramVec,
199    /// Counter of requests exceeding the "excessive" size limit
200    excessive_size_requests: IntCounterVec,
201    /// Counter of responses exceeding the "excessive" size limit
202    excessive_size_responses: IntCounterVec,
203    /// Gauge of the number of inflight requests at any given time by route
204    inflight_requests: IntGaugeVec,
205    /// Failed requests by route
206    errors: IntCounterVec,
207}
208
209const LATENCY_SEC_BUCKETS: &[f64] = &[
210    0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
211];
212
213// Arbitrarily chosen buckets for message size, with gradually-lowering exponent
214// to give us better resolution at high sizes.
215const SIZE_BYTE_BUCKETS: &[f64] = &[
216    2048., 8192., // *4
217    16384., 32768., 65536., 131072., 262144., 524288., 1048576., // *2
218    1572864., 2359256., 3538944., // *1.5
219    4600627., 5980815., 7775060., 10107578., 13139851., 17081807., 22206349., 28868253., 37528729.,
220    48787348., 63423553., // *1.3
221];
222
223impl NetworkMetrics {
224    pub fn new(node: &'static str, direction: &'static str, registry: &Registry) -> Self {
225        let requests = register_int_counter_vec_with_registry!(
226            format!("{node}_{direction}_requests"),
227            "The number of requests made on the network",
228            &["route"],
229            registry
230        )
231        .unwrap();
232
233        let request_latency = register_histogram_vec_with_registry!(
234            format!("{node}_{direction}_request_latency"),
235            "Latency of a request by route",
236            &["route"],
237            LATENCY_SEC_BUCKETS.to_vec(),
238            registry,
239        )
240        .unwrap();
241
242        let request_size = register_histogram_vec_with_registry!(
243            format!("{node}_{direction}_request_size"),
244            "Size of a request by route",
245            &["route"],
246            SIZE_BYTE_BUCKETS.to_vec(),
247            registry,
248        )
249        .unwrap();
250
251        let response_size = register_histogram_vec_with_registry!(
252            format!("{node}_{direction}_response_size"),
253            "Size of a response by route",
254            &["route"],
255            SIZE_BYTE_BUCKETS.to_vec(),
256            registry,
257        )
258        .unwrap();
259
260        let excessive_size_requests = register_int_counter_vec_with_registry!(
261            format!("{node}_{direction}_excessive_size_requests"),
262            "The number of excessively large request messages sent",
263            &["route"],
264            registry
265        )
266        .unwrap();
267
268        let excessive_size_responses = register_int_counter_vec_with_registry!(
269            format!("{node}_{direction}_excessive_size_responses"),
270            "The number of excessively large response messages seen",
271            &["route"],
272            registry
273        )
274        .unwrap();
275
276        let inflight_requests = register_int_gauge_vec_with_registry!(
277            format!("{node}_{direction}_inflight_requests"),
278            "The number of inflight network requests",
279            &["route"],
280            registry
281        )
282        .unwrap();
283
284        let errors = register_int_counter_vec_with_registry!(
285            format!("{node}_{direction}_request_errors"),
286            "Number of errors by route",
287            &["route", "status"],
288            registry,
289        )
290        .unwrap();
291
292        Self {
293            requests,
294            request_latency,
295            request_size,
296            response_size,
297            excessive_size_requests,
298            excessive_size_responses,
299            inflight_requests,
300            errors,
301        }
302    }
303}
304
305#[derive(Clone)]
306pub struct MetricsMakeCallbackHandler {
307    metrics: Arc<NetworkMetrics>,
308    /// Size in bytes above which a request or response message is considered
309    /// excessively large
310    excessive_message_size: usize,
311}
312
313impl MetricsMakeCallbackHandler {
314    pub fn new(metrics: Arc<NetworkMetrics>, excessive_message_size: usize) -> Self {
315        Self {
316            metrics,
317            excessive_message_size,
318        }
319    }
320}
321
322impl MakeCallbackHandler for MetricsMakeCallbackHandler {
323    type Handler = MetricsResponseHandler;
324
325    fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
326        let route = request.route().to_owned();
327
328        self.metrics.requests.with_label_values(&[&route]).inc();
329        self.metrics
330            .inflight_requests
331            .with_label_values(&[&route])
332            .inc();
333        let body_len = request.body().len();
334        self.metrics
335            .request_size
336            .with_label_values(&[&route])
337            .observe(body_len as f64);
338        if body_len > self.excessive_message_size {
339            warn!(
340                "Saw excessively large request with size {body_len} for {route} with peer {:?}",
341                request.peer_id()
342            );
343            self.metrics
344                .excessive_size_requests
345                .with_label_values(&[&route])
346                .inc();
347        }
348
349        let timer = self
350            .metrics
351            .request_latency
352            .with_label_values(&[&route])
353            .start_timer();
354
355        MetricsResponseHandler {
356            metrics: self.metrics.clone(),
357            timer,
358            route,
359            excessive_message_size: self.excessive_message_size,
360        }
361    }
362}
363
364pub struct MetricsResponseHandler {
365    metrics: Arc<NetworkMetrics>,
366    // The timer is held on to and "observed" once dropped
367    #[expect(unused)]
368    timer: HistogramTimer,
369    route: String,
370    excessive_message_size: usize,
371}
372
373impl ResponseHandler for MetricsResponseHandler {
374    fn on_response(self, response: &anemo::Response<bytes::Bytes>) {
375        let body_len = response.body().len();
376        self.metrics
377            .response_size
378            .with_label_values(&[&self.route])
379            .observe(body_len as f64);
380        if body_len > self.excessive_message_size {
381            warn!(
382                "Saw excessively large response with size {body_len} for {} with peer {:?}",
383                self.route,
384                response.peer_id()
385            );
386            self.metrics
387                .excessive_size_responses
388                .with_label_values(&[&self.route])
389                .inc();
390        }
391
392        if !response.status().is_success() {
393            let status = response.status().to_u16().to_string();
394            self.metrics
395                .errors
396                .with_label_values(&[&self.route, &status])
397                .inc();
398        }
399    }
400
401    fn on_error<E>(self, _error: &E) {
402        self.metrics
403            .errors
404            .with_label_values(&[self.route.as_str(), "unknown"])
405            .inc();
406    }
407}
408
409impl Drop for MetricsResponseHandler {
410    fn drop(&mut self) {
411        self.metrics
412            .inflight_requests
413            .with_label_values(&[&self.route])
414            .dec();
415    }
416}