1use std::{
6 net::TcpListener,
7 sync::{Arc, RwLock},
8};
9
10use axum::{Router, extract::Extension, http::StatusCode, routing::get};
11use iota_metrics::RegistryService;
12use prometheus::{Registry, TextEncoder};
13use tower::ServiceBuilder;
14use tower_http::{
15 LatencyUnit,
16 trace::{DefaultOnResponse, TraceLayer},
17};
18use tracing::Level;
19
20const METRICS_ROUTE: &str = "/metrics";
21const POD_HEALTH_ROUTE: &str = "/pod_health";
22
23type HealthCheckMetrics = Arc<RwLock<HealthCheck>>;
24
25#[derive(Debug)]
27struct HealthCheck {
28 consumer_operations_submitted: f64,
30}
31
32impl HealthCheck {
35 fn new() -> Self {
36 Self {
37 consumer_operations_submitted: 0.0,
38 }
39 }
40}
41
42pub fn start_prometheus_server(listener: TcpListener) -> RegistryService {
47 let registry = Registry::new();
48
49 let registry_service = RegistryService::new(registry);
50
51 let pod_health_data = Arc::new(RwLock::new(HealthCheck::new()));
52
53 let app = Router::new()
54 .route(METRICS_ROUTE, get(metrics))
55 .route(POD_HEALTH_ROUTE, get(pod_health))
56 .layer(Extension(registry_service.clone()))
57 .layer(Extension(pod_health_data.clone()))
58 .layer(
59 ServiceBuilder::new().layer(
60 TraceLayer::new_for_http().on_response(
61 DefaultOnResponse::new()
62 .level(Level::INFO)
63 .latency_unit(LatencyUnit::Seconds),
64 ),
65 ),
66 );
67
68 tokio::spawn(async move {
69 listener.set_nonblocking(true).unwrap();
70 let listener = tokio::net::TcpListener::from_std(listener).unwrap();
71 axum::serve(listener, app).await.unwrap();
72 });
73
74 registry_service
75}
76
77async fn metrics(
80 Extension(registry_service): Extension<RegistryService>,
81 Extension(pod_health): Extension<HealthCheckMetrics>,
82) -> (StatusCode, String) {
83 let mut metric_families = registry_service.gather_all();
84 metric_families.extend(prometheus::gather());
85
86 if let Some(consumer_operations_submitted) = metric_families
87 .iter()
88 .filter_map(|v| {
89 if v.name() == "consumer_operations_submitted" {
90 v.get_metric().first().map(|m| m.get_counter().value())
92 } else {
93 None
94 }
95 })
96 .next()
97 {
98 pod_health
99 .write()
100 .expect("unable to write to pod health metrics")
101 .consumer_operations_submitted = consumer_operations_submitted;
102 };
103 match TextEncoder.encode_to_string(&metric_families) {
104 Ok(metrics) => (StatusCode::OK, metrics),
105 Err(error) => (
106 StatusCode::INTERNAL_SERVER_ERROR,
107 format!("unable to encode metrics: {error}"),
108 ),
109 }
110}
111
112async fn pod_health(Extension(pod_health): Extension<HealthCheckMetrics>) -> (StatusCode, String) {
115 let consumer_operations_submitted = pod_health
116 .read()
117 .expect("unable to read pod health metrics")
118 .consumer_operations_submitted;
119
120 if consumer_operations_submitted > 0.0 {
121 (StatusCode::OK, consumer_operations_submitted.to_string())
122 } else {
123 (
124 StatusCode::SERVICE_UNAVAILABLE,
125 consumer_operations_submitted.to_string(),
126 )
127 }
128}