1use std::{
6 future::Future,
7 net::SocketAddr,
8 path::Path,
9 pin::Pin,
10 sync::Arc,
11 task::{Context, Poll},
12 time::Instant,
13};
14
15use axum::{
16 Router,
17 extract::{Extension, Request},
18 http::StatusCode,
19 middleware::Next,
20 response::Response,
21 routing::get,
22};
23use dashmap::DashMap;
24use once_cell::sync::OnceCell;
25use parking_lot::Mutex;
26use prometheus_filtered::{
27 Histogram, IntCounterVec, IntGaugeVec, Registry, TextEncoder,
28 core::{AtomicI64, GenericGauge},
29 register_histogram_with_registry, register_int_counter_vec_with_registry,
30 register_int_gauge_vec_with_registry,
31};
32pub use scopeguard;
33use simple_server_timing_header::Timer;
34use tap::TapFallible;
35use tracing::{Span, warn};
36use uuid::Uuid;
37
38mod guards;
39pub mod hardware_metrics;
40pub mod histogram;
41pub mod metered_channel;
42pub mod metrics_network;
43pub mod monitored_mpsc;
44pub mod thread_stall_monitor;
45pub use guards::*;
46
47pub const TX_TYPE_SINGLE_WRITER_TX: &str = "single_writer";
48pub const TX_TYPE_SHARED_OBJ_TX: &str = "shared_object";
49
50pub const SUBSECOND_LATENCY_SEC_BUCKETS: &[f64] = &[
51 0.005, 0.01, 0.02, 0.03, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1.,
52];
53
54pub const COARSE_LATENCY_SEC_BUCKETS: &[f64] = &[
55 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1., 2., 3., 5., 10., 20., 30., 60.,
56];
57
58pub const LATENCY_SEC_BUCKETS: &[f64] = &[
59 0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6,
60 0.7, 0.8, 0.9, 1., 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2., 2.5, 3., 3.5, 4., 4.5, 5.,
61 6., 7., 8., 9., 10., 15., 20., 25., 30., 60., 90.,
62];
63
64pub const COUNT_BUCKETS: &[f64] = &[
65 2., 5., 10., 20., 50., 100., 200., 500., 1000., 2000., 5000., 10000.,
66];
67
68pub const BYTES_BUCKETS: &[f64] = &[
69 1024., 4096., 16384., 65536., 262144., 524288., 1048576., 2097152., 4194304., 8388608.,
70 16777216., 33554432., 67108864.,
71];
72
73#[derive(Debug)]
74pub struct Metrics {
75 pub tasks: IntGaugeVec,
76 pub futures: IntGaugeVec,
77 pub channel_inflight: IntGaugeVec,
78 pub channel_sent: IntGaugeVec,
79 pub channel_received: IntGaugeVec,
80 pub future_active_duration_ns: IntGaugeVec,
81 pub scope_iterations: IntGaugeVec,
82 pub scope_duration_ns: IntGaugeVec,
83 pub scope_entrance: IntGaugeVec,
84 pub thread_stall_duration_sec: Histogram,
85 pub system_invariant_violations: IntCounterVec,
86}
87
88impl Metrics {
89 fn new(registry: &Registry) -> Self {
97 Self {
98 tasks: register_int_gauge_vec_with_registry!(
99 "monitored_tasks",
100 "Number of running tasks per callsite.",
101 &["callsite"],
102 registry,
103 )
104 .unwrap(),
105 futures: register_int_gauge_vec_with_registry!(
106 "monitored_futures",
107 "Number of pending futures per callsite.",
108 &["callsite"],
109 registry,
110 )
111 .unwrap(),
112 channel_inflight: register_int_gauge_vec_with_registry!(
113 "monitored_channel_inflight",
114 "Inflight items in channels.",
115 &["name"],
116 registry,
117 )
118 .unwrap(),
119 channel_sent: register_int_gauge_vec_with_registry!(
120 "monitored_channel_sent",
121 "Sent items in channels.",
122 &["name"],
123 registry,
124 )
125 .unwrap(),
126 channel_received: register_int_gauge_vec_with_registry!(
127 "monitored_channel_received",
128 "Received items in channels.",
129 &["name"],
130 registry,
131 )
132 .unwrap(),
133 future_active_duration_ns: register_int_gauge_vec_with_registry!(
134 "monitored_future_active_duration_ns",
135 "Total duration in nanosecs where the monitored future is active (consuming CPU time)",
136 &["name"],
137 registry,
138 )
139 .unwrap(),
140 scope_entrance: register_int_gauge_vec_with_registry!(
141 "monitored_scope_entrance",
142 "Number of entrance in the scope.",
143 &["name"],
144 registry,
145 )
146 .unwrap(),
147 scope_iterations: register_int_gauge_vec_with_registry!(
148 "monitored_scope_iterations",
149 "Total number of times where the monitored scope runs",
150 &["name"],
151 registry,
152 )
153 .unwrap(),
154 scope_duration_ns: register_int_gauge_vec_with_registry!(
155 "monitored_scope_duration_ns",
156 "Total duration in nanosecs where the monitored scope is running",
157 &["name"],
158 registry,
159 )
160 .unwrap(),
161 thread_stall_duration_sec: register_histogram_with_registry!(
162 "thread_stall_duration_sec",
163 "Duration of thread stalls in seconds.",
164 registry,
165 )
166 .unwrap(),
167 system_invariant_violations: register_int_counter_vec_with_registry!(
168 "system_invariant_violations",
169 "Number of system invariant violations",
170 &["name"],
171 registry,
172 ).unwrap(),
173 }
174 }
175}
176
177static METRICS: OnceCell<Metrics> = OnceCell::new();
178
179pub fn init_metrics(registry: &Registry) {
185 let _ = METRICS
186 .set(Metrics::new(registry))
187 .tap_err(|_| warn!("init_metrics registry overwritten"));
189}
190
191pub fn get_metrics() -> Option<&'static Metrics> {
193 METRICS.get()
194}
195
196tokio::task_local! {
197 static SERVER_TIMING: Arc<Mutex<Timer>>;
198}
199
200pub async fn with_new_server_timing<T>(fut: impl Future<Output = T> + Send + 'static) -> T {
205 let timer = Arc::new(Mutex::new(Timer::new()));
206
207 let mut ret = None;
208 SERVER_TIMING
209 .scope(timer, async {
210 ret = Some(fut.await);
211 })
212 .await;
213
214 ret.unwrap()
215}
216
217pub fn server_timing_header_key() -> &'static str {
219 Timer::header_key()
220}
221
222pub fn finish_and_set_server_timing_header(headers: &mut http::HeaderMap) {
225 let Some(timer) = get_server_timing() else {
226 return;
227 };
228 let header_value = {
229 let mut timer = timer.lock();
230 timer.add("finish_request");
231 timer.header_value()
232 };
233 if let Ok(value) = http::HeaderValue::try_from(header_value) {
234 headers.insert(server_timing_header_key(), value);
235 }
236}
237
238pub async fn server_timing_middleware(request: Request, next: Next) -> Response {
239 with_new_server_timing(async move {
240 let mut response = next.run(request).await;
241 finish_and_set_server_timing_header(response.headers_mut());
242 response
243 })
244 .await
245}
246
247pub async fn with_server_timing<T>(
250 timer: Arc<Mutex<Timer>>,
251 fut: impl Future<Output = T> + Send + 'static,
252) -> T {
253 let mut ret = None;
254 SERVER_TIMING
255 .scope(timer, async {
256 ret = Some(fut.await);
257 })
258 .await;
259
260 ret.unwrap()
261}
262
263pub fn get_server_timing() -> Option<Arc<Mutex<Timer>>> {
266 SERVER_TIMING.try_with(|timer| timer.clone()).ok()
267}
268
269pub fn add_server_timing(name: &str) {
273 let res = SERVER_TIMING.try_with(|timer| {
274 timer.lock().add(name);
275 });
276
277 if res.is_err() {
278 tracing::error!("Server timing context not found");
279 }
280}
281
282#[macro_export]
283macro_rules! monitored_future {
284 ($fut: expr) => {{ monitored_future!(futures, $fut, "", INFO, false) }};
285
286 ($metric: ident, $fut: expr, $name: expr, $logging_level: ident, $logging_enabled: expr) => {{
287 let location: &str = if $name.is_empty() {
288 concat!(file!(), ':', line!())
289 } else {
290 concat!(file!(), ':', $name)
291 };
292
293 async move {
294 let metrics = $crate::get_metrics();
295
296 let _metrics_guard = if let Some(m) = metrics {
297 m.$metric.with_label_values(&[location]).inc();
298 Some($crate::scopeguard::guard(m, |_| {
299 m.$metric.with_label_values(&[location]).dec();
300 }))
301 } else {
302 None
303 };
304 let _logging_guard = if $logging_enabled {
305 Some($crate::scopeguard::guard((), |_| {
306 tracing::event!(
307 tracing::Level::$logging_level,
308 "Future {} completed",
309 location
310 );
311 }))
312 } else {
313 None
314 };
315
316 if $logging_enabled {
317 tracing::event!(
318 tracing::Level::$logging_level,
319 "Spawning future {}",
320 location
321 );
322 }
323
324 $fut.await
325 }
326 }};
327}
328
329#[macro_export]
330macro_rules! forward_server_timing_and_spawn {
331 ($fut: expr) => {
332 if let Some(timing) = $crate::get_server_timing() {
333 tokio::task::spawn(async move { $crate::with_server_timing(timing, $fut).await })
334 } else {
335 tokio::task::spawn($fut)
336 }
337 };
338}
339
340#[macro_export]
341macro_rules! spawn_monitored_task {
342 ($fut: expr) => {
343 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
344 tasks, $fut, "", INFO, false
345 ))
346 };
347}
348
349#[macro_export]
350macro_rules! spawn_logged_monitored_task {
351 ($fut: expr) => {
352 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
353 tasks, $fut, "", INFO, true
354 ))
355 };
356
357 ($fut: expr, $name: expr) => {
358 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
359 tasks, $fut, $name, INFO, true
360 ))
361 };
362
363 ($fut: expr, $name: expr, $logging_level: ident) => {
364 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
365 tasks,
366 $fut,
367 $name,
368 $logging_level,
369 true
370 ))
371 };
372}
373
374pub struct MonitoredScopeGuard {
375 metrics: &'static Metrics,
376 name: &'static str,
377 timer: Instant,
378}
379
380impl Drop for MonitoredScopeGuard {
381 fn drop(&mut self) {
382 self.metrics
383 .scope_duration_ns
384 .with_label_values(&[self.name])
385 .add(self.timer.elapsed().as_nanos() as i64);
386 self.metrics
387 .scope_entrance
388 .with_label_values(&[self.name])
389 .dec();
390 }
391}
392
393pub fn monitored_scope(name: &'static str) -> Option<MonitoredScopeGuard> {
404 let metrics = get_metrics();
405 if let Some(m) = metrics {
406 m.scope_iterations.with_label_values(&[name]).inc();
407 m.scope_entrance.with_label_values(&[name]).inc();
408 Some(MonitoredScopeGuard {
409 metrics: m,
410 name,
411 timer: Instant::now(),
412 })
413 } else {
414 None
415 }
416}
417
418pub trait MonitoredFutureExt: Future + Sized {
423 fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self>;
427}
428
429impl<F: Future> MonitoredFutureExt for F {
430 fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self> {
431 MonitoredScopeFuture {
432 f: Box::pin(self),
433 active_duration_metric: get_metrics()
434 .map(|m| m.future_active_duration_ns.with_label_values(&[name])),
435 _scope: monitored_scope(name),
436 }
437 }
438}
439
440pub struct MonitoredScopeFuture<F: Sized> {
445 f: Pin<Box<F>>,
446 active_duration_metric: Option<GenericGauge<AtomicI64>>,
447 _scope: Option<MonitoredScopeGuard>,
448}
449
450impl<F: Future> Future for MonitoredScopeFuture<F> {
451 type Output = F::Output;
452
453 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
454 let active_timer = Instant::now();
455 let ret = self.f.as_mut().poll(cx);
456 if let Some(m) = &self.active_duration_metric {
457 m.add(active_timer.elapsed().as_nanos() as i64);
458 }
459 ret
460 }
461}
462
463pub struct CancelMonitor<F: Sized> {
468 finished: bool,
469 inner: Pin<Box<F>>,
470}
471
472impl<F> CancelMonitor<F>
473where
474 F: Future,
475{
476 pub fn new(inner: F) -> Self {
479 Self {
480 finished: false,
481 inner: Box::pin(inner),
482 }
483 }
484
485 pub fn is_finished(&self) -> bool {
487 self.finished
488 }
489}
490
491impl<F> Future for CancelMonitor<F>
492where
493 F: Future,
494{
495 type Output = F::Output;
496
497 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
502 match self.inner.as_mut().poll(cx) {
503 Poll::Ready(output) => {
504 self.finished = true;
505 Poll::Ready(output)
506 }
507 Poll::Pending => Poll::Pending,
508 }
509 }
510}
511
512impl<F: Sized> Drop for CancelMonitor<F> {
513 fn drop(&mut self) {
518 if !self.finished {
519 Span::current().record("cancelled", true);
520 }
521 }
522}
523
524pub trait MonitorCancellation {
529 fn monitor_cancellation(self) -> CancelMonitor<Self>
530 where
531 Self: Sized + Future;
532}
533
534impl<T> MonitorCancellation for T
535where
536 T: Future,
537{
538 fn monitor_cancellation(self) -> CancelMonitor<Self> {
539 CancelMonitor::new(self)
540 }
541}
542
543pub type RegistryID = Uuid;
544
545#[derive(Clone)]
549pub struct RegistryService {
550 default_registry: Registry,
552 registries_by_id: Arc<DashMap<Uuid, Registry>>,
553}
554
555impl RegistryService {
556 pub fn new(default_registry: Registry) -> Self {
559 Self {
560 default_registry,
561 registries_by_id: Arc::new(DashMap::new()),
562 }
563 }
564
565 pub fn default_registry(&self) -> Registry {
568 self.default_registry.clone()
569 }
570
571 pub fn add(&self, registry: Registry) -> RegistryID {
577 let registry_id = Uuid::new_v4();
578 if self
579 .registries_by_id
580 .insert(registry_id, registry)
581 .is_some()
582 {
583 panic!("Other Registry already detected for the same id {registry_id}");
584 }
585
586 registry_id
587 }
588
589 pub fn remove(&self, registry_id: RegistryID) -> bool {
592 self.registries_by_id.remove(®istry_id).is_some()
593 }
594
595 pub fn get_all(&self) -> Vec<Registry> {
597 let mut registries: Vec<Registry> = self
598 .registries_by_id
599 .iter()
600 .map(|r| r.value().clone())
601 .collect();
602 registries.push(self.default_registry.clone());
603
604 registries
605 }
606
607 pub fn gather_all(&self) -> Vec<prometheus_filtered::proto::MetricFamily> {
609 self.get_all().iter().flat_map(|r| r.gather()).collect()
610 }
611}
612
613pub fn uptime_metric(
622 process: &str,
623 version: &'static str,
624 chain_identifier: &str,
625) -> Box<dyn prometheus_filtered::core::Collector> {
626 let opts = prometheus_filtered::opts!("uptime", "uptime of the node service in seconds")
627 .variable_label("process")
628 .variable_label("version")
629 .variable_label("chain_identifier")
630 .variable_label("os_version")
631 .variable_label("is_docker");
632
633 let start_time = std::time::Instant::now();
634 let uptime = move || start_time.elapsed().as_secs();
635 let metric = prometheus_closure_metric::ClosureMetric::new(
636 opts,
637 prometheus_closure_metric::ValueType::Counter,
638 uptime,
639 &[
640 process,
641 version,
642 chain_identifier,
643 &sysinfo::System::long_os_version()
644 .unwrap_or_else(|| "os_version_unavailable".to_string()),
645 &is_running_in_docker().to_string(),
646 ],
647 )
648 .unwrap();
649
650 Box::new(metric)
651}
652
653pub fn is_running_in_docker() -> bool {
654 Path::new("/.dockerenv").exists()
657}
658
659pub const METRICS_ROUTE: &str = "/metrics";
660
661pub fn start_prometheus_server(addr: SocketAddr) -> RegistryService {
666 let registry = Registry::new();
667
668 let registry_service = RegistryService::new(registry);
669
670 if cfg!(msim) {
671 warn!("not starting prometheus server in simulator");
674 return registry_service;
675 }
676
677 let app = Router::new()
678 .route(METRICS_ROUTE, get(metrics))
679 .layer(Extension(registry_service.clone()));
680
681 tokio::spawn(async move {
682 let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
683 axum::serve(listener, app.into_make_service())
684 .await
685 .unwrap();
686 });
687
688 registry_service
689}
690
691pub async fn metrics(
699 Extension(registry_service): Extension<RegistryService>,
700) -> (StatusCode, String) {
701 let metrics_families = registry_service.gather_all();
702 match TextEncoder.encode_to_string(&metrics_families) {
703 Ok(metrics) => (StatusCode::OK, metrics),
704 Err(error) => (
705 StatusCode::INTERNAL_SERVER_ERROR,
706 format!("unable to encode metrics: {error}"),
707 ),
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use prometheus_filtered::{IntCounter, Registry};
714
715 use crate::RegistryService;
716
717 #[test]
718 fn registry_service() {
719 let default_registry = Registry::new_custom(Some("default".to_string()), None).unwrap();
721
722 let registry_service = RegistryService::new(default_registry.clone());
723 let default_counter = IntCounter::new("counter", "counter_desc").unwrap();
724 default_counter.inc();
725 default_registry
726 .register(Box::new(default_counter))
727 .unwrap();
728
729 let registry_1 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
733 registry_1
734 .register(Box::new(
735 IntCounter::new("counter_1", "counter_1_desc").unwrap(),
736 ))
737 .unwrap();
738
739 let registry_1_id = registry_service.add(registry_1);
741
742 let mut metrics = registry_service.gather_all();
744 metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
745
746 assert_eq!(metrics.len(), 2);
747
748 let metric_default = metrics.remove(0);
749 assert_eq!(metric_default.name(), "default_counter");
750 assert_eq!(metric_default.help(), "counter_desc");
751
752 let metric_1: prometheus_filtered::proto::MetricFamily = metrics.remove(0);
753 assert_eq!(metric_1.name(), "iota_counter_1");
754 assert_eq!(metric_1.help(), "counter_1_desc");
755
756 let registry_2 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
758 registry_2
759 .register(Box::new(
760 IntCounter::new("counter_2", "counter_2_desc").unwrap(),
761 ))
762 .unwrap();
763 let _registry_2_id = registry_service.add(registry_2);
764
765 let mut metrics = registry_service.gather_all();
767 metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
768
769 assert_eq!(metrics.len(), 3);
770
771 let metric_default = metrics.remove(0);
772 assert_eq!(metric_default.name(), "default_counter");
773 assert_eq!(metric_default.help(), "counter_desc");
774
775 let metric_1 = metrics.remove(0);
776 assert_eq!(metric_1.name(), "iota_counter_1");
777 assert_eq!(metric_1.help(), "counter_1_desc");
778
779 let metric_2 = metrics.remove(0);
780 assert_eq!(metric_2.name(), "iota_counter_2");
781 assert_eq!(metric_2.help(), "counter_2_desc");
782
783 assert!(registry_service.remove(registry_1_id));
785
786 let mut metrics = registry_service.gather_all();
788 metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
789
790 assert_eq!(metrics.len(), 2);
791
792 let metric_default = metrics.remove(0);
793 assert_eq!(metric_default.name(), "default_counter");
794 assert_eq!(metric_default.help(), "counter_desc");
795
796 let metric_1 = metrics.remove(0);
797 assert_eq!(metric_1.name(), "iota_counter_2");
798 assert_eq!(metric_1.help(), "counter_2_desc");
799 }
800}