1use std::{
6 future::Future,
7 net::SocketAddr,
8 path::Path,
9 pin::Pin,
10 sync::Arc,
11 task::{Context, Poll},
12 time::Instant,
13};
14
15use axum::{
16 Router,
17 extract::{Extension, Request},
18 http::StatusCode,
19 middleware::Next,
20 response::Response,
21 routing::get,
22};
23use dashmap::DashMap;
24use once_cell::sync::OnceCell;
25use parking_lot::Mutex;
26use prometheus::{
27 Histogram, IntGaugeVec, Registry, TextEncoder, register_histogram_with_registry,
28 register_int_gauge_vec_with_registry,
29};
30pub use scopeguard;
31use simple_server_timing_header::Timer;
32use tap::TapFallible;
33use tracing::{Span, warn};
34use uuid::Uuid;
35
36mod guards;
37pub mod hardware_metrics;
38pub mod histogram;
39pub mod metered_channel;
40pub mod metrics_network;
41pub mod monitored_mpsc;
42pub mod thread_stall_monitor;
43pub use guards::*;
44
45pub const TX_TYPE_SINGLE_WRITER_TX: &str = "single_writer";
46pub const TX_TYPE_SHARED_OBJ_TX: &str = "shared_object";
47
48pub const LATENCY_SEC_BUCKETS: &[f64] = &[
49 0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6,
50 0.7, 0.8, 0.9, 1., 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2., 2.5, 3., 3.5, 4., 4.5, 5.,
51 6., 7., 8., 9., 10., 15., 20., 25., 30., 60., 90.,
52];
53
54#[derive(Debug)]
55pub struct Metrics {
56 pub tasks: IntGaugeVec,
57 pub futures: IntGaugeVec,
58 pub channel_inflight: IntGaugeVec,
59 pub channel_sent: IntGaugeVec,
60 pub channel_received: IntGaugeVec,
61 pub scope_iterations: IntGaugeVec,
62 pub scope_duration_ns: IntGaugeVec,
63 pub scope_entrance: IntGaugeVec,
64 pub thread_stall_duration_sec: Histogram,
65}
66
67impl Metrics {
68 fn new(registry: &Registry) -> Self {
76 Self {
77 tasks: register_int_gauge_vec_with_registry!(
78 "monitored_tasks",
79 "Number of running tasks per callsite.",
80 &["callsite"],
81 registry,
82 )
83 .unwrap(),
84 futures: register_int_gauge_vec_with_registry!(
85 "monitored_futures",
86 "Number of pending futures per callsite.",
87 &["callsite"],
88 registry,
89 )
90 .unwrap(),
91 channel_inflight: register_int_gauge_vec_with_registry!(
92 "monitored_channel_inflight",
93 "Inflight items in channels.",
94 &["name"],
95 registry,
96 )
97 .unwrap(),
98 channel_sent: register_int_gauge_vec_with_registry!(
99 "monitored_channel_sent",
100 "Sent items in channels.",
101 &["name"],
102 registry,
103 )
104 .unwrap(),
105 channel_received: register_int_gauge_vec_with_registry!(
106 "monitored_channel_received",
107 "Received items in channels.",
108 &["name"],
109 registry,
110 )
111 .unwrap(),
112 scope_entrance: register_int_gauge_vec_with_registry!(
113 "monitored_scope_entrance",
114 "Number of entrance in the scope.",
115 &["name"],
116 registry,
117 )
118 .unwrap(),
119 scope_iterations: register_int_gauge_vec_with_registry!(
120 "monitored_scope_iterations",
121 "Total number of times where the monitored scope runs",
122 &["name"],
123 registry,
124 )
125 .unwrap(),
126 scope_duration_ns: register_int_gauge_vec_with_registry!(
127 "monitored_scope_duration_ns",
128 "Total duration in nanosecs where the monitored scope is running",
129 &["name"],
130 registry,
131 )
132 .unwrap(),
133 thread_stall_duration_sec: register_histogram_with_registry!(
134 "thread_stall_duration_sec",
135 "Duration of thread stalls in seconds.",
136 registry,
137 )
138 .unwrap(),
139 }
140 }
141}
142
143static METRICS: OnceCell<Metrics> = OnceCell::new();
144
145pub fn init_metrics(registry: &Registry) {
151 let _ = METRICS
152 .set(Metrics::new(registry))
153 .tap_err(|_| warn!("init_metrics registry overwritten"));
155}
156
157pub fn get_metrics() -> Option<&'static Metrics> {
159 METRICS.get()
160}
161
162tokio::task_local! {
163 static SERVER_TIMING: Arc<Mutex<Timer>>;
164}
165
166pub async fn with_new_server_timing<T>(fut: impl Future<Output = T> + Send + 'static) -> T {
171 let timer = Arc::new(Mutex::new(Timer::new()));
172
173 let mut ret = None;
174 SERVER_TIMING
175 .scope(timer, async {
176 ret = Some(fut.await);
177 })
178 .await;
179
180 ret.unwrap()
181}
182
183pub async fn server_timing_middleware(request: Request, next: Next) -> Response {
184 with_new_server_timing(async move {
185 let mut response = next.run(request).await;
186 add_server_timing("finish_request");
187
188 if let Ok(header_value) = get_server_timing()
189 .expect("server timing not set")
190 .lock()
191 .header_value()
192 .try_into()
193 {
194 response
195 .headers_mut()
196 .insert(Timer::header_key(), header_value);
197 }
198 response
199 })
200 .await
201}
202
203pub async fn with_server_timing<T>(
206 timer: Arc<Mutex<Timer>>,
207 fut: impl Future<Output = T> + Send + 'static,
208) -> T {
209 let mut ret = None;
210 SERVER_TIMING
211 .scope(timer, async {
212 ret = Some(fut.await);
213 })
214 .await;
215
216 ret.unwrap()
217}
218
219pub fn get_server_timing() -> Option<Arc<Mutex<Timer>>> {
222 SERVER_TIMING.try_with(|timer| timer.clone()).ok()
223}
224
225pub fn add_server_timing(name: &str) {
229 let res = SERVER_TIMING.try_with(|timer| {
230 timer.lock().add(name);
231 });
232
233 if res.is_err() {
234 tracing::error!("Server timing context not found");
235 }
236}
237
238#[macro_export]
239macro_rules! monitored_future {
240 ($fut: expr) => {{ monitored_future!(futures, $fut, "", INFO, false) }};
241
242 ($metric: ident, $fut: expr, $name: expr, $logging_level: ident, $logging_enabled: expr) => {{
243 let location: &str = if $name.is_empty() {
244 concat!(file!(), ':', line!())
245 } else {
246 concat!(file!(), ':', $name)
247 };
248
249 async move {
250 let metrics = $crate::get_metrics();
251
252 let _metrics_guard = if let Some(m) = metrics {
253 m.$metric.with_label_values(&[location]).inc();
254 Some($crate::scopeguard::guard(m, |_| {
255 m.$metric.with_label_values(&[location]).dec();
256 }))
257 } else {
258 None
259 };
260 let _logging_guard = if $logging_enabled {
261 Some($crate::scopeguard::guard((), |_| {
262 tracing::event!(
263 tracing::Level::$logging_level,
264 "Future {} completed",
265 location
266 );
267 }))
268 } else {
269 None
270 };
271
272 if $logging_enabled {
273 tracing::event!(
274 tracing::Level::$logging_level,
275 "Spawning future {}",
276 location
277 );
278 }
279
280 $fut.await
281 }
282 }};
283}
284
285#[macro_export]
286macro_rules! forward_server_timing_and_spawn {
287 ($fut: expr) => {
288 if let Some(timing) = $crate::get_server_timing() {
289 tokio::task::spawn(async move { $crate::with_server_timing(timing, $fut).await })
290 } else {
291 tokio::task::spawn($fut)
292 }
293 };
294}
295
296#[macro_export]
297macro_rules! spawn_monitored_task {
298 ($fut: expr) => {
299 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
300 tasks, $fut, "", INFO, false
301 ))
302 };
303}
304
305#[macro_export]
306macro_rules! spawn_logged_monitored_task {
307 ($fut: expr) => {
308 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
309 tasks, $fut, "", INFO, true
310 ))
311 };
312
313 ($fut: expr, $name: expr) => {
314 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
315 tasks, $fut, $name, INFO, true
316 ))
317 };
318
319 ($fut: expr, $name: expr, $logging_level: ident) => {
320 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
321 tasks,
322 $fut,
323 $name,
324 $logging_level,
325 true
326 ))
327 };
328}
329
330pub struct MonitoredScopeGuard {
331 metrics: &'static Metrics,
332 name: &'static str,
333 timer: Instant,
334}
335
336impl Drop for MonitoredScopeGuard {
337 fn drop(&mut self) {
338 self.metrics
339 .scope_duration_ns
340 .with_label_values(&[self.name])
341 .add(self.timer.elapsed().as_nanos() as i64);
342 self.metrics
343 .scope_entrance
344 .with_label_values(&[self.name])
345 .dec();
346 }
347}
348
349pub fn monitored_scope(name: &'static str) -> Option<MonitoredScopeGuard> {
360 let metrics = get_metrics();
361 if let Some(m) = metrics {
362 m.scope_iterations.with_label_values(&[name]).inc();
363 m.scope_entrance.with_label_values(&[name]).inc();
364 Some(MonitoredScopeGuard {
365 metrics: m,
366 name,
367 timer: Instant::now(),
368 })
369 } else {
370 None
371 }
372}
373
374pub trait MonitoredFutureExt: Future + Sized {
379 fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self>;
383}
384
385impl<F: Future> MonitoredFutureExt for F {
386 fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self> {
387 MonitoredScopeFuture {
388 f: Box::pin(self),
389 _scope: monitored_scope(name),
390 }
391 }
392}
393
394pub struct MonitoredScopeFuture<F: Sized> {
399 f: Pin<Box<F>>,
400 _scope: Option<MonitoredScopeGuard>,
401}
402
403impl<F: Future> Future for MonitoredScopeFuture<F> {
404 type Output = F::Output;
405
406 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
407 self.f.as_mut().poll(cx)
408 }
409}
410
411pub struct CancelMonitor<F: Sized> {
416 finished: bool,
417 inner: Pin<Box<F>>,
418}
419
420impl<F> CancelMonitor<F>
421where
422 F: Future,
423{
424 pub fn new(inner: F) -> Self {
427 Self {
428 finished: false,
429 inner: Box::pin(inner),
430 }
431 }
432
433 pub fn is_finished(&self) -> bool {
435 self.finished
436 }
437}
438
439impl<F> Future for CancelMonitor<F>
440where
441 F: Future,
442{
443 type Output = F::Output;
444
445 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
450 match self.inner.as_mut().poll(cx) {
451 Poll::Ready(output) => {
452 self.finished = true;
453 Poll::Ready(output)
454 }
455 Poll::Pending => Poll::Pending,
456 }
457 }
458}
459
460impl<F: Sized> Drop for CancelMonitor<F> {
461 fn drop(&mut self) {
466 if !self.finished {
467 Span::current().record("cancelled", true);
468 }
469 }
470}
471
472pub trait MonitorCancellation {
477 fn monitor_cancellation(self) -> CancelMonitor<Self>
478 where
479 Self: Sized + Future;
480}
481
482impl<T> MonitorCancellation for T
483where
484 T: Future,
485{
486 fn monitor_cancellation(self) -> CancelMonitor<Self> {
487 CancelMonitor::new(self)
488 }
489}
490
491pub type RegistryID = Uuid;
492
493#[derive(Clone)]
497pub struct RegistryService {
498 default_registry: Registry,
500 registries_by_id: Arc<DashMap<Uuid, Registry>>,
501}
502
503impl RegistryService {
504 pub fn new(default_registry: Registry) -> Self {
507 Self {
508 default_registry,
509 registries_by_id: Arc::new(DashMap::new()),
510 }
511 }
512
513 pub fn default_registry(&self) -> Registry {
516 self.default_registry.clone()
517 }
518
519 pub fn add(&self, registry: Registry) -> RegistryID {
525 let registry_id = Uuid::new_v4();
526 if self
527 .registries_by_id
528 .insert(registry_id, registry)
529 .is_some()
530 {
531 panic!("Other Registry already detected for the same id {registry_id}");
532 }
533
534 registry_id
535 }
536
537 pub fn remove(&self, registry_id: RegistryID) -> bool {
540 self.registries_by_id.remove(®istry_id).is_some()
541 }
542
543 pub fn get_all(&self) -> Vec<Registry> {
545 let mut registries: Vec<Registry> = self
546 .registries_by_id
547 .iter()
548 .map(|r| r.value().clone())
549 .collect();
550 registries.push(self.default_registry.clone());
551
552 registries
553 }
554
555 pub fn gather_all(&self) -> Vec<prometheus::proto::MetricFamily> {
557 self.get_all().iter().flat_map(|r| r.gather()).collect()
558 }
559}
560
561pub fn uptime_metric(
570 process: &str,
571 version: &'static str,
572 chain_identifier: &str,
573) -> Box<dyn prometheus::core::Collector> {
574 let opts = prometheus::opts!("uptime", "uptime of the node service in seconds")
575 .variable_label("process")
576 .variable_label("version")
577 .variable_label("chain_identifier")
578 .variable_label("os_version")
579 .variable_label("is_docker");
580
581 let start_time = std::time::Instant::now();
582 let uptime = move || start_time.elapsed().as_secs();
583 let metric = prometheus_closure_metric::ClosureMetric::new(
584 opts,
585 prometheus_closure_metric::ValueType::Counter,
586 uptime,
587 &[
588 process,
589 version,
590 chain_identifier,
591 &sysinfo::System::long_os_version()
592 .unwrap_or_else(|| "os_version_unavailable".to_string()),
593 &is_running_in_docker().to_string(),
594 ],
595 )
596 .unwrap();
597
598 Box::new(metric)
599}
600
601pub fn is_running_in_docker() -> bool {
602 Path::new("/.dockerenv").exists()
605}
606
607pub const METRICS_ROUTE: &str = "/metrics";
608
609pub fn start_prometheus_server(addr: SocketAddr) -> RegistryService {
614 let registry = Registry::new();
615
616 let registry_service = RegistryService::new(registry);
617
618 if cfg!(msim) {
619 warn!("not starting prometheus server in simulator");
622 return registry_service;
623 }
624
625 let app = Router::new()
626 .route(METRICS_ROUTE, get(metrics))
627 .layer(Extension(registry_service.clone()));
628
629 tokio::spawn(async move {
630 let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
631 axum::serve(listener, app.into_make_service())
632 .await
633 .unwrap();
634 });
635
636 registry_service
637}
638
639pub async fn metrics(
647 Extension(registry_service): Extension<RegistryService>,
648) -> (StatusCode, String) {
649 let metrics_families = registry_service.gather_all();
650 match TextEncoder.encode_to_string(&metrics_families) {
651 Ok(metrics) => (StatusCode::OK, metrics),
652 Err(error) => (
653 StatusCode::INTERNAL_SERVER_ERROR,
654 format!("unable to encode metrics: {error}"),
655 ),
656 }
657}
658
659#[cfg(test)]
660mod tests {
661 use prometheus::{IntCounter, Registry};
662
663 use crate::RegistryService;
664
665 #[test]
666 fn registry_service() {
667 let default_registry = Registry::new_custom(Some("default".to_string()), None).unwrap();
669
670 let registry_service = RegistryService::new(default_registry.clone());
671 let default_counter = IntCounter::new("counter", "counter_desc").unwrap();
672 default_counter.inc();
673 default_registry
674 .register(Box::new(default_counter))
675 .unwrap();
676
677 let registry_1 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
681 registry_1
682 .register(Box::new(
683 IntCounter::new("counter_1", "counter_1_desc").unwrap(),
684 ))
685 .unwrap();
686
687 let registry_1_id = registry_service.add(registry_1);
689
690 let mut metrics = registry_service.gather_all();
692 metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
693
694 assert_eq!(metrics.len(), 2);
695
696 let metric_default = metrics.remove(0);
697 assert_eq!(metric_default.name(), "default_counter");
698 assert_eq!(metric_default.help(), "counter_desc");
699
700 let metric_1: prometheus::proto::MetricFamily = metrics.remove(0);
701 assert_eq!(metric_1.name(), "iota_counter_1");
702 assert_eq!(metric_1.help(), "counter_1_desc");
703
704 let registry_2 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
706 registry_2
707 .register(Box::new(
708 IntCounter::new("counter_2", "counter_2_desc").unwrap(),
709 ))
710 .unwrap();
711 let _registry_2_id = registry_service.add(registry_2);
712
713 let mut metrics = registry_service.gather_all();
715 metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
716
717 assert_eq!(metrics.len(), 3);
718
719 let metric_default = metrics.remove(0);
720 assert_eq!(metric_default.name(), "default_counter");
721 assert_eq!(metric_default.help(), "counter_desc");
722
723 let metric_1 = metrics.remove(0);
724 assert_eq!(metric_1.name(), "iota_counter_1");
725 assert_eq!(metric_1.help(), "counter_1_desc");
726
727 let metric_2 = metrics.remove(0);
728 assert_eq!(metric_2.name(), "iota_counter_2");
729 assert_eq!(metric_2.help(), "counter_2_desc");
730
731 assert!(registry_service.remove(registry_1_id));
733
734 let mut metrics = registry_service.gather_all();
736 metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
737
738 assert_eq!(metrics.len(), 2);
739
740 let metric_default = metrics.remove(0);
741 assert_eq!(metric_default.name(), "default_counter");
742 assert_eq!(metric_default.help(), "counter_desc");
743
744 let metric_1 = metrics.remove(0);
745 assert_eq!(metric_1.name(), "iota_counter_2");
746 assert_eq!(metric_1.help(), "counter_2_desc");
747 }
748}