iota_proxy/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    net::TcpListener,
7    sync::{Arc, RwLock},
8};
9
10use axum::{Router, extract::Extension, http::StatusCode, routing::get};
11use iota_metrics::RegistryService;
12use prometheus::{Registry, TextEncoder};
13use tower::ServiceBuilder;
14use tower_http::{
15    LatencyUnit,
16    trace::{DefaultOnResponse, TraceLayer},
17};
18use tracing::Level;
19
20const METRICS_ROUTE: &str = "/metrics";
21const POD_HEALTH_ROUTE: &str = "/pod_health";
22
23type HealthCheckMetrics = Arc<RwLock<HealthCheck>>;
24
25/// Do not access struct members without using HealthCheckMetrics to arc+mutex
26#[derive(Debug)]
27struct HealthCheck {
28    // eg; consumer_operations_submitted{...}
29    consumer_operations_submitted: f64,
30}
31
32/// HealthCheck contains fields we believe are interesting that say whether this
33/// pod should be considered health.  do not use w/o using an arc+mutex
34impl HealthCheck {
35    fn new() -> Self {
36        Self {
37            consumer_operations_submitted: 0.0,
38        }
39    }
40}
41
42// Creates a new http server that has as a sole purpose to expose
43// and endpoint that prometheus agent can use to poll for the metrics.
44// A RegistryService is returned that can be used to get access in prometheus
45// Registries.
46pub fn start_prometheus_server(listener: TcpListener) -> RegistryService {
47    let registry = Registry::new();
48
49    let registry_service = RegistryService::new(registry);
50
51    let pod_health_data = Arc::new(RwLock::new(HealthCheck::new()));
52
53    let app = Router::new()
54        .route(METRICS_ROUTE, get(metrics))
55        .route(POD_HEALTH_ROUTE, get(pod_health))
56        .layer(Extension(registry_service.clone()))
57        .layer(Extension(pod_health_data.clone()))
58        .layer(
59            ServiceBuilder::new().layer(
60                TraceLayer::new_for_http().on_response(
61                    DefaultOnResponse::new()
62                        .level(Level::INFO)
63                        .latency_unit(LatencyUnit::Seconds),
64                ),
65            ),
66        );
67
68    tokio::spawn(async move {
69        listener.set_nonblocking(true).unwrap();
70        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
71        axum::serve(listener, app).await.unwrap();
72    });
73
74    registry_service
75}
76
77// DO NOT remove this handler, it is not compatible with the
78// iota_metrics::metric equivalent
79async fn metrics(
80    Extension(registry_service): Extension<RegistryService>,
81    Extension(pod_health): Extension<HealthCheckMetrics>,
82) -> (StatusCode, String) {
83    let mut metric_families = registry_service.gather_all();
84    metric_families.extend(prometheus::gather());
85
86    if let Some(consumer_operations_submitted) = metric_families
87        .iter()
88        .filter_map(|v| {
89            if v.name() == "consumer_operations_submitted" {
90                // Expecting one metric, so return the first one, as it is the only one
91                v.get_metric().first().map(|m| m.get_counter().value())
92            } else {
93                None
94            }
95        })
96        .next()
97    {
98        pod_health
99            .write()
100            .expect("unable to write to pod health metrics")
101            .consumer_operations_submitted = consumer_operations_submitted;
102    };
103    match TextEncoder.encode_to_string(&metric_families) {
104        Ok(metrics) => (StatusCode::OK, metrics),
105        Err(error) => (
106            StatusCode::INTERNAL_SERVER_ERROR,
107            format!("unable to encode metrics: {error}"),
108        ),
109    }
110}
111
112/// pod_health is called by k8s to know if this service is correctly processing
113/// data
114async fn pod_health(Extension(pod_health): Extension<HealthCheckMetrics>) -> (StatusCode, String) {
115    let consumer_operations_submitted = pod_health
116        .read()
117        .expect("unable to read pod health metrics")
118        .consumer_operations_submitted;
119
120    if consumer_operations_submitted > 0.0 {
121        (StatusCode::OK, consumer_operations_submitted.to_string())
122    } else {
123        (
124            StatusCode::SERVICE_UNAVAILABLE,
125            consumer_operations_submitted.to_string(),
126        )
127    }
128}