consensus_core/network/
metrics_layer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! Tower layer adapters that allow specifying callbacks for request and
6//! response handling can be implemented for different networking stacks.
7
8use std::sync::Arc;
9
10use prometheus::HistogramTimer;
11
12use super::metrics::NetworkRouteMetrics;
13
14pub(crate) trait SizedRequest {
15    fn size(&self) -> usize;
16    fn route(&self) -> String;
17}
18
19pub(crate) trait SizedResponse {
20    fn size(&self) -> usize;
21    fn error_type(&self) -> Option<String>;
22}
23
24#[derive(Clone)]
25pub(crate) struct MetricsCallbackMaker {
26    metrics: Arc<NetworkRouteMetrics>,
27    /// Size in bytes above which a request or response message is considered
28    /// excessively large
29    excessive_message_size: usize,
30}
31
32impl MetricsCallbackMaker {
33    pub(crate) fn new(metrics: Arc<NetworkRouteMetrics>, excessive_message_size: usize) -> Self {
34        Self {
35            metrics,
36            excessive_message_size,
37        }
38    }
39
40    // Update request metrics. And create a callback that should be called on
41    // response.
42    pub(crate) fn handle_request(&self, request: &dyn SizedRequest) -> MetricsResponseCallback {
43        let route = request.route();
44
45        self.metrics.requests.with_label_values(&[&route]).inc();
46        self.metrics
47            .inflight_requests
48            .with_label_values(&[&route])
49            .inc();
50        let request_size = request.size();
51        if request_size > 0 {
52            self.metrics
53                .request_size
54                .with_label_values(&[&route])
55                .observe(request_size as f64);
56        }
57        if request_size > self.excessive_message_size {
58            self.metrics
59                .excessive_size_requests
60                .with_label_values(&[&route])
61                .inc();
62        }
63
64        let timer = self
65            .metrics
66            .request_latency
67            .with_label_values(&[&route])
68            .start_timer();
69
70        MetricsResponseCallback {
71            metrics: self.metrics.clone(),
72            timer,
73            route,
74            excessive_message_size: self.excessive_message_size,
75        }
76    }
77}
78
79pub(crate) struct MetricsResponseCallback {
80    metrics: Arc<NetworkRouteMetrics>,
81    // The timer is held on to and "observed" once dropped
82    #[expect(unused)]
83    timer: HistogramTimer,
84    route: String,
85    excessive_message_size: usize,
86}
87
88impl MetricsResponseCallback {
89    // Update response metrics.
90    pub(crate) fn on_response(self, response: &dyn SizedResponse) {
91        let response_size = response.size();
92        if response_size > 0 {
93            self.metrics
94                .response_size
95                .with_label_values(&[&self.route])
96                .observe(response_size as f64);
97        }
98        if response_size > self.excessive_message_size {
99            self.metrics
100                .excessive_size_responses
101                .with_label_values(&[&self.route])
102                .inc();
103        }
104
105        if let Some(err) = response.error_type() {
106            self.metrics
107                .errors
108                .with_label_values(&[&self.route, &err])
109                .inc();
110        }
111    }
112
113    pub(crate) fn on_error<E>(self, _error: &E) {
114        self.metrics
115            .errors
116            .with_label_values(&[self.route.as_str(), "unknown"])
117            .inc();
118    }
119}
120
121impl Drop for MetricsResponseCallback {
122    fn drop(&mut self) {
123        self.metrics
124            .inflight_requests
125            .with_label_values(&[&self.route])
126            .dec();
127    }
128}