1use std::{collections::HashSet, net::SocketAddr};
6
7use http_body::Body;
8use iota_json_rpc_api::{
9 CLIENT_SDK_TYPE_HEADER, CLIENT_TARGET_API_VERSION_HEADER, TRANSIENT_ERROR_CODE,
10};
11use jsonrpsee::{MethodKind, server::HttpRequest, types::Params};
12use prometheus::{
13 HistogramVec, IntCounterVec, IntGaugeVec, register_histogram_vec_with_registry,
14 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
15};
16use tokio::time::Instant;
17
18use crate::logger::{Logger, TransportProtocol};
19
20const SPAM_LABEL: &str = "SPAM";
21const LATENCY_SEC_BUCKETS: &[f64] = &[
22 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
23];
24
25#[derive(Debug, Clone)]
26pub struct Metrics {
27 requests_by_route: IntCounterVec,
29 inflight_requests_by_route: IntGaugeVec,
32 req_latency_by_route: HistogramVec,
34 errors_by_route: IntCounterVec,
36 server_errors_by_route: IntCounterVec,
37 client_errors_by_route: IntCounterVec,
38 transient_errors_by_route: IntCounterVec,
39 client: IntCounterVec,
41 inflight_connection: IntGaugeVec,
43 rpc_request_size: HistogramVec,
45 rpc_response_size: HistogramVec,
47}
48
49#[derive(Clone)]
50pub struct MetricsLogger {
51 metrics: Metrics,
52 method_whitelist: HashSet<String>,
53}
54
55impl MetricsLogger {
56 fn check_spam<'a>(&'a self, method_name: &'a str) -> &'a str {
57 if self.method_whitelist.contains(method_name) {
58 method_name
59 } else {
60 SPAM_LABEL
61 }
62 }
63
64 pub fn new(registry: &prometheus::Registry, method_whitelist: &[&str]) -> Self {
65 let metrics = Metrics {
66 requests_by_route: register_int_counter_vec_with_registry!(
67 "rpc_requests_by_route",
68 "Number of requests by route",
69 &["route"],
70 registry,
71 )
72 .unwrap(),
73 inflight_requests_by_route: register_int_gauge_vec_with_registry!(
74 "inflight_rpc_requests_by_route",
75 "Number of inflight requests by route",
76 &["route"],
77 registry,
78 )
79 .unwrap(),
80 req_latency_by_route: register_histogram_vec_with_registry!(
81 "req_latency_by_route",
82 "Latency of a request by route",
83 &["route"],
84 LATENCY_SEC_BUCKETS.to_vec(),
85 registry,
86 )
87 .unwrap(),
88 client_errors_by_route: register_int_counter_vec_with_registry!(
89 "client_errors_by_route",
90 "Number of client errors by route",
91 &["route"],
92 registry,
93 )
94 .unwrap(),
95 server_errors_by_route: register_int_counter_vec_with_registry!(
96 "server_errors_by_route",
97 "Number of server errors by route",
98 &["route"],
99 registry,
100 )
101 .unwrap(),
102 transient_errors_by_route: register_int_counter_vec_with_registry!(
103 "transient_errors_by_route",
104 "Number of transient errors by route",
105 &["route"],
106 registry,
107 )
108 .unwrap(),
109 errors_by_route: register_int_counter_vec_with_registry!(
110 "errors_by_route",
111 "Number of client and server errors by route",
112 &["route"],
113 registry
114 )
115 .unwrap(),
116 client: register_int_counter_vec_with_registry!(
117 "rpc_client",
118 "Connected RPC client's info",
119 &["client_type", "api_version"],
120 registry,
121 )
122 .unwrap(),
123 inflight_connection: register_int_gauge_vec_with_registry!(
124 "rpc_inflight_connection",
125 "Number of inflight RPC connection by protocol",
126 &["protocol"],
127 registry,
128 )
129 .unwrap(),
130 rpc_request_size: register_histogram_vec_with_registry!(
131 "rpc_request_size",
132 "Request size of rpc requests",
133 &["protocol"],
134 prometheus::exponential_buckets(32.0, 2.0, 19)
135 .unwrap()
136 .to_vec(),
137 registry,
138 )
139 .unwrap(),
140 rpc_response_size: register_histogram_vec_with_registry!(
141 "rpc_response_size",
142 "Response size of rpc requests",
143 &["protocol"],
144 prometheus::exponential_buckets(1024.0, 2.0, 20)
145 .unwrap()
146 .to_vec(),
147 registry,
148 )
149 .unwrap(),
150 };
151
152 Self {
153 metrics,
154 method_whitelist: method_whitelist.iter().map(|s| (*s).into()).collect(),
155 }
156 }
157}
158
159impl Logger for MetricsLogger {
160 type Instant = Instant;
161
162 fn on_connect(&self, _remote_addr: SocketAddr, request: &HttpRequest, t: TransportProtocol) {
163 let client_type = request
164 .headers()
165 .get(CLIENT_SDK_TYPE_HEADER)
166 .and_then(|v| v.to_str().ok())
167 .unwrap_or("Unknown");
168
169 let api_version = request
170 .headers()
171 .get(CLIENT_TARGET_API_VERSION_HEADER)
172 .and_then(|v| v.to_str().ok())
173 .unwrap_or("Unknown");
174 self.metrics
175 .client
176 .with_label_values(&[client_type, api_version])
177 .inc();
178 self.metrics
179 .inflight_connection
180 .with_label_values(&[&t.to_string()])
181 .inc();
182
183 self.metrics
184 .rpc_request_size
185 .with_label_values(&[&t.to_string()])
186 .observe(
187 request
188 .size_hint()
189 .exact()
190 .unwrap_or_else(|| request.size_hint().lower()) as f64,
191 );
192 }
193
194 fn on_request(&self, _transport: TransportProtocol) -> Self::Instant {
195 Instant::now()
196 }
197
198 fn on_call(
199 &self,
200 method_name: &str,
201 _params: Params,
202 _kind: MethodKind,
203 _transport: TransportProtocol,
204 ) {
205 let method_name = self.check_spam(method_name);
206 self.metrics
207 .inflight_requests_by_route
208 .with_label_values(&[method_name])
209 .inc();
210 self.metrics
211 .requests_by_route
212 .with_label_values(&[method_name])
213 .inc();
214 }
215
216 fn on_result(
217 &self,
218 method_name: &str,
219 _success: bool,
220 error_code: Option<i32>,
221 started_at: Self::Instant,
222 _transport: TransportProtocol,
223 ) {
224 let method_name = self.check_spam(method_name);
225 self.metrics
226 .inflight_requests_by_route
227 .with_label_values(&[method_name])
228 .dec();
229 let req_latency_secs = (Instant::now() - started_at).as_secs_f64();
230 self.metrics
231 .req_latency_by_route
232 .with_label_values(&[method_name])
233 .observe(req_latency_secs);
234
235 if let Some(code) = error_code {
236 if code == jsonrpsee::types::error::CALL_EXECUTION_FAILED_CODE
237 || code == jsonrpsee::types::error::INTERNAL_ERROR_CODE
238 {
239 self.metrics
240 .server_errors_by_route
241 .with_label_values(&[method_name])
242 .inc();
243 } else if code == TRANSIENT_ERROR_CODE {
244 self.metrics
245 .transient_errors_by_route
246 .with_label_values(&[method_name])
247 .inc();
248 } else {
249 self.metrics
250 .client_errors_by_route
251 .with_label_values(&[method_name])
252 .inc();
253 }
254 self.metrics
255 .errors_by_route
256 .with_label_values(&[method_name])
257 .inc();
258 }
259 }
260
261 fn on_response(&self, result: &str, _started_at: Self::Instant, t: TransportProtocol) {
262 self.metrics
263 .rpc_response_size
264 .with_label_values(&[&t.to_string()])
265 .observe(std::mem::size_of_val(result) as f64)
266 }
267
268 fn on_disconnect(&self, _remote_addr: SocketAddr, t: TransportProtocol) {
269 self.metrics
270 .inflight_connection
271 .with_label_values(&[&t.to_string()])
272 .dec();
273 }
274}