iota_json_rpc/
logger.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{net::SocketAddr, time::Duration};
6
7use anyhow::anyhow;
8use futures::Future;
9use iota_json_rpc_api::error_object_from_rpc;
10use jsonrpsee::{
11    MethodKind,
12    core::{ClientError as RpcError, RpcResult},
13    server::HttpRequest,
14    types::Params,
15};
16use tracing::{Instrument, Span, error, info};
17
18use crate::error::RpcInterimResult;
19
20/// The transport protocol used to send or receive a call or request.
21#[derive(Debug, Copy, Clone)]
22pub enum TransportProtocol {
23    /// HTTP transport.
24    Http,
25    /// WebSocket transport.
26    WebSocket,
27}
28
29impl std::fmt::Display for TransportProtocol {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        let s = match self {
32            Self::Http => "http",
33            Self::WebSocket => "websocket",
34        };
35
36        write!(f, "{s}")
37    }
38}
39
40/// Defines a logger specifically for WebSocket connections with callbacks
41/// during the RPC request life-cycle. The primary use case for this is to
42/// collect timings for a larger metrics collection solution.
43pub trait Logger: Send + Sync + Clone + 'static {
44    /// Intended to carry timestamp of a request, for example
45    /// `std::time::Instant`. How the trait measures time, if at all, is
46    /// entirely up to the implementation.
47    type Instant: std::fmt::Debug + Send + Sync + Copy;
48
49    /// Called when a new client connects
50    fn on_connect(&self, _remote_addr: SocketAddr, _request: &HttpRequest, _t: TransportProtocol);
51
52    /// Called when a new JSON-RPC request comes to the server.
53    fn on_request(&self, transport: TransportProtocol) -> Self::Instant;
54
55    /// Called on each JSON-RPC method call, batch requests will trigger
56    /// `on_call` multiple times.
57    fn on_call(
58        &self,
59        method_name: &str,
60        params: Params,
61        kind: MethodKind,
62        transport: TransportProtocol,
63    );
64
65    /// Called on each JSON-RPC method completion, batch requests will trigger
66    /// `on_result` multiple times.
67    fn on_result(
68        &self,
69        method_name: &str,
70        success: bool,
71        error_code: Option<i32>,
72        started_at: Self::Instant,
73        transport: TransportProtocol,
74    );
75
76    /// Called once the JSON-RPC request is finished and response is sent to the
77    /// output buffer.
78    fn on_response(&self, result: &str, started_at: Self::Instant, transport: TransportProtocol);
79
80    /// Called when a client disconnects
81    fn on_disconnect(&self, _remote_addr: SocketAddr, transport: TransportProtocol);
82}
83
84impl Logger for () {
85    type Instant = ();
86
87    fn on_connect(&self, _: SocketAddr, _: &HttpRequest, _p: TransportProtocol) -> Self::Instant {}
88
89    fn on_request(&self, _p: TransportProtocol) -> Self::Instant {}
90
91    fn on_call(&self, _: &str, _: Params, _: MethodKind, _p: TransportProtocol) {}
92
93    fn on_result(&self, _: &str, _: bool, _: Option<i32>, _: Self::Instant, _p: TransportProtocol) {
94    }
95
96    fn on_response(&self, _: &str, _: Self::Instant, _p: TransportProtocol) {}
97
98    fn on_disconnect(&self, _: SocketAddr, _p: TransportProtocol) {}
99}
100
101impl<A, B> Logger for (A, B)
102where
103    A: Logger,
104    B: Logger,
105{
106    type Instant = (A::Instant, B::Instant);
107
108    fn on_connect(
109        &self,
110        remote_addr: std::net::SocketAddr,
111        request: &HttpRequest,
112        transport: TransportProtocol,
113    ) {
114        self.0.on_connect(remote_addr, request, transport);
115        self.1.on_connect(remote_addr, request, transport);
116    }
117
118    fn on_request(&self, transport: TransportProtocol) -> Self::Instant {
119        (self.0.on_request(transport), self.1.on_request(transport))
120    }
121
122    fn on_call(
123        &self,
124        method_name: &str,
125        params: Params,
126        kind: MethodKind,
127        transport: TransportProtocol,
128    ) {
129        self.0.on_call(method_name, params.clone(), kind, transport);
130        self.1.on_call(method_name, params, kind, transport);
131    }
132
133    fn on_result(
134        &self,
135        method_name: &str,
136        success: bool,
137        error_code: Option<i32>,
138        started_at: Self::Instant,
139        transport: TransportProtocol,
140    ) {
141        self.0
142            .on_result(method_name, success, error_code, started_at.0, transport);
143        self.1
144            .on_result(method_name, success, error_code, started_at.1, transport);
145    }
146
147    fn on_response(&self, result: &str, started_at: Self::Instant, transport: TransportProtocol) {
148        self.0.on_response(result, started_at.0, transport);
149        self.1.on_response(result, started_at.1, transport);
150    }
151
152    fn on_disconnect(&self, remote_addr: SocketAddr, transport: TransportProtocol) {
153        self.0.on_disconnect(remote_addr, transport);
154        self.1.on_disconnect(remote_addr, transport);
155    }
156}
157
158pub(crate) trait FutureWithTracing<O>: Future<Output = RpcInterimResult<O>> {
159    fn trace(self) -> impl Future<Output = RpcResult<O>>
160    where
161        Self: Sized,
162    {
163        self.trace_timeout(Duration::from_secs(1))
164    }
165
166    fn trace_timeout(self, timeout: Duration) -> impl Future<Output = RpcResult<O>>
167    where
168        Self: Sized,
169    {
170        async move {
171            let start = std::time::Instant::now();
172            let interim_result: RpcInterimResult<_> = self.await;
173            let elapsed = start.elapsed();
174            let result = interim_result.map_err(|e| {
175                let anyhow_error = anyhow!("{:?}", e);
176
177                let rpc_error: RpcError = e.into();
178                if !matches!(rpc_error, RpcError::Call(_)) {
179                    error!(error=?anyhow_error);
180                }
181                error_object_from_rpc(rpc_error)
182            });
183
184            if elapsed > timeout {
185                info!(?elapsed, "RPC took longer than threshold to complete.");
186            }
187            result
188        }
189        .instrument(Span::current())
190    }
191}
192impl<F: Future<Output = RpcInterimResult<O>>, O> FutureWithTracing<O> for F {}