iota_metrics/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    future::Future,
7    net::SocketAddr,
8    path::Path,
9    pin::Pin,
10    sync::Arc,
11    task::{Context, Poll},
12    time::Instant,
13};
14
15use axum::{
16    Router,
17    extract::{Extension, Request},
18    http::StatusCode,
19    middleware::Next,
20    response::Response,
21    routing::get,
22};
23use dashmap::DashMap;
24use once_cell::sync::OnceCell;
25use parking_lot::Mutex;
26use prometheus::{
27    Histogram, IntCounterVec, IntGaugeVec, Registry, TextEncoder,
28    core::{AtomicI64, GenericGauge},
29    register_histogram_with_registry, register_int_counter_vec_with_registry,
30    register_int_gauge_vec_with_registry,
31};
32pub use scopeguard;
33use simple_server_timing_header::Timer;
34use tap::TapFallible;
35use tracing::{Span, warn};
36use uuid::Uuid;
37
38mod guards;
39pub mod hardware_metrics;
40pub mod histogram;
41pub mod metered_channel;
42pub mod metrics_network;
43pub mod monitored_mpsc;
44pub mod thread_stall_monitor;
45pub use guards::*;
46
47pub const TX_TYPE_SINGLE_WRITER_TX: &str = "single_writer";
48pub const TX_TYPE_SHARED_OBJ_TX: &str = "shared_object";
49
50pub const SUBSECOND_LATENCY_SEC_BUCKETS: &[f64] = &[
51    0.005, 0.01, 0.02, 0.03, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1.,
52];
53
54pub const COARSE_LATENCY_SEC_BUCKETS: &[f64] = &[
55    0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1., 2., 3., 5., 10., 20., 30., 60.,
56];
57
58pub const LATENCY_SEC_BUCKETS: &[f64] = &[
59    0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6,
60    0.7, 0.8, 0.9, 1., 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2., 2.5, 3., 3.5, 4., 4.5, 5.,
61    6., 7., 8., 9., 10., 15., 20., 25., 30., 60., 90.,
62];
63
64pub const COUNT_BUCKETS: &[f64] = &[
65    2., 5., 10., 20., 50., 100., 200., 500., 1000., 2000., 5000., 10000.,
66];
67
68pub const BYTES_BUCKETS: &[f64] = &[
69    1024., 4096., 16384., 65536., 262144., 524288., 1048576., 2097152., 4194304., 8388608.,
70    16777216., 33554432., 67108864.,
71];
72
73#[derive(Debug)]
74pub struct Metrics {
75    pub tasks: IntGaugeVec,
76    pub futures: IntGaugeVec,
77    pub channel_inflight: IntGaugeVec,
78    pub channel_sent: IntGaugeVec,
79    pub channel_received: IntGaugeVec,
80    pub future_active_duration_ns: IntGaugeVec,
81    pub scope_iterations: IntGaugeVec,
82    pub scope_duration_ns: IntGaugeVec,
83    pub scope_entrance: IntGaugeVec,
84    pub thread_stall_duration_sec: Histogram,
85    pub system_invariant_violations: IntCounterVec,
86}
87
88impl Metrics {
89    /// Creates a new instance of the monitoring metrics, registering various
90    /// gauges and histograms with the provided metrics `Registry`. The
91    /// gauges track metrics such as the number of running tasks, pending
92    /// futures, channel items, and scope activities, while the histogram
93    /// measures the duration of thread stalls. Each metric is registered
94    /// with descriptive labels to facilitate performance monitoring and
95    /// analysis.
96    fn new(registry: &Registry) -> Self {
97        Self {
98            tasks: register_int_gauge_vec_with_registry!(
99                "monitored_tasks",
100                "Number of running tasks per callsite.",
101                &["callsite"],
102                registry,
103            )
104            .unwrap(),
105            futures: register_int_gauge_vec_with_registry!(
106                "monitored_futures",
107                "Number of pending futures per callsite.",
108                &["callsite"],
109                registry,
110            )
111            .unwrap(),
112            channel_inflight: register_int_gauge_vec_with_registry!(
113                "monitored_channel_inflight",
114                "Inflight items in channels.",
115                &["name"],
116                registry,
117            )
118            .unwrap(),
119            channel_sent: register_int_gauge_vec_with_registry!(
120                "monitored_channel_sent",
121                "Sent items in channels.",
122                &["name"],
123                registry,
124            )
125            .unwrap(),
126            channel_received: register_int_gauge_vec_with_registry!(
127                "monitored_channel_received",
128                "Received items in channels.",
129                &["name"],
130                registry,
131            )
132            .unwrap(),
133            future_active_duration_ns: register_int_gauge_vec_with_registry!(
134                "monitored_future_active_duration_ns",
135                "Total duration in nanosecs where the monitored future is active (consuming CPU time)",
136                &["name"],
137                registry,
138            )
139            .unwrap(),
140            scope_entrance: register_int_gauge_vec_with_registry!(
141                "monitored_scope_entrance",
142                "Number of entrance in the scope.",
143                &["name"],
144                registry,
145            )
146            .unwrap(),
147            scope_iterations: register_int_gauge_vec_with_registry!(
148                "monitored_scope_iterations",
149                "Total number of times where the monitored scope runs",
150                &["name"],
151                registry,
152            )
153            .unwrap(),
154            scope_duration_ns: register_int_gauge_vec_with_registry!(
155                "monitored_scope_duration_ns",
156                "Total duration in nanosecs where the monitored scope is running",
157                &["name"],
158                registry,
159            )
160            .unwrap(),
161            thread_stall_duration_sec: register_histogram_with_registry!(
162                "thread_stall_duration_sec",
163                "Duration of thread stalls in seconds.",
164                registry,
165            )
166            .unwrap(),
167            system_invariant_violations: register_int_counter_vec_with_registry!(
168                "system_invariant_violations",
169                "Number of system invariant violations",
170                &["name"],
171                registry,
172            ).unwrap(),
173        }
174    }
175}
176
177static METRICS: OnceCell<Metrics> = OnceCell::new();
178
179/// Initializes the global `METRICS` instance by setting it to a new `Metrics`
180/// object registered with the provided `Registry`. If `METRICS` is already set,
181/// a warning is logged indicating that the metrics registry was overwritten.
182/// This function is intended to be called once during initialization to set up
183/// metrics collection.
184pub fn init_metrics(registry: &Registry) {
185    let _ = METRICS
186        .set(Metrics::new(registry))
187        // this happens many times during tests
188        .tap_err(|_| warn!("init_metrics registry overwritten"));
189}
190
191/// Retrieves the global `METRICS` instance if it has been initialized.
192pub fn get_metrics() -> Option<&'static Metrics> {
193    METRICS.get()
194}
195
196tokio::task_local! {
197    static SERVER_TIMING: Arc<Mutex<Timer>>;
198}
199
200/// Create a new task-local ServerTiming context and run the provided future
201/// within it. Should be used at the top-most level of a request handler. Can be
202/// added to an axum router as a layer by using
203/// iota_service::server_timing_middleware.
204pub async fn with_new_server_timing<T>(fut: impl Future<Output = T> + Send + 'static) -> T {
205    let timer = Arc::new(Mutex::new(Timer::new()));
206
207    let mut ret = None;
208    SERVER_TIMING
209        .scope(timer, async {
210            ret = Some(fut.await);
211        })
212        .await;
213
214    ret.unwrap()
215}
216
217pub async fn server_timing_middleware(request: Request, next: Next) -> Response {
218    with_new_server_timing(async move {
219        let mut response = next.run(request).await;
220        add_server_timing("finish_request");
221
222        if let Ok(header_value) = get_server_timing()
223            .expect("server timing not set")
224            .lock()
225            .header_value()
226            .try_into()
227        {
228            response
229                .headers_mut()
230                .insert(Timer::header_key(), header_value);
231        }
232        response
233    })
234    .await
235}
236
237/// Create a new task-local ServerTiming context and run the provided future
238/// within it. Only intended for use by macros within this module.
239pub async fn with_server_timing<T>(
240    timer: Arc<Mutex<Timer>>,
241    fut: impl Future<Output = T> + Send + 'static,
242) -> T {
243    let mut ret = None;
244    SERVER_TIMING
245        .scope(timer, async {
246            ret = Some(fut.await);
247        })
248        .await;
249
250    ret.unwrap()
251}
252
253/// Get the currently active ServerTiming context. Only intended for use by
254/// macros within this module.
255pub fn get_server_timing() -> Option<Arc<Mutex<Timer>>> {
256    SERVER_TIMING.try_with(|timer| timer.clone()).ok()
257}
258
259/// Add a new entry to the ServerTiming header.
260/// If the caller is not currently in a ServerTiming context (created with
261/// `with_new_server_timing`), an error is logged.
262pub fn add_server_timing(name: &str) {
263    let res = SERVER_TIMING.try_with(|timer| {
264        timer.lock().add(name);
265    });
266
267    if res.is_err() {
268        tracing::error!("Server timing context not found");
269    }
270}
271
272#[macro_export]
273macro_rules! monitored_future {
274    ($fut: expr) => {{ monitored_future!(futures, $fut, "", INFO, false) }};
275
276    ($metric: ident, $fut: expr, $name: expr, $logging_level: ident, $logging_enabled: expr) => {{
277        let location: &str = if $name.is_empty() {
278            concat!(file!(), ':', line!())
279        } else {
280            concat!(file!(), ':', $name)
281        };
282
283        async move {
284            let metrics = $crate::get_metrics();
285
286            let _metrics_guard = if let Some(m) = metrics {
287                m.$metric.with_label_values(&[location]).inc();
288                Some($crate::scopeguard::guard(m, |_| {
289                    m.$metric.with_label_values(&[location]).dec();
290                }))
291            } else {
292                None
293            };
294            let _logging_guard = if $logging_enabled {
295                Some($crate::scopeguard::guard((), |_| {
296                    tracing::event!(
297                        tracing::Level::$logging_level,
298                        "Future {} completed",
299                        location
300                    );
301                }))
302            } else {
303                None
304            };
305
306            if $logging_enabled {
307                tracing::event!(
308                    tracing::Level::$logging_level,
309                    "Spawning future {}",
310                    location
311                );
312            }
313
314            $fut.await
315        }
316    }};
317}
318
319#[macro_export]
320macro_rules! forward_server_timing_and_spawn {
321    ($fut: expr) => {
322        if let Some(timing) = $crate::get_server_timing() {
323            tokio::task::spawn(async move { $crate::with_server_timing(timing, $fut).await })
324        } else {
325            tokio::task::spawn($fut)
326        }
327    };
328}
329
330#[macro_export]
331macro_rules! spawn_monitored_task {
332    ($fut: expr) => {
333        $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
334            tasks, $fut, "", INFO, false
335        ))
336    };
337}
338
339#[macro_export]
340macro_rules! spawn_logged_monitored_task {
341    ($fut: expr) => {
342        $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
343            tasks, $fut, "", INFO, true
344        ))
345    };
346
347    ($fut: expr, $name: expr) => {
348        $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
349            tasks, $fut, $name, INFO, true
350        ))
351    };
352
353    ($fut: expr, $name: expr, $logging_level: ident) => {
354        $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
355            tasks,
356            $fut,
357            $name,
358            $logging_level,
359            true
360        ))
361    };
362}
363
364pub struct MonitoredScopeGuard {
365    metrics: &'static Metrics,
366    name: &'static str,
367    timer: Instant,
368}
369
370impl Drop for MonitoredScopeGuard {
371    fn drop(&mut self) {
372        self.metrics
373            .scope_duration_ns
374            .with_label_values(&[self.name])
375            .add(self.timer.elapsed().as_nanos() as i64);
376        self.metrics
377            .scope_entrance
378            .with_label_values(&[self.name])
379            .dec();
380    }
381}
382
383/// This function creates a named scoped object, that keeps track of
384/// - the total iterations where the scope is called in the
385///   `monitored_scope_iterations` metric.
386/// - and the total duration of the scope in the `monitored_scope_duration_ns`
387///   metric.
388///
389/// The monitored scope should be single threaded, e.g. the scoped object
390/// encompass the lifetime of a select loop or guarded by mutex.
391/// Then the rate of `monitored_scope_duration_ns`, converted to the unit of sec
392/// / sec, would be how full the single threaded scope is running.
393pub fn monitored_scope(name: &'static str) -> Option<MonitoredScopeGuard> {
394    let metrics = get_metrics();
395    if let Some(m) = metrics {
396        m.scope_iterations.with_label_values(&[name]).inc();
397        m.scope_entrance.with_label_values(&[name]).inc();
398        Some(MonitoredScopeGuard {
399            metrics: m,
400            name,
401            timer: Instant::now(),
402        })
403    } else {
404        None
405    }
406}
407
408/// A trait extension for `Future` to allow monitoring the execution of the
409/// future within a specific scope. Provides the `in_monitored_scope` method to
410/// wrap the future in a `MonitoredScopeFuture`, which tracks the future's
411/// execution using a `MonitoredScopeGuard` for monitoring purposes.
412pub trait MonitoredFutureExt: Future + Sized {
413    /// Wraps the current future in a `MonitoredScopeFuture` that is associated
414    /// with a specific monitored scope name. The scope helps track the
415    /// execution of the future for performance analysis and metrics collection.
416    fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self>;
417}
418
419impl<F: Future> MonitoredFutureExt for F {
420    fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self> {
421        MonitoredScopeFuture {
422            f: Box::pin(self),
423            active_duration_metric: get_metrics()
424                .map(|m| m.future_active_duration_ns.with_label_values(&[name])),
425            _scope: monitored_scope(name),
426        }
427    }
428}
429
430/// A future that runs within a monitored scope. This struct wraps a pinned
431/// future and holds an optional `MonitoredScopeGuard` to measure and monitor
432/// the execution of the future. It forwards polling operations
433/// to the underlying future while maintaining the monitoring scope.
434pub struct MonitoredScopeFuture<F: Sized> {
435    f: Pin<Box<F>>,
436    active_duration_metric: Option<GenericGauge<AtomicI64>>,
437    _scope: Option<MonitoredScopeGuard>,
438}
439
440impl<F: Future> Future for MonitoredScopeFuture<F> {
441    type Output = F::Output;
442
443    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
444        let active_timer = Instant::now();
445        let ret = self.f.as_mut().poll(cx);
446        if let Some(m) = &self.active_duration_metric {
447            m.add(active_timer.elapsed().as_nanos() as i64);
448        }
449        ret
450    }
451}
452
453/// A future that runs within a monitored scope. This struct wraps a pinned
454/// future and holds an optional `MonitoredScopeGuard` to measure and monitor
455/// the execution of the future. It forwards polling operations
456/// to the underlying future while maintaining the monitoring scope.
457pub struct CancelMonitor<F: Sized> {
458    finished: bool,
459    inner: Pin<Box<F>>,
460}
461
462impl<F> CancelMonitor<F>
463where
464    F: Future,
465{
466    /// Creates a new `CancelMonitor` that wraps the given future (`inner`). The
467    /// monitor tracks whether the future has completed.
468    pub fn new(inner: F) -> Self {
469        Self {
470            finished: false,
471            inner: Box::pin(inner),
472        }
473    }
474
475    /// Returns `true` if the future has completed; otherwise, `false`.
476    pub fn is_finished(&self) -> bool {
477        self.finished
478    }
479}
480
481impl<F> Future for CancelMonitor<F>
482where
483    F: Future,
484{
485    type Output = F::Output;
486
487    /// Polls the inner future to determine if it is ready or still pending. For
488    /// `CancelMonitor`, if the future completes (`Poll::Ready`), `finished`
489    /// is set to `true`. If it is still pending, the status remains
490    /// unchanged. This allows monitoring of the future's completion status.
491    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
492        match self.inner.as_mut().poll(cx) {
493            Poll::Ready(output) => {
494                self.finished = true;
495                Poll::Ready(output)
496            }
497            Poll::Pending => Poll::Pending,
498        }
499    }
500}
501
502impl<F: Sized> Drop for CancelMonitor<F> {
503    /// When the `CancelMonitor` is dropped, it checks whether the future has
504    /// finished executing. If the future was not completed (`finished` is
505    /// `false`), it records that the future was cancelled by logging the
506    /// cancellation status using the current span.
507    fn drop(&mut self) {
508        if !self.finished {
509            Span::current().record("cancelled", true);
510        }
511    }
512}
513
514/// MonitorCancellation records a cancelled = true span attribute if the future
515/// it is decorating is dropped before completion. The cancelled attribute must
516/// be added at span creation, as you cannot add new attributes after the span
517/// is created.
518pub trait MonitorCancellation {
519    fn monitor_cancellation(self) -> CancelMonitor<Self>
520    where
521        Self: Sized + Future;
522}
523
524impl<T> MonitorCancellation for T
525where
526    T: Future,
527{
528    fn monitor_cancellation(self) -> CancelMonitor<Self> {
529        CancelMonitor::new(self)
530    }
531}
532
533pub type RegistryID = Uuid;
534
535/// A service to manage the prometheus registries. This service allow us to
536/// create a new Registry on demand and keep it accessible for
537/// processing/polling. The service can be freely cloned/shared across threads.
538#[derive(Clone)]
539pub struct RegistryService {
540    // Holds a Registry that is supposed to be used
541    default_registry: Registry,
542    registries_by_id: Arc<DashMap<Uuid, Registry>>,
543}
544
545impl RegistryService {
546    // Creates a new registry service and also adds the main/default registry that
547    // is supposed to be preserved and never get removed
548    pub fn new(default_registry: Registry) -> Self {
549        Self {
550            default_registry,
551            registries_by_id: Arc::new(DashMap::new()),
552        }
553    }
554
555    // Returns the default registry for the service that someone can use
556    // if they don't want to create a new one.
557    pub fn default_registry(&self) -> Registry {
558        self.default_registry.clone()
559    }
560
561    // Adds a new registry to the service. The corresponding RegistryID is returned
562    // so can later be used for removing the Registry. Method panics if we try
563    // to insert a registry with the same id. As this can be quite serious for
564    // the operation of the node we don't want to accidentally swap an existing
565    // registry - we expected a removal to happen explicitly.
566    pub fn add(&self, registry: Registry) -> RegistryID {
567        let registry_id = Uuid::new_v4();
568        if self
569            .registries_by_id
570            .insert(registry_id, registry)
571            .is_some()
572        {
573            panic!("Other Registry already detected for the same id {registry_id}");
574        }
575
576        registry_id
577    }
578
579    // Removes the registry from the service. If Registry existed then this method
580    // returns true, otherwise false is returned instead.
581    pub fn remove(&self, registry_id: RegistryID) -> bool {
582        self.registries_by_id.remove(&registry_id).is_some()
583    }
584
585    // Returns all the registries of the service
586    pub fn get_all(&self) -> Vec<Registry> {
587        let mut registries: Vec<Registry> = self
588            .registries_by_id
589            .iter()
590            .map(|r| r.value().clone())
591            .collect();
592        registries.push(self.default_registry.clone());
593
594        registries
595    }
596
597    // Returns all the metric families from the registries that a service holds.
598    pub fn gather_all(&self) -> Vec<prometheus::proto::MetricFamily> {
599        self.get_all().iter().flat_map(|r| r.gather()).collect()
600    }
601}
602
603/// Create a metric that measures the uptime from when this metric was
604/// constructed. The metric is labeled with:
605/// - 'process': the process type, differentiating between validator and
606///   fullnode
607/// - 'version': binary version, generally be of the format:
608///   'semver-gitrevision'
609/// - 'chain_identifier': the identifier of the network which this process is
610///   part of
611pub fn uptime_metric(
612    process: &str,
613    version: &'static str,
614    chain_identifier: &str,
615) -> Box<dyn prometheus::core::Collector> {
616    let opts = prometheus::opts!("uptime", "uptime of the node service in seconds")
617        .variable_label("process")
618        .variable_label("version")
619        .variable_label("chain_identifier")
620        .variable_label("os_version")
621        .variable_label("is_docker");
622
623    let start_time = std::time::Instant::now();
624    let uptime = move || start_time.elapsed().as_secs();
625    let metric = prometheus_closure_metric::ClosureMetric::new(
626        opts,
627        prometheus_closure_metric::ValueType::Counter,
628        uptime,
629        &[
630            process,
631            version,
632            chain_identifier,
633            &sysinfo::System::long_os_version()
634                .unwrap_or_else(|| "os_version_unavailable".to_string()),
635            &is_running_in_docker().to_string(),
636        ],
637    )
638    .unwrap();
639
640    Box::new(metric)
641}
642
643pub fn is_running_in_docker() -> bool {
644    // Check for .dockerenv file instead. This file exists in the debian:__-slim
645    // image we use at runtime.
646    Path::new("/.dockerenv").exists()
647}
648
649pub const METRICS_ROUTE: &str = "/metrics";
650
651// Creates a new http server that has as a sole purpose to expose
652// and endpoint that prometheus agent can use to poll for the metrics.
653// A RegistryService is returned that can be used to get access in prometheus
654// Registries.
655pub fn start_prometheus_server(addr: SocketAddr) -> RegistryService {
656    let registry = Registry::new();
657
658    let registry_service = RegistryService::new(registry);
659
660    if cfg!(msim) {
661        // prometheus uses difficult-to-support features such as
662        // TcpSocket::from_raw_fd(), so we can't yet run it in the simulator.
663        warn!("not starting prometheus server in simulator");
664        return registry_service;
665    }
666
667    let app = Router::new()
668        .route(METRICS_ROUTE, get(metrics))
669        .layer(Extension(registry_service.clone()));
670
671    tokio::spawn(async move {
672        let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
673        axum::serve(listener, app.into_make_service())
674            .await
675            .unwrap();
676    });
677
678    registry_service
679}
680
681/// Handles a request to retrieve metrics, using the provided `RegistryService`
682/// to gather all registered metric families. The metrics are then encoded to a
683/// text format for easy consumption by monitoring systems. If successful, it
684/// returns the metrics string with an `OK` status. If an error occurs during
685/// encoding, it returns an `INTERNAL_SERVER_ERROR` status along with an error
686/// message. Returns a tuple containing the status code and either the metrics
687/// data or an error description.
688pub async fn metrics(
689    Extension(registry_service): Extension<RegistryService>,
690) -> (StatusCode, String) {
691    let metrics_families = registry_service.gather_all();
692    match TextEncoder.encode_to_string(&metrics_families) {
693        Ok(metrics) => (StatusCode::OK, metrics),
694        Err(error) => (
695            StatusCode::INTERNAL_SERVER_ERROR,
696            format!("unable to encode metrics: {error}"),
697        ),
698    }
699}
700
701#[cfg(test)]
702mod tests {
703    use prometheus::{IntCounter, Registry};
704
705    use crate::RegistryService;
706
707    #[test]
708    fn registry_service() {
709        // GIVEN
710        let default_registry = Registry::new_custom(Some("default".to_string()), None).unwrap();
711
712        let registry_service = RegistryService::new(default_registry.clone());
713        let default_counter = IntCounter::new("counter", "counter_desc").unwrap();
714        default_counter.inc();
715        default_registry
716            .register(Box::new(default_counter))
717            .unwrap();
718
719        // AND add a metric to the default registry
720
721        // AND a registry with one metric
722        let registry_1 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
723        registry_1
724            .register(Box::new(
725                IntCounter::new("counter_1", "counter_1_desc").unwrap(),
726            ))
727            .unwrap();
728
729        // WHEN
730        let registry_1_id = registry_service.add(registry_1);
731
732        // THEN
733        let mut metrics = registry_service.gather_all();
734        metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
735
736        assert_eq!(metrics.len(), 2);
737
738        let metric_default = metrics.remove(0);
739        assert_eq!(metric_default.name(), "default_counter");
740        assert_eq!(metric_default.help(), "counter_desc");
741
742        let metric_1: prometheus::proto::MetricFamily = metrics.remove(0);
743        assert_eq!(metric_1.name(), "iota_counter_1");
744        assert_eq!(metric_1.help(), "counter_1_desc");
745
746        // AND add a second registry with a metric
747        let registry_2 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
748        registry_2
749            .register(Box::new(
750                IntCounter::new("counter_2", "counter_2_desc").unwrap(),
751            ))
752            .unwrap();
753        let _registry_2_id = registry_service.add(registry_2);
754
755        // THEN all the metrics should be returned
756        let mut metrics = registry_service.gather_all();
757        metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
758
759        assert_eq!(metrics.len(), 3);
760
761        let metric_default = metrics.remove(0);
762        assert_eq!(metric_default.name(), "default_counter");
763        assert_eq!(metric_default.help(), "counter_desc");
764
765        let metric_1 = metrics.remove(0);
766        assert_eq!(metric_1.name(), "iota_counter_1");
767        assert_eq!(metric_1.help(), "counter_1_desc");
768
769        let metric_2 = metrics.remove(0);
770        assert_eq!(metric_2.name(), "iota_counter_2");
771        assert_eq!(metric_2.help(), "counter_2_desc");
772
773        // AND remove first registry
774        assert!(registry_service.remove(registry_1_id));
775
776        // THEN metrics should now not contain metric of registry_1
777        let mut metrics = registry_service.gather_all();
778        metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
779
780        assert_eq!(metrics.len(), 2);
781
782        let metric_default = metrics.remove(0);
783        assert_eq!(metric_default.name(), "default_counter");
784        assert_eq!(metric_default.help(), "counter_desc");
785
786        let metric_1 = metrics.remove(0);
787        assert_eq!(metric_1.name(), "iota_counter_2");
788        assert_eq!(metric_1.help(), "counter_2_desc");
789    }
790}