iota_faucet/
metrics_layer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    pin::Pin,
7    sync::Arc,
8    task::{Context, Poll},
9};
10
11use futures::Future;
12use http::{Request, StatusCode};
13use prometheus::{HistogramTimer, Registry};
14use tower::{BoxError, Layer, Service, ServiceExt, load_shed::error::Overloaded};
15use tracing::{error, info, warn};
16
17use crate::metrics::RequestMetrics;
18
19/// Tower Layer for tracking metrics in Prometheus related to number,
20/// success-rate and latency of requests running through service.
21#[derive(Clone)]
22pub struct RequestMetricsLayer {
23    metrics: Arc<RequestMetrics>,
24}
25
26#[derive(Clone)]
27pub struct RequestMetricsService<Inner> {
28    inner: Inner,
29    metrics: Arc<RequestMetrics>,
30}
31
32pub struct RequestMetricsFuture<Res> {
33    future: Pin<Box<dyn Future<Output = Result<Res, BoxError>> + Send>>,
34}
35
36struct MetricsGuard {
37    timer: Option<HistogramTimer>,
38    metrics: Arc<RequestMetrics>,
39    path: String,
40}
41
42impl RequestMetricsLayer {
43    pub fn new(registry: &Registry) -> Self {
44        Self {
45            metrics: Arc::new(RequestMetrics::new(registry)),
46        }
47    }
48}
49
50impl<Inner> Layer<Inner> for RequestMetricsLayer {
51    type Service = RequestMetricsService<Inner>;
52    fn layer(&self, inner: Inner) -> Self::Service {
53        RequestMetricsService {
54            inner,
55            metrics: self.metrics.clone(),
56        }
57    }
58}
59
60impl<Inner, Body> Service<Request<Body>> for RequestMetricsService<Inner>
61where
62    Inner: Service<Request<Body>, Response = http::Response<Body>, Error = BoxError>
63        + Clone
64        + Send
65        + 'static,
66    Inner::Future: Send,
67    Body: Send + 'static,
68{
69    type Response = Inner::Response;
70    type Error = BoxError;
71    type Future = RequestMetricsFuture<Self::Response>;
72
73    fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
74        self.inner.poll_ready(ctx)
75    }
76
77    fn call(&mut self, req: Request<Body>) -> Self::Future {
78        let path = req.uri().path().to_string();
79        let metrics = MetricsGuard::new(self.metrics.clone(), &path);
80        let inner = self.inner.clone();
81
82        let future = Box::pin(async move {
83            let resp = inner.oneshot(req).await;
84            match &resp {
85                Ok(resp) if !resp.status().is_success() => {
86                    metrics.failed(None, Some(resp.status()))
87                }
88                Ok(_) => metrics.succeeded(),
89                Err(err) => {
90                    if err.is::<Overloaded>() {
91                        metrics.shed();
92                    } else {
93                        metrics.failed(Some(err), None);
94                    }
95                }
96            }
97
98            resp
99        });
100
101        RequestMetricsFuture { future }
102    }
103}
104
105impl<Res> Future for RequestMetricsFuture<Res> {
106    type Output = Result<Res, BoxError>;
107    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
108        Future::poll(self.future.as_mut(), ctx)
109    }
110}
111
112impl MetricsGuard {
113    fn new(metrics: Arc<RequestMetrics>, path: &str) -> Self {
114        metrics
115            .total_requests_received
116            .with_label_values(&[path])
117            .inc();
118        metrics
119            .current_requests_in_flight
120            .with_label_values(&[path])
121            .inc();
122        MetricsGuard {
123            timer: Some(
124                metrics
125                    .process_latency
126                    .with_label_values(&[path])
127                    .start_timer(),
128            ),
129            metrics,
130            path: path.to_string(),
131        }
132    }
133
134    fn succeeded(mut self) {
135        if let Some(timer) = self.timer.take() {
136            let elapsed = timer.stop_and_record();
137            self.metrics
138                .total_requests_succeeded
139                .with_label_values(&[&self.path])
140                .inc();
141            info!(
142                "Request succeeded for path {} in {:.2}s",
143                self.path, elapsed
144            );
145        }
146    }
147
148    fn failed(mut self, error: Option<&BoxError>, status: Option<StatusCode>) {
149        if let Some(timer) = self.timer.take() {
150            let elapsed = timer.stop_and_record();
151            self.metrics
152                .total_requests_failed
153                .with_label_values(&[&self.path])
154                .inc();
155
156            if let Some(err) = error {
157                error!(
158                    "Request failed for path {} in {:.2}s, error {:?}",
159                    self.path, elapsed, err
160                );
161            } else if let Some(status) = status {
162                error!(
163                    "Request failed for path {} in {:.2}s with status: {}",
164                    self.path, elapsed, status
165                );
166            } else {
167                warn!("Request failed for path {} in {:.2}s", self.path, elapsed);
168            }
169        }
170    }
171
172    fn shed(mut self) {
173        if let Some(timer) = self.timer.take() {
174            let elapsed = timer.stop_and_record();
175            self.metrics
176                .total_requests_shed
177                .with_label_values(&[&self.path])
178                .inc();
179            info!("Request shed for path {} in {:.2}s", self.path, elapsed);
180        }
181    }
182}
183
184impl Drop for MetricsGuard {
185    fn drop(&mut self) {
186        self.metrics
187            .current_requests_in_flight
188            .with_label_values(&[&self.path])
189            .dec();
190
191        // Request was still in flight when the guard was dropped, implying the client
192        // disconnected.
193        if let Some(timer) = self.timer.take() {
194            let elapsed = timer.stop_and_record();
195            self.metrics
196                .total_requests_disconnected
197                .with_label_values(&[&self.path])
198                .inc();
199            info!(
200                "Request disconnected for path {} in {:.2}s",
201                self.path, elapsed
202            );
203        }
204    }
205}