1use std::sync::Arc;
6
7use anemo_tower::callback::{MakeCallbackHandler, ResponseHandler};
8use prometheus::{
9 HistogramTimer, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
10 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
11 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
12};
13use tracing::warn;
14
15#[derive(Clone)]
16pub struct NetworkConnectionMetrics {
17 pub network_peer_connected: IntGaugeVec,
20 pub network_peers: IntGauge,
22 pub network_peer_disconnects: IntCounterVec,
24 pub socket_receive_buffer_size: IntGauge,
26 pub socket_send_buffer_size: IntGauge,
28
29 pub network_peer_rtt: IntGaugeVec,
32 pub network_peer_lost_packets: IntGaugeVec,
34 pub network_peer_lost_bytes: IntGaugeVec,
36 pub network_peer_sent_packets: IntGaugeVec,
38 pub network_peer_congestion_events: IntGaugeVec,
40 pub network_peer_congestion_window: IntGaugeVec,
42
43 pub network_peer_max_data: IntGaugeVec,
46 pub network_peer_closed_connections: IntGaugeVec,
48 pub network_peer_data_blocked: IntGaugeVec,
50
51 pub network_peer_udp_datagrams: IntGaugeVec,
54 pub network_peer_udp_bytes: IntGaugeVec,
56 pub network_peer_udp_transmits: IntGaugeVec,
58}
59
60impl NetworkConnectionMetrics {
61 pub fn new(node: &'static str, registry: &Registry) -> Self {
62 Self {
63 network_peer_connected: register_int_gauge_vec_with_registry!(
64 format!("{node}_network_peer_connected"),
65 "The connection status of a peer. 0 if not connected, 1 if connected",
66 &["peer_id", "type"],
67 registry
68 )
69 .unwrap(),
70 network_peers: register_int_gauge_with_registry!(
71 format!("{node}_network_peers"),
72 "The number of connected peers.",
73 registry
74 )
75 .unwrap(),
76 network_peer_disconnects: register_int_counter_vec_with_registry!(
77 format!("{node}_network_peer_disconnects"),
78 "Number of disconnect events per peer.",
79 &["peer_id", "reason"],
80 registry
81 )
82 .unwrap(),
83 socket_receive_buffer_size: register_int_gauge_with_registry!(
84 format!("{node}_socket_receive_buffer_size"),
85 "Receive buffer size of Anemo socket.",
86 registry
87 )
88 .unwrap(),
89 socket_send_buffer_size: register_int_gauge_with_registry!(
90 format!("{node}_socket_send_buffer_size"),
91 "Send buffer size of Anemo socket.",
92 registry
93 )
94 .unwrap(),
95
96 network_peer_rtt: register_int_gauge_vec_with_registry!(
98 format!("{node}_network_peer_rtt"),
99 "The rtt for a peer connection in ms.",
100 &["peer_id"],
101 registry
102 )
103 .unwrap(),
104 network_peer_lost_packets: register_int_gauge_vec_with_registry!(
105 format!("{node}_network_peer_lost_packets"),
106 "The total number of lost packets for a peer connection.",
107 &["peer_id"],
108 registry
109 )
110 .unwrap(),
111 network_peer_lost_bytes: register_int_gauge_vec_with_registry!(
112 format!("{node}_network_peer_lost_bytes"),
113 "The total number of lost bytes for a peer connection.",
114 &["peer_id"],
115 registry
116 )
117 .unwrap(),
118 network_peer_sent_packets: register_int_gauge_vec_with_registry!(
119 format!("{node}_network_peer_sent_packets"),
120 "The total number of sent packets for a peer connection.",
121 &["peer_id"],
122 registry
123 )
124 .unwrap(),
125 network_peer_congestion_events: register_int_gauge_vec_with_registry!(
126 format!("{node}_network_peer_congestion_events"),
127 "The total number of congestion events for a peer connection.",
128 &["peer_id"],
129 registry
130 )
131 .unwrap(),
132 network_peer_congestion_window: register_int_gauge_vec_with_registry!(
133 format!("{node}_network_peer_congestion_window"),
134 "The congestion window for a peer connection.",
135 &["peer_id"],
136 registry
137 )
138 .unwrap(),
139
140 network_peer_closed_connections: register_int_gauge_vec_with_registry!(
142 format!("{node}_network_peer_closed_connections"),
143 "The number of closed connections for a peer connection.",
144 &["peer_id", "direction"],
145 registry
146 )
147 .unwrap(),
148 network_peer_max_data: register_int_gauge_vec_with_registry!(
149 format!("{node}_network_peer_max_data"),
150 "The number of max data frames for a peer connection.",
151 &["peer_id", "direction"],
152 registry
153 )
154 .unwrap(),
155 network_peer_data_blocked: register_int_gauge_vec_with_registry!(
156 format!("{node}_network_peer_data_blocked"),
157 "The number of data blocked frames for a peer connection.",
158 &["peer_id", "direction"],
159 registry
160 )
161 .unwrap(),
162
163 network_peer_udp_datagrams: register_int_gauge_vec_with_registry!(
165 format!("{node}_network_peer_udp_datagrams"),
166 "The total number datagrams observed by the UDP peer connection.",
167 &["peer_id", "direction"],
168 registry
169 )
170 .unwrap(),
171 network_peer_udp_bytes: register_int_gauge_vec_with_registry!(
172 format!("{node}_network_peer_udp_bytes"),
173 "The total number bytes observed by the UDP peer connection.",
174 &["peer_id", "direction"],
175 registry
176 )
177 .unwrap(),
178 network_peer_udp_transmits: register_int_gauge_vec_with_registry!(
179 format!("{node}_network_peer_udp_transmits"),
180 "The total number transmits observed by the UDP peer connection.",
181 &["peer_id", "direction"],
182 registry
183 )
184 .unwrap(),
185 }
186 }
187}
188
189#[derive(Clone)]
190pub struct NetworkMetrics {
191 requests: IntCounterVec,
193 request_latency: HistogramVec,
195 request_size: HistogramVec,
197 response_size: HistogramVec,
199 excessive_size_requests: IntCounterVec,
201 excessive_size_responses: IntCounterVec,
203 inflight_requests: IntGaugeVec,
205 errors: IntCounterVec,
207}
208
209const LATENCY_SEC_BUCKETS: &[f64] = &[
210 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
211];
212
213const SIZE_BYTE_BUCKETS: &[f64] = &[
216 2048., 8192., 16384., 32768., 65536., 131072., 262144., 524288., 1048576., 1572864., 2359256., 3538944., 4600627., 5980815., 7775060., 10107578., 13139851., 17081807., 22206349., 28868253., 37528729.,
220 48787348., 63423553., ];
222
223impl NetworkMetrics {
224 pub fn new(node: &'static str, direction: &'static str, registry: &Registry) -> Self {
225 let requests = register_int_counter_vec_with_registry!(
226 format!("{node}_{direction}_requests"),
227 "The number of requests made on the network",
228 &["route"],
229 registry
230 )
231 .unwrap();
232
233 let request_latency = register_histogram_vec_with_registry!(
234 format!("{node}_{direction}_request_latency"),
235 "Latency of a request by route",
236 &["route"],
237 LATENCY_SEC_BUCKETS.to_vec(),
238 registry,
239 )
240 .unwrap();
241
242 let request_size = register_histogram_vec_with_registry!(
243 format!("{node}_{direction}_request_size"),
244 "Size of a request by route",
245 &["route"],
246 SIZE_BYTE_BUCKETS.to_vec(),
247 registry,
248 )
249 .unwrap();
250
251 let response_size = register_histogram_vec_with_registry!(
252 format!("{node}_{direction}_response_size"),
253 "Size of a response by route",
254 &["route"],
255 SIZE_BYTE_BUCKETS.to_vec(),
256 registry,
257 )
258 .unwrap();
259
260 let excessive_size_requests = register_int_counter_vec_with_registry!(
261 format!("{node}_{direction}_excessive_size_requests"),
262 "The number of excessively large request messages sent",
263 &["route"],
264 registry
265 )
266 .unwrap();
267
268 let excessive_size_responses = register_int_counter_vec_with_registry!(
269 format!("{node}_{direction}_excessive_size_responses"),
270 "The number of excessively large response messages seen",
271 &["route"],
272 registry
273 )
274 .unwrap();
275
276 let inflight_requests = register_int_gauge_vec_with_registry!(
277 format!("{node}_{direction}_inflight_requests"),
278 "The number of inflight network requests",
279 &["route"],
280 registry
281 )
282 .unwrap();
283
284 let errors = register_int_counter_vec_with_registry!(
285 format!("{node}_{direction}_request_errors"),
286 "Number of errors by route",
287 &["route", "status"],
288 registry,
289 )
290 .unwrap();
291
292 Self {
293 requests,
294 request_latency,
295 request_size,
296 response_size,
297 excessive_size_requests,
298 excessive_size_responses,
299 inflight_requests,
300 errors,
301 }
302 }
303}
304
305#[derive(Clone)]
306pub struct MetricsMakeCallbackHandler {
307 metrics: Arc<NetworkMetrics>,
308 excessive_message_size: usize,
311}
312
313impl MetricsMakeCallbackHandler {
314 pub fn new(metrics: Arc<NetworkMetrics>, excessive_message_size: usize) -> Self {
315 Self {
316 metrics,
317 excessive_message_size,
318 }
319 }
320}
321
322impl MakeCallbackHandler for MetricsMakeCallbackHandler {
323 type Handler = MetricsResponseHandler;
324
325 fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
326 let route = request.route().to_owned();
327
328 self.metrics.requests.with_label_values(&[&route]).inc();
329 self.metrics
330 .inflight_requests
331 .with_label_values(&[&route])
332 .inc();
333 let body_len = request.body().len();
334 self.metrics
335 .request_size
336 .with_label_values(&[&route])
337 .observe(body_len as f64);
338 if body_len > self.excessive_message_size {
339 warn!(
340 "Saw excessively large request with size {body_len} for {route} with peer {:?}",
341 request.peer_id()
342 );
343 self.metrics
344 .excessive_size_requests
345 .with_label_values(&[&route])
346 .inc();
347 }
348
349 let timer = self
350 .metrics
351 .request_latency
352 .with_label_values(&[&route])
353 .start_timer();
354
355 MetricsResponseHandler {
356 metrics: self.metrics.clone(),
357 timer,
358 route,
359 excessive_message_size: self.excessive_message_size,
360 }
361 }
362}
363
364pub struct MetricsResponseHandler {
365 metrics: Arc<NetworkMetrics>,
366 #[expect(unused)]
368 timer: HistogramTimer,
369 route: String,
370 excessive_message_size: usize,
371}
372
373impl ResponseHandler for MetricsResponseHandler {
374 fn on_response(self, response: &anemo::Response<bytes::Bytes>) {
375 let body_len = response.body().len();
376 self.metrics
377 .response_size
378 .with_label_values(&[&self.route])
379 .observe(body_len as f64);
380 if body_len > self.excessive_message_size {
381 warn!(
382 "Saw excessively large response with size {body_len} for {} with peer {:?}",
383 self.route,
384 response.peer_id()
385 );
386 self.metrics
387 .excessive_size_responses
388 .with_label_values(&[&self.route])
389 .inc();
390 }
391
392 if !response.status().is_success() {
393 let status = response.status().to_u16().to_string();
394 self.metrics
395 .errors
396 .with_label_values(&[&self.route, &status])
397 .inc();
398 }
399 }
400
401 fn on_error<E>(self, _error: &E) {
402 self.metrics
403 .errors
404 .with_label_values(&[self.route.as_str(), "unknown"])
405 .inc();
406 }
407}
408
409impl Drop for MetricsResponseHandler {
410 fn drop(&mut self) {
411 self.metrics
412 .inflight_requests
413 .with_label_values(&[&self.route])
414 .dec();
415 }
416}