iota_network_stack/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::time::Duration;
6
7use tonic::{
8    Code, Status,
9    codegen::http::{HeaderValue, Request, Response, header::HeaderName},
10};
11use tower_http::{
12    classify::GrpcFailureClass,
13    trace::{OnFailure, OnRequest, OnResponse},
14};
15use tracing::Span;
16
17pub(crate) static GRPC_ENDPOINT_PATH_HEADER: HeaderName = HeaderName::from_static("grpc-path-req");
18
19/// The trait to be implemented when want to be notified about
20/// a new request and related metrics around it. When a request
21/// is performed (up to the point that a response is created) the
22/// on_response method is called with the corresponding metrics
23/// details. The on_request method will be called when the request
24/// is received, but not further processing has happened at this
25/// point.
26pub trait MetricsCallbackProvider: Send + Sync + Clone + 'static {
27    /// Method will be called when a request has been received.
28    /// `path`: the endpoint uri path
29    fn on_request(&self, path: String);
30
31    /// Method to be called from the server when a request is performed.
32    /// `path`: the endpoint uri path
33    /// `latency`: the time when the request was received and when the response
34    /// was created `status`: the http status code of the response
35    /// `grpc_status_code`: the grpc status code (see <https://github.com/grpc/grpc/blob/master/doc/statuscodes.md#status-codes-and-their-use-in-grpc>)
36    fn on_response(&self, path: String, latency: Duration, status: u16, grpc_status_code: Code);
37
38    /// Called when a gRPC request fails at the transport/middleware level
39    /// (e.g. service panic, connection drop, timeout). gRPC application
40    /// errors (non-OK status codes) are NOT reported here — they are
41    /// already captured by [`on_response`](Self::on_response).
42    /// The method path is not available at this layer (tower-http's
43    /// `on_failure` callback does not receive the response object).
44    fn on_error(&self, _latency: Duration, _grpc_status_code: Code) {}
45
46    /// Called when request call is started
47    fn on_start(&self, _path: &str) {}
48
49    /// Called when request call is dropped.
50    /// It is guaranteed that for each on_start there will be corresponding
51    /// on_drop
52    fn on_drop(&self, _path: &str) {}
53}
54
55#[derive(Clone, Default)]
56pub struct DefaultMetricsCallbackProvider {}
57impl MetricsCallbackProvider for DefaultMetricsCallbackProvider {
58    fn on_request(&self, _path: String) {}
59
60    fn on_response(
61        &self,
62        _path: String,
63        _latency: Duration,
64        _status: u16,
65        _grpc_status_code: Code,
66    ) {
67    }
68}
69
70#[derive(Clone)]
71pub(crate) struct MetricsHandler<M: MetricsCallbackProvider> {
72    metrics_provider: M,
73}
74
75impl<M: MetricsCallbackProvider> MetricsHandler<M> {
76    pub(crate) fn new(metrics_provider: M) -> Self {
77        Self { metrics_provider }
78    }
79}
80
81impl<B, M: MetricsCallbackProvider> OnResponse<B> for MetricsHandler<M> {
82    fn on_response(self, response: &Response<B>, latency: Duration, _span: &Span) {
83        let grpc_status = Status::from_header_map(response.headers());
84        let grpc_status_code = grpc_status.map_or(Code::Ok, |s| s.code());
85
86        let path: HeaderValue = response
87            .headers()
88            .get(&GRPC_ENDPOINT_PATH_HEADER)
89            .unwrap()
90            .clone();
91
92        self.metrics_provider.on_response(
93            path.to_str().unwrap().to_string(),
94            latency,
95            response.status().as_u16(),
96            grpc_status_code,
97        );
98    }
99}
100
101impl<B, M: MetricsCallbackProvider> OnRequest<B> for MetricsHandler<M> {
102    fn on_request(&mut self, request: &Request<B>, _span: &Span) {
103        self.metrics_provider
104            .on_request(request.uri().path().to_string());
105    }
106}
107
108impl<M: MetricsCallbackProvider> OnFailure<GrpcFailureClass> for MetricsHandler<M> {
109    fn on_failure(
110        &mut self,
111        failure_classification: GrpcFailureClass,
112        latency: Duration,
113        _span: &Span,
114    ) {
115        // Only count transport/middleware errors (GrpcFailureClass::Error).
116        // GrpcFailureClass::Code variants are gRPC application errors that
117        // on_response already records with full method-path context.
118        if let GrpcFailureClass::Error(_) = failure_classification {
119            self.metrics_provider.on_error(latency, Code::Internal);
120        }
121    }
122}