telemetry_subscribers/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2025 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4#![cfg_attr(all(feature = "flamegraph-alloc", nightly), feature(thread_local))]
5
6use std::{
7    env,
8    io::{Write, stderr},
9    path::PathBuf,
10    str::FromStr,
11    sync::{Arc, Mutex, atomic::Ordering},
12    time::Duration,
13};
14
15use atomic_float::AtomicF64;
16use crossterm::tty::IsTty;
17use once_cell::sync::Lazy;
18use opentelemetry::{
19    Context, KeyValue,
20    trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
21};
22use opentelemetry_otlp::WithExportConfig;
23use opentelemetry_sdk::{
24    Resource, runtime,
25    trace::{BatchSpanProcessor, Sampler, ShouldSample, TracerProvider},
26};
27use span_latency_prom::PrometheusSpanLatencyLayer;
28use thiserror::Error;
29use tracing::{Level, error, metadata::LevelFilter};
30use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
31use tracing_subscriber::{EnvFilter, Layer, Registry, filter, fmt, layer::SubscriberExt, reload};
32
33use crate::file_exporter::{CachedOpenFile, FileExporter};
34
35mod file_exporter;
36pub mod flamegraph;
37pub mod span_latency_prom;
38pub use flamegraph::FlameSub;
39
40/// Alias for a type-erased error type.
41pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
42
43#[derive(Debug, Error)]
44pub enum TelemetryError {
45    #[error("OTLP protocol not enabled in the node's configuration")]
46    TracingDisabled,
47
48    #[error("{0}")]
49    Other(#[from] BoxError),
50}
51
52/// Configuration for different logging/tracing options
53#[derive(Default, Clone, Debug)]
54pub struct TelemetryConfig {
55    /// Enables export of tracing span data via OTLP. Can be viewed with
56    /// grafana/tempo. Enabled if `TRACE_FILTER` env var is provided.
57    pub enable_otlp_tracing: bool,
58    /// Enables Tokio Console debugging on port 6669.
59    /// Enabled if `TOKIO_CONSOLE` env var is provided.
60    pub tokio_console: bool,
61    /// Output JSON logs to stdout only.
62    /// Enabled if `RUST_LOG_JSON` env var is provided.
63    pub json_log_output: bool,
64    /// If defined, write output to a file starting with this name, ex app.log.
65    /// Provided by `RUST_LOG_FILE` env var.
66    pub log_file: Option<String>,
67    /// Log level to set ("error/warn/info/debug/trace"), defaults to "info".
68    /// Provided by `RUST_LOG` env var.
69    pub log_string: Option<String>,
70    /// Span level - what level of spans should be created.  Note this is not
71    /// same as logging level If set to None, then defaults to "info".
72    /// Provided by `TOKIO_SPAN_LEVEL` env var.
73    pub span_level: Option<Level>,
74    /// Set a panic hook.
75    pub panic_hook: bool,
76    /// Crash on panic.
77    /// Enabled if `CRASH_ON_PANIC` env var is provided.
78    pub crash_on_panic: bool,
79    /// Optional Prometheus registry - if present, all enabled span latencies
80    /// are measured.
81    pub prom_registry: Option<prometheus::Registry>,
82    /// Sample rate for tracing spans, that will be used in the
83    /// "TraceIdRatioBased" sampler. Values rate>=1 - always sample, rate<0
84    /// never sample, rate<1 - sample rate with rate probability,
85    /// e.g. for 0.5 there is 50% chance that trace will be sampled.
86    /// Provided by `SAMPLE_RATE` env var.
87    pub sample_rate: f64,
88    /// Add directive to include trace logs with provided target.
89    pub trace_target: Option<Vec<String>>,
90    /// Enable flamegraph tracing.
91    pub enable_flamegraph: bool,
92}
93
94#[must_use]
95pub struct TelemetryGuards {
96    #[expect(unused)]
97    worker_guard: WorkerGuard,
98    #[expect(unused)]
99    provider: Option<TracerProvider>,
100}
101
102impl TelemetryGuards {
103    fn new(
104        config: TelemetryConfig,
105        worker_guard: WorkerGuard,
106        provider: Option<TracerProvider>,
107    ) -> Self {
108        set_global_telemetry_config(config);
109        Self {
110            worker_guard,
111            provider,
112        }
113    }
114}
115
116impl Drop for TelemetryGuards {
117    fn drop(&mut self) {
118        clear_global_telemetry_config();
119    }
120}
121
122#[derive(Clone, Debug)]
123pub struct FilterHandle(reload::Handle<EnvFilter, Registry>);
124
125impl FilterHandle {
126    pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
127        let filter = EnvFilter::try_new(directives)?;
128        self.0.reload(filter)?;
129        Ok(())
130    }
131
132    pub fn get(&self) -> Result<String, BoxError> {
133        self.0
134            .with_current(|filter| filter.to_string())
135            .map_err(Into::into)
136    }
137}
138
139pub struct TracingHandle {
140    log: FilterHandle,
141    trace: Option<FilterHandle>,
142    file_output: CachedOpenFile,
143    sampler: SamplingFilter,
144    flamegraph: Option<FlameSub>,
145}
146
147impl TracingHandle {
148    pub fn update_log<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
149        self.log.update(directives)
150    }
151
152    pub fn get_log(&self) -> Result<String, BoxError> {
153        self.log.get()
154    }
155
156    pub fn update_sampling_rate(&self, sample_rate: f64) {
157        self.sampler.update_sampling_rate(sample_rate);
158    }
159
160    pub fn update_trace_file<S: AsRef<str>>(&self, trace_file: S) -> Result<(), BoxError> {
161        let trace_path = PathBuf::from_str(trace_file.as_ref())?;
162        self.file_output.update_path(trace_path)?;
163        Ok(())
164    }
165
166    pub fn update_trace_filter<S: AsRef<str>>(
167        &self,
168        directives: S,
169        duration: Duration,
170    ) -> Result<(), TelemetryError> {
171        if let Some(trace) = &self.trace {
172            trace.update(directives)?;
173            // after duration is elapsed, reset to the env setting
174            let trace = trace.clone();
175            let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
176            tokio::spawn(async move {
177                tokio::time::sleep(duration).await;
178                if let Err(e) = trace.update(trace_filter_env) {
179                    error!("failed to reset trace filter: {}", e);
180                }
181            });
182            Ok(())
183        } else {
184            Err(TelemetryError::TracingDisabled)
185        }
186    }
187
188    pub fn clear_file_output(&self) {
189        self.file_output.clear_path();
190    }
191
192    pub fn reset_trace(&self) -> Result<(), TelemetryError> {
193        if let Some(trace) = &self.trace {
194            let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
195            trace.update(trace_filter_env).map_err(|e| e.into())
196        } else {
197            Err(TelemetryError::TracingDisabled)
198        }
199    }
200
201    pub fn get_flamegraph(&self) -> Option<FlameSub> {
202        self.flamegraph.clone()
203    }
204}
205
206fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
207    if let Some(logfile_prefix) = log_file {
208        let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
209        tracing_appender::non_blocking(file_appender)
210    } else {
211        tracing_appender::non_blocking(stderr())
212    }
213}
214
215// NOTE: this function is copied from tracing's panic_hook example
216fn set_panic_hook(crash_on_panic: bool) {
217    let default_panic_handler = std::panic::take_hook();
218
219    // Set a panic hook that records the panic as a `tracing` event at the
220    // `ERROR` verbosity level.
221    //
222    // If we are currently in a span when the panic occurred, the logged event
223    // will include the current span, allowing the context in which the panic
224    // occurred to be recorded.
225    std::panic::set_hook(Box::new(move |panic| {
226        // If the panic has a source location, record it as structured fields.
227        if let Some(location) = panic.location() {
228            // On nightly Rust, where the `PanicInfo` type also exposes a
229            // `message()` method returning just the message, we could record
230            // just the message instead of the entire `fmt::Display`
231            // implementation, avoiding the duplicated location
232            tracing::error!(
233                message = %panic,
234                panic.file = location.file(),
235                panic.line = location.line(),
236                panic.column = location.column(),
237            );
238        } else {
239            tracing::error!(message = %panic);
240        }
241
242        default_panic_handler(panic);
243
244        // We're panicking so we can't do anything about the flush failing
245        let _ = std::io::stderr().flush();
246        let _ = std::io::stdout().flush();
247
248        if crash_on_panic {
249            // Kill the process
250            std::process::exit(12);
251        }
252    }));
253}
254
255static GLOBAL_CONFIG: Lazy<Arc<Mutex<Option<TelemetryConfig>>>> =
256    Lazy::new(|| Arc::new(Mutex::new(None)));
257
258fn set_global_telemetry_config(config: TelemetryConfig) {
259    let mut global_config = GLOBAL_CONFIG.lock().unwrap();
260    assert!(global_config.is_none());
261    *global_config = Some(config);
262}
263
264fn clear_global_telemetry_config() {
265    let mut global_config = GLOBAL_CONFIG.lock().unwrap();
266    *global_config = None;
267}
268
269pub fn get_global_telemetry_config() -> Option<TelemetryConfig> {
270    let global_config = GLOBAL_CONFIG.lock().unwrap();
271    global_config.clone()
272}
273
274impl TelemetryConfig {
275    pub fn new() -> Self {
276        Self {
277            enable_otlp_tracing: false,
278            tokio_console: false,
279            json_log_output: false,
280            log_file: None,
281            log_string: None,
282            span_level: None,
283            panic_hook: true,
284            crash_on_panic: false,
285            prom_registry: None,
286            sample_rate: 1.0,
287            trace_target: None,
288            enable_flamegraph: false,
289        }
290    }
291
292    pub fn with_json(mut self) -> Self {
293        self.json_log_output = true;
294        self
295    }
296
297    pub fn with_log_level(mut self, log_string: &str) -> Self {
298        self.log_string = Some(log_string.to_owned());
299        self
300    }
301
302    pub fn with_span_level(mut self, span_level: Level) -> Self {
303        self.span_level = Some(span_level);
304        self
305    }
306
307    pub fn with_log_file(mut self, filename: &str) -> Self {
308        self.log_file = Some(filename.to_owned());
309        self
310    }
311
312    pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
313        self.prom_registry = Some(registry.clone());
314        self
315    }
316
317    pub fn with_sample_rate(mut self, rate: f64) -> Self {
318        self.sample_rate = rate;
319        self
320    }
321
322    pub fn with_trace_target(mut self, target: &str) -> Self {
323        match self.trace_target {
324            Some(ref mut v) => v.push(target.to_owned()),
325            None => self.trace_target = Some(vec![target.to_owned()]),
326        };
327
328        self
329    }
330
331    pub fn with_flamegraph(mut self) -> Self {
332        self.enable_flamegraph = true;
333        self
334    }
335
336    pub fn with_env(mut self) -> Self {
337        if env::var("CRASH_ON_PANIC").is_ok() {
338            self.crash_on_panic = true
339        }
340
341        if env::var("TRACE_FILTER").is_ok() {
342            self.enable_otlp_tracing = true
343        }
344
345        if env::var("RUST_LOG_JSON").is_ok() {
346            self.json_log_output = true;
347        }
348
349        if env::var("TOKIO_CONSOLE").is_ok() {
350            self.tokio_console = true;
351        }
352
353        if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
354            self.span_level =
355                Some(Level::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
356        }
357
358        if let Ok(filepath) = env::var("RUST_LOG_FILE") {
359            self.log_file = Some(filepath);
360        }
361
362        if let Ok(sample_rate) = env::var("SAMPLE_RATE") {
363            self.sample_rate = sample_rate.parse().expect("Cannot parse SAMPLE_RATE");
364        }
365
366        if env::var("TRACE_FLAMEGRAPH").is_ok() {
367            self.enable_flamegraph = true
368        }
369
370        self
371    }
372
373    pub fn init(self) -> (TelemetryGuards, TracingHandle) {
374        let config = self;
375        let config_clone = config.clone();
376
377        // Setup an EnvFilter for filtering logging output layers.
378        // NOTE: we don't want to use this to filter all layers.  That causes problems
379        // for layers with different filtering needs, including
380        // tokio-console/console-subscriber, and it also doesn't fit with the
381        // span creation needs for distributed tracing and other span-based tools.
382        let mut directives = config.log_string.unwrap_or_else(|| "info".into());
383        if let Some(targets) = config.trace_target {
384            for target in targets {
385                directives.push_str(&format!(",{target}=trace"));
386            }
387        }
388        let env_filter =
389            EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(directives));
390        let (log_filter, reload_handle) = reload::Layer::new(env_filter);
391        let log_filter_handle = FilterHandle(reload_handle);
392
393        // Separate span level filter.
394        // This is a dumb filter for now - allows all spans that are below a given
395        // level. TODO: implement a sampling filter
396        let span_level = config.span_level.unwrap_or(Level::INFO);
397        let span_filter = filter::filter_fn(move |metadata| {
398            metadata.is_span() && *metadata.level() <= span_level
399        });
400
401        let mut layers = Vec::new();
402
403        // tokio-console layer
404        // Please see https://docs.rs/console-subscriber/latest/console_subscriber/struct.Builder.html#configuration
405        // for environment vars/config options
406        if config.tokio_console {
407            layers.push(console_subscriber::spawn().boxed());
408        }
409
410        if let Some(registry) = config.prom_registry {
411            let span_lat_layer = PrometheusSpanLatencyLayer::try_new(&registry, 15)
412                .expect("Could not initialize span latency layer");
413            layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
414        }
415
416        let mut trace_filter_handle = None;
417        let mut file_output = CachedOpenFile::new::<&str>(None).unwrap();
418        let mut provider = None;
419        let sampler = SamplingFilter::new(config.sample_rate);
420        let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("iota-node".to_owned());
421
422        if config.enable_otlp_tracing {
423            let trace_file = env::var("TRACE_FILE").ok();
424
425            let mut otel_kv_vec = vec![opentelemetry::KeyValue::new(
426                "service.name",
427                service_name.clone(),
428            )];
429            if let Ok(namespace) = env::var("NAMESPACE") {
430                otel_kv_vec.push(opentelemetry::KeyValue::new("service.namespace", namespace));
431            }
432            if let Ok(hostname) = env::var("HOSTNAME") {
433                otel_kv_vec.push(opentelemetry::KeyValue::new("host", hostname));
434            }
435            if let Ok(network) = env::var("NETWORK") {
436                otel_kv_vec.push(opentelemetry::KeyValue::new("network", network));
437            }
438
439            let resource = Resource::new(otel_kv_vec);
440            let sampler = Sampler::ParentBased(Box::new(sampler.clone()));
441
442            // We can either do file output or OTLP, but not both. tracing-opentelemetry
443            // only supports a single tracer at a time.
444            let telemetry = if let Some(trace_file) = trace_file {
445                let exporter =
446                    FileExporter::new(Some(trace_file.into())).expect("Failed to create exporter");
447                file_output = exporter.cached_open_file.clone();
448                let processor =
449                    BatchSpanProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio)
450                        .build();
451
452                let p = TracerProvider::builder()
453                    .with_resource(resource)
454                    .with_sampler(sampler)
455                    .with_span_processor(processor)
456                    .build();
457
458                let tracer = p.tracer(service_name);
459                provider = Some(p);
460
461                tracing_opentelemetry::layer().with_tracer(tracer)
462            } else {
463                let endpoint = env::var("OTLP_ENDPOINT")
464                    .unwrap_or_else(|_| "http://localhost:4317".to_string());
465
466                let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
467                    .with_tonic()
468                    .with_endpoint(endpoint)
469                    .build()
470                    .unwrap();
471                let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
472                    .with_resource(resource)
473                    .with_sampler(sampler)
474                    .with_batch_exporter(otlp_exporter, runtime::Tokio)
475                    .build();
476                let tracer = tracer_provider.tracer(service_name);
477
478                tracing_opentelemetry::layer().with_tracer(tracer)
479            };
480
481            // Enable Trace Contexts for tying spans together
482            opentelemetry::global::set_text_map_propagator(
483                opentelemetry_sdk::propagation::TraceContextPropagator::new(),
484            );
485
486            let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
487            let (trace_env_filter, reload_handle) = reload::Layer::new(trace_env_filter);
488            trace_filter_handle = Some(FilterHandle(reload_handle));
489
490            layers.push(telemetry.with_filter(trace_env_filter).boxed());
491        }
492
493        let (nb_output, worker_guard) = get_output(config.log_file.clone());
494        if config.json_log_output {
495            // Output to file or to stderr in a newline-delimited JSON format
496            let json_layer = fmt::layer()
497                .with_file(true)
498                .with_line_number(true)
499                .json()
500                .with_writer(nb_output)
501                .with_filter(log_filter)
502                .boxed();
503            layers.push(json_layer);
504        } else {
505            // Output to file or to stderr with ANSI colors
506            let fmt_layer = fmt::layer()
507                .with_ansi(config.log_file.is_none() && stderr().is_tty())
508                .with_writer(nb_output)
509                .with_filter(log_filter)
510                .boxed();
511            layers.push(fmt_layer);
512        }
513
514        let mut flamegraph = None;
515        if config.enable_flamegraph {
516            let flamesub = FlameSub::new();
517            flamegraph = Some(flamesub.clone());
518            layers.push(flamesub.boxed());
519        }
520
521        let subscriber = tracing_subscriber::registry().with(layers);
522        ::tracing::subscriber::set_global_default(subscriber)
523            .expect("unable to initialize tracing subscriber");
524
525        if config.panic_hook {
526            set_panic_hook(config.crash_on_panic);
527        }
528
529        // The guard must be returned and kept in the main fn of the app, as when it's
530        // dropped then the output gets flushed and closed. If this is dropped
531        // too early then no output will appear!
532        let guards = TelemetryGuards::new(config_clone, worker_guard, provider);
533
534        (
535            guards,
536            TracingHandle {
537                log: log_filter_handle,
538                trace: trace_filter_handle,
539                file_output,
540                sampler,
541                flamegraph,
542            },
543        )
544    }
545}
546
547// Like Sampler::TraceIdRatioBased, but can be updated at runtime
548#[derive(Debug, Clone)]
549struct SamplingFilter {
550    // Sampling filter needs to be fast, so we avoid a mutex.
551    sample_rate: Arc<AtomicF64>,
552}
553
554impl SamplingFilter {
555    fn new(sample_rate: f64) -> Self {
556        SamplingFilter {
557            sample_rate: Arc::new(AtomicF64::new(Self::clamp(sample_rate))),
558        }
559    }
560
561    fn clamp(sample_rate: f64) -> f64 {
562        // clamp sample rate to between 0.0001 and 1.0
563        sample_rate.clamp(0.0001, 1.0)
564    }
565
566    fn update_sampling_rate(&self, sample_rate: f64) {
567        // clamp sample rate to between 0.0001 and 1.0
568        let sample_rate = Self::clamp(sample_rate);
569        self.sample_rate.store(sample_rate, Ordering::Relaxed);
570    }
571}
572
573impl ShouldSample for SamplingFilter {
574    fn should_sample(
575        &self,
576        parent_context: Option<&Context>,
577        trace_id: TraceId,
578        name: &str,
579        span_kind: &SpanKind,
580        attributes: &[KeyValue],
581        links: &[Link],
582    ) -> SamplingResult {
583        let sample_rate = self.sample_rate.load(Ordering::Relaxed);
584        let sampler = Sampler::TraceIdRatioBased(sample_rate);
585
586        sampler.should_sample(parent_context, trace_id, name, span_kind, attributes, links)
587    }
588}
589
590/// Globally set a tracing subscriber suitable for testing environments
591pub fn init_for_testing() {
592    static LOGGER: Lazy<()> = Lazy::new(|| {
593        let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
594            .with_env_filter(
595                EnvFilter::builder()
596                    .with_default_directive(LevelFilter::INFO.into())
597                    .from_env_lossy(),
598            )
599            .with_file(true)
600            .with_line_number(true)
601            .with_test_writer()
602            .finish();
603        ::tracing::subscriber::set_global_default(subscriber)
604            .expect("unable to initialize logging for tests");
605    });
606
607    Lazy::force(&LOGGER);
608}
609
610#[cfg(test)]
611mod tests {
612    use std::time::Duration;
613
614    use prometheus::proto::MetricType;
615    use tracing::{debug, debug_span, info, trace_span, warn};
616
617    use super::*;
618
619    #[test]
620    #[should_panic]
621    fn test_telemetry_init() {
622        let registry = prometheus::Registry::new();
623        // Default logging level is INFO, but here we set the span level to DEBUG.
624        // TRACE spans should be ignored.
625        let config = TelemetryConfig::new()
626            .with_span_level(Level::DEBUG)
627            .with_prom_registry(&registry);
628        let _guard = config.init();
629
630        info!(a = 1, "This will be INFO.");
631        // Spans are debug level or below, so they won't be printed out either.  However
632        // latencies should be recorded for at least one span
633        debug_span!("yo span yo").in_scope(|| {
634            // This debug log will not print out, log level set to INFO by default
635            debug!(a = 2, "This will be DEBUG.");
636            std::thread::sleep(Duration::from_millis(100));
637            warn!(a = 3, "This will be WARNING.");
638        });
639
640        // This span won't be enabled
641        trace_span!("this span should not be created").in_scope(|| {
642            info!("This log appears, but surrounding span is not created");
643            std::thread::sleep(Duration::from_millis(100));
644        });
645
646        let metrics = registry.gather();
647        // There should be 1 metricFamily and 1 metric
648        assert_eq!(metrics.len(), 1);
649        assert_eq!(metrics[0].name(), "tracing_span_latencies");
650        assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
651        let inner = metrics[0].get_metric();
652        assert_eq!(inner.len(), 1);
653        let labels = inner[0].get_label();
654        assert_eq!(labels.len(), 1);
655        assert_eq!(labels[0].name(), "span_name");
656        assert_eq!(labels[0].value(), "yo span yo");
657
658        panic!("This should cause error logs to be printed out!");
659    }
660
661    // Both the following tests should be able to "race" to initialize logging
662    // without causing a panic
663    #[test]
664    fn testing_logger_1() {
665        init_for_testing();
666    }
667
668    #[test]
669    fn testing_logger_2() {
670        init_for_testing();
671    }
672}