telemetry_subscribers/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    env,
7    io::{Write, stderr},
8    path::PathBuf,
9    str::FromStr,
10    sync::{Arc, Mutex, atomic::Ordering},
11    time::Duration,
12};
13
14use atomic_float::AtomicF64;
15use crossterm::tty::IsTty;
16use once_cell::sync::Lazy;
17use opentelemetry::{
18    Context, KeyValue,
19    trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
20};
21use opentelemetry_otlp::WithExportConfig;
22use opentelemetry_sdk::{
23    Resource, runtime,
24    trace::{BatchSpanProcessor, Sampler, ShouldSample, TracerProvider},
25};
26use span_latency_prom::PrometheusSpanLatencyLayer;
27use thiserror::Error;
28use tracing::{Level, error, metadata::LevelFilter};
29use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
30use tracing_subscriber::{EnvFilter, Layer, Registry, filter, fmt, layer::SubscriberExt, reload};
31
32use crate::file_exporter::{CachedOpenFile, FileExporter};
33
34mod file_exporter;
35pub mod span_latency_prom;
36
37/// Alias for a type-erased error type.
38pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
39
40#[derive(Debug, Error)]
41pub enum TelemetryError {
42    #[error("OTLP protocol not enabled in the node's configuration")]
43    TracingDisabled,
44
45    #[error("{0}")]
46    Other(#[from] BoxError),
47}
48
49/// Configuration for different logging/tracing options
50#[derive(Default, Clone, Debug)]
51pub struct TelemetryConfig {
52    /// Enables export of tracing span data via OTLP. Can be viewed with
53    /// grafana/tempo. Enabled if `TRACE_FILTER` env var is provided.
54    pub enable_otlp_tracing: bool,
55    /// Enables Tokio Console debugging on port 6669.
56    /// Enabled if `TOKIO_CONSOLE` env var is provided.
57    pub tokio_console: bool,
58    /// Output JSON logs to stdout only.
59    /// Enabled if `RUST_LOG_JSON` env var is provided.
60    pub json_log_output: bool,
61    /// If defined, write output to a file starting with this name, ex app.log.
62    /// Provided by `RUST_LOG_FILE` env var.
63    pub log_file: Option<String>,
64    /// Log level to set ("error/warn/info/debug/trace"), defaults to "info".
65    /// Provided by `RUST_LOG` env var.
66    pub log_string: Option<String>,
67    /// Span level - what level of spans should be created.  Note this is not
68    /// same as logging level If set to None, then defaults to "info".
69    /// Provided by `TOKIO_SPAN_LEVEL` env var.
70    pub span_level: Option<Level>,
71    /// Set a panic hook.
72    pub panic_hook: bool,
73    /// Crash on panic.
74    /// Enabled if `CRASH_ON_PANIC` env var is provided.
75    pub crash_on_panic: bool,
76    /// Optional Prometheus registry - if present, all enabled span latencies
77    /// are measured.
78    pub prom_registry: Option<prometheus::Registry>,
79    /// Sample rate for tracing spans, that will be used in the
80    /// "TraceIdRatioBased" sampler. Values rate>=1 - always sample, rate<0
81    /// never sample, rate<1 - sample rate with rate probability,
82    /// e.g. for 0.5 there is 50% chance that trace will be sampled.
83    /// Provided by `SAMPLE_RATE` env var.
84    pub sample_rate: f64,
85    /// Add directive to include trace logs with provided target.
86    pub trace_target: Option<Vec<String>>,
87}
88
89#[must_use]
90pub struct TelemetryGuards {
91    #[expect(unused)]
92    worker_guard: WorkerGuard,
93    #[expect(unused)]
94    provider: Option<TracerProvider>,
95}
96
97impl TelemetryGuards {
98    fn new(
99        config: TelemetryConfig,
100        worker_guard: WorkerGuard,
101        provider: Option<TracerProvider>,
102    ) -> Self {
103        set_global_telemetry_config(config);
104        Self {
105            worker_guard,
106            provider,
107        }
108    }
109}
110
111impl Drop for TelemetryGuards {
112    fn drop(&mut self) {
113        clear_global_telemetry_config();
114    }
115}
116
117#[derive(Clone, Debug)]
118pub struct FilterHandle(reload::Handle<EnvFilter, Registry>);
119
120impl FilterHandle {
121    pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
122        let filter = EnvFilter::try_new(directives)?;
123        self.0.reload(filter)?;
124        Ok(())
125    }
126
127    pub fn get(&self) -> Result<String, BoxError> {
128        self.0
129            .with_current(|filter| filter.to_string())
130            .map_err(Into::into)
131    }
132}
133
134pub struct TracingHandle {
135    log: FilterHandle,
136    trace: Option<FilterHandle>,
137    file_output: CachedOpenFile,
138    sampler: SamplingFilter,
139}
140
141impl TracingHandle {
142    pub fn update_log<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
143        self.log.update(directives)
144    }
145
146    pub fn get_log(&self) -> Result<String, BoxError> {
147        self.log.get()
148    }
149
150    pub fn update_sampling_rate(&self, sample_rate: f64) {
151        self.sampler.update_sampling_rate(sample_rate);
152    }
153
154    pub fn update_trace_file<S: AsRef<str>>(&self, trace_file: S) -> Result<(), BoxError> {
155        let trace_path = PathBuf::from_str(trace_file.as_ref())?;
156        self.file_output.update_path(trace_path)?;
157        Ok(())
158    }
159
160    pub fn update_trace_filter<S: AsRef<str>>(
161        &self,
162        directives: S,
163        duration: Duration,
164    ) -> Result<(), TelemetryError> {
165        if let Some(trace) = &self.trace {
166            trace.update(directives)?;
167            // after duration is elapsed, reset to the env setting
168            let trace = trace.clone();
169            let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
170            tokio::spawn(async move {
171                tokio::time::sleep(duration).await;
172                if let Err(e) = trace.update(trace_filter_env) {
173                    error!("failed to reset trace filter: {}", e);
174                }
175            });
176            Ok(())
177        } else {
178            Err(TelemetryError::TracingDisabled)
179        }
180    }
181
182    pub fn clear_file_output(&self) {
183        self.file_output.clear_path();
184    }
185
186    pub fn reset_trace(&self) -> Result<(), TelemetryError> {
187        if let Some(trace) = &self.trace {
188            let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
189            trace.update(trace_filter_env).map_err(|e| e.into())
190        } else {
191            Err(TelemetryError::TracingDisabled)
192        }
193    }
194}
195
196fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
197    if let Some(logfile_prefix) = log_file {
198        let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
199        tracing_appender::non_blocking(file_appender)
200    } else {
201        tracing_appender::non_blocking(stderr())
202    }
203}
204
205// NOTE: this function is copied from tracing's panic_hook example
206fn set_panic_hook(crash_on_panic: bool) {
207    let default_panic_handler = std::panic::take_hook();
208
209    // Set a panic hook that records the panic as a `tracing` event at the
210    // `ERROR` verbosity level.
211    //
212    // If we are currently in a span when the panic occurred, the logged event
213    // will include the current span, allowing the context in which the panic
214    // occurred to be recorded.
215    std::panic::set_hook(Box::new(move |panic| {
216        // If the panic has a source location, record it as structured fields.
217        if let Some(location) = panic.location() {
218            // On nightly Rust, where the `PanicInfo` type also exposes a
219            // `message()` method returning just the message, we could record
220            // just the message instead of the entire `fmt::Display`
221            // implementation, avoiding the duplicated location
222            tracing::error!(
223                message = %panic,
224                panic.file = location.file(),
225                panic.line = location.line(),
226                panic.column = location.column(),
227            );
228        } else {
229            tracing::error!(message = %panic);
230        }
231
232        default_panic_handler(panic);
233
234        // We're panicking so we can't do anything about the flush failing
235        let _ = std::io::stderr().flush();
236        let _ = std::io::stdout().flush();
237
238        if crash_on_panic {
239            // Kill the process
240            std::process::exit(12);
241        }
242    }));
243}
244
245static GLOBAL_CONFIG: Lazy<Arc<Mutex<Option<TelemetryConfig>>>> =
246    Lazy::new(|| Arc::new(Mutex::new(None)));
247
248fn set_global_telemetry_config(config: TelemetryConfig) {
249    let mut global_config = GLOBAL_CONFIG.lock().unwrap();
250    assert!(global_config.is_none());
251    *global_config = Some(config);
252}
253
254fn clear_global_telemetry_config() {
255    let mut global_config = GLOBAL_CONFIG.lock().unwrap();
256    *global_config = None;
257}
258
259pub fn get_global_telemetry_config() -> Option<TelemetryConfig> {
260    let global_config = GLOBAL_CONFIG.lock().unwrap();
261    global_config.clone()
262}
263
264impl TelemetryConfig {
265    pub fn new() -> Self {
266        Self {
267            enable_otlp_tracing: false,
268            tokio_console: false,
269            json_log_output: false,
270            log_file: None,
271            log_string: None,
272            span_level: None,
273            panic_hook: true,
274            crash_on_panic: false,
275            prom_registry: None,
276            sample_rate: 1.0,
277            trace_target: None,
278        }
279    }
280
281    pub fn with_json(mut self) -> Self {
282        self.json_log_output = true;
283        self
284    }
285
286    pub fn with_log_level(mut self, log_string: &str) -> Self {
287        self.log_string = Some(log_string.to_owned());
288        self
289    }
290
291    pub fn with_span_level(mut self, span_level: Level) -> Self {
292        self.span_level = Some(span_level);
293        self
294    }
295
296    pub fn with_log_file(mut self, filename: &str) -> Self {
297        self.log_file = Some(filename.to_owned());
298        self
299    }
300
301    pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
302        self.prom_registry = Some(registry.clone());
303        self
304    }
305
306    pub fn with_sample_rate(mut self, rate: f64) -> Self {
307        self.sample_rate = rate;
308        self
309    }
310
311    pub fn with_trace_target(mut self, target: &str) -> Self {
312        match self.trace_target {
313            Some(ref mut v) => v.push(target.to_owned()),
314            None => self.trace_target = Some(vec![target.to_owned()]),
315        };
316
317        self
318    }
319
320    pub fn with_env(mut self) -> Self {
321        if env::var("CRASH_ON_PANIC").is_ok() {
322            self.crash_on_panic = true
323        }
324
325        if env::var("TRACE_FILTER").is_ok() {
326            self.enable_otlp_tracing = true
327        }
328
329        if env::var("RUST_LOG_JSON").is_ok() {
330            self.json_log_output = true;
331        }
332
333        if env::var("TOKIO_CONSOLE").is_ok() {
334            self.tokio_console = true;
335        }
336
337        if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
338            self.span_level =
339                Some(Level::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
340        }
341
342        if let Ok(filepath) = env::var("RUST_LOG_FILE") {
343            self.log_file = Some(filepath);
344        }
345
346        if let Ok(sample_rate) = env::var("SAMPLE_RATE") {
347            self.sample_rate = sample_rate.parse().expect("Cannot parse SAMPLE_RATE");
348        }
349
350        self
351    }
352
353    pub fn init(self) -> (TelemetryGuards, TracingHandle) {
354        let config = self;
355        let config_clone = config.clone();
356
357        // Setup an EnvFilter for filtering logging output layers.
358        // NOTE: we don't want to use this to filter all layers.  That causes problems
359        // for layers with different filtering needs, including
360        // tokio-console/console-subscriber, and it also doesn't fit with the
361        // span creation needs for distributed tracing and other span-based tools.
362        let mut directives = config.log_string.unwrap_or_else(|| "info".into());
363        if let Some(targets) = config.trace_target {
364            for target in targets {
365                directives.push_str(&format!(",{}=trace", target));
366            }
367        }
368        let env_filter =
369            EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(directives));
370        let (log_filter, reload_handle) = reload::Layer::new(env_filter);
371        let log_filter_handle = FilterHandle(reload_handle);
372
373        // Separate span level filter.
374        // This is a dumb filter for now - allows all spans that are below a given
375        // level. TODO: implement a sampling filter
376        let span_level = config.span_level.unwrap_or(Level::INFO);
377        let span_filter = filter::filter_fn(move |metadata| {
378            metadata.is_span() && *metadata.level() <= span_level
379        });
380
381        let mut layers = Vec::new();
382
383        // tokio-console layer
384        // Please see https://docs.rs/console-subscriber/latest/console_subscriber/struct.Builder.html#configuration
385        // for environment vars/config options
386        #[cfg(feature = "tokio-console")]
387        if config.tokio_console {
388            layers.push(console_subscriber::spawn().boxed());
389        }
390
391        if let Some(registry) = config.prom_registry {
392            let span_lat_layer = PrometheusSpanLatencyLayer::try_new(&registry, 15)
393                .expect("Could not initialize span latency layer");
394            layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
395        }
396
397        let mut trace_filter_handle = None;
398        let mut file_output = CachedOpenFile::new::<&str>(None).unwrap();
399        let mut provider = None;
400        let sampler = SamplingFilter::new(config.sample_rate);
401        let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("iota-node".to_owned());
402
403        if config.enable_otlp_tracing {
404            let trace_file = env::var("TRACE_FILE").ok();
405
406            let resource = Resource::new(vec![opentelemetry::KeyValue::new(
407                "service.name",
408                service_name.clone(),
409            )]);
410            let sampler = Sampler::ParentBased(Box::new(sampler.clone()));
411
412            // We can either do file output or OTLP, but not both. tracing-opentelemetry
413            // only supports a single tracer at a time.
414            let telemetry = if let Some(trace_file) = trace_file {
415                let exporter =
416                    FileExporter::new(Some(trace_file.into())).expect("Failed to create exporter");
417                file_output = exporter.cached_open_file.clone();
418                let processor =
419                    BatchSpanProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio)
420                        .build();
421
422                let p = TracerProvider::builder()
423                    .with_resource(resource)
424                    .with_sampler(sampler)
425                    .with_span_processor(processor)
426                    .build();
427
428                let tracer = p.tracer(service_name);
429                provider = Some(p);
430
431                tracing_opentelemetry::layer().with_tracer(tracer)
432            } else {
433                let endpoint = env::var("OTLP_ENDPOINT")
434                    .unwrap_or_else(|_| "http://localhost:4317".to_string());
435
436                let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
437                    .with_tonic()
438                    .with_endpoint(endpoint)
439                    .build()
440                    .unwrap();
441                let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
442                    .with_resource(resource)
443                    .with_sampler(sampler)
444                    .with_batch_exporter(otlp_exporter, runtime::Tokio)
445                    .build();
446                let tracer = tracer_provider.tracer(service_name);
447
448                tracing_opentelemetry::layer().with_tracer(tracer)
449            };
450
451            // Enable Trace Contexts for tying spans together
452            opentelemetry::global::set_text_map_propagator(
453                opentelemetry_sdk::propagation::TraceContextPropagator::new(),
454            );
455
456            let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
457            let (trace_env_filter, reload_handle) = reload::Layer::new(trace_env_filter);
458            trace_filter_handle = Some(FilterHandle(reload_handle));
459
460            layers.push(telemetry.with_filter(trace_env_filter).boxed());
461        }
462
463        let (nb_output, worker_guard) = get_output(config.log_file.clone());
464        if config.json_log_output {
465            // Output to file or to stderr in a newline-delimited JSON format
466            let json_layer = fmt::layer()
467                .with_file(true)
468                .with_line_number(true)
469                .json()
470                .with_writer(nb_output)
471                .with_filter(log_filter)
472                .boxed();
473            layers.push(json_layer);
474        } else {
475            // Output to file or to stderr with ANSI colors
476            let fmt_layer = fmt::layer()
477                .with_ansi(config.log_file.is_none() && stderr().is_tty())
478                .with_writer(nb_output)
479                .with_filter(log_filter)
480                .boxed();
481            layers.push(fmt_layer);
482        }
483
484        let subscriber = tracing_subscriber::registry().with(layers);
485        ::tracing::subscriber::set_global_default(subscriber)
486            .expect("unable to initialize tracing subscriber");
487
488        if config.panic_hook {
489            set_panic_hook(config.crash_on_panic);
490        }
491
492        // The guard must be returned and kept in the main fn of the app, as when it's
493        // dropped then the output gets flushed and closed. If this is dropped
494        // too early then no output will appear!
495        let guards = TelemetryGuards::new(config_clone, worker_guard, provider);
496
497        (
498            guards,
499            TracingHandle {
500                log: log_filter_handle,
501                trace: trace_filter_handle,
502                file_output,
503                sampler,
504            },
505        )
506    }
507}
508
509// Like Sampler::TraceIdRatioBased, but can be updated at runtime
510#[derive(Debug, Clone)]
511struct SamplingFilter {
512    // Sampling filter needs to be fast, so we avoid a mutex.
513    sample_rate: Arc<AtomicF64>,
514}
515
516impl SamplingFilter {
517    fn new(sample_rate: f64) -> Self {
518        SamplingFilter {
519            sample_rate: Arc::new(AtomicF64::new(Self::clamp(sample_rate))),
520        }
521    }
522
523    fn clamp(sample_rate: f64) -> f64 {
524        // clamp sample rate to between 0.0001 and 1.0
525        sample_rate.clamp(0.0001, 1.0)
526    }
527
528    fn update_sampling_rate(&self, sample_rate: f64) {
529        // clamp sample rate to between 0.0001 and 1.0
530        let sample_rate = Self::clamp(sample_rate);
531        self.sample_rate.store(sample_rate, Ordering::Relaxed);
532    }
533}
534
535impl ShouldSample for SamplingFilter {
536    fn should_sample(
537        &self,
538        parent_context: Option<&Context>,
539        trace_id: TraceId,
540        name: &str,
541        span_kind: &SpanKind,
542        attributes: &[KeyValue],
543        links: &[Link],
544    ) -> SamplingResult {
545        let sample_rate = self.sample_rate.load(Ordering::Relaxed);
546        let sampler = Sampler::TraceIdRatioBased(sample_rate);
547
548        sampler.should_sample(parent_context, trace_id, name, span_kind, attributes, links)
549    }
550}
551
552/// Globally set a tracing subscriber suitable for testing environments
553pub fn init_for_testing() {
554    static LOGGER: Lazy<()> = Lazy::new(|| {
555        let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
556            .with_env_filter(
557                EnvFilter::builder()
558                    .with_default_directive(LevelFilter::INFO.into())
559                    .from_env_lossy(),
560            )
561            .with_file(true)
562            .with_line_number(true)
563            .with_test_writer()
564            .finish();
565        ::tracing::subscriber::set_global_default(subscriber)
566            .expect("unable to initialize logging for tests");
567    });
568
569    Lazy::force(&LOGGER);
570}
571
572#[cfg(test)]
573mod tests {
574    use std::time::Duration;
575
576    use prometheus::proto::MetricType;
577    use tracing::{debug, debug_span, info, trace_span, warn};
578
579    use super::*;
580
581    #[test]
582    #[should_panic]
583    fn test_telemetry_init() {
584        let registry = prometheus::Registry::new();
585        // Default logging level is INFO, but here we set the span level to DEBUG.
586        // TRACE spans should be ignored.
587        let config = TelemetryConfig::new()
588            .with_span_level(Level::DEBUG)
589            .with_prom_registry(&registry);
590        let _guard = config.init();
591
592        info!(a = 1, "This will be INFO.");
593        // Spans are debug level or below, so they won't be printed out either.  However
594        // latencies should be recorded for at least one span
595        debug_span!("yo span yo").in_scope(|| {
596            // This debug log will not print out, log level set to INFO by default
597            debug!(a = 2, "This will be DEBUG.");
598            std::thread::sleep(Duration::from_millis(100));
599            warn!(a = 3, "This will be WARNING.");
600        });
601
602        // This span won't be enabled
603        trace_span!("this span should not be created").in_scope(|| {
604            info!("This log appears, but surrounding span is not created");
605            std::thread::sleep(Duration::from_millis(100));
606        });
607
608        let metrics = registry.gather();
609        // There should be 1 metricFamily and 1 metric
610        assert_eq!(metrics.len(), 1);
611        assert_eq!(metrics[0].name(), "tracing_span_latencies");
612        assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
613        let inner = metrics[0].get_metric();
614        assert_eq!(inner.len(), 1);
615        let labels = inner[0].get_label();
616        assert_eq!(labels.len(), 1);
617        assert_eq!(labels[0].name(), "span_name");
618        assert_eq!(labels[0].value(), "yo span yo");
619
620        panic!("This should cause error logs to be printed out!");
621    }
622
623    // Both the following tests should be able to "race" to initialize logging
624    // without causing a panic
625    #[test]
626    fn testing_logger_1() {
627        init_for_testing();
628    }
629
630    #[test]
631    fn testing_logger_2() {
632        init_for_testing();
633    }
634}