1use std::{net::SocketAddr, time::Duration};
6
7use anyhow::anyhow;
8use futures::Future;
9use iota_json_rpc_api::error_object_from_rpc;
10use jsonrpsee::{
11 MethodKind,
12 core::{ClientError as RpcError, RpcResult},
13 server::HttpRequest,
14 types::Params,
15};
16use tracing::{Instrument, Span, error, info};
17
18use crate::error::RpcInterimResult;
19
20#[derive(Debug, Copy, Clone)]
22pub enum TransportProtocol {
23 Http,
25 WebSocket,
27}
28
29impl std::fmt::Display for TransportProtocol {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 let s = match self {
32 Self::Http => "http",
33 Self::WebSocket => "websocket",
34 };
35
36 write!(f, "{s}")
37 }
38}
39
40pub trait Logger: Send + Sync + Clone + 'static {
44 type Instant: std::fmt::Debug + Send + Sync + Copy;
48
49 fn on_connect(&self, _remote_addr: SocketAddr, _request: &HttpRequest, _t: TransportProtocol);
51
52 fn on_request(&self, transport: TransportProtocol) -> Self::Instant;
54
55 fn on_call(
58 &self,
59 method_name: &str,
60 params: Params,
61 kind: MethodKind,
62 transport: TransportProtocol,
63 );
64
65 fn on_result(
68 &self,
69 method_name: &str,
70 success: bool,
71 error_code: Option<i32>,
72 started_at: Self::Instant,
73 transport: TransportProtocol,
74 );
75
76 fn on_response(&self, result: &str, started_at: Self::Instant, transport: TransportProtocol);
79
80 fn on_disconnect(&self, _remote_addr: SocketAddr, transport: TransportProtocol);
82}
83
84impl Logger for () {
85 type Instant = ();
86
87 fn on_connect(&self, _: SocketAddr, _: &HttpRequest, _p: TransportProtocol) -> Self::Instant {}
88
89 fn on_request(&self, _p: TransportProtocol) -> Self::Instant {}
90
91 fn on_call(&self, _: &str, _: Params, _: MethodKind, _p: TransportProtocol) {}
92
93 fn on_result(&self, _: &str, _: bool, _: Option<i32>, _: Self::Instant, _p: TransportProtocol) {
94 }
95
96 fn on_response(&self, _: &str, _: Self::Instant, _p: TransportProtocol) {}
97
98 fn on_disconnect(&self, _: SocketAddr, _p: TransportProtocol) {}
99}
100
101impl<A, B> Logger for (A, B)
102where
103 A: Logger,
104 B: Logger,
105{
106 type Instant = (A::Instant, B::Instant);
107
108 fn on_connect(
109 &self,
110 remote_addr: std::net::SocketAddr,
111 request: &HttpRequest,
112 transport: TransportProtocol,
113 ) {
114 self.0.on_connect(remote_addr, request, transport);
115 self.1.on_connect(remote_addr, request, transport);
116 }
117
118 fn on_request(&self, transport: TransportProtocol) -> Self::Instant {
119 (self.0.on_request(transport), self.1.on_request(transport))
120 }
121
122 fn on_call(
123 &self,
124 method_name: &str,
125 params: Params,
126 kind: MethodKind,
127 transport: TransportProtocol,
128 ) {
129 self.0.on_call(method_name, params.clone(), kind, transport);
130 self.1.on_call(method_name, params, kind, transport);
131 }
132
133 fn on_result(
134 &self,
135 method_name: &str,
136 success: bool,
137 error_code: Option<i32>,
138 started_at: Self::Instant,
139 transport: TransportProtocol,
140 ) {
141 self.0
142 .on_result(method_name, success, error_code, started_at.0, transport);
143 self.1
144 .on_result(method_name, success, error_code, started_at.1, transport);
145 }
146
147 fn on_response(&self, result: &str, started_at: Self::Instant, transport: TransportProtocol) {
148 self.0.on_response(result, started_at.0, transport);
149 self.1.on_response(result, started_at.1, transport);
150 }
151
152 fn on_disconnect(&self, remote_addr: SocketAddr, transport: TransportProtocol) {
153 self.0.on_disconnect(remote_addr, transport);
154 self.1.on_disconnect(remote_addr, transport);
155 }
156}
157
158pub(crate) trait FutureWithTracing<O>: Future<Output = RpcInterimResult<O>> {
159 fn trace(self) -> impl Future<Output = RpcResult<O>>
160 where
161 Self: Sized,
162 {
163 self.trace_timeout(Duration::from_secs(1))
164 }
165
166 fn trace_timeout(self, timeout: Duration) -> impl Future<Output = RpcResult<O>>
167 where
168 Self: Sized,
169 {
170 async move {
171 let start = std::time::Instant::now();
172 let interim_result: RpcInterimResult<_> = self.await;
173 let elapsed = start.elapsed();
174 let result = interim_result.map_err(|e| {
175 let anyhow_error = anyhow!("{:?}", e);
176
177 let rpc_error: RpcError = e.into();
178 if !matches!(rpc_error, RpcError::Call(_)) {
179 error!(error=?anyhow_error);
180 }
181 error_object_from_rpc(rpc_error)
182 });
183
184 if elapsed > timeout {
185 info!(?elapsed, "RPC took longer than threshold to complete.");
186 }
187 result
188 }
189 .instrument(Span::current())
190 }
191}
192impl<F: Future<Output = RpcInterimResult<O>>, O> FutureWithTracing<O> for F {}