iota_faucet/
metrics_layer.rs1use std::{
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11use futures::Future;
12use http::{Request, StatusCode};
13use prometheus::{HistogramTimer, Registry};
14use tower::{BoxError, Layer, Service, ServiceExt, load_shed::error::Overloaded};
15use tracing::{error, info, warn};
16
17use crate::metrics::RequestMetrics;
18
19#[derive(Clone)]
22pub struct RequestMetricsLayer {
23 metrics: Arc<RequestMetrics>,
24}
25
26#[derive(Clone)]
27pub struct RequestMetricsService<Inner> {
28 inner: Inner,
29 metrics: Arc<RequestMetrics>,
30}
31
32pub struct RequestMetricsFuture<Res> {
33 future: Pin<Box<dyn Future<Output = Result<Res, BoxError>> + Send>>,
34}
35
36struct MetricsGuard {
37 timer: Option<HistogramTimer>,
38 metrics: Arc<RequestMetrics>,
39 path: String,
40}
41
42impl RequestMetricsLayer {
43 pub fn new(registry: &Registry) -> Self {
44 Self {
45 metrics: Arc::new(RequestMetrics::new(registry)),
46 }
47 }
48}
49
50impl<Inner> Layer<Inner> for RequestMetricsLayer {
51 type Service = RequestMetricsService<Inner>;
52 fn layer(&self, inner: Inner) -> Self::Service {
53 RequestMetricsService {
54 inner,
55 metrics: self.metrics.clone(),
56 }
57 }
58}
59
60impl<Inner, Body> Service<Request<Body>> for RequestMetricsService<Inner>
61where
62 Inner: Service<Request<Body>, Response = http::Response<Body>, Error = BoxError>
63 + Clone
64 + Send
65 + 'static,
66 Inner::Future: Send,
67 Body: Send + 'static,
68{
69 type Response = Inner::Response;
70 type Error = BoxError;
71 type Future = RequestMetricsFuture<Self::Response>;
72
73 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
74 self.inner.poll_ready(ctx)
75 }
76
77 fn call(&mut self, req: Request<Body>) -> Self::Future {
78 let path = req.uri().path().to_string();
79 let metrics = MetricsGuard::new(self.metrics.clone(), &path);
80 let inner = self.inner.clone();
81
82 let future = Box::pin(async move {
83 let resp = inner.oneshot(req).await;
84 match &resp {
85 Ok(resp) if !resp.status().is_success() => {
86 metrics.failed(None, Some(resp.status()))
87 }
88 Ok(_) => metrics.succeeded(),
89 Err(err) => {
90 if err.is::<Overloaded>() {
91 metrics.shed();
92 } else {
93 metrics.failed(Some(err), None);
94 }
95 }
96 }
97
98 resp
99 });
100
101 RequestMetricsFuture { future }
102 }
103}
104
105impl<Res> Future for RequestMetricsFuture<Res> {
106 type Output = Result<Res, BoxError>;
107 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
108 Future::poll(self.future.as_mut(), ctx)
109 }
110}
111
112impl MetricsGuard {
113 fn new(metrics: Arc<RequestMetrics>, path: &str) -> Self {
114 metrics
115 .total_requests_received
116 .with_label_values(&[path])
117 .inc();
118 metrics
119 .current_requests_in_flight
120 .with_label_values(&[path])
121 .inc();
122 MetricsGuard {
123 timer: Some(
124 metrics
125 .process_latency
126 .with_label_values(&[path])
127 .start_timer(),
128 ),
129 metrics,
130 path: path.to_string(),
131 }
132 }
133
134 fn succeeded(mut self) {
135 if let Some(timer) = self.timer.take() {
136 let elapsed = timer.stop_and_record();
137 self.metrics
138 .total_requests_succeeded
139 .with_label_values(&[&self.path])
140 .inc();
141 info!(
142 "Request succeeded for path {} in {:.2}s",
143 self.path, elapsed
144 );
145 }
146 }
147
148 fn failed(mut self, error: Option<&BoxError>, status: Option<StatusCode>) {
149 if let Some(timer) = self.timer.take() {
150 let elapsed = timer.stop_and_record();
151 self.metrics
152 .total_requests_failed
153 .with_label_values(&[&self.path])
154 .inc();
155
156 if let Some(err) = error {
157 error!(
158 "Request failed for path {} in {:.2}s, error {:?}",
159 self.path, elapsed, err
160 );
161 } else if let Some(status) = status {
162 error!(
163 "Request failed for path {} in {:.2}s with status: {}",
164 self.path, elapsed, status
165 );
166 } else {
167 warn!("Request failed for path {} in {:.2}s", self.path, elapsed);
168 }
169 }
170 }
171
172 fn shed(mut self) {
173 if let Some(timer) = self.timer.take() {
174 let elapsed = timer.stop_and_record();
175 self.metrics
176 .total_requests_shed
177 .with_label_values(&[&self.path])
178 .inc();
179 info!("Request shed for path {} in {:.2}s", self.path, elapsed);
180 }
181 }
182}
183
184impl Drop for MetricsGuard {
185 fn drop(&mut self) {
186 self.metrics
187 .current_requests_in_flight
188 .with_label_values(&[&self.path])
189 .dec();
190
191 if let Some(timer) = self.timer.take() {
194 let elapsed = timer.stop_and_record();
195 self.metrics
196 .total_requests_disconnected
197 .with_label_values(&[&self.path])
198 .inc();
199 info!(
200 "Request disconnected for path {} in {:.2}s",
201 self.path, elapsed
202 );
203 }
204 }
205}