1#![cfg_attr(all(feature = "flamegraph-alloc", nightly), feature(thread_local))]
5
6use std::{
7 env,
8 io::{Write, stderr},
9 path::PathBuf,
10 str::FromStr,
11 sync::{Arc, Mutex, atomic::Ordering},
12 time::Duration,
13};
14
15use atomic_float::AtomicF64;
16use crossterm::tty::IsTty;
17use once_cell::sync::Lazy;
18use opentelemetry::{
19 Context, KeyValue,
20 trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
21};
22use opentelemetry_otlp::WithExportConfig;
23use opentelemetry_sdk::{
24 Resource, runtime,
25 trace::{BatchSpanProcessor, Sampler, ShouldSample, TracerProvider},
26};
27use span_latency_prom::PrometheusSpanLatencyLayer;
28use thiserror::Error;
29use tracing::{Level, error, metadata::LevelFilter};
30use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
31use tracing_subscriber::{EnvFilter, Layer, Registry, filter, fmt, layer::SubscriberExt, reload};
32
33use crate::file_exporter::{CachedOpenFile, FileExporter};
34
35mod file_exporter;
36pub mod flamegraph;
37pub mod span_latency_prom;
38pub use flamegraph::FlameSub;
39
40struct GlobalLevelFilter {
53 max_span_level: LevelFilter,
54 max_event_level: Arc<std::sync::atomic::AtomicU8>,
58}
59
60impl GlobalLevelFilter {
61 fn load_max_event_level(&self) -> LevelFilter {
62 level_filter_from_u8(self.max_event_level.load(Ordering::Relaxed))
63 }
64
65 fn exceeds_max_level(&self, metadata: &tracing::Metadata<'_>) -> bool {
66 if metadata.is_span() {
67 LevelFilter::from_level(*metadata.level()) > self.max_span_level
68 } else {
69 let max = self.load_max_event_level();
70 !max.eq(&LevelFilter::OFF) && LevelFilter::from_level(*metadata.level()) > max
71 }
72 }
73}
74
75fn level_filter_to_u8(lf: LevelFilter) -> u8 {
76 match lf {
77 LevelFilter::OFF => 0,
78 LevelFilter::ERROR => 1,
79 LevelFilter::WARN => 2,
80 LevelFilter::INFO => 3,
81 LevelFilter::DEBUG => 4,
82 LevelFilter::TRACE => 5,
83 }
84}
85
86fn level_filter_from_u8(v: u8) -> LevelFilter {
87 match v {
88 0 => LevelFilter::OFF,
89 1 => LevelFilter::ERROR,
90 2 => LevelFilter::WARN,
91 3 => LevelFilter::INFO,
92 4 => LevelFilter::DEBUG,
93 _ => LevelFilter::TRACE,
94 }
95}
96
97impl<S: tracing::Subscriber> Layer<S> for GlobalLevelFilter {
98 fn register_callsite(
99 &self,
100 metadata: &'static tracing::Metadata<'static>,
101 ) -> tracing::subscriber::Interest {
102 if self.exceeds_max_level(metadata) {
103 tracing::subscriber::Interest::never()
104 } else {
105 tracing::subscriber::Interest::always()
113 }
114 }
115
116 fn max_level_hint(&self) -> Option<LevelFilter> {
117 let event = self.load_max_event_level();
118 Some(std::cmp::max(self.max_span_level, event))
119 }
120}
121
122pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
124
125#[derive(Debug, Error)]
126pub enum TelemetryError {
127 #[error("OTLP protocol not enabled in the node's configuration")]
128 TracingDisabled,
129
130 #[error("{0}")]
131 Other(#[from] BoxError),
132}
133
134#[derive(Default, Clone, Debug)]
136pub struct TelemetryConfig {
137 pub enable_otlp_tracing: bool,
140 pub tokio_console: bool,
143 pub json_log_output: bool,
146 pub log_file: Option<String>,
149 pub log_string: Option<String>,
152 pub span_level: Option<LevelFilter>,
157 pub panic_hook: bool,
159 pub crash_on_panic: bool,
162 pub prom_registry: Option<prometheus::Registry>,
165 pub disable_span_latency: bool,
169 pub sample_rate: f64,
175 pub trace_target: Option<Vec<String>>,
177 pub enable_flamegraph: bool,
179}
180
181#[must_use]
182pub struct TelemetryGuards {
183 #[expect(unused)]
184 worker_guard: WorkerGuard,
185 #[expect(unused)]
186 provider: Option<TracerProvider>,
187}
188
189impl TelemetryGuards {
190 fn new(
191 config: TelemetryConfig,
192 worker_guard: WorkerGuard,
193 provider: Option<TracerProvider>,
194 ) -> Self {
195 set_global_telemetry_config(config);
196 Self {
197 worker_guard,
198 provider,
199 }
200 }
201}
202
203impl Drop for TelemetryGuards {
204 fn drop(&mut self) {
205 clear_global_telemetry_config();
206 }
207}
208
209#[derive(Clone, Debug)]
210pub struct FilterHandle {
211 reload: reload::Handle<EnvFilter, Registry>,
212 max_event_level: Option<Arc<std::sync::atomic::AtomicU8>>,
215}
216
217impl FilterHandle {
218 pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
219 let filter = EnvFilter::try_new(directives)?;
220 if let Some(ref max_level) = self.max_event_level {
221 let hint = filter.max_level_hint().unwrap_or(LevelFilter::TRACE);
222 max_level.store(level_filter_to_u8(hint), Ordering::Relaxed);
223 }
224 self.reload.reload(filter)?;
225 Ok(())
226 }
227
228 pub fn get(&self) -> Result<String, BoxError> {
229 self.reload
230 .with_current(|filter| filter.to_string())
231 .map_err(Into::into)
232 }
233}
234
235pub struct TracingHandle {
236 log: FilterHandle,
237 trace: Option<FilterHandle>,
238 file_output: CachedOpenFile,
239 sampler: SamplingFilter,
240 flamegraph: Option<FlameSub>,
241}
242
243impl TracingHandle {
244 pub fn update_log<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
245 self.log.update(directives)
246 }
247
248 pub fn get_log(&self) -> Result<String, BoxError> {
249 self.log.get()
250 }
251
252 pub fn update_sampling_rate(&self, sample_rate: f64) {
253 self.sampler.update_sampling_rate(sample_rate);
254 }
255
256 pub fn update_trace_file<S: AsRef<str>>(&self, trace_file: S) -> Result<(), BoxError> {
257 let trace_path = PathBuf::from_str(trace_file.as_ref())?;
258 self.file_output.update_path(trace_path)?;
259 Ok(())
260 }
261
262 pub fn update_trace_filter<S: AsRef<str>>(
263 &self,
264 directives: S,
265 duration: Duration,
266 ) -> Result<(), TelemetryError> {
267 if let Some(trace) = &self.trace {
268 trace.update(directives)?;
269 let trace = trace.clone();
271 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
272 tokio::spawn(async move {
273 tokio::time::sleep(duration).await;
274 if let Err(e) = trace.update(trace_filter_env) {
275 error!("failed to reset trace filter: {}", e);
276 }
277 });
278 Ok(())
279 } else {
280 Err(TelemetryError::TracingDisabled)
281 }
282 }
283
284 pub fn clear_file_output(&self) {
285 self.file_output.clear_path();
286 }
287
288 pub fn reset_trace(&self) -> Result<(), TelemetryError> {
289 if let Some(trace) = &self.trace {
290 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
291 trace.update(trace_filter_env).map_err(|e| e.into())
292 } else {
293 Err(TelemetryError::TracingDisabled)
294 }
295 }
296
297 pub fn get_flamegraph(&self) -> Option<FlameSub> {
298 self.flamegraph.clone()
299 }
300}
301
302fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
303 if let Some(logfile_prefix) = log_file {
304 let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
305 tracing_appender::non_blocking(file_appender)
306 } else {
307 tracing_appender::non_blocking(stderr())
308 }
309}
310
311fn set_panic_hook(crash_on_panic: bool) {
313 let default_panic_handler = std::panic::take_hook();
314
315 std::panic::set_hook(Box::new(move |panic| {
322 if let Some(location) = panic.location() {
324 tracing::error!(
329 message = %panic,
330 panic.file = location.file(),
331 panic.line = location.line(),
332 panic.column = location.column(),
333 );
334 } else {
335 tracing::error!(message = %panic);
336 }
337
338 default_panic_handler(panic);
339
340 let _ = std::io::stderr().flush();
342 let _ = std::io::stdout().flush();
343
344 if crash_on_panic {
345 std::process::exit(12);
347 }
348 }));
349}
350
351static GLOBAL_CONFIG: Lazy<Arc<Mutex<Option<TelemetryConfig>>>> =
352 Lazy::new(|| Arc::new(Mutex::new(None)));
353
354fn set_global_telemetry_config(config: TelemetryConfig) {
355 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
356 assert!(global_config.is_none());
357 *global_config = Some(config);
358}
359
360fn clear_global_telemetry_config() {
361 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
362 *global_config = None;
363}
364
365pub fn get_global_telemetry_config() -> Option<TelemetryConfig> {
366 let global_config = GLOBAL_CONFIG.lock().unwrap();
367 global_config.clone()
368}
369
370impl TelemetryConfig {
371 pub fn new() -> Self {
372 Self {
373 enable_otlp_tracing: false,
374 tokio_console: false,
375 json_log_output: false,
376 log_file: None,
377 log_string: None,
378 span_level: None,
379 panic_hook: true,
380 crash_on_panic: false,
381 prom_registry: None,
382 disable_span_latency: false,
383 sample_rate: 1.0,
384 trace_target: None,
385 enable_flamegraph: false,
386 }
387 }
388
389 pub fn with_json(mut self) -> Self {
390 self.json_log_output = true;
391 self
392 }
393
394 pub fn with_log_level(mut self, log_string: &str) -> Self {
395 self.log_string = Some(log_string.to_owned());
396 self
397 }
398
399 pub fn with_span_level(mut self, span_level: Level) -> Self {
400 self.span_level = Some(LevelFilter::from_level(span_level));
401 self
402 }
403
404 pub fn with_disable_span_latency(mut self, disable: bool) -> Self {
408 self.disable_span_latency = disable;
409 self
410 }
411
412 pub fn with_log_file(mut self, filename: &str) -> Self {
413 self.log_file = Some(filename.to_owned());
414 self
415 }
416
417 pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
418 self.prom_registry = Some(registry.clone());
419 self
420 }
421
422 pub fn with_sample_rate(mut self, rate: f64) -> Self {
423 self.sample_rate = rate;
424 self
425 }
426
427 pub fn with_trace_target(mut self, target: &str) -> Self {
428 match self.trace_target {
429 Some(ref mut v) => v.push(target.to_owned()),
430 None => self.trace_target = Some(vec![target.to_owned()]),
431 };
432
433 self
434 }
435
436 pub fn with_flamegraph(mut self) -> Self {
437 self.enable_flamegraph = true;
438 self
439 }
440
441 pub fn with_env(mut self) -> Self {
442 if env::var("CRASH_ON_PANIC").is_ok() {
443 self.crash_on_panic = true
444 }
445
446 if env::var("TRACE_FILTER").is_ok() {
447 self.enable_otlp_tracing = true
448 }
449
450 if env::var("RUST_LOG_JSON").is_ok() {
451 self.json_log_output = true;
452 }
453
454 if env::var("TOKIO_CONSOLE").is_ok() {
455 self.tokio_console = true;
456 }
457
458 if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
459 self.span_level =
460 Some(LevelFilter::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
461 }
462
463 if let Ok(filepath) = env::var("RUST_LOG_FILE") {
464 self.log_file = Some(filepath);
465 }
466
467 if let Ok(sample_rate) = env::var("SAMPLE_RATE") {
468 self.sample_rate = sample_rate.parse().expect("Cannot parse SAMPLE_RATE");
469 }
470
471 if env::var("TRACE_FLAMEGRAPH").is_ok() {
472 self.enable_flamegraph = true
473 }
474
475 self
476 }
477
478 pub fn init(self) -> (TelemetryGuards, TracingHandle) {
479 let config = self;
480 let config_clone = config.clone();
481
482 let mut directives = config.log_string.unwrap_or_else(|| "info".into());
488 if let Some(targets) = config.trace_target {
489 for target in targets {
490 directives.push_str(&format!(",{target}=trace"));
491 }
492 }
493 let env_filter =
494 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(directives));
495 let max_event_level_seed = env_filter.max_level_hint().unwrap_or(LevelFilter::TRACE);
496 let max_event_level = Arc::new(std::sync::atomic::AtomicU8::new(level_filter_to_u8(
497 max_event_level_seed,
498 )));
499 let (log_filter, reload_handle) = reload::Layer::new(env_filter);
500 let log_filter_handle = FilterHandle {
501 reload: reload_handle,
502 max_event_level: Some(max_event_level.clone()),
503 };
504
505 let span_level = config.span_level.unwrap_or(LevelFilter::INFO);
509 let span_filter = filter::filter_fn(move |metadata| {
510 metadata.is_span() && LevelFilter::from_level(*metadata.level()) <= span_level
511 });
512
513 let mut layers = Vec::new();
514
515 if config.tokio_console {
519 layers.push(console_subscriber::spawn().boxed());
520 }
521
522 if let Some(registry) = config.prom_registry {
523 if !config.disable_span_latency {
524 let span_lat_layer = PrometheusSpanLatencyLayer::try_new(®istry, 15)
525 .expect("Could not initialize span latency layer");
526 layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
527 }
528 }
529
530 let mut trace_filter_handle = None;
531 let mut file_output = CachedOpenFile::new::<&str>(None).unwrap();
532 let mut provider = None;
533 let sampler = SamplingFilter::new(config.sample_rate);
534 let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("iota-node".to_owned());
535
536 if config.enable_otlp_tracing {
537 let trace_file = env::var("TRACE_FILE").ok();
538
539 let mut otel_kv_vec = vec![opentelemetry::KeyValue::new(
540 "service.name",
541 service_name.clone(),
542 )];
543 if let Ok(namespace) = env::var("NAMESPACE") {
544 otel_kv_vec.push(opentelemetry::KeyValue::new("service.namespace", namespace));
545 }
546 if let Ok(hostname) = env::var("HOSTNAME") {
547 otel_kv_vec.push(opentelemetry::KeyValue::new("host", hostname));
548 }
549 if let Ok(network) = env::var("NETWORK") {
550 otel_kv_vec.push(opentelemetry::KeyValue::new("network", network));
551 }
552
553 let resource = Resource::new(otel_kv_vec);
554 let sampler = Sampler::ParentBased(Box::new(sampler.clone()));
555
556 let telemetry = if let Some(trace_file) = trace_file {
559 let exporter =
560 FileExporter::new(Some(trace_file.into())).expect("Failed to create exporter");
561 file_output = exporter.cached_open_file.clone();
562 let processor =
563 BatchSpanProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio)
564 .build();
565
566 let p = TracerProvider::builder()
567 .with_resource(resource)
568 .with_sampler(sampler)
569 .with_span_processor(processor)
570 .build();
571
572 let tracer = p.tracer(service_name);
573 provider = Some(p);
574
575 tracing_opentelemetry::layer().with_tracer(tracer)
576 } else {
577 let endpoint = env::var("OTLP_ENDPOINT")
578 .unwrap_or_else(|_| "http://localhost:4317".to_string());
579
580 let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
581 .with_tonic()
582 .with_endpoint(endpoint)
583 .build()
584 .unwrap();
585 let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
586 .with_resource(resource)
587 .with_sampler(sampler)
588 .with_batch_exporter(otlp_exporter, runtime::Tokio)
589 .build();
590 let tracer = tracer_provider.tracer(service_name);
591
592 tracing_opentelemetry::layer().with_tracer(tracer)
593 };
594
595 opentelemetry::global::set_text_map_propagator(
597 opentelemetry_sdk::propagation::TraceContextPropagator::new(),
598 );
599
600 let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
601 let (trace_env_filter, reload_handle) = reload::Layer::new(trace_env_filter);
602 trace_filter_handle = Some(FilterHandle {
606 reload: reload_handle,
607 max_event_level: None,
608 });
609
610 layers.push(telemetry.with_filter(trace_env_filter).boxed());
611 }
612
613 let (nb_output, worker_guard) = get_output(config.log_file.clone());
614 if config.json_log_output {
615 let json_layer = fmt::layer()
617 .with_file(true)
618 .with_line_number(true)
619 .json()
620 .with_writer(nb_output)
621 .with_filter(log_filter)
622 .boxed();
623 layers.push(json_layer);
624 } else {
625 let fmt_layer = fmt::layer()
627 .with_ansi(config.log_file.is_none() && stderr().is_tty())
628 .with_writer(nb_output)
629 .with_filter(log_filter)
630 .boxed();
631 layers.push(fmt_layer);
632 }
633
634 let mut flamegraph = None;
635 if config.enable_flamegraph {
636 let flamesub = FlameSub::new();
637 flamegraph = Some(flamesub.clone());
638 layers.push(flamesub.boxed());
639 }
640
641 let global_filter = GlobalLevelFilter {
653 max_span_level: span_level,
654 max_event_level,
655 };
656 let subscriber = tracing_subscriber::registry()
657 .with(layers)
658 .with(global_filter);
659 ::tracing::subscriber::set_global_default(subscriber)
660 .expect("unable to initialize tracing subscriber");
661
662 if config.panic_hook {
663 set_panic_hook(config.crash_on_panic);
664 }
665
666 let guards = TelemetryGuards::new(config_clone, worker_guard, provider);
670
671 (
672 guards,
673 TracingHandle {
674 log: log_filter_handle,
675 trace: trace_filter_handle,
676 file_output,
677 sampler,
678 flamegraph,
679 },
680 )
681 }
682}
683
684#[derive(Debug, Clone)]
686struct SamplingFilter {
687 sample_rate: Arc<AtomicF64>,
689}
690
691impl SamplingFilter {
692 fn new(sample_rate: f64) -> Self {
693 SamplingFilter {
694 sample_rate: Arc::new(AtomicF64::new(Self::clamp(sample_rate))),
695 }
696 }
697
698 fn clamp(sample_rate: f64) -> f64 {
699 sample_rate.clamp(0.0001, 1.0)
701 }
702
703 fn update_sampling_rate(&self, sample_rate: f64) {
704 let sample_rate = Self::clamp(sample_rate);
706 self.sample_rate.store(sample_rate, Ordering::Relaxed);
707 }
708}
709
710impl ShouldSample for SamplingFilter {
711 fn should_sample(
712 &self,
713 parent_context: Option<&Context>,
714 trace_id: TraceId,
715 name: &str,
716 span_kind: &SpanKind,
717 attributes: &[KeyValue],
718 links: &[Link],
719 ) -> SamplingResult {
720 let sample_rate = self.sample_rate.load(Ordering::Relaxed);
721 let sampler = Sampler::TraceIdRatioBased(sample_rate);
722
723 sampler.should_sample(parent_context, trace_id, name, span_kind, attributes, links)
724 }
725}
726
727pub fn init_for_testing() {
729 static LOGGER: Lazy<()> = Lazy::new(|| {
730 let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
731 .with_env_filter(
732 EnvFilter::builder()
733 .with_default_directive(LevelFilter::INFO.into())
734 .from_env_lossy(),
735 )
736 .with_file(true)
737 .with_line_number(true)
738 .with_test_writer()
739 .finish();
740 ::tracing::subscriber::set_global_default(subscriber)
741 .expect("unable to initialize logging for tests");
742 });
743
744 Lazy::force(&LOGGER);
745}
746
747#[cfg(test)]
748mod tests {
749 use std::time::Duration;
750
751 use prometheus::proto::MetricType;
752 use tracing::{debug, debug_span, info, trace_span, warn};
753
754 use super::*;
755
756 #[test]
757 #[should_panic]
758 fn test_telemetry_init() {
759 let registry = prometheus::Registry::new();
760 let config = TelemetryConfig::new()
763 .with_span_level(Level::DEBUG)
764 .with_prom_registry(®istry);
765 let _guard = config.init();
766
767 info!(a = 1, "This will be INFO.");
768 debug_span!("yo span yo").in_scope(|| {
771 debug!(a = 2, "This will be DEBUG.");
773 std::thread::sleep(Duration::from_millis(100));
774 warn!(a = 3, "This will be WARNING.");
775 });
776
777 trace_span!("this span should not be created").in_scope(|| {
779 info!("This log appears, but surrounding span is not created");
780 std::thread::sleep(Duration::from_millis(100));
781 });
782
783 let metrics = registry.gather();
784 assert_eq!(metrics.len(), 1);
786 assert_eq!(metrics[0].name(), "tracing_span_latencies");
787 assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
788 let inner = metrics[0].get_metric();
789 assert_eq!(inner.len(), 1);
790 let labels = inner[0].get_label();
791 assert_eq!(labels.len(), 1);
792 assert_eq!(labels[0].name(), "span_name");
793 assert_eq!(labels[0].value(), "yo span yo");
794
795 panic!("This should cause error logs to be printed out!");
796 }
797
798 #[test]
801 fn testing_logger_1() {
802 init_for_testing();
803 }
804
805 #[test]
806 fn testing_logger_2() {
807 init_for_testing();
808 }
809}