1use std::{
6 future::Future,
7 net::SocketAddr,
8 path::Path,
9 pin::Pin,
10 sync::Arc,
11 task::{Context, Poll},
12 time::Instant,
13};
14
15use axum::{
16 Router,
17 extract::{Extension, Request},
18 http::StatusCode,
19 middleware::Next,
20 response::Response,
21 routing::get,
22};
23use dashmap::DashMap;
24use once_cell::sync::OnceCell;
25use parking_lot::Mutex;
26use prometheus::{
27 Histogram, IntCounterVec, IntGaugeVec, Registry, TextEncoder,
28 core::{AtomicI64, GenericGauge},
29 register_histogram_with_registry, register_int_counter_vec_with_registry,
30 register_int_gauge_vec_with_registry,
31};
32pub use scopeguard;
33use simple_server_timing_header::Timer;
34use tap::TapFallible;
35use tracing::{Span, warn};
36use uuid::Uuid;
37
38mod guards;
39pub mod hardware_metrics;
40pub mod histogram;
41pub mod metered_channel;
42pub mod metrics_network;
43pub mod monitored_mpsc;
44pub mod thread_stall_monitor;
45pub use guards::*;
46
47pub const TX_TYPE_SINGLE_WRITER_TX: &str = "single_writer";
48pub const TX_TYPE_SHARED_OBJ_TX: &str = "shared_object";
49
50pub const SUBSECOND_LATENCY_SEC_BUCKETS: &[f64] = &[
51 0.005, 0.01, 0.02, 0.03, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1.,
52];
53
54pub const COARSE_LATENCY_SEC_BUCKETS: &[f64] = &[
55 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1., 2., 3., 5., 10., 20., 30., 60.,
56];
57
58pub const LATENCY_SEC_BUCKETS: &[f64] = &[
59 0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6,
60 0.7, 0.8, 0.9, 1., 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2., 2.5, 3., 3.5, 4., 4.5, 5.,
61 6., 7., 8., 9., 10., 15., 20., 25., 30., 60., 90.,
62];
63
64pub const COUNT_BUCKETS: &[f64] = &[
65 2., 5., 10., 20., 50., 100., 200., 500., 1000., 2000., 5000., 10000.,
66];
67
68pub const BYTES_BUCKETS: &[f64] = &[
69 1024., 4096., 16384., 65536., 262144., 524288., 1048576., 2097152., 4194304., 8388608.,
70 16777216., 33554432., 67108864.,
71];
72
73#[derive(Debug)]
74pub struct Metrics {
75 pub tasks: IntGaugeVec,
76 pub futures: IntGaugeVec,
77 pub channel_inflight: IntGaugeVec,
78 pub channel_sent: IntGaugeVec,
79 pub channel_received: IntGaugeVec,
80 pub future_active_duration_ns: IntGaugeVec,
81 pub scope_iterations: IntGaugeVec,
82 pub scope_duration_ns: IntGaugeVec,
83 pub scope_entrance: IntGaugeVec,
84 pub thread_stall_duration_sec: Histogram,
85 pub system_invariant_violations: IntCounterVec,
86}
87
88impl Metrics {
89 fn new(registry: &Registry) -> Self {
97 Self {
98 tasks: register_int_gauge_vec_with_registry!(
99 "monitored_tasks",
100 "Number of running tasks per callsite.",
101 &["callsite"],
102 registry,
103 )
104 .unwrap(),
105 futures: register_int_gauge_vec_with_registry!(
106 "monitored_futures",
107 "Number of pending futures per callsite.",
108 &["callsite"],
109 registry,
110 )
111 .unwrap(),
112 channel_inflight: register_int_gauge_vec_with_registry!(
113 "monitored_channel_inflight",
114 "Inflight items in channels.",
115 &["name"],
116 registry,
117 )
118 .unwrap(),
119 channel_sent: register_int_gauge_vec_with_registry!(
120 "monitored_channel_sent",
121 "Sent items in channels.",
122 &["name"],
123 registry,
124 )
125 .unwrap(),
126 channel_received: register_int_gauge_vec_with_registry!(
127 "monitored_channel_received",
128 "Received items in channels.",
129 &["name"],
130 registry,
131 )
132 .unwrap(),
133 future_active_duration_ns: register_int_gauge_vec_with_registry!(
134 "monitored_future_active_duration_ns",
135 "Total duration in nanosecs where the monitored future is active (consuming CPU time)",
136 &["name"],
137 registry,
138 )
139 .unwrap(),
140 scope_entrance: register_int_gauge_vec_with_registry!(
141 "monitored_scope_entrance",
142 "Number of entrance in the scope.",
143 &["name"],
144 registry,
145 )
146 .unwrap(),
147 scope_iterations: register_int_gauge_vec_with_registry!(
148 "monitored_scope_iterations",
149 "Total number of times where the monitored scope runs",
150 &["name"],
151 registry,
152 )
153 .unwrap(),
154 scope_duration_ns: register_int_gauge_vec_with_registry!(
155 "monitored_scope_duration_ns",
156 "Total duration in nanosecs where the monitored scope is running",
157 &["name"],
158 registry,
159 )
160 .unwrap(),
161 thread_stall_duration_sec: register_histogram_with_registry!(
162 "thread_stall_duration_sec",
163 "Duration of thread stalls in seconds.",
164 registry,
165 )
166 .unwrap(),
167 system_invariant_violations: register_int_counter_vec_with_registry!(
168 "system_invariant_violations",
169 "Number of system invariant violations",
170 &["name"],
171 registry,
172 ).unwrap(),
173 }
174 }
175}
176
177static METRICS: OnceCell<Metrics> = OnceCell::new();
178
179pub fn init_metrics(registry: &Registry) {
185 let _ = METRICS
186 .set(Metrics::new(registry))
187 .tap_err(|_| warn!("init_metrics registry overwritten"));
189}
190
191pub fn get_metrics() -> Option<&'static Metrics> {
193 METRICS.get()
194}
195
196tokio::task_local! {
197 static SERVER_TIMING: Arc<Mutex<Timer>>;
198}
199
200pub async fn with_new_server_timing<T>(fut: impl Future<Output = T> + Send + 'static) -> T {
205 let timer = Arc::new(Mutex::new(Timer::new()));
206
207 let mut ret = None;
208 SERVER_TIMING
209 .scope(timer, async {
210 ret = Some(fut.await);
211 })
212 .await;
213
214 ret.unwrap()
215}
216
217pub async fn server_timing_middleware(request: Request, next: Next) -> Response {
218 with_new_server_timing(async move {
219 let mut response = next.run(request).await;
220 add_server_timing("finish_request");
221
222 if let Ok(header_value) = get_server_timing()
223 .expect("server timing not set")
224 .lock()
225 .header_value()
226 .try_into()
227 {
228 response
229 .headers_mut()
230 .insert(Timer::header_key(), header_value);
231 }
232 response
233 })
234 .await
235}
236
237pub async fn with_server_timing<T>(
240 timer: Arc<Mutex<Timer>>,
241 fut: impl Future<Output = T> + Send + 'static,
242) -> T {
243 let mut ret = None;
244 SERVER_TIMING
245 .scope(timer, async {
246 ret = Some(fut.await);
247 })
248 .await;
249
250 ret.unwrap()
251}
252
253pub fn get_server_timing() -> Option<Arc<Mutex<Timer>>> {
256 SERVER_TIMING.try_with(|timer| timer.clone()).ok()
257}
258
259pub fn add_server_timing(name: &str) {
263 let res = SERVER_TIMING.try_with(|timer| {
264 timer.lock().add(name);
265 });
266
267 if res.is_err() {
268 tracing::error!("Server timing context not found");
269 }
270}
271
272#[macro_export]
273macro_rules! monitored_future {
274 ($fut: expr) => {{ monitored_future!(futures, $fut, "", INFO, false) }};
275
276 ($metric: ident, $fut: expr, $name: expr, $logging_level: ident, $logging_enabled: expr) => {{
277 let location: &str = if $name.is_empty() {
278 concat!(file!(), ':', line!())
279 } else {
280 concat!(file!(), ':', $name)
281 };
282
283 async move {
284 let metrics = $crate::get_metrics();
285
286 let _metrics_guard = if let Some(m) = metrics {
287 m.$metric.with_label_values(&[location]).inc();
288 Some($crate::scopeguard::guard(m, |_| {
289 m.$metric.with_label_values(&[location]).dec();
290 }))
291 } else {
292 None
293 };
294 let _logging_guard = if $logging_enabled {
295 Some($crate::scopeguard::guard((), |_| {
296 tracing::event!(
297 tracing::Level::$logging_level,
298 "Future {} completed",
299 location
300 );
301 }))
302 } else {
303 None
304 };
305
306 if $logging_enabled {
307 tracing::event!(
308 tracing::Level::$logging_level,
309 "Spawning future {}",
310 location
311 );
312 }
313
314 $fut.await
315 }
316 }};
317}
318
319#[macro_export]
320macro_rules! forward_server_timing_and_spawn {
321 ($fut: expr) => {
322 if let Some(timing) = $crate::get_server_timing() {
323 tokio::task::spawn(async move { $crate::with_server_timing(timing, $fut).await })
324 } else {
325 tokio::task::spawn($fut)
326 }
327 };
328}
329
330#[macro_export]
331macro_rules! spawn_monitored_task {
332 ($fut: expr) => {
333 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
334 tasks, $fut, "", INFO, false
335 ))
336 };
337}
338
339#[macro_export]
340macro_rules! spawn_logged_monitored_task {
341 ($fut: expr) => {
342 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
343 tasks, $fut, "", INFO, true
344 ))
345 };
346
347 ($fut: expr, $name: expr) => {
348 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
349 tasks, $fut, $name, INFO, true
350 ))
351 };
352
353 ($fut: expr, $name: expr, $logging_level: ident) => {
354 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
355 tasks,
356 $fut,
357 $name,
358 $logging_level,
359 true
360 ))
361 };
362}
363
364pub struct MonitoredScopeGuard {
365 metrics: &'static Metrics,
366 name: &'static str,
367 timer: Instant,
368}
369
370impl Drop for MonitoredScopeGuard {
371 fn drop(&mut self) {
372 self.metrics
373 .scope_duration_ns
374 .with_label_values(&[self.name])
375 .add(self.timer.elapsed().as_nanos() as i64);
376 self.metrics
377 .scope_entrance
378 .with_label_values(&[self.name])
379 .dec();
380 }
381}
382
383pub fn monitored_scope(name: &'static str) -> Option<MonitoredScopeGuard> {
394 let metrics = get_metrics();
395 if let Some(m) = metrics {
396 m.scope_iterations.with_label_values(&[name]).inc();
397 m.scope_entrance.with_label_values(&[name]).inc();
398 Some(MonitoredScopeGuard {
399 metrics: m,
400 name,
401 timer: Instant::now(),
402 })
403 } else {
404 None
405 }
406}
407
408pub trait MonitoredFutureExt: Future + Sized {
413 fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self>;
417}
418
419impl<F: Future> MonitoredFutureExt for F {
420 fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self> {
421 MonitoredScopeFuture {
422 f: Box::pin(self),
423 active_duration_metric: get_metrics()
424 .map(|m| m.future_active_duration_ns.with_label_values(&[name])),
425 _scope: monitored_scope(name),
426 }
427 }
428}
429
430pub struct MonitoredScopeFuture<F: Sized> {
435 f: Pin<Box<F>>,
436 active_duration_metric: Option<GenericGauge<AtomicI64>>,
437 _scope: Option<MonitoredScopeGuard>,
438}
439
440impl<F: Future> Future for MonitoredScopeFuture<F> {
441 type Output = F::Output;
442
443 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
444 let active_timer = Instant::now();
445 let ret = self.f.as_mut().poll(cx);
446 if let Some(m) = &self.active_duration_metric {
447 m.add(active_timer.elapsed().as_nanos() as i64);
448 }
449 ret
450 }
451}
452
453pub struct CancelMonitor<F: Sized> {
458 finished: bool,
459 inner: Pin<Box<F>>,
460}
461
462impl<F> CancelMonitor<F>
463where
464 F: Future,
465{
466 pub fn new(inner: F) -> Self {
469 Self {
470 finished: false,
471 inner: Box::pin(inner),
472 }
473 }
474
475 pub fn is_finished(&self) -> bool {
477 self.finished
478 }
479}
480
481impl<F> Future for CancelMonitor<F>
482where
483 F: Future,
484{
485 type Output = F::Output;
486
487 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
492 match self.inner.as_mut().poll(cx) {
493 Poll::Ready(output) => {
494 self.finished = true;
495 Poll::Ready(output)
496 }
497 Poll::Pending => Poll::Pending,
498 }
499 }
500}
501
502impl<F: Sized> Drop for CancelMonitor<F> {
503 fn drop(&mut self) {
508 if !self.finished {
509 Span::current().record("cancelled", true);
510 }
511 }
512}
513
514pub trait MonitorCancellation {
519 fn monitor_cancellation(self) -> CancelMonitor<Self>
520 where
521 Self: Sized + Future;
522}
523
524impl<T> MonitorCancellation for T
525where
526 T: Future,
527{
528 fn monitor_cancellation(self) -> CancelMonitor<Self> {
529 CancelMonitor::new(self)
530 }
531}
532
533pub type RegistryID = Uuid;
534
535#[derive(Clone)]
539pub struct RegistryService {
540 default_registry: Registry,
542 registries_by_id: Arc<DashMap<Uuid, Registry>>,
543}
544
545impl RegistryService {
546 pub fn new(default_registry: Registry) -> Self {
549 Self {
550 default_registry,
551 registries_by_id: Arc::new(DashMap::new()),
552 }
553 }
554
555 pub fn default_registry(&self) -> Registry {
558 self.default_registry.clone()
559 }
560
561 pub fn add(&self, registry: Registry) -> RegistryID {
567 let registry_id = Uuid::new_v4();
568 if self
569 .registries_by_id
570 .insert(registry_id, registry)
571 .is_some()
572 {
573 panic!("Other Registry already detected for the same id {registry_id}");
574 }
575
576 registry_id
577 }
578
579 pub fn remove(&self, registry_id: RegistryID) -> bool {
582 self.registries_by_id.remove(®istry_id).is_some()
583 }
584
585 pub fn get_all(&self) -> Vec<Registry> {
587 let mut registries: Vec<Registry> = self
588 .registries_by_id
589 .iter()
590 .map(|r| r.value().clone())
591 .collect();
592 registries.push(self.default_registry.clone());
593
594 registries
595 }
596
597 pub fn gather_all(&self) -> Vec<prometheus::proto::MetricFamily> {
599 self.get_all().iter().flat_map(|r| r.gather()).collect()
600 }
601}
602
603pub fn uptime_metric(
612 process: &str,
613 version: &'static str,
614 chain_identifier: &str,
615) -> Box<dyn prometheus::core::Collector> {
616 let opts = prometheus::opts!("uptime", "uptime of the node service in seconds")
617 .variable_label("process")
618 .variable_label("version")
619 .variable_label("chain_identifier")
620 .variable_label("os_version")
621 .variable_label("is_docker");
622
623 let start_time = std::time::Instant::now();
624 let uptime = move || start_time.elapsed().as_secs();
625 let metric = prometheus_closure_metric::ClosureMetric::new(
626 opts,
627 prometheus_closure_metric::ValueType::Counter,
628 uptime,
629 &[
630 process,
631 version,
632 chain_identifier,
633 &sysinfo::System::long_os_version()
634 .unwrap_or_else(|| "os_version_unavailable".to_string()),
635 &is_running_in_docker().to_string(),
636 ],
637 )
638 .unwrap();
639
640 Box::new(metric)
641}
642
643pub fn is_running_in_docker() -> bool {
644 Path::new("/.dockerenv").exists()
647}
648
649pub const METRICS_ROUTE: &str = "/metrics";
650
651pub fn start_prometheus_server(addr: SocketAddr) -> RegistryService {
656 let registry = Registry::new();
657
658 let registry_service = RegistryService::new(registry);
659
660 if cfg!(msim) {
661 warn!("not starting prometheus server in simulator");
664 return registry_service;
665 }
666
667 let app = Router::new()
668 .route(METRICS_ROUTE, get(metrics))
669 .layer(Extension(registry_service.clone()));
670
671 tokio::spawn(async move {
672 let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
673 axum::serve(listener, app.into_make_service())
674 .await
675 .unwrap();
676 });
677
678 registry_service
679}
680
681pub async fn metrics(
689 Extension(registry_service): Extension<RegistryService>,
690) -> (StatusCode, String) {
691 let metrics_families = registry_service.gather_all();
692 match TextEncoder.encode_to_string(&metrics_families) {
693 Ok(metrics) => (StatusCode::OK, metrics),
694 Err(error) => (
695 StatusCode::INTERNAL_SERVER_ERROR,
696 format!("unable to encode metrics: {error}"),
697 ),
698 }
699}
700
701#[cfg(test)]
702mod tests {
703 use prometheus::{IntCounter, Registry};
704
705 use crate::RegistryService;
706
707 #[test]
708 fn registry_service() {
709 let default_registry = Registry::new_custom(Some("default".to_string()), None).unwrap();
711
712 let registry_service = RegistryService::new(default_registry.clone());
713 let default_counter = IntCounter::new("counter", "counter_desc").unwrap();
714 default_counter.inc();
715 default_registry
716 .register(Box::new(default_counter))
717 .unwrap();
718
719 let registry_1 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
723 registry_1
724 .register(Box::new(
725 IntCounter::new("counter_1", "counter_1_desc").unwrap(),
726 ))
727 .unwrap();
728
729 let registry_1_id = registry_service.add(registry_1);
731
732 let mut metrics = registry_service.gather_all();
734 metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
735
736 assert_eq!(metrics.len(), 2);
737
738 let metric_default = metrics.remove(0);
739 assert_eq!(metric_default.name(), "default_counter");
740 assert_eq!(metric_default.help(), "counter_desc");
741
742 let metric_1: prometheus::proto::MetricFamily = metrics.remove(0);
743 assert_eq!(metric_1.name(), "iota_counter_1");
744 assert_eq!(metric_1.help(), "counter_1_desc");
745
746 let registry_2 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
748 registry_2
749 .register(Box::new(
750 IntCounter::new("counter_2", "counter_2_desc").unwrap(),
751 ))
752 .unwrap();
753 let _registry_2_id = registry_service.add(registry_2);
754
755 let mut metrics = registry_service.gather_all();
757 metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
758
759 assert_eq!(metrics.len(), 3);
760
761 let metric_default = metrics.remove(0);
762 assert_eq!(metric_default.name(), "default_counter");
763 assert_eq!(metric_default.help(), "counter_desc");
764
765 let metric_1 = metrics.remove(0);
766 assert_eq!(metric_1.name(), "iota_counter_1");
767 assert_eq!(metric_1.help(), "counter_1_desc");
768
769 let metric_2 = metrics.remove(0);
770 assert_eq!(metric_2.name(), "iota_counter_2");
771 assert_eq!(metric_2.help(), "counter_2_desc");
772
773 assert!(registry_service.remove(registry_1_id));
775
776 let mut metrics = registry_service.gather_all();
778 metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
779
780 assert_eq!(metrics.len(), 2);
781
782 let metric_default = metrics.remove(0);
783 assert_eq!(metric_default.name(), "default_counter");
784 assert_eq!(metric_default.help(), "counter_desc");
785
786 let metric_1 = metrics.remove(0);
787 assert_eq!(metric_1.name(), "iota_counter_2");
788 assert_eq!(metric_1.help(), "counter_2_desc");
789 }
790}