1use std::{
6 future::Future,
7 net::SocketAddr,
8 path::Path,
9 pin::Pin,
10 sync::Arc,
11 task::{Context, Poll},
12 time::Instant,
13};
14
15use axum::{
16 Router,
17 extract::{Extension, Request},
18 http::StatusCode,
19 middleware::Next,
20 response::Response,
21 routing::get,
22};
23use dashmap::DashMap;
24use once_cell::sync::OnceCell;
25use parking_lot::Mutex;
26use prometheus::{
27 Histogram, IntGaugeVec, Registry, TextEncoder,
28 core::{AtomicI64, GenericGauge},
29 register_histogram_with_registry, register_int_gauge_vec_with_registry,
30};
31pub use scopeguard;
32use simple_server_timing_header::Timer;
33use tap::TapFallible;
34use tracing::{Span, warn};
35use uuid::Uuid;
36
37mod guards;
38pub mod hardware_metrics;
39pub mod histogram;
40pub mod metered_channel;
41pub mod metrics_network;
42pub mod monitored_mpsc;
43pub mod thread_stall_monitor;
44pub use guards::*;
45
46pub const TX_TYPE_SINGLE_WRITER_TX: &str = "single_writer";
47pub const TX_TYPE_SHARED_OBJ_TX: &str = "shared_object";
48
49pub const SUBSECOND_LATENCY_SEC_BUCKETS: &[f64] = &[
50 0.005, 0.01, 0.02, 0.03, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1.,
51];
52
53pub const COARSE_LATENCY_SEC_BUCKETS: &[f64] = &[
54 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1., 2., 3., 5., 10., 20., 30., 60.,
55];
56
57pub const LATENCY_SEC_BUCKETS: &[f64] = &[
58 0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6,
59 0.7, 0.8, 0.9, 1., 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2., 2.5, 3., 3.5, 4., 4.5, 5.,
60 6., 7., 8., 9., 10., 15., 20., 25., 30., 60., 90.,
61];
62
63pub const COUNT_BUCKETS: &[f64] = &[
64 2., 5., 10., 20., 50., 100., 200., 500., 1000., 2000., 5000., 10000.,
65];
66
67pub const BYTES_BUCKETS: &[f64] = &[
68 1024., 4096., 16384., 65536., 262144., 524288., 1048576., 2097152., 4194304., 8388608.,
69 16777216., 33554432., 67108864.,
70];
71
72#[derive(Debug)]
73pub struct Metrics {
74 pub tasks: IntGaugeVec,
75 pub futures: IntGaugeVec,
76 pub channel_inflight: IntGaugeVec,
77 pub channel_sent: IntGaugeVec,
78 pub channel_received: IntGaugeVec,
79 pub future_active_duration_ns: IntGaugeVec,
80 pub scope_iterations: IntGaugeVec,
81 pub scope_duration_ns: IntGaugeVec,
82 pub scope_entrance: IntGaugeVec,
83 pub thread_stall_duration_sec: Histogram,
84}
85
86impl Metrics {
87 fn new(registry: &Registry) -> Self {
95 Self {
96 tasks: register_int_gauge_vec_with_registry!(
97 "monitored_tasks",
98 "Number of running tasks per callsite.",
99 &["callsite"],
100 registry,
101 )
102 .unwrap(),
103 futures: register_int_gauge_vec_with_registry!(
104 "monitored_futures",
105 "Number of pending futures per callsite.",
106 &["callsite"],
107 registry,
108 )
109 .unwrap(),
110 channel_inflight: register_int_gauge_vec_with_registry!(
111 "monitored_channel_inflight",
112 "Inflight items in channels.",
113 &["name"],
114 registry,
115 )
116 .unwrap(),
117 channel_sent: register_int_gauge_vec_with_registry!(
118 "monitored_channel_sent",
119 "Sent items in channels.",
120 &["name"],
121 registry,
122 )
123 .unwrap(),
124 channel_received: register_int_gauge_vec_with_registry!(
125 "monitored_channel_received",
126 "Received items in channels.",
127 &["name"],
128 registry,
129 )
130 .unwrap(),
131 future_active_duration_ns: register_int_gauge_vec_with_registry!(
132 "monitored_future_active_duration_ns",
133 "Total duration in nanosecs where the monitored future is active (consuming CPU time)",
134 &["name"],
135 registry,
136 )
137 .unwrap(),
138 scope_entrance: register_int_gauge_vec_with_registry!(
139 "monitored_scope_entrance",
140 "Number of entrance in the scope.",
141 &["name"],
142 registry,
143 )
144 .unwrap(),
145 scope_iterations: register_int_gauge_vec_with_registry!(
146 "monitored_scope_iterations",
147 "Total number of times where the monitored scope runs",
148 &["name"],
149 registry,
150 )
151 .unwrap(),
152 scope_duration_ns: register_int_gauge_vec_with_registry!(
153 "monitored_scope_duration_ns",
154 "Total duration in nanosecs where the monitored scope is running",
155 &["name"],
156 registry,
157 )
158 .unwrap(),
159 thread_stall_duration_sec: register_histogram_with_registry!(
160 "thread_stall_duration_sec",
161 "Duration of thread stalls in seconds.",
162 registry,
163 )
164 .unwrap(),
165 }
166 }
167}
168
169static METRICS: OnceCell<Metrics> = OnceCell::new();
170
171pub fn init_metrics(registry: &Registry) {
177 let _ = METRICS
178 .set(Metrics::new(registry))
179 .tap_err(|_| warn!("init_metrics registry overwritten"));
181}
182
183pub fn get_metrics() -> Option<&'static Metrics> {
185 METRICS.get()
186}
187
188tokio::task_local! {
189 static SERVER_TIMING: Arc<Mutex<Timer>>;
190}
191
192pub async fn with_new_server_timing<T>(fut: impl Future<Output = T> + Send + 'static) -> T {
197 let timer = Arc::new(Mutex::new(Timer::new()));
198
199 let mut ret = None;
200 SERVER_TIMING
201 .scope(timer, async {
202 ret = Some(fut.await);
203 })
204 .await;
205
206 ret.unwrap()
207}
208
209pub async fn server_timing_middleware(request: Request, next: Next) -> Response {
210 with_new_server_timing(async move {
211 let mut response = next.run(request).await;
212 add_server_timing("finish_request");
213
214 if let Ok(header_value) = get_server_timing()
215 .expect("server timing not set")
216 .lock()
217 .header_value()
218 .try_into()
219 {
220 response
221 .headers_mut()
222 .insert(Timer::header_key(), header_value);
223 }
224 response
225 })
226 .await
227}
228
229pub async fn with_server_timing<T>(
232 timer: Arc<Mutex<Timer>>,
233 fut: impl Future<Output = T> + Send + 'static,
234) -> T {
235 let mut ret = None;
236 SERVER_TIMING
237 .scope(timer, async {
238 ret = Some(fut.await);
239 })
240 .await;
241
242 ret.unwrap()
243}
244
245pub fn get_server_timing() -> Option<Arc<Mutex<Timer>>> {
248 SERVER_TIMING.try_with(|timer| timer.clone()).ok()
249}
250
251pub fn add_server_timing(name: &str) {
255 let res = SERVER_TIMING.try_with(|timer| {
256 timer.lock().add(name);
257 });
258
259 if res.is_err() {
260 tracing::error!("Server timing context not found");
261 }
262}
263
264#[macro_export]
265macro_rules! monitored_future {
266 ($fut: expr) => {{ monitored_future!(futures, $fut, "", INFO, false) }};
267
268 ($metric: ident, $fut: expr, $name: expr, $logging_level: ident, $logging_enabled: expr) => {{
269 let location: &str = if $name.is_empty() {
270 concat!(file!(), ':', line!())
271 } else {
272 concat!(file!(), ':', $name)
273 };
274
275 async move {
276 let metrics = $crate::get_metrics();
277
278 let _metrics_guard = if let Some(m) = metrics {
279 m.$metric.with_label_values(&[location]).inc();
280 Some($crate::scopeguard::guard(m, |_| {
281 m.$metric.with_label_values(&[location]).dec();
282 }))
283 } else {
284 None
285 };
286 let _logging_guard = if $logging_enabled {
287 Some($crate::scopeguard::guard((), |_| {
288 tracing::event!(
289 tracing::Level::$logging_level,
290 "Future {} completed",
291 location
292 );
293 }))
294 } else {
295 None
296 };
297
298 if $logging_enabled {
299 tracing::event!(
300 tracing::Level::$logging_level,
301 "Spawning future {}",
302 location
303 );
304 }
305
306 $fut.await
307 }
308 }};
309}
310
311#[macro_export]
312macro_rules! forward_server_timing_and_spawn {
313 ($fut: expr) => {
314 if let Some(timing) = $crate::get_server_timing() {
315 tokio::task::spawn(async move { $crate::with_server_timing(timing, $fut).await })
316 } else {
317 tokio::task::spawn($fut)
318 }
319 };
320}
321
322#[macro_export]
323macro_rules! spawn_monitored_task {
324 ($fut: expr) => {
325 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
326 tasks, $fut, "", INFO, false
327 ))
328 };
329}
330
331#[macro_export]
332macro_rules! spawn_logged_monitored_task {
333 ($fut: expr) => {
334 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
335 tasks, $fut, "", INFO, true
336 ))
337 };
338
339 ($fut: expr, $name: expr) => {
340 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
341 tasks, $fut, $name, INFO, true
342 ))
343 };
344
345 ($fut: expr, $name: expr, $logging_level: ident) => {
346 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
347 tasks,
348 $fut,
349 $name,
350 $logging_level,
351 true
352 ))
353 };
354}
355
356pub struct MonitoredScopeGuard {
357 metrics: &'static Metrics,
358 name: &'static str,
359 timer: Instant,
360}
361
362impl Drop for MonitoredScopeGuard {
363 fn drop(&mut self) {
364 self.metrics
365 .scope_duration_ns
366 .with_label_values(&[self.name])
367 .add(self.timer.elapsed().as_nanos() as i64);
368 self.metrics
369 .scope_entrance
370 .with_label_values(&[self.name])
371 .dec();
372 }
373}
374
375pub fn monitored_scope(name: &'static str) -> Option<MonitoredScopeGuard> {
386 let metrics = get_metrics();
387 if let Some(m) = metrics {
388 m.scope_iterations.with_label_values(&[name]).inc();
389 m.scope_entrance.with_label_values(&[name]).inc();
390 Some(MonitoredScopeGuard {
391 metrics: m,
392 name,
393 timer: Instant::now(),
394 })
395 } else {
396 None
397 }
398}
399
400pub trait MonitoredFutureExt: Future + Sized {
405 fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self>;
409}
410
411impl<F: Future> MonitoredFutureExt for F {
412 fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self> {
413 MonitoredScopeFuture {
414 f: Box::pin(self),
415 active_duration_metric: get_metrics()
416 .map(|m| m.future_active_duration_ns.with_label_values(&[name])),
417 _scope: monitored_scope(name),
418 }
419 }
420}
421
422pub struct MonitoredScopeFuture<F: Sized> {
427 f: Pin<Box<F>>,
428 active_duration_metric: Option<GenericGauge<AtomicI64>>,
429 _scope: Option<MonitoredScopeGuard>,
430}
431
432impl<F: Future> Future for MonitoredScopeFuture<F> {
433 type Output = F::Output;
434
435 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
436 let active_timer = Instant::now();
437 let ret = self.f.as_mut().poll(cx);
438 if let Some(m) = &self.active_duration_metric {
439 m.add(active_timer.elapsed().as_nanos() as i64);
440 }
441 ret
442 }
443}
444
445pub struct CancelMonitor<F: Sized> {
450 finished: bool,
451 inner: Pin<Box<F>>,
452}
453
454impl<F> CancelMonitor<F>
455where
456 F: Future,
457{
458 pub fn new(inner: F) -> Self {
461 Self {
462 finished: false,
463 inner: Box::pin(inner),
464 }
465 }
466
467 pub fn is_finished(&self) -> bool {
469 self.finished
470 }
471}
472
473impl<F> Future for CancelMonitor<F>
474where
475 F: Future,
476{
477 type Output = F::Output;
478
479 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
484 match self.inner.as_mut().poll(cx) {
485 Poll::Ready(output) => {
486 self.finished = true;
487 Poll::Ready(output)
488 }
489 Poll::Pending => Poll::Pending,
490 }
491 }
492}
493
494impl<F: Sized> Drop for CancelMonitor<F> {
495 fn drop(&mut self) {
500 if !self.finished {
501 Span::current().record("cancelled", true);
502 }
503 }
504}
505
506pub trait MonitorCancellation {
511 fn monitor_cancellation(self) -> CancelMonitor<Self>
512 where
513 Self: Sized + Future;
514}
515
516impl<T> MonitorCancellation for T
517where
518 T: Future,
519{
520 fn monitor_cancellation(self) -> CancelMonitor<Self> {
521 CancelMonitor::new(self)
522 }
523}
524
525pub type RegistryID = Uuid;
526
527#[derive(Clone)]
531pub struct RegistryService {
532 default_registry: Registry,
534 registries_by_id: Arc<DashMap<Uuid, Registry>>,
535}
536
537impl RegistryService {
538 pub fn new(default_registry: Registry) -> Self {
541 Self {
542 default_registry,
543 registries_by_id: Arc::new(DashMap::new()),
544 }
545 }
546
547 pub fn default_registry(&self) -> Registry {
550 self.default_registry.clone()
551 }
552
553 pub fn add(&self, registry: Registry) -> RegistryID {
559 let registry_id = Uuid::new_v4();
560 if self
561 .registries_by_id
562 .insert(registry_id, registry)
563 .is_some()
564 {
565 panic!("Other Registry already detected for the same id {registry_id}");
566 }
567
568 registry_id
569 }
570
571 pub fn remove(&self, registry_id: RegistryID) -> bool {
574 self.registries_by_id.remove(®istry_id).is_some()
575 }
576
577 pub fn get_all(&self) -> Vec<Registry> {
579 let mut registries: Vec<Registry> = self
580 .registries_by_id
581 .iter()
582 .map(|r| r.value().clone())
583 .collect();
584 registries.push(self.default_registry.clone());
585
586 registries
587 }
588
589 pub fn gather_all(&self) -> Vec<prometheus::proto::MetricFamily> {
591 self.get_all().iter().flat_map(|r| r.gather()).collect()
592 }
593}
594
595pub fn uptime_metric(
604 process: &str,
605 version: &'static str,
606 chain_identifier: &str,
607) -> Box<dyn prometheus::core::Collector> {
608 let opts = prometheus::opts!("uptime", "uptime of the node service in seconds")
609 .variable_label("process")
610 .variable_label("version")
611 .variable_label("chain_identifier")
612 .variable_label("os_version")
613 .variable_label("is_docker");
614
615 let start_time = std::time::Instant::now();
616 let uptime = move || start_time.elapsed().as_secs();
617 let metric = prometheus_closure_metric::ClosureMetric::new(
618 opts,
619 prometheus_closure_metric::ValueType::Counter,
620 uptime,
621 &[
622 process,
623 version,
624 chain_identifier,
625 &sysinfo::System::long_os_version()
626 .unwrap_or_else(|| "os_version_unavailable".to_string()),
627 &is_running_in_docker().to_string(),
628 ],
629 )
630 .unwrap();
631
632 Box::new(metric)
633}
634
635pub fn is_running_in_docker() -> bool {
636 Path::new("/.dockerenv").exists()
639}
640
641pub const METRICS_ROUTE: &str = "/metrics";
642
643pub fn start_prometheus_server(addr: SocketAddr) -> RegistryService {
648 let registry = Registry::new();
649
650 let registry_service = RegistryService::new(registry);
651
652 if cfg!(msim) {
653 warn!("not starting prometheus server in simulator");
656 return registry_service;
657 }
658
659 let app = Router::new()
660 .route(METRICS_ROUTE, get(metrics))
661 .layer(Extension(registry_service.clone()));
662
663 tokio::spawn(async move {
664 let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
665 axum::serve(listener, app.into_make_service())
666 .await
667 .unwrap();
668 });
669
670 registry_service
671}
672
673pub async fn metrics(
681 Extension(registry_service): Extension<RegistryService>,
682) -> (StatusCode, String) {
683 let metrics_families = registry_service.gather_all();
684 match TextEncoder.encode_to_string(&metrics_families) {
685 Ok(metrics) => (StatusCode::OK, metrics),
686 Err(error) => (
687 StatusCode::INTERNAL_SERVER_ERROR,
688 format!("unable to encode metrics: {error}"),
689 ),
690 }
691}
692
693#[cfg(test)]
694mod tests {
695 use prometheus::{IntCounter, Registry};
696
697 use crate::RegistryService;
698
699 #[test]
700 fn registry_service() {
701 let default_registry = Registry::new_custom(Some("default".to_string()), None).unwrap();
703
704 let registry_service = RegistryService::new(default_registry.clone());
705 let default_counter = IntCounter::new("counter", "counter_desc").unwrap();
706 default_counter.inc();
707 default_registry
708 .register(Box::new(default_counter))
709 .unwrap();
710
711 let registry_1 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
715 registry_1
716 .register(Box::new(
717 IntCounter::new("counter_1", "counter_1_desc").unwrap(),
718 ))
719 .unwrap();
720
721 let registry_1_id = registry_service.add(registry_1);
723
724 let mut metrics = registry_service.gather_all();
726 metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
727
728 assert_eq!(metrics.len(), 2);
729
730 let metric_default = metrics.remove(0);
731 assert_eq!(metric_default.name(), "default_counter");
732 assert_eq!(metric_default.help(), "counter_desc");
733
734 let metric_1: prometheus::proto::MetricFamily = metrics.remove(0);
735 assert_eq!(metric_1.name(), "iota_counter_1");
736 assert_eq!(metric_1.help(), "counter_1_desc");
737
738 let registry_2 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
740 registry_2
741 .register(Box::new(
742 IntCounter::new("counter_2", "counter_2_desc").unwrap(),
743 ))
744 .unwrap();
745 let _registry_2_id = registry_service.add(registry_2);
746
747 let mut metrics = registry_service.gather_all();
749 metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
750
751 assert_eq!(metrics.len(), 3);
752
753 let metric_default = metrics.remove(0);
754 assert_eq!(metric_default.name(), "default_counter");
755 assert_eq!(metric_default.help(), "counter_desc");
756
757 let metric_1 = metrics.remove(0);
758 assert_eq!(metric_1.name(), "iota_counter_1");
759 assert_eq!(metric_1.help(), "counter_1_desc");
760
761 let metric_2 = metrics.remove(0);
762 assert_eq!(metric_2.name(), "iota_counter_2");
763 assert_eq!(metric_2.help(), "counter_2_desc");
764
765 assert!(registry_service.remove(registry_1_id));
767
768 let mut metrics = registry_service.gather_all();
770 metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
771
772 assert_eq!(metrics.len(), 2);
773
774 let metric_default = metrics.remove(0);
775 assert_eq!(metric_default.name(), "default_counter");
776 assert_eq!(metric_default.help(), "counter_desc");
777
778 let metric_1 = metrics.remove(0);
779 assert_eq!(metric_1.name(), "iota_counter_2");
780 assert_eq!(metric_1.help(), "counter_2_desc");
781 }
782}