Skip to main content

telemetry_subscribers/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2025 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4#![cfg_attr(all(feature = "flamegraph-alloc", nightly), feature(thread_local))]
5
6use std::{
7    env,
8    io::{Write, stderr},
9    path::PathBuf,
10    str::FromStr,
11    sync::{Arc, Mutex, atomic::Ordering},
12    time::Duration,
13};
14
15use atomic_float::AtomicF64;
16use crossterm::tty::IsTty;
17use once_cell::sync::Lazy;
18use opentelemetry::{
19    Context, KeyValue,
20    trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
21};
22use opentelemetry_otlp::WithExportConfig;
23use opentelemetry_sdk::{
24    Resource, runtime,
25    trace::{BatchSpanProcessor, Sampler, ShouldSample, TracerProvider},
26};
27use span_latency_prom::PrometheusSpanLatencyLayer;
28use thiserror::Error;
29use tracing::{Level, error, metadata::LevelFilter};
30use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
31use tracing_subscriber::{EnvFilter, Layer, Registry, filter, fmt, layer::SubscriberExt, reload};
32
33use crate::file_exporter::{CachedOpenFile, FileExporter};
34
35mod file_exporter;
36pub mod flamegraph;
37pub mod span_latency_prom;
38pub use flamegraph::FlameSub;
39
40/// Global filter layer that rejects callsites above the configured levels at
41/// registration time.
42///
43/// Without this, per-layer filtering causes the Registry to return
44/// `Interest::always()` for every callsite. Trace-level `#[instrument]` spans
45/// would still be allocated in the slab, have their per-layer filters walked,
46/// and be immediately discarded — all at significant cost. The same applies to
47/// events below the env filter threshold.
48///
49/// By rejecting high-verbosity callsites globally, `Span::new` short-circuits
50/// to `Span::none()` and event macros short-circuit before formatting
51/// arguments.
52struct GlobalLevelFilter {
53    max_span_level: LevelFilter,
54    /// The maximum event level any layer will accept. Seeded from the
55    /// EnvFilter's `max_level_hint` and updated via a shared atomic when the
56    /// filter is reloaded.
57    max_event_level: Arc<std::sync::atomic::AtomicU8>,
58}
59
60impl GlobalLevelFilter {
61    fn load_max_event_level(&self) -> LevelFilter {
62        level_filter_from_u8(self.max_event_level.load(Ordering::Relaxed))
63    }
64
65    fn exceeds_max_level(&self, metadata: &tracing::Metadata<'_>) -> bool {
66        if metadata.is_span() {
67            LevelFilter::from_level(*metadata.level()) > self.max_span_level
68        } else {
69            let max = self.load_max_event_level();
70            !max.eq(&LevelFilter::OFF) && LevelFilter::from_level(*metadata.level()) > max
71        }
72    }
73}
74
75fn level_filter_to_u8(lf: LevelFilter) -> u8 {
76    match lf {
77        LevelFilter::OFF => 0,
78        LevelFilter::ERROR => 1,
79        LevelFilter::WARN => 2,
80        LevelFilter::INFO => 3,
81        LevelFilter::DEBUG => 4,
82        LevelFilter::TRACE => 5,
83    }
84}
85
86fn level_filter_from_u8(v: u8) -> LevelFilter {
87    match v {
88        0 => LevelFilter::OFF,
89        1 => LevelFilter::ERROR,
90        2 => LevelFilter::WARN,
91        3 => LevelFilter::INFO,
92        4 => LevelFilter::DEBUG,
93        _ => LevelFilter::TRACE,
94    }
95}
96
97impl<S: tracing::Subscriber> Layer<S> for GlobalLevelFilter {
98    fn register_callsite(
99        &self,
100        metadata: &'static tracing::Metadata<'static>,
101    ) -> tracing::subscriber::Interest {
102        if self.exceeds_max_level(metadata) {
103            tracing::subscriber::Interest::never()
104        } else {
105            // Return always() for passing callsites. The final interest will be
106            // determined by the Registry (which returns sometimes() when
107            // per-layer filters are present). We intentionally do NOT override
108            // enabled() — doing so would add an extra check to every enabled()
109            // walk, which the per-layer filters already handle. For dynamic env
110            // filter reloads, rebuild_interest_cache() re-calls
111            // register_callsite with our updated atomic max_event_level.
112            tracing::subscriber::Interest::always()
113        }
114    }
115
116    fn max_level_hint(&self) -> Option<LevelFilter> {
117        let event = self.load_max_event_level();
118        Some(std::cmp::max(self.max_span_level, event))
119    }
120}
121
122/// Alias for a type-erased error type.
123pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
124
125#[derive(Debug, Error)]
126pub enum TelemetryError {
127    #[error("OTLP protocol not enabled in the node's configuration")]
128    TracingDisabled,
129
130    #[error("{0}")]
131    Other(#[from] BoxError),
132}
133
134/// Configuration for different logging/tracing options
135#[derive(Default, Clone, Debug)]
136pub struct TelemetryConfig {
137    /// Enables export of tracing span data via OTLP. Can be viewed with
138    /// grafana/tempo. Enabled if `TRACE_FILTER` env var is provided.
139    pub enable_otlp_tracing: bool,
140    /// Enables Tokio Console debugging on port 6669.
141    /// Enabled if `TOKIO_CONSOLE` env var is provided.
142    pub tokio_console: bool,
143    /// Output JSON logs to stdout only.
144    /// Enabled if `RUST_LOG_JSON` env var is provided.
145    pub json_log_output: bool,
146    /// If defined, write output to a file starting with this name, ex app.log.
147    /// Provided by `RUST_LOG_FILE` env var.
148    pub log_file: Option<String>,
149    /// Log level to set ("error/warn/info/debug/trace"), defaults to "info".
150    /// Provided by `RUST_LOG` env var.
151    pub log_string: Option<String>,
152    /// Span level - what level of spans should be created. Note this is not
153    /// same as logging level. If set to None, then defaults to "info". Use
154    /// `LevelFilter::OFF` to disable all spans.
155    /// Provided by `TOKIO_SPAN_LEVEL` env var (supports "off").
156    pub span_level: Option<LevelFilter>,
157    /// Set a panic hook.
158    pub panic_hook: bool,
159    /// Crash on panic.
160    /// Enabled if `CRASH_ON_PANIC` env var is provided.
161    pub crash_on_panic: bool,
162    /// Optional Prometheus registry - if present, all enabled span latencies
163    /// are measured.
164    pub prom_registry: Option<prometheus::Registry>,
165    /// Disable the `PrometheusSpanLatencyLayer` even when a `prom_registry`
166    /// is set. The layer is a known tracing hotspot in node production
167    /// configs that don't read span-latency metrics.
168    pub disable_span_latency: bool,
169    /// Sample rate for tracing spans, that will be used in the
170    /// "TraceIdRatioBased" sampler. Values rate>=1 - always sample, rate<0
171    /// never sample, rate<1 - sample rate with rate probability,
172    /// e.g. for 0.5 there is 50% chance that trace will be sampled.
173    /// Provided by `SAMPLE_RATE` env var.
174    pub sample_rate: f64,
175    /// Add directive to include trace logs with provided target.
176    pub trace_target: Option<Vec<String>>,
177    /// Enable flamegraph tracing.
178    pub enable_flamegraph: bool,
179}
180
181#[must_use]
182pub struct TelemetryGuards {
183    #[expect(unused)]
184    worker_guard: WorkerGuard,
185    #[expect(unused)]
186    provider: Option<TracerProvider>,
187}
188
189impl TelemetryGuards {
190    fn new(
191        config: TelemetryConfig,
192        worker_guard: WorkerGuard,
193        provider: Option<TracerProvider>,
194    ) -> Self {
195        set_global_telemetry_config(config);
196        Self {
197            worker_guard,
198            provider,
199        }
200    }
201}
202
203impl Drop for TelemetryGuards {
204    fn drop(&mut self) {
205        clear_global_telemetry_config();
206    }
207}
208
209#[derive(Clone, Debug)]
210pub struct FilterHandle {
211    reload: reload::Handle<EnvFilter, Registry>,
212    /// Shared with `GlobalLevelFilter` so that reloading the env filter also
213    /// updates the global event-level gate.
214    max_event_level: Option<Arc<std::sync::atomic::AtomicU8>>,
215}
216
217impl FilterHandle {
218    pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
219        let filter = EnvFilter::try_new(directives)?;
220        if let Some(ref max_level) = self.max_event_level {
221            let hint = filter.max_level_hint().unwrap_or(LevelFilter::TRACE);
222            max_level.store(level_filter_to_u8(hint), Ordering::Relaxed);
223        }
224        self.reload.reload(filter)?;
225        Ok(())
226    }
227
228    pub fn get(&self) -> Result<String, BoxError> {
229        self.reload
230            .with_current(|filter| filter.to_string())
231            .map_err(Into::into)
232    }
233}
234
235pub struct TracingHandle {
236    log: FilterHandle,
237    trace: Option<FilterHandle>,
238    file_output: CachedOpenFile,
239    sampler: SamplingFilter,
240    flamegraph: Option<FlameSub>,
241}
242
243impl TracingHandle {
244    pub fn update_log<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
245        self.log.update(directives)
246    }
247
248    pub fn get_log(&self) -> Result<String, BoxError> {
249        self.log.get()
250    }
251
252    pub fn update_sampling_rate(&self, sample_rate: f64) {
253        self.sampler.update_sampling_rate(sample_rate);
254    }
255
256    pub fn update_trace_file<S: AsRef<str>>(&self, trace_file: S) -> Result<(), BoxError> {
257        let trace_path = PathBuf::from_str(trace_file.as_ref())?;
258        self.file_output.update_path(trace_path)?;
259        Ok(())
260    }
261
262    pub fn update_trace_filter<S: AsRef<str>>(
263        &self,
264        directives: S,
265        duration: Duration,
266    ) -> Result<(), TelemetryError> {
267        if let Some(trace) = &self.trace {
268            trace.update(directives)?;
269            // after duration is elapsed, reset to the env setting
270            let trace = trace.clone();
271            let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
272            tokio::spawn(async move {
273                tokio::time::sleep(duration).await;
274                if let Err(e) = trace.update(trace_filter_env) {
275                    error!("failed to reset trace filter: {}", e);
276                }
277            });
278            Ok(())
279        } else {
280            Err(TelemetryError::TracingDisabled)
281        }
282    }
283
284    pub fn clear_file_output(&self) {
285        self.file_output.clear_path();
286    }
287
288    pub fn reset_trace(&self) -> Result<(), TelemetryError> {
289        if let Some(trace) = &self.trace {
290            let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
291            trace.update(trace_filter_env).map_err(|e| e.into())
292        } else {
293            Err(TelemetryError::TracingDisabled)
294        }
295    }
296
297    pub fn get_flamegraph(&self) -> Option<FlameSub> {
298        self.flamegraph.clone()
299    }
300}
301
302fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
303    if let Some(logfile_prefix) = log_file {
304        let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
305        tracing_appender::non_blocking(file_appender)
306    } else {
307        tracing_appender::non_blocking(stderr())
308    }
309}
310
311// NOTE: this function is copied from tracing's panic_hook example
312fn set_panic_hook(crash_on_panic: bool) {
313    let default_panic_handler = std::panic::take_hook();
314
315    // Set a panic hook that records the panic as a `tracing` event at the
316    // `ERROR` verbosity level.
317    //
318    // If we are currently in a span when the panic occurred, the logged event
319    // will include the current span, allowing the context in which the panic
320    // occurred to be recorded.
321    std::panic::set_hook(Box::new(move |panic| {
322        // If the panic has a source location, record it as structured fields.
323        if let Some(location) = panic.location() {
324            // On nightly Rust, where the `PanicInfo` type also exposes a
325            // `message()` method returning just the message, we could record
326            // just the message instead of the entire `fmt::Display`
327            // implementation, avoiding the duplicated location
328            tracing::error!(
329                message = %panic,
330                panic.file = location.file(),
331                panic.line = location.line(),
332                panic.column = location.column(),
333            );
334        } else {
335            tracing::error!(message = %panic);
336        }
337
338        default_panic_handler(panic);
339
340        // We're panicking so we can't do anything about the flush failing
341        let _ = std::io::stderr().flush();
342        let _ = std::io::stdout().flush();
343
344        if crash_on_panic {
345            // Kill the process
346            std::process::exit(12);
347        }
348    }));
349}
350
351static GLOBAL_CONFIG: Lazy<Arc<Mutex<Option<TelemetryConfig>>>> =
352    Lazy::new(|| Arc::new(Mutex::new(None)));
353
354fn set_global_telemetry_config(config: TelemetryConfig) {
355    let mut global_config = GLOBAL_CONFIG.lock().unwrap();
356    assert!(global_config.is_none());
357    *global_config = Some(config);
358}
359
360fn clear_global_telemetry_config() {
361    let mut global_config = GLOBAL_CONFIG.lock().unwrap();
362    *global_config = None;
363}
364
365pub fn get_global_telemetry_config() -> Option<TelemetryConfig> {
366    let global_config = GLOBAL_CONFIG.lock().unwrap();
367    global_config.clone()
368}
369
370impl TelemetryConfig {
371    pub fn new() -> Self {
372        Self {
373            enable_otlp_tracing: false,
374            tokio_console: false,
375            json_log_output: false,
376            log_file: None,
377            log_string: None,
378            span_level: None,
379            panic_hook: true,
380            crash_on_panic: false,
381            prom_registry: None,
382            disable_span_latency: false,
383            sample_rate: 1.0,
384            trace_target: None,
385            enable_flamegraph: false,
386        }
387    }
388
389    pub fn with_json(mut self) -> Self {
390        self.json_log_output = true;
391        self
392    }
393
394    pub fn with_log_level(mut self, log_string: &str) -> Self {
395        self.log_string = Some(log_string.to_owned());
396        self
397    }
398
399    pub fn with_span_level(mut self, span_level: Level) -> Self {
400        self.span_level = Some(LevelFilter::from_level(span_level));
401        self
402    }
403
404    /// Disable the `PrometheusSpanLatencyLayer` even when a `prom_registry`
405    /// is set. Useful for production configs that don't read span-latency
406    /// metrics; the layer is a known tracing hotspot.
407    pub fn with_disable_span_latency(mut self, disable: bool) -> Self {
408        self.disable_span_latency = disable;
409        self
410    }
411
412    pub fn with_log_file(mut self, filename: &str) -> Self {
413        self.log_file = Some(filename.to_owned());
414        self
415    }
416
417    pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
418        self.prom_registry = Some(registry.clone());
419        self
420    }
421
422    pub fn with_sample_rate(mut self, rate: f64) -> Self {
423        self.sample_rate = rate;
424        self
425    }
426
427    pub fn with_trace_target(mut self, target: &str) -> Self {
428        match self.trace_target {
429            Some(ref mut v) => v.push(target.to_owned()),
430            None => self.trace_target = Some(vec![target.to_owned()]),
431        };
432
433        self
434    }
435
436    pub fn with_flamegraph(mut self) -> Self {
437        self.enable_flamegraph = true;
438        self
439    }
440
441    pub fn with_env(mut self) -> Self {
442        if env::var("CRASH_ON_PANIC").is_ok() {
443            self.crash_on_panic = true
444        }
445
446        if env::var("TRACE_FILTER").is_ok() {
447            self.enable_otlp_tracing = true
448        }
449
450        if env::var("RUST_LOG_JSON").is_ok() {
451            self.json_log_output = true;
452        }
453
454        if env::var("TOKIO_CONSOLE").is_ok() {
455            self.tokio_console = true;
456        }
457
458        if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
459            self.span_level =
460                Some(LevelFilter::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
461        }
462
463        if let Ok(filepath) = env::var("RUST_LOG_FILE") {
464            self.log_file = Some(filepath);
465        }
466
467        if let Ok(sample_rate) = env::var("SAMPLE_RATE") {
468            self.sample_rate = sample_rate.parse().expect("Cannot parse SAMPLE_RATE");
469        }
470
471        if env::var("TRACE_FLAMEGRAPH").is_ok() {
472            self.enable_flamegraph = true
473        }
474
475        self
476    }
477
478    pub fn init(self) -> (TelemetryGuards, TracingHandle) {
479        let config = self;
480        let config_clone = config.clone();
481
482        // Setup an EnvFilter for filtering logging output layers.
483        // NOTE: we don't want to use this to filter all layers.  That causes problems
484        // for layers with different filtering needs, including
485        // tokio-console/console-subscriber, and it also doesn't fit with the
486        // span creation needs for distributed tracing and other span-based tools.
487        let mut directives = config.log_string.unwrap_or_else(|| "info".into());
488        if let Some(targets) = config.trace_target {
489            for target in targets {
490                directives.push_str(&format!(",{target}=trace"));
491            }
492        }
493        let env_filter =
494            EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(directives));
495        let max_event_level_seed = env_filter.max_level_hint().unwrap_or(LevelFilter::TRACE);
496        let max_event_level = Arc::new(std::sync::atomic::AtomicU8::new(level_filter_to_u8(
497            max_event_level_seed,
498        )));
499        let (log_filter, reload_handle) = reload::Layer::new(env_filter);
500        let log_filter_handle = FilterHandle {
501            reload: reload_handle,
502            max_event_level: Some(max_event_level.clone()),
503        };
504
505        // Separate span level filter.
506        // This is a dumb filter for now - allows all spans that are below a given
507        // level. TODO: implement a sampling filter
508        let span_level = config.span_level.unwrap_or(LevelFilter::INFO);
509        let span_filter = filter::filter_fn(move |metadata| {
510            metadata.is_span() && LevelFilter::from_level(*metadata.level()) <= span_level
511        });
512
513        let mut layers = Vec::new();
514
515        // tokio-console layer
516        // Please see https://docs.rs/console-subscriber/latest/console_subscriber/struct.Builder.html#configuration
517        // for environment vars/config options
518        if config.tokio_console {
519            layers.push(console_subscriber::spawn().boxed());
520        }
521
522        if let Some(registry) = config.prom_registry {
523            if !config.disable_span_latency {
524                let span_lat_layer = PrometheusSpanLatencyLayer::try_new(&registry, 15)
525                    .expect("Could not initialize span latency layer");
526                layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
527            }
528        }
529
530        let mut trace_filter_handle = None;
531        let mut file_output = CachedOpenFile::new::<&str>(None).unwrap();
532        let mut provider = None;
533        let sampler = SamplingFilter::new(config.sample_rate);
534        let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("iota-node".to_owned());
535
536        if config.enable_otlp_tracing {
537            let trace_file = env::var("TRACE_FILE").ok();
538
539            let mut otel_kv_vec = vec![opentelemetry::KeyValue::new(
540                "service.name",
541                service_name.clone(),
542            )];
543            if let Ok(namespace) = env::var("NAMESPACE") {
544                otel_kv_vec.push(opentelemetry::KeyValue::new("service.namespace", namespace));
545            }
546            if let Ok(hostname) = env::var("HOSTNAME") {
547                otel_kv_vec.push(opentelemetry::KeyValue::new("host", hostname));
548            }
549            if let Ok(network) = env::var("NETWORK") {
550                otel_kv_vec.push(opentelemetry::KeyValue::new("network", network));
551            }
552
553            let resource = Resource::new(otel_kv_vec);
554            let sampler = Sampler::ParentBased(Box::new(sampler.clone()));
555
556            // We can either do file output or OTLP, but not both. tracing-opentelemetry
557            // only supports a single tracer at a time.
558            let telemetry = if let Some(trace_file) = trace_file {
559                let exporter =
560                    FileExporter::new(Some(trace_file.into())).expect("Failed to create exporter");
561                file_output = exporter.cached_open_file.clone();
562                let processor =
563                    BatchSpanProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio)
564                        .build();
565
566                let p = TracerProvider::builder()
567                    .with_resource(resource)
568                    .with_sampler(sampler)
569                    .with_span_processor(processor)
570                    .build();
571
572                let tracer = p.tracer(service_name);
573                provider = Some(p);
574
575                tracing_opentelemetry::layer().with_tracer(tracer)
576            } else {
577                let endpoint = env::var("OTLP_ENDPOINT")
578                    .unwrap_or_else(|_| "http://localhost:4317".to_string());
579
580                let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
581                    .with_tonic()
582                    .with_endpoint(endpoint)
583                    .build()
584                    .unwrap();
585                let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
586                    .with_resource(resource)
587                    .with_sampler(sampler)
588                    .with_batch_exporter(otlp_exporter, runtime::Tokio)
589                    .build();
590                let tracer = tracer_provider.tracer(service_name);
591
592                tracing_opentelemetry::layer().with_tracer(tracer)
593            };
594
595            // Enable Trace Contexts for tying spans together
596            opentelemetry::global::set_text_map_propagator(
597                opentelemetry_sdk::propagation::TraceContextPropagator::new(),
598            );
599
600            let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
601            let (trace_env_filter, reload_handle) = reload::Layer::new(trace_env_filter);
602            // The trace filter's verbosity is independent of the global event
603            // level gate (it routes to OTLP, not to the log subscriber), so we
604            // don't share its max-level hint with `GlobalLevelFilter`.
605            trace_filter_handle = Some(FilterHandle {
606                reload: reload_handle,
607                max_event_level: None,
608            });
609
610            layers.push(telemetry.with_filter(trace_env_filter).boxed());
611        }
612
613        let (nb_output, worker_guard) = get_output(config.log_file.clone());
614        if config.json_log_output {
615            // Output to file or to stderr in a newline-delimited JSON format
616            let json_layer = fmt::layer()
617                .with_file(true)
618                .with_line_number(true)
619                .json()
620                .with_writer(nb_output)
621                .with_filter(log_filter)
622                .boxed();
623            layers.push(json_layer);
624        } else {
625            // Output to file or to stderr with ANSI colors
626            let fmt_layer = fmt::layer()
627                .with_ansi(config.log_file.is_none() && stderr().is_tty())
628                .with_writer(nb_output)
629                .with_filter(log_filter)
630                .boxed();
631            layers.push(fmt_layer);
632        }
633
634        let mut flamegraph = None;
635        if config.enable_flamegraph {
636            let flamesub = FlameSub::new();
637            flamegraph = Some(flamesub.clone());
638            layers.push(flamesub.boxed());
639        }
640
641        // Global level filter: rejects span callsites above `span_level` and
642        // event callsites above the env filter's max level at registration
643        // time, preventing the Registry from dispatching callsites that every
644        // per-layer filter would immediately discard.
645        //
646        // Must be stacked on top of `layers` via `.with()` rather than pushed
647        // into the Vec. `Vec<Layer>::register_callsite` aggregates child
648        // interests as "most permissive wins", so a `never()` from this filter
649        // would be overridden by any layer (e.g. fmt+EnvFilter) returning
650        // `sometimes()`. As an outer `Layered`, the `Interest::and` combiner
651        // gives `never()` priority and the inner stack is short-circuited.
652        let global_filter = GlobalLevelFilter {
653            max_span_level: span_level,
654            max_event_level,
655        };
656        let subscriber = tracing_subscriber::registry()
657            .with(layers)
658            .with(global_filter);
659        ::tracing::subscriber::set_global_default(subscriber)
660            .expect("unable to initialize tracing subscriber");
661
662        if config.panic_hook {
663            set_panic_hook(config.crash_on_panic);
664        }
665
666        // The guard must be returned and kept in the main fn of the app, as when it's
667        // dropped then the output gets flushed and closed. If this is dropped
668        // too early then no output will appear!
669        let guards = TelemetryGuards::new(config_clone, worker_guard, provider);
670
671        (
672            guards,
673            TracingHandle {
674                log: log_filter_handle,
675                trace: trace_filter_handle,
676                file_output,
677                sampler,
678                flamegraph,
679            },
680        )
681    }
682}
683
684// Like Sampler::TraceIdRatioBased, but can be updated at runtime
685#[derive(Debug, Clone)]
686struct SamplingFilter {
687    // Sampling filter needs to be fast, so we avoid a mutex.
688    sample_rate: Arc<AtomicF64>,
689}
690
691impl SamplingFilter {
692    fn new(sample_rate: f64) -> Self {
693        SamplingFilter {
694            sample_rate: Arc::new(AtomicF64::new(Self::clamp(sample_rate))),
695        }
696    }
697
698    fn clamp(sample_rate: f64) -> f64 {
699        // clamp sample rate to between 0.0001 and 1.0
700        sample_rate.clamp(0.0001, 1.0)
701    }
702
703    fn update_sampling_rate(&self, sample_rate: f64) {
704        // clamp sample rate to between 0.0001 and 1.0
705        let sample_rate = Self::clamp(sample_rate);
706        self.sample_rate.store(sample_rate, Ordering::Relaxed);
707    }
708}
709
710impl ShouldSample for SamplingFilter {
711    fn should_sample(
712        &self,
713        parent_context: Option<&Context>,
714        trace_id: TraceId,
715        name: &str,
716        span_kind: &SpanKind,
717        attributes: &[KeyValue],
718        links: &[Link],
719    ) -> SamplingResult {
720        let sample_rate = self.sample_rate.load(Ordering::Relaxed);
721        let sampler = Sampler::TraceIdRatioBased(sample_rate);
722
723        sampler.should_sample(parent_context, trace_id, name, span_kind, attributes, links)
724    }
725}
726
727/// Globally set a tracing subscriber suitable for testing environments
728pub fn init_for_testing() {
729    static LOGGER: Lazy<()> = Lazy::new(|| {
730        let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
731            .with_env_filter(
732                EnvFilter::builder()
733                    .with_default_directive(LevelFilter::INFO.into())
734                    .from_env_lossy(),
735            )
736            .with_file(true)
737            .with_line_number(true)
738            .with_test_writer()
739            .finish();
740        ::tracing::subscriber::set_global_default(subscriber)
741            .expect("unable to initialize logging for tests");
742    });
743
744    Lazy::force(&LOGGER);
745}
746
747#[cfg(test)]
748mod tests {
749    use std::time::Duration;
750
751    use prometheus::proto::MetricType;
752    use tracing::{debug, debug_span, info, trace_span, warn};
753
754    use super::*;
755
756    #[test]
757    #[should_panic]
758    fn test_telemetry_init() {
759        let registry = prometheus::Registry::new();
760        // Default logging level is INFO, but here we set the span level to DEBUG.
761        // TRACE spans should be ignored.
762        let config = TelemetryConfig::new()
763            .with_span_level(Level::DEBUG)
764            .with_prom_registry(&registry);
765        let _guard = config.init();
766
767        info!(a = 1, "This will be INFO.");
768        // Spans are debug level or below, so they won't be printed out either.  However
769        // latencies should be recorded for at least one span
770        debug_span!("yo span yo").in_scope(|| {
771            // This debug log will not print out, log level set to INFO by default
772            debug!(a = 2, "This will be DEBUG.");
773            std::thread::sleep(Duration::from_millis(100));
774            warn!(a = 3, "This will be WARNING.");
775        });
776
777        // This span won't be enabled
778        trace_span!("this span should not be created").in_scope(|| {
779            info!("This log appears, but surrounding span is not created");
780            std::thread::sleep(Duration::from_millis(100));
781        });
782
783        let metrics = registry.gather();
784        // There should be 1 metricFamily and 1 metric
785        assert_eq!(metrics.len(), 1);
786        assert_eq!(metrics[0].name(), "tracing_span_latencies");
787        assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
788        let inner = metrics[0].get_metric();
789        assert_eq!(inner.len(), 1);
790        let labels = inner[0].get_label();
791        assert_eq!(labels.len(), 1);
792        assert_eq!(labels[0].name(), "span_name");
793        assert_eq!(labels[0].value(), "yo span yo");
794
795        panic!("This should cause error logs to be printed out!");
796    }
797
798    // Both the following tests should be able to "race" to initialize logging
799    // without causing a panic
800    #[test]
801    fn testing_logger_1() {
802        init_for_testing();
803    }
804
805    #[test]
806    fn testing_logger_2() {
807        init_for_testing();
808    }
809}