iota_json_rpc/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashSet, net::SocketAddr};
6
7use http_body::Body;
8use iota_json_rpc_api::{
9    CLIENT_SDK_TYPE_HEADER, CLIENT_TARGET_API_VERSION_HEADER, TRANSIENT_ERROR_CODE,
10};
11use jsonrpsee::{MethodKind, server::HttpRequest, types::Params};
12use prometheus::{
13    HistogramVec, IntCounterVec, IntGaugeVec, register_histogram_vec_with_registry,
14    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
15};
16use tokio::time::Instant;
17
18use crate::logger::{Logger, TransportProtocol};
19
20const SPAM_LABEL: &str = "SPAM";
21const LATENCY_SEC_BUCKETS: &[f64] = &[
22    0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
23];
24
25#[derive(Debug, Clone)]
26pub struct Metrics {
27    /// Counter of requests, route is a label (ie separate timeseries per route)
28    requests_by_route: IntCounterVec,
29    /// Gauge of inflight requests, route is a label (ie separate timeseries per
30    /// route)
31    inflight_requests_by_route: IntGaugeVec,
32    /// Request latency, route is a label
33    req_latency_by_route: HistogramVec,
34    /// Failed requests by route
35    errors_by_route: IntCounterVec,
36    server_errors_by_route: IntCounterVec,
37    client_errors_by_route: IntCounterVec,
38    transient_errors_by_route: IntCounterVec,
39    /// Client info
40    client: IntCounterVec,
41    /// Connection count
42    inflight_connection: IntGaugeVec,
43    /// Request size
44    rpc_request_size: HistogramVec,
45    /// Response size
46    rpc_response_size: HistogramVec,
47}
48
49#[derive(Clone)]
50pub struct MetricsLogger {
51    metrics: Metrics,
52    method_whitelist: HashSet<String>,
53}
54
55impl MetricsLogger {
56    fn check_spam<'a>(&'a self, method_name: &'a str) -> &'a str {
57        if self.method_whitelist.contains(method_name) {
58            method_name
59        } else {
60            SPAM_LABEL
61        }
62    }
63
64    pub fn new(registry: &prometheus::Registry, method_whitelist: &[&str]) -> Self {
65        let metrics = Metrics {
66            requests_by_route: register_int_counter_vec_with_registry!(
67                "rpc_requests_by_route",
68                "Number of requests by route",
69                &["route"],
70                registry,
71            )
72            .unwrap(),
73            inflight_requests_by_route: register_int_gauge_vec_with_registry!(
74                "inflight_rpc_requests_by_route",
75                "Number of inflight requests by route",
76                &["route"],
77                registry,
78            )
79            .unwrap(),
80            req_latency_by_route: register_histogram_vec_with_registry!(
81                "req_latency_by_route",
82                "Latency of a request by route",
83                &["route"],
84                LATENCY_SEC_BUCKETS.to_vec(),
85                registry,
86            )
87            .unwrap(),
88            client_errors_by_route: register_int_counter_vec_with_registry!(
89                "client_errors_by_route",
90                "Number of client errors by route",
91                &["route"],
92                registry,
93            )
94            .unwrap(),
95            server_errors_by_route: register_int_counter_vec_with_registry!(
96                "server_errors_by_route",
97                "Number of server errors by route",
98                &["route"],
99                registry,
100            )
101            .unwrap(),
102            transient_errors_by_route: register_int_counter_vec_with_registry!(
103                "transient_errors_by_route",
104                "Number of transient errors by route",
105                &["route"],
106                registry,
107            )
108            .unwrap(),
109            errors_by_route: register_int_counter_vec_with_registry!(
110                "errors_by_route",
111                "Number of client and server errors by route",
112                &["route"],
113                registry
114            )
115            .unwrap(),
116            client: register_int_counter_vec_with_registry!(
117                "rpc_client",
118                "Connected RPC client's info",
119                &["client_type", "api_version"],
120                registry,
121            )
122            .unwrap(),
123            inflight_connection: register_int_gauge_vec_with_registry!(
124                "rpc_inflight_connection",
125                "Number of inflight RPC connection by protocol",
126                &["protocol"],
127                registry,
128            )
129            .unwrap(),
130            rpc_request_size: register_histogram_vec_with_registry!(
131                "rpc_request_size",
132                "Request size of rpc requests",
133                &["protocol"],
134                prometheus::exponential_buckets(32.0, 2.0, 19)
135                    .unwrap()
136                    .to_vec(),
137                registry,
138            )
139            .unwrap(),
140            rpc_response_size: register_histogram_vec_with_registry!(
141                "rpc_response_size",
142                "Response size of rpc requests",
143                &["protocol"],
144                prometheus::exponential_buckets(1024.0, 2.0, 20)
145                    .unwrap()
146                    .to_vec(),
147                registry,
148            )
149            .unwrap(),
150        };
151
152        Self {
153            metrics,
154            method_whitelist: method_whitelist.iter().map(|s| (*s).into()).collect(),
155        }
156    }
157}
158
159impl Logger for MetricsLogger {
160    type Instant = Instant;
161
162    fn on_connect(&self, _remote_addr: SocketAddr, request: &HttpRequest, t: TransportProtocol) {
163        let client_type = request
164            .headers()
165            .get(CLIENT_SDK_TYPE_HEADER)
166            .and_then(|v| v.to_str().ok())
167            .unwrap_or("Unknown");
168
169        let api_version = request
170            .headers()
171            .get(CLIENT_TARGET_API_VERSION_HEADER)
172            .and_then(|v| v.to_str().ok())
173            .unwrap_or("Unknown");
174        self.metrics
175            .client
176            .with_label_values(&[client_type, api_version])
177            .inc();
178        self.metrics
179            .inflight_connection
180            .with_label_values(&[&t.to_string()])
181            .inc();
182
183        self.metrics
184            .rpc_request_size
185            .with_label_values(&[&t.to_string()])
186            .observe(
187                request
188                    .size_hint()
189                    .exact()
190                    .unwrap_or_else(|| request.size_hint().lower()) as f64,
191            );
192    }
193
194    fn on_request(&self, _transport: TransportProtocol) -> Self::Instant {
195        Instant::now()
196    }
197
198    fn on_call(
199        &self,
200        method_name: &str,
201        _params: Params,
202        _kind: MethodKind,
203        _transport: TransportProtocol,
204    ) {
205        let method_name = self.check_spam(method_name);
206        self.metrics
207            .inflight_requests_by_route
208            .with_label_values(&[method_name])
209            .inc();
210        self.metrics
211            .requests_by_route
212            .with_label_values(&[method_name])
213            .inc();
214    }
215
216    fn on_result(
217        &self,
218        method_name: &str,
219        _success: bool,
220        error_code: Option<i32>,
221        started_at: Self::Instant,
222        _transport: TransportProtocol,
223    ) {
224        let method_name = self.check_spam(method_name);
225        self.metrics
226            .inflight_requests_by_route
227            .with_label_values(&[method_name])
228            .dec();
229        let req_latency_secs = (Instant::now() - started_at).as_secs_f64();
230        self.metrics
231            .req_latency_by_route
232            .with_label_values(&[method_name])
233            .observe(req_latency_secs);
234
235        if let Some(code) = error_code {
236            if code == jsonrpsee::types::error::CALL_EXECUTION_FAILED_CODE
237                || code == jsonrpsee::types::error::INTERNAL_ERROR_CODE
238            {
239                self.metrics
240                    .server_errors_by_route
241                    .with_label_values(&[method_name])
242                    .inc();
243            } else if code == TRANSIENT_ERROR_CODE {
244                self.metrics
245                    .transient_errors_by_route
246                    .with_label_values(&[method_name])
247                    .inc();
248            } else {
249                self.metrics
250                    .client_errors_by_route
251                    .with_label_values(&[method_name])
252                    .inc();
253            }
254            self.metrics
255                .errors_by_route
256                .with_label_values(&[method_name])
257                .inc();
258        }
259    }
260
261    fn on_response(&self, result: &str, _started_at: Self::Instant, t: TransportProtocol) {
262        self.metrics
263            .rpc_response_size
264            .with_label_values(&[&t.to_string()])
265            .observe(std::mem::size_of_val(result) as f64)
266    }
267
268    fn on_disconnect(&self, _remote_addr: SocketAddr, t: TransportProtocol) {
269        self.metrics
270            .inflight_connection
271            .with_label_values(&[&t.to_string()])
272            .dec();
273    }
274}