Skip to main content

iota_metrics/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    future::Future,
7    net::SocketAddr,
8    path::Path,
9    pin::Pin,
10    sync::Arc,
11    task::{Context, Poll},
12    time::Instant,
13};
14
15use axum::{
16    Router,
17    extract::{Extension, Request},
18    http::StatusCode,
19    middleware::Next,
20    response::Response,
21    routing::get,
22};
23use dashmap::DashMap;
24use once_cell::sync::OnceCell;
25use parking_lot::Mutex;
26use prometheus_filtered::{
27    Histogram, IntCounterVec, IntGaugeVec, Registry, TextEncoder,
28    core::{AtomicI64, GenericGauge},
29    register_histogram_with_registry, register_int_counter_vec_with_registry,
30    register_int_gauge_vec_with_registry,
31};
32pub use scopeguard;
33use simple_server_timing_header::Timer;
34use tap::TapFallible;
35use tracing::{Span, warn};
36use uuid::Uuid;
37
38mod guards;
39pub mod hardware_metrics;
40pub mod histogram;
41pub mod metered_channel;
42pub mod metrics_network;
43pub mod monitored_mpsc;
44pub mod thread_stall_monitor;
45pub use guards::*;
46
47pub const TX_TYPE_SINGLE_WRITER_TX: &str = "single_writer";
48pub const TX_TYPE_SHARED_OBJ_TX: &str = "shared_object";
49
50pub const SUBSECOND_LATENCY_SEC_BUCKETS: &[f64] = &[
51    0.005, 0.01, 0.02, 0.03, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1.,
52];
53
54pub const COARSE_LATENCY_SEC_BUCKETS: &[f64] = &[
55    0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1., 2., 3., 5., 10., 20., 30., 60.,
56];
57
58pub const LATENCY_SEC_BUCKETS: &[f64] = &[
59    0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6,
60    0.7, 0.8, 0.9, 1., 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2., 2.5, 3., 3.5, 4., 4.5, 5.,
61    6., 7., 8., 9., 10., 15., 20., 25., 30., 60., 90.,
62];
63
64pub const COUNT_BUCKETS: &[f64] = &[
65    2., 5., 10., 20., 50., 100., 200., 500., 1000., 2000., 5000., 10000.,
66];
67
68pub const BYTES_BUCKETS: &[f64] = &[
69    1024., 4096., 16384., 65536., 262144., 524288., 1048576., 2097152., 4194304., 8388608.,
70    16777216., 33554432., 67108864.,
71];
72
73#[derive(Debug)]
74pub struct Metrics {
75    pub tasks: IntGaugeVec,
76    pub futures: IntGaugeVec,
77    pub channel_inflight: IntGaugeVec,
78    pub channel_sent: IntGaugeVec,
79    pub channel_received: IntGaugeVec,
80    pub future_active_duration_ns: IntGaugeVec,
81    pub scope_iterations: IntGaugeVec,
82    pub scope_duration_ns: IntGaugeVec,
83    pub scope_entrance: IntGaugeVec,
84    pub thread_stall_duration_sec: Histogram,
85    pub system_invariant_violations: IntCounterVec,
86}
87
88impl Metrics {
89    /// Creates a new instance of the monitoring metrics, registering various
90    /// gauges and histograms with the provided metrics `Registry`. The
91    /// gauges track metrics such as the number of running tasks, pending
92    /// futures, channel items, and scope activities, while the histogram
93    /// measures the duration of thread stalls. Each metric is registered
94    /// with descriptive labels to facilitate performance monitoring and
95    /// analysis.
96    fn new(registry: &Registry) -> Self {
97        Self {
98            tasks: register_int_gauge_vec_with_registry!(
99                "monitored_tasks",
100                "Number of running tasks per callsite.",
101                &["callsite"],
102                registry,
103            )
104            .unwrap(),
105            futures: register_int_gauge_vec_with_registry!(
106                "monitored_futures",
107                "Number of pending futures per callsite.",
108                &["callsite"],
109                registry,
110            )
111            .unwrap(),
112            channel_inflight: register_int_gauge_vec_with_registry!(
113                "monitored_channel_inflight",
114                "Inflight items in channels.",
115                &["name"],
116                registry,
117            )
118            .unwrap(),
119            channel_sent: register_int_gauge_vec_with_registry!(
120                "monitored_channel_sent",
121                "Sent items in channels.",
122                &["name"],
123                registry,
124            )
125            .unwrap(),
126            channel_received: register_int_gauge_vec_with_registry!(
127                "monitored_channel_received",
128                "Received items in channels.",
129                &["name"],
130                registry,
131            )
132            .unwrap(),
133            future_active_duration_ns: register_int_gauge_vec_with_registry!(
134                "monitored_future_active_duration_ns",
135                "Total duration in nanosecs where the monitored future is active (consuming CPU time)",
136                &["name"],
137                registry,
138            )
139            .unwrap(),
140            scope_entrance: register_int_gauge_vec_with_registry!(
141                "monitored_scope_entrance",
142                "Number of entrance in the scope.",
143                &["name"],
144                registry,
145            )
146            .unwrap(),
147            scope_iterations: register_int_gauge_vec_with_registry!(
148                "monitored_scope_iterations",
149                "Total number of times where the monitored scope runs",
150                &["name"],
151                registry,
152            )
153            .unwrap(),
154            scope_duration_ns: register_int_gauge_vec_with_registry!(
155                "monitored_scope_duration_ns",
156                "Total duration in nanosecs where the monitored scope is running",
157                &["name"],
158                registry,
159            )
160            .unwrap(),
161            thread_stall_duration_sec: register_histogram_with_registry!(
162                "thread_stall_duration_sec",
163                "Duration of thread stalls in seconds.",
164                registry,
165            )
166            .unwrap(),
167            system_invariant_violations: register_int_counter_vec_with_registry!(
168                "system_invariant_violations",
169                "Number of system invariant violations",
170                &["name"],
171                registry,
172            ).unwrap(),
173        }
174    }
175}
176
177static METRICS: OnceCell<Metrics> = OnceCell::new();
178
179/// Initializes the global `METRICS` instance by setting it to a new `Metrics`
180/// object registered with the provided `Registry`. If `METRICS` is already set,
181/// a warning is logged indicating that the metrics registry was overwritten.
182/// This function is intended to be called once during initialization to set up
183/// metrics collection.
184pub fn init_metrics(registry: &Registry) {
185    let _ = METRICS
186        .set(Metrics::new(registry))
187        // this happens many times during tests
188        .tap_err(|_| warn!("init_metrics registry overwritten"));
189}
190
191/// Retrieves the global `METRICS` instance if it has been initialized.
192pub fn get_metrics() -> Option<&'static Metrics> {
193    METRICS.get()
194}
195
196tokio::task_local! {
197    static SERVER_TIMING: Arc<Mutex<Timer>>;
198}
199
200/// Create a new task-local ServerTiming context and run the provided future
201/// within it. Should be used at the top-most level of a request handler. Can be
202/// added to an axum router as a layer by using
203/// iota_service::server_timing_middleware.
204pub async fn with_new_server_timing<T>(fut: impl Future<Output = T> + Send + 'static) -> T {
205    let timer = Arc::new(Mutex::new(Timer::new()));
206
207    let mut ret = None;
208    SERVER_TIMING
209        .scope(timer, async {
210            ret = Some(fut.await);
211        })
212        .await;
213
214    ret.unwrap()
215}
216
217/// The `Server-Timing` HTTP header key.
218pub fn server_timing_header_key() -> &'static str {
219    Timer::header_key()
220}
221
222/// Add a final `finish_request` entry and write the collected timings to the
223/// `Server-Timing` response header. No-op outside a server-timing context.
224pub fn finish_and_set_server_timing_header(headers: &mut http::HeaderMap) {
225    let Some(timer) = get_server_timing() else {
226        return;
227    };
228    let header_value = {
229        let mut timer = timer.lock();
230        timer.add("finish_request");
231        timer.header_value()
232    };
233    if let Ok(value) = http::HeaderValue::try_from(header_value) {
234        headers.insert(server_timing_header_key(), value);
235    }
236}
237
238pub async fn server_timing_middleware(request: Request, next: Next) -> Response {
239    with_new_server_timing(async move {
240        let mut response = next.run(request).await;
241        finish_and_set_server_timing_header(response.headers_mut());
242        response
243    })
244    .await
245}
246
247/// Create a new task-local ServerTiming context and run the provided future
248/// within it. Only intended for use by macros within this module.
249pub async fn with_server_timing<T>(
250    timer: Arc<Mutex<Timer>>,
251    fut: impl Future<Output = T> + Send + 'static,
252) -> T {
253    let mut ret = None;
254    SERVER_TIMING
255        .scope(timer, async {
256            ret = Some(fut.await);
257        })
258        .await;
259
260    ret.unwrap()
261}
262
263/// Get the currently active ServerTiming context. Only intended for use by
264/// macros within this module.
265pub fn get_server_timing() -> Option<Arc<Mutex<Timer>>> {
266    SERVER_TIMING.try_with(|timer| timer.clone()).ok()
267}
268
269/// Add a new entry to the ServerTiming header.
270/// If the caller is not currently in a ServerTiming context (created with
271/// `with_new_server_timing`), an error is logged.
272pub fn add_server_timing(name: &str) {
273    let res = SERVER_TIMING.try_with(|timer| {
274        timer.lock().add(name);
275    });
276
277    if res.is_err() {
278        tracing::error!("Server timing context not found");
279    }
280}
281
282#[macro_export]
283macro_rules! monitored_future {
284    ($fut: expr) => {{ monitored_future!(futures, $fut, "", INFO, false) }};
285
286    ($metric: ident, $fut: expr, $name: expr, $logging_level: ident, $logging_enabled: expr) => {{
287        let location: &str = if $name.is_empty() {
288            concat!(file!(), ':', line!())
289        } else {
290            concat!(file!(), ':', $name)
291        };
292
293        async move {
294            let metrics = $crate::get_metrics();
295
296            let _metrics_guard = if let Some(m) = metrics {
297                m.$metric.with_label_values(&[location]).inc();
298                Some($crate::scopeguard::guard(m, |_| {
299                    m.$metric.with_label_values(&[location]).dec();
300                }))
301            } else {
302                None
303            };
304            let _logging_guard = if $logging_enabled {
305                Some($crate::scopeguard::guard((), |_| {
306                    tracing::event!(
307                        tracing::Level::$logging_level,
308                        "Future {} completed",
309                        location
310                    );
311                }))
312            } else {
313                None
314            };
315
316            if $logging_enabled {
317                tracing::event!(
318                    tracing::Level::$logging_level,
319                    "Spawning future {}",
320                    location
321                );
322            }
323
324            $fut.await
325        }
326    }};
327}
328
329#[macro_export]
330macro_rules! forward_server_timing_and_spawn {
331    ($fut: expr) => {
332        if let Some(timing) = $crate::get_server_timing() {
333            tokio::task::spawn(async move { $crate::with_server_timing(timing, $fut).await })
334        } else {
335            tokio::task::spawn($fut)
336        }
337    };
338}
339
340#[macro_export]
341macro_rules! spawn_monitored_task {
342    ($fut: expr) => {
343        $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
344            tasks, $fut, "", INFO, false
345        ))
346    };
347}
348
349#[macro_export]
350macro_rules! spawn_logged_monitored_task {
351    ($fut: expr) => {
352        $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
353            tasks, $fut, "", INFO, true
354        ))
355    };
356
357    ($fut: expr, $name: expr) => {
358        $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
359            tasks, $fut, $name, INFO, true
360        ))
361    };
362
363    ($fut: expr, $name: expr, $logging_level: ident) => {
364        $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
365            tasks,
366            $fut,
367            $name,
368            $logging_level,
369            true
370        ))
371    };
372}
373
374pub struct MonitoredScopeGuard {
375    metrics: &'static Metrics,
376    name: &'static str,
377    timer: Instant,
378}
379
380impl Drop for MonitoredScopeGuard {
381    fn drop(&mut self) {
382        self.metrics
383            .scope_duration_ns
384            .with_label_values(&[self.name])
385            .add(self.timer.elapsed().as_nanos() as i64);
386        self.metrics
387            .scope_entrance
388            .with_label_values(&[self.name])
389            .dec();
390    }
391}
392
393/// This function creates a named scoped object, that keeps track of
394/// - the total iterations where the scope is called in the
395///   `monitored_scope_iterations` metric.
396/// - and the total duration of the scope in the `monitored_scope_duration_ns`
397///   metric.
398///
399/// The monitored scope should be single threaded, e.g. the scoped object
400/// encompass the lifetime of a select loop or guarded by mutex.
401/// Then the rate of `monitored_scope_duration_ns`, converted to the unit of sec
402/// / sec, would be how full the single threaded scope is running.
403pub fn monitored_scope(name: &'static str) -> Option<MonitoredScopeGuard> {
404    let metrics = get_metrics();
405    if let Some(m) = metrics {
406        m.scope_iterations.with_label_values(&[name]).inc();
407        m.scope_entrance.with_label_values(&[name]).inc();
408        Some(MonitoredScopeGuard {
409            metrics: m,
410            name,
411            timer: Instant::now(),
412        })
413    } else {
414        None
415    }
416}
417
418/// A trait extension for `Future` to allow monitoring the execution of the
419/// future within a specific scope. Provides the `in_monitored_scope` method to
420/// wrap the future in a `MonitoredScopeFuture`, which tracks the future's
421/// execution using a `MonitoredScopeGuard` for monitoring purposes.
422pub trait MonitoredFutureExt: Future + Sized {
423    /// Wraps the current future in a `MonitoredScopeFuture` that is associated
424    /// with a specific monitored scope name. The scope helps track the
425    /// execution of the future for performance analysis and metrics collection.
426    fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self>;
427}
428
429impl<F: Future> MonitoredFutureExt for F {
430    fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self> {
431        MonitoredScopeFuture {
432            f: Box::pin(self),
433            active_duration_metric: get_metrics()
434                .map(|m| m.future_active_duration_ns.with_label_values(&[name])),
435            _scope: monitored_scope(name),
436        }
437    }
438}
439
440/// A future that runs within a monitored scope. This struct wraps a pinned
441/// future and holds an optional `MonitoredScopeGuard` to measure and monitor
442/// the execution of the future. It forwards polling operations
443/// to the underlying future while maintaining the monitoring scope.
444pub struct MonitoredScopeFuture<F: Sized> {
445    f: Pin<Box<F>>,
446    active_duration_metric: Option<GenericGauge<AtomicI64>>,
447    _scope: Option<MonitoredScopeGuard>,
448}
449
450impl<F: Future> Future for MonitoredScopeFuture<F> {
451    type Output = F::Output;
452
453    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
454        let active_timer = Instant::now();
455        let ret = self.f.as_mut().poll(cx);
456        if let Some(m) = &self.active_duration_metric {
457            m.add(active_timer.elapsed().as_nanos() as i64);
458        }
459        ret
460    }
461}
462
463/// A future that runs within a monitored scope. This struct wraps a pinned
464/// future and holds an optional `MonitoredScopeGuard` to measure and monitor
465/// the execution of the future. It forwards polling operations
466/// to the underlying future while maintaining the monitoring scope.
467pub struct CancelMonitor<F: Sized> {
468    finished: bool,
469    inner: Pin<Box<F>>,
470}
471
472impl<F> CancelMonitor<F>
473where
474    F: Future,
475{
476    /// Creates a new `CancelMonitor` that wraps the given future (`inner`). The
477    /// monitor tracks whether the future has completed.
478    pub fn new(inner: F) -> Self {
479        Self {
480            finished: false,
481            inner: Box::pin(inner),
482        }
483    }
484
485    /// Returns `true` if the future has completed; otherwise, `false`.
486    pub fn is_finished(&self) -> bool {
487        self.finished
488    }
489}
490
491impl<F> Future for CancelMonitor<F>
492where
493    F: Future,
494{
495    type Output = F::Output;
496
497    /// Polls the inner future to determine if it is ready or still pending. For
498    /// `CancelMonitor`, if the future completes (`Poll::Ready`), `finished`
499    /// is set to `true`. If it is still pending, the status remains
500    /// unchanged. This allows monitoring of the future's completion status.
501    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
502        match self.inner.as_mut().poll(cx) {
503            Poll::Ready(output) => {
504                self.finished = true;
505                Poll::Ready(output)
506            }
507            Poll::Pending => Poll::Pending,
508        }
509    }
510}
511
512impl<F: Sized> Drop for CancelMonitor<F> {
513    /// When the `CancelMonitor` is dropped, it checks whether the future has
514    /// finished executing. If the future was not completed (`finished` is
515    /// `false`), it records that the future was cancelled by logging the
516    /// cancellation status using the current span.
517    fn drop(&mut self) {
518        if !self.finished {
519            Span::current().record("cancelled", true);
520        }
521    }
522}
523
524/// MonitorCancellation records a cancelled = true span attribute if the future
525/// it is decorating is dropped before completion. The cancelled attribute must
526/// be added at span creation, as you cannot add new attributes after the span
527/// is created.
528pub trait MonitorCancellation {
529    fn monitor_cancellation(self) -> CancelMonitor<Self>
530    where
531        Self: Sized + Future;
532}
533
534impl<T> MonitorCancellation for T
535where
536    T: Future,
537{
538    fn monitor_cancellation(self) -> CancelMonitor<Self> {
539        CancelMonitor::new(self)
540    }
541}
542
543pub type RegistryID = Uuid;
544
545/// A service to manage the prometheus registries. This service allow us to
546/// create a new Registry on demand and keep it accessible for
547/// processing/polling. The service can be freely cloned/shared across threads.
548#[derive(Clone)]
549pub struct RegistryService {
550    // Holds a Registry that is supposed to be used
551    default_registry: Registry,
552    registries_by_id: Arc<DashMap<Uuid, Registry>>,
553}
554
555impl RegistryService {
556    // Creates a new registry service and also adds the main/default registry that
557    // is supposed to be preserved and never get removed
558    pub fn new(default_registry: Registry) -> Self {
559        Self {
560            default_registry,
561            registries_by_id: Arc::new(DashMap::new()),
562        }
563    }
564
565    // Returns the default registry for the service that someone can use
566    // if they don't want to create a new one.
567    pub fn default_registry(&self) -> Registry {
568        self.default_registry.clone()
569    }
570
571    // Adds a new registry to the service. The corresponding RegistryID is returned
572    // so can later be used for removing the Registry. Method panics if we try
573    // to insert a registry with the same id. As this can be quite serious for
574    // the operation of the node we don't want to accidentally swap an existing
575    // registry - we expected a removal to happen explicitly.
576    pub fn add(&self, registry: Registry) -> RegistryID {
577        let registry_id = Uuid::new_v4();
578        if self
579            .registries_by_id
580            .insert(registry_id, registry)
581            .is_some()
582        {
583            panic!("Other Registry already detected for the same id {registry_id}");
584        }
585
586        registry_id
587    }
588
589    // Removes the registry from the service. If Registry existed then this method
590    // returns true, otherwise false is returned instead.
591    pub fn remove(&self, registry_id: RegistryID) -> bool {
592        self.registries_by_id.remove(&registry_id).is_some()
593    }
594
595    // Returns all the registries of the service
596    pub fn get_all(&self) -> Vec<Registry> {
597        let mut registries: Vec<Registry> = self
598            .registries_by_id
599            .iter()
600            .map(|r| r.value().clone())
601            .collect();
602        registries.push(self.default_registry.clone());
603
604        registries
605    }
606
607    // Returns all the metric families from the registries that a service holds.
608    pub fn gather_all(&self) -> Vec<prometheus_filtered::proto::MetricFamily> {
609        self.get_all().iter().flat_map(|r| r.gather()).collect()
610    }
611}
612
613/// Create a metric that measures the uptime from when this metric was
614/// constructed. The metric is labeled with:
615/// - 'process': the process type, differentiating between validator and
616///   fullnode
617/// - 'version': binary version, generally be of the format:
618///   'semver-gitrevision'
619/// - 'chain_identifier': the identifier of the network which this process is
620///   part of
621pub fn uptime_metric(
622    process: &str,
623    version: &'static str,
624    chain_identifier: &str,
625) -> Box<dyn prometheus_filtered::core::Collector> {
626    let opts = prometheus_filtered::opts!("uptime", "uptime of the node service in seconds")
627        .variable_label("process")
628        .variable_label("version")
629        .variable_label("chain_identifier")
630        .variable_label("os_version")
631        .variable_label("is_docker");
632
633    let start_time = std::time::Instant::now();
634    let uptime = move || start_time.elapsed().as_secs();
635    let metric = prometheus_closure_metric::ClosureMetric::new(
636        opts,
637        prometheus_closure_metric::ValueType::Counter,
638        uptime,
639        &[
640            process,
641            version,
642            chain_identifier,
643            &sysinfo::System::long_os_version()
644                .unwrap_or_else(|| "os_version_unavailable".to_string()),
645            &is_running_in_docker().to_string(),
646        ],
647    )
648    .unwrap();
649
650    Box::new(metric)
651}
652
653pub fn is_running_in_docker() -> bool {
654    // Check for .dockerenv file instead. This file exists in the debian:__-slim
655    // image we use at runtime.
656    Path::new("/.dockerenv").exists()
657}
658
659pub const METRICS_ROUTE: &str = "/metrics";
660
661// Creates a new http server that has as a sole purpose to expose
662// and endpoint that prometheus agent can use to poll for the metrics.
663// A RegistryService is returned that can be used to get access in prometheus
664// Registries.
665pub fn start_prometheus_server(addr: SocketAddr) -> RegistryService {
666    let registry = Registry::new();
667
668    let registry_service = RegistryService::new(registry);
669
670    if cfg!(msim) {
671        // prometheus uses difficult-to-support features such as
672        // TcpSocket::from_raw_fd(), so we can't yet run it in the simulator.
673        warn!("not starting prometheus server in simulator");
674        return registry_service;
675    }
676
677    let app = Router::new()
678        .route(METRICS_ROUTE, get(metrics))
679        .layer(Extension(registry_service.clone()));
680
681    tokio::spawn(async move {
682        let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
683        axum::serve(listener, app.into_make_service())
684            .await
685            .unwrap();
686    });
687
688    registry_service
689}
690
691/// Handles a request to retrieve metrics, using the provided `RegistryService`
692/// to gather all registered metric families. The metrics are then encoded to a
693/// text format for easy consumption by monitoring systems. If successful, it
694/// returns the metrics string with an `OK` status. If an error occurs during
695/// encoding, it returns an `INTERNAL_SERVER_ERROR` status along with an error
696/// message. Returns a tuple containing the status code and either the metrics
697/// data or an error description.
698pub async fn metrics(
699    Extension(registry_service): Extension<RegistryService>,
700) -> (StatusCode, String) {
701    let metrics_families = registry_service.gather_all();
702    match TextEncoder.encode_to_string(&metrics_families) {
703        Ok(metrics) => (StatusCode::OK, metrics),
704        Err(error) => (
705            StatusCode::INTERNAL_SERVER_ERROR,
706            format!("unable to encode metrics: {error}"),
707        ),
708    }
709}
710
711#[cfg(test)]
712mod tests {
713    use prometheus_filtered::{IntCounter, Registry};
714
715    use crate::RegistryService;
716
717    #[test]
718    fn registry_service() {
719        // GIVEN
720        let default_registry = Registry::new_custom(Some("default".to_string()), None).unwrap();
721
722        let registry_service = RegistryService::new(default_registry.clone());
723        let default_counter = IntCounter::new("counter", "counter_desc").unwrap();
724        default_counter.inc();
725        default_registry
726            .register(Box::new(default_counter))
727            .unwrap();
728
729        // AND add a metric to the default registry
730
731        // AND a registry with one metric
732        let registry_1 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
733        registry_1
734            .register(Box::new(
735                IntCounter::new("counter_1", "counter_1_desc").unwrap(),
736            ))
737            .unwrap();
738
739        // WHEN
740        let registry_1_id = registry_service.add(registry_1);
741
742        // THEN
743        let mut metrics = registry_service.gather_all();
744        metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
745
746        assert_eq!(metrics.len(), 2);
747
748        let metric_default = metrics.remove(0);
749        assert_eq!(metric_default.name(), "default_counter");
750        assert_eq!(metric_default.help(), "counter_desc");
751
752        let metric_1: prometheus_filtered::proto::MetricFamily = metrics.remove(0);
753        assert_eq!(metric_1.name(), "iota_counter_1");
754        assert_eq!(metric_1.help(), "counter_1_desc");
755
756        // AND add a second registry with a metric
757        let registry_2 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
758        registry_2
759            .register(Box::new(
760                IntCounter::new("counter_2", "counter_2_desc").unwrap(),
761            ))
762            .unwrap();
763        let _registry_2_id = registry_service.add(registry_2);
764
765        // THEN all the metrics should be returned
766        let mut metrics = registry_service.gather_all();
767        metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
768
769        assert_eq!(metrics.len(), 3);
770
771        let metric_default = metrics.remove(0);
772        assert_eq!(metric_default.name(), "default_counter");
773        assert_eq!(metric_default.help(), "counter_desc");
774
775        let metric_1 = metrics.remove(0);
776        assert_eq!(metric_1.name(), "iota_counter_1");
777        assert_eq!(metric_1.help(), "counter_1_desc");
778
779        let metric_2 = metrics.remove(0);
780        assert_eq!(metric_2.name(), "iota_counter_2");
781        assert_eq!(metric_2.help(), "counter_2_desc");
782
783        // AND remove first registry
784        assert!(registry_service.remove(registry_1_id));
785
786        // THEN metrics should now not contain metric of registry_1
787        let mut metrics = registry_service.gather_all();
788        metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
789
790        assert_eq!(metrics.len(), 2);
791
792        let metric_default = metrics.remove(0);
793        assert_eq!(metric_default.name(), "default_counter");
794        assert_eq!(metric_default.help(), "counter_desc");
795
796        let metric_1 = metrics.remove(0);
797        assert_eq!(metric_1.name(), "iota_counter_2");
798        assert_eq!(metric_1.help(), "counter_2_desc");
799    }
800}