iota_metrics/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    future::Future,
7    net::SocketAddr,
8    path::Path,
9    pin::Pin,
10    sync::Arc,
11    task::{Context, Poll},
12    time::Instant,
13};
14
15use axum::{
16    Router,
17    extract::{Extension, Request},
18    http::StatusCode,
19    middleware::Next,
20    response::Response,
21    routing::get,
22};
23use dashmap::DashMap;
24use once_cell::sync::OnceCell;
25use parking_lot::Mutex;
26use prometheus::{
27    Histogram, IntGaugeVec, Registry, TextEncoder,
28    core::{AtomicI64, GenericGauge},
29    register_histogram_with_registry, register_int_gauge_vec_with_registry,
30};
31pub use scopeguard;
32use simple_server_timing_header::Timer;
33use tap::TapFallible;
34use tracing::{Span, warn};
35use uuid::Uuid;
36
37mod guards;
38pub mod hardware_metrics;
39pub mod histogram;
40pub mod metered_channel;
41pub mod metrics_network;
42pub mod monitored_mpsc;
43pub mod thread_stall_monitor;
44pub use guards::*;
45
46pub const TX_TYPE_SINGLE_WRITER_TX: &str = "single_writer";
47pub const TX_TYPE_SHARED_OBJ_TX: &str = "shared_object";
48
49pub const SUBSECOND_LATENCY_SEC_BUCKETS: &[f64] = &[
50    0.005, 0.01, 0.02, 0.03, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1.,
51];
52
53pub const COARSE_LATENCY_SEC_BUCKETS: &[f64] = &[
54    0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1., 2., 3., 5., 10., 20., 30., 60.,
55];
56
57pub const LATENCY_SEC_BUCKETS: &[f64] = &[
58    0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6,
59    0.7, 0.8, 0.9, 1., 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2., 2.5, 3., 3.5, 4., 4.5, 5.,
60    6., 7., 8., 9., 10., 15., 20., 25., 30., 60., 90.,
61];
62
63pub const COUNT_BUCKETS: &[f64] = &[
64    2., 5., 10., 20., 50., 100., 200., 500., 1000., 2000., 5000., 10000.,
65];
66
67pub const BYTES_BUCKETS: &[f64] = &[
68    1024., 4096., 16384., 65536., 262144., 524288., 1048576., 2097152., 4194304., 8388608.,
69    16777216., 33554432., 67108864.,
70];
71
72#[derive(Debug)]
73pub struct Metrics {
74    pub tasks: IntGaugeVec,
75    pub futures: IntGaugeVec,
76    pub channel_inflight: IntGaugeVec,
77    pub channel_sent: IntGaugeVec,
78    pub channel_received: IntGaugeVec,
79    pub future_active_duration_ns: IntGaugeVec,
80    pub scope_iterations: IntGaugeVec,
81    pub scope_duration_ns: IntGaugeVec,
82    pub scope_entrance: IntGaugeVec,
83    pub thread_stall_duration_sec: Histogram,
84}
85
86impl Metrics {
87    /// Creates a new instance of the monitoring metrics, registering various
88    /// gauges and histograms with the provided metrics `Registry`. The
89    /// gauges track metrics such as the number of running tasks, pending
90    /// futures, channel items, and scope activities, while the histogram
91    /// measures the duration of thread stalls. Each metric is registered
92    /// with descriptive labels to facilitate performance monitoring and
93    /// analysis.
94    fn new(registry: &Registry) -> Self {
95        Self {
96            tasks: register_int_gauge_vec_with_registry!(
97                "monitored_tasks",
98                "Number of running tasks per callsite.",
99                &["callsite"],
100                registry,
101            )
102            .unwrap(),
103            futures: register_int_gauge_vec_with_registry!(
104                "monitored_futures",
105                "Number of pending futures per callsite.",
106                &["callsite"],
107                registry,
108            )
109            .unwrap(),
110            channel_inflight: register_int_gauge_vec_with_registry!(
111                "monitored_channel_inflight",
112                "Inflight items in channels.",
113                &["name"],
114                registry,
115            )
116            .unwrap(),
117            channel_sent: register_int_gauge_vec_with_registry!(
118                "monitored_channel_sent",
119                "Sent items in channels.",
120                &["name"],
121                registry,
122            )
123            .unwrap(),
124            channel_received: register_int_gauge_vec_with_registry!(
125                "monitored_channel_received",
126                "Received items in channels.",
127                &["name"],
128                registry,
129            )
130            .unwrap(),
131            future_active_duration_ns: register_int_gauge_vec_with_registry!(
132                "monitored_future_active_duration_ns",
133                "Total duration in nanosecs where the monitored future is active (consuming CPU time)",
134                &["name"],
135                registry,
136            )
137            .unwrap(),
138            scope_entrance: register_int_gauge_vec_with_registry!(
139                "monitored_scope_entrance",
140                "Number of entrance in the scope.",
141                &["name"],
142                registry,
143            )
144            .unwrap(),
145            scope_iterations: register_int_gauge_vec_with_registry!(
146                "monitored_scope_iterations",
147                "Total number of times where the monitored scope runs",
148                &["name"],
149                registry,
150            )
151            .unwrap(),
152            scope_duration_ns: register_int_gauge_vec_with_registry!(
153                "monitored_scope_duration_ns",
154                "Total duration in nanosecs where the monitored scope is running",
155                &["name"],
156                registry,
157            )
158            .unwrap(),
159            thread_stall_duration_sec: register_histogram_with_registry!(
160                "thread_stall_duration_sec",
161                "Duration of thread stalls in seconds.",
162                registry,
163            )
164            .unwrap(),
165        }
166    }
167}
168
169static METRICS: OnceCell<Metrics> = OnceCell::new();
170
171/// Initializes the global `METRICS` instance by setting it to a new `Metrics`
172/// object registered with the provided `Registry`. If `METRICS` is already set,
173/// a warning is logged indicating that the metrics registry was overwritten.
174/// This function is intended to be called once during initialization to set up
175/// metrics collection.
176pub fn init_metrics(registry: &Registry) {
177    let _ = METRICS
178        .set(Metrics::new(registry))
179        // this happens many times during tests
180        .tap_err(|_| warn!("init_metrics registry overwritten"));
181}
182
183/// Retrieves the global `METRICS` instance if it has been initialized.
184pub fn get_metrics() -> Option<&'static Metrics> {
185    METRICS.get()
186}
187
188tokio::task_local! {
189    static SERVER_TIMING: Arc<Mutex<Timer>>;
190}
191
192/// Create a new task-local ServerTiming context and run the provided future
193/// within it. Should be used at the top-most level of a request handler. Can be
194/// added to an axum router as a layer by using
195/// iota_service::server_timing_middleware.
196pub async fn with_new_server_timing<T>(fut: impl Future<Output = T> + Send + 'static) -> T {
197    let timer = Arc::new(Mutex::new(Timer::new()));
198
199    let mut ret = None;
200    SERVER_TIMING
201        .scope(timer, async {
202            ret = Some(fut.await);
203        })
204        .await;
205
206    ret.unwrap()
207}
208
209pub async fn server_timing_middleware(request: Request, next: Next) -> Response {
210    with_new_server_timing(async move {
211        let mut response = next.run(request).await;
212        add_server_timing("finish_request");
213
214        if let Ok(header_value) = get_server_timing()
215            .expect("server timing not set")
216            .lock()
217            .header_value()
218            .try_into()
219        {
220            response
221                .headers_mut()
222                .insert(Timer::header_key(), header_value);
223        }
224        response
225    })
226    .await
227}
228
229/// Create a new task-local ServerTiming context and run the provided future
230/// within it. Only intended for use by macros within this module.
231pub async fn with_server_timing<T>(
232    timer: Arc<Mutex<Timer>>,
233    fut: impl Future<Output = T> + Send + 'static,
234) -> T {
235    let mut ret = None;
236    SERVER_TIMING
237        .scope(timer, async {
238            ret = Some(fut.await);
239        })
240        .await;
241
242    ret.unwrap()
243}
244
245/// Get the currently active ServerTiming context. Only intended for use by
246/// macros within this module.
247pub fn get_server_timing() -> Option<Arc<Mutex<Timer>>> {
248    SERVER_TIMING.try_with(|timer| timer.clone()).ok()
249}
250
251/// Add a new entry to the ServerTiming header.
252/// If the caller is not currently in a ServerTiming context (created with
253/// `with_new_server_timing`), an error is logged.
254pub fn add_server_timing(name: &str) {
255    let res = SERVER_TIMING.try_with(|timer| {
256        timer.lock().add(name);
257    });
258
259    if res.is_err() {
260        tracing::error!("Server timing context not found");
261    }
262}
263
264#[macro_export]
265macro_rules! monitored_future {
266    ($fut: expr) => {{ monitored_future!(futures, $fut, "", INFO, false) }};
267
268    ($metric: ident, $fut: expr, $name: expr, $logging_level: ident, $logging_enabled: expr) => {{
269        let location: &str = if $name.is_empty() {
270            concat!(file!(), ':', line!())
271        } else {
272            concat!(file!(), ':', $name)
273        };
274
275        async move {
276            let metrics = $crate::get_metrics();
277
278            let _metrics_guard = if let Some(m) = metrics {
279                m.$metric.with_label_values(&[location]).inc();
280                Some($crate::scopeguard::guard(m, |_| {
281                    m.$metric.with_label_values(&[location]).dec();
282                }))
283            } else {
284                None
285            };
286            let _logging_guard = if $logging_enabled {
287                Some($crate::scopeguard::guard((), |_| {
288                    tracing::event!(
289                        tracing::Level::$logging_level,
290                        "Future {} completed",
291                        location
292                    );
293                }))
294            } else {
295                None
296            };
297
298            if $logging_enabled {
299                tracing::event!(
300                    tracing::Level::$logging_level,
301                    "Spawning future {}",
302                    location
303                );
304            }
305
306            $fut.await
307        }
308    }};
309}
310
311#[macro_export]
312macro_rules! forward_server_timing_and_spawn {
313    ($fut: expr) => {
314        if let Some(timing) = $crate::get_server_timing() {
315            tokio::task::spawn(async move { $crate::with_server_timing(timing, $fut).await })
316        } else {
317            tokio::task::spawn($fut)
318        }
319    };
320}
321
322#[macro_export]
323macro_rules! spawn_monitored_task {
324    ($fut: expr) => {
325        $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
326            tasks, $fut, "", INFO, false
327        ))
328    };
329}
330
331#[macro_export]
332macro_rules! spawn_logged_monitored_task {
333    ($fut: expr) => {
334        $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
335            tasks, $fut, "", INFO, true
336        ))
337    };
338
339    ($fut: expr, $name: expr) => {
340        $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
341            tasks, $fut, $name, INFO, true
342        ))
343    };
344
345    ($fut: expr, $name: expr, $logging_level: ident) => {
346        $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
347            tasks,
348            $fut,
349            $name,
350            $logging_level,
351            true
352        ))
353    };
354}
355
356pub struct MonitoredScopeGuard {
357    metrics: &'static Metrics,
358    name: &'static str,
359    timer: Instant,
360}
361
362impl Drop for MonitoredScopeGuard {
363    fn drop(&mut self) {
364        self.metrics
365            .scope_duration_ns
366            .with_label_values(&[self.name])
367            .add(self.timer.elapsed().as_nanos() as i64);
368        self.metrics
369            .scope_entrance
370            .with_label_values(&[self.name])
371            .dec();
372    }
373}
374
375/// This function creates a named scoped object, that keeps track of
376/// - the total iterations where the scope is called in the
377///   `monitored_scope_iterations` metric.
378/// - and the total duration of the scope in the `monitored_scope_duration_ns`
379///   metric.
380///
381/// The monitored scope should be single threaded, e.g. the scoped object
382/// encompass the lifetime of a select loop or guarded by mutex.
383/// Then the rate of `monitored_scope_duration_ns`, converted to the unit of sec
384/// / sec, would be how full the single threaded scope is running.
385pub fn monitored_scope(name: &'static str) -> Option<MonitoredScopeGuard> {
386    let metrics = get_metrics();
387    if let Some(m) = metrics {
388        m.scope_iterations.with_label_values(&[name]).inc();
389        m.scope_entrance.with_label_values(&[name]).inc();
390        Some(MonitoredScopeGuard {
391            metrics: m,
392            name,
393            timer: Instant::now(),
394        })
395    } else {
396        None
397    }
398}
399
400/// A trait extension for `Future` to allow monitoring the execution of the
401/// future within a specific scope. Provides the `in_monitored_scope` method to
402/// wrap the future in a `MonitoredScopeFuture`, which tracks the future's
403/// execution using a `MonitoredScopeGuard` for monitoring purposes.
404pub trait MonitoredFutureExt: Future + Sized {
405    /// Wraps the current future in a `MonitoredScopeFuture` that is associated
406    /// with a specific monitored scope name. The scope helps track the
407    /// execution of the future for performance analysis and metrics collection.
408    fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self>;
409}
410
411impl<F: Future> MonitoredFutureExt for F {
412    fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self> {
413        MonitoredScopeFuture {
414            f: Box::pin(self),
415            active_duration_metric: get_metrics()
416                .map(|m| m.future_active_duration_ns.with_label_values(&[name])),
417            _scope: monitored_scope(name),
418        }
419    }
420}
421
422/// A future that runs within a monitored scope. This struct wraps a pinned
423/// future and holds an optional `MonitoredScopeGuard` to measure and monitor
424/// the execution of the future. It forwards polling operations
425/// to the underlying future while maintaining the monitoring scope.
426pub struct MonitoredScopeFuture<F: Sized> {
427    f: Pin<Box<F>>,
428    active_duration_metric: Option<GenericGauge<AtomicI64>>,
429    _scope: Option<MonitoredScopeGuard>,
430}
431
432impl<F: Future> Future for MonitoredScopeFuture<F> {
433    type Output = F::Output;
434
435    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
436        let active_timer = Instant::now();
437        let ret = self.f.as_mut().poll(cx);
438        if let Some(m) = &self.active_duration_metric {
439            m.add(active_timer.elapsed().as_nanos() as i64);
440        }
441        ret
442    }
443}
444
445/// A future that runs within a monitored scope. This struct wraps a pinned
446/// future and holds an optional `MonitoredScopeGuard` to measure and monitor
447/// the execution of the future. It forwards polling operations
448/// to the underlying future while maintaining the monitoring scope.
449pub struct CancelMonitor<F: Sized> {
450    finished: bool,
451    inner: Pin<Box<F>>,
452}
453
454impl<F> CancelMonitor<F>
455where
456    F: Future,
457{
458    /// Creates a new `CancelMonitor` that wraps the given future (`inner`). The
459    /// monitor tracks whether the future has completed.
460    pub fn new(inner: F) -> Self {
461        Self {
462            finished: false,
463            inner: Box::pin(inner),
464        }
465    }
466
467    /// Returns `true` if the future has completed; otherwise, `false`.
468    pub fn is_finished(&self) -> bool {
469        self.finished
470    }
471}
472
473impl<F> Future for CancelMonitor<F>
474where
475    F: Future,
476{
477    type Output = F::Output;
478
479    /// Polls the inner future to determine if it is ready or still pending. For
480    /// `CancelMonitor`, if the future completes (`Poll::Ready`), `finished`
481    /// is set to `true`. If it is still pending, the status remains
482    /// unchanged. This allows monitoring of the future's completion status.
483    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
484        match self.inner.as_mut().poll(cx) {
485            Poll::Ready(output) => {
486                self.finished = true;
487                Poll::Ready(output)
488            }
489            Poll::Pending => Poll::Pending,
490        }
491    }
492}
493
494impl<F: Sized> Drop for CancelMonitor<F> {
495    /// When the `CancelMonitor` is dropped, it checks whether the future has
496    /// finished executing. If the future was not completed (`finished` is
497    /// `false`), it records that the future was cancelled by logging the
498    /// cancellation status using the current span.
499    fn drop(&mut self) {
500        if !self.finished {
501            Span::current().record("cancelled", true);
502        }
503    }
504}
505
506/// MonitorCancellation records a cancelled = true span attribute if the future
507/// it is decorating is dropped before completion. The cancelled attribute must
508/// be added at span creation, as you cannot add new attributes after the span
509/// is created.
510pub trait MonitorCancellation {
511    fn monitor_cancellation(self) -> CancelMonitor<Self>
512    where
513        Self: Sized + Future;
514}
515
516impl<T> MonitorCancellation for T
517where
518    T: Future,
519{
520    fn monitor_cancellation(self) -> CancelMonitor<Self> {
521        CancelMonitor::new(self)
522    }
523}
524
525pub type RegistryID = Uuid;
526
527/// A service to manage the prometheus registries. This service allow us to
528/// create a new Registry on demand and keep it accessible for
529/// processing/polling. The service can be freely cloned/shared across threads.
530#[derive(Clone)]
531pub struct RegistryService {
532    // Holds a Registry that is supposed to be used
533    default_registry: Registry,
534    registries_by_id: Arc<DashMap<Uuid, Registry>>,
535}
536
537impl RegistryService {
538    // Creates a new registry service and also adds the main/default registry that
539    // is supposed to be preserved and never get removed
540    pub fn new(default_registry: Registry) -> Self {
541        Self {
542            default_registry,
543            registries_by_id: Arc::new(DashMap::new()),
544        }
545    }
546
547    // Returns the default registry for the service that someone can use
548    // if they don't want to create a new one.
549    pub fn default_registry(&self) -> Registry {
550        self.default_registry.clone()
551    }
552
553    // Adds a new registry to the service. The corresponding RegistryID is returned
554    // so can later be used for removing the Registry. Method panics if we try
555    // to insert a registry with the same id. As this can be quite serious for
556    // the operation of the node we don't want to accidentally swap an existing
557    // registry - we expected a removal to happen explicitly.
558    pub fn add(&self, registry: Registry) -> RegistryID {
559        let registry_id = Uuid::new_v4();
560        if self
561            .registries_by_id
562            .insert(registry_id, registry)
563            .is_some()
564        {
565            panic!("Other Registry already detected for the same id {registry_id}");
566        }
567
568        registry_id
569    }
570
571    // Removes the registry from the service. If Registry existed then this method
572    // returns true, otherwise false is returned instead.
573    pub fn remove(&self, registry_id: RegistryID) -> bool {
574        self.registries_by_id.remove(&registry_id).is_some()
575    }
576
577    // Returns all the registries of the service
578    pub fn get_all(&self) -> Vec<Registry> {
579        let mut registries: Vec<Registry> = self
580            .registries_by_id
581            .iter()
582            .map(|r| r.value().clone())
583            .collect();
584        registries.push(self.default_registry.clone());
585
586        registries
587    }
588
589    // Returns all the metric families from the registries that a service holds.
590    pub fn gather_all(&self) -> Vec<prometheus::proto::MetricFamily> {
591        self.get_all().iter().flat_map(|r| r.gather()).collect()
592    }
593}
594
595/// Create a metric that measures the uptime from when this metric was
596/// constructed. The metric is labeled with:
597/// - 'process': the process type, differentiating between validator and
598///   fullnode
599/// - 'version': binary version, generally be of the format:
600///   'semver-gitrevision'
601/// - 'chain_identifier': the identifier of the network which this process is
602///   part of
603pub fn uptime_metric(
604    process: &str,
605    version: &'static str,
606    chain_identifier: &str,
607) -> Box<dyn prometheus::core::Collector> {
608    let opts = prometheus::opts!("uptime", "uptime of the node service in seconds")
609        .variable_label("process")
610        .variable_label("version")
611        .variable_label("chain_identifier")
612        .variable_label("os_version")
613        .variable_label("is_docker");
614
615    let start_time = std::time::Instant::now();
616    let uptime = move || start_time.elapsed().as_secs();
617    let metric = prometheus_closure_metric::ClosureMetric::new(
618        opts,
619        prometheus_closure_metric::ValueType::Counter,
620        uptime,
621        &[
622            process,
623            version,
624            chain_identifier,
625            &sysinfo::System::long_os_version()
626                .unwrap_or_else(|| "os_version_unavailable".to_string()),
627            &is_running_in_docker().to_string(),
628        ],
629    )
630    .unwrap();
631
632    Box::new(metric)
633}
634
635pub fn is_running_in_docker() -> bool {
636    // Check for .dockerenv file instead. This file exists in the debian:__-slim
637    // image we use at runtime.
638    Path::new("/.dockerenv").exists()
639}
640
641pub const METRICS_ROUTE: &str = "/metrics";
642
643// Creates a new http server that has as a sole purpose to expose
644// and endpoint that prometheus agent can use to poll for the metrics.
645// A RegistryService is returned that can be used to get access in prometheus
646// Registries.
647pub fn start_prometheus_server(addr: SocketAddr) -> RegistryService {
648    let registry = Registry::new();
649
650    let registry_service = RegistryService::new(registry);
651
652    if cfg!(msim) {
653        // prometheus uses difficult-to-support features such as
654        // TcpSocket::from_raw_fd(), so we can't yet run it in the simulator.
655        warn!("not starting prometheus server in simulator");
656        return registry_service;
657    }
658
659    let app = Router::new()
660        .route(METRICS_ROUTE, get(metrics))
661        .layer(Extension(registry_service.clone()));
662
663    tokio::spawn(async move {
664        let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
665        axum::serve(listener, app.into_make_service())
666            .await
667            .unwrap();
668    });
669
670    registry_service
671}
672
673/// Handles a request to retrieve metrics, using the provided `RegistryService`
674/// to gather all registered metric families. The metrics are then encoded to a
675/// text format for easy consumption by monitoring systems. If successful, it
676/// returns the metrics string with an `OK` status. If an error occurs during
677/// encoding, it returns an `INTERNAL_SERVER_ERROR` status along with an error
678/// message. Returns a tuple containing the status code and either the metrics
679/// data or an error description.
680pub async fn metrics(
681    Extension(registry_service): Extension<RegistryService>,
682) -> (StatusCode, String) {
683    let metrics_families = registry_service.gather_all();
684    match TextEncoder.encode_to_string(&metrics_families) {
685        Ok(metrics) => (StatusCode::OK, metrics),
686        Err(error) => (
687            StatusCode::INTERNAL_SERVER_ERROR,
688            format!("unable to encode metrics: {error}"),
689        ),
690    }
691}
692
693#[cfg(test)]
694mod tests {
695    use prometheus::{IntCounter, Registry};
696
697    use crate::RegistryService;
698
699    #[test]
700    fn registry_service() {
701        // GIVEN
702        let default_registry = Registry::new_custom(Some("default".to_string()), None).unwrap();
703
704        let registry_service = RegistryService::new(default_registry.clone());
705        let default_counter = IntCounter::new("counter", "counter_desc").unwrap();
706        default_counter.inc();
707        default_registry
708            .register(Box::new(default_counter))
709            .unwrap();
710
711        // AND add a metric to the default registry
712
713        // AND a registry with one metric
714        let registry_1 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
715        registry_1
716            .register(Box::new(
717                IntCounter::new("counter_1", "counter_1_desc").unwrap(),
718            ))
719            .unwrap();
720
721        // WHEN
722        let registry_1_id = registry_service.add(registry_1);
723
724        // THEN
725        let mut metrics = registry_service.gather_all();
726        metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
727
728        assert_eq!(metrics.len(), 2);
729
730        let metric_default = metrics.remove(0);
731        assert_eq!(metric_default.name(), "default_counter");
732        assert_eq!(metric_default.help(), "counter_desc");
733
734        let metric_1: prometheus::proto::MetricFamily = metrics.remove(0);
735        assert_eq!(metric_1.name(), "iota_counter_1");
736        assert_eq!(metric_1.help(), "counter_1_desc");
737
738        // AND add a second registry with a metric
739        let registry_2 = Registry::new_custom(Some("iota".to_string()), None).unwrap();
740        registry_2
741            .register(Box::new(
742                IntCounter::new("counter_2", "counter_2_desc").unwrap(),
743            ))
744            .unwrap();
745        let _registry_2_id = registry_service.add(registry_2);
746
747        // THEN all the metrics should be returned
748        let mut metrics = registry_service.gather_all();
749        metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
750
751        assert_eq!(metrics.len(), 3);
752
753        let metric_default = metrics.remove(0);
754        assert_eq!(metric_default.name(), "default_counter");
755        assert_eq!(metric_default.help(), "counter_desc");
756
757        let metric_1 = metrics.remove(0);
758        assert_eq!(metric_1.name(), "iota_counter_1");
759        assert_eq!(metric_1.help(), "counter_1_desc");
760
761        let metric_2 = metrics.remove(0);
762        assert_eq!(metric_2.name(), "iota_counter_2");
763        assert_eq!(metric_2.help(), "counter_2_desc");
764
765        // AND remove first registry
766        assert!(registry_service.remove(registry_1_id));
767
768        // THEN metrics should now not contain metric of registry_1
769        let mut metrics = registry_service.gather_all();
770        metrics.sort_by(|m1, m2| Ord::cmp(m1.name(), m2.name()));
771
772        assert_eq!(metrics.len(), 2);
773
774        let metric_default = metrics.remove(0);
775        assert_eq!(metric_default.name(), "default_counter");
776        assert_eq!(metric_default.help(), "counter_desc");
777
778        let metric_1 = metrics.remove(0);
779        assert_eq!(metric_1.name(), "iota_counter_2");
780        assert_eq!(metric_1.help(), "counter_2_desc");
781    }
782}