1#![cfg_attr(all(feature = "flamegraph-alloc", nightly), feature(thread_local))]
5
6use std::{
7 env,
8 io::{Write, stderr},
9 path::PathBuf,
10 str::FromStr,
11 sync::{Arc, Mutex, atomic::Ordering},
12 time::Duration,
13};
14
15use atomic_float::AtomicF64;
16use crossterm::tty::IsTty;
17use once_cell::sync::Lazy;
18use opentelemetry::{
19 Context, KeyValue,
20 trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
21};
22use opentelemetry_otlp::WithExportConfig;
23use opentelemetry_sdk::{
24 Resource, runtime,
25 trace::{BatchSpanProcessor, Sampler, ShouldSample, TracerProvider},
26};
27use span_latency_prom::PrometheusSpanLatencyLayer;
28use thiserror::Error;
29use tracing::{Level, error, metadata::LevelFilter};
30use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
31use tracing_subscriber::{EnvFilter, Layer, Registry, filter, fmt, layer::SubscriberExt, reload};
32
33use crate::file_exporter::{CachedOpenFile, FileExporter};
34
35mod file_exporter;
36pub mod flamegraph;
37pub mod span_latency_prom;
38pub use flamegraph::FlameSub;
39
40pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
42
43#[derive(Debug, Error)]
44pub enum TelemetryError {
45 #[error("OTLP protocol not enabled in the node's configuration")]
46 TracingDisabled,
47
48 #[error("{0}")]
49 Other(#[from] BoxError),
50}
51
52#[derive(Default, Clone, Debug)]
54pub struct TelemetryConfig {
55 pub enable_otlp_tracing: bool,
58 pub tokio_console: bool,
61 pub json_log_output: bool,
64 pub log_file: Option<String>,
67 pub log_string: Option<String>,
70 pub span_level: Option<Level>,
74 pub panic_hook: bool,
76 pub crash_on_panic: bool,
79 pub prom_registry: Option<prometheus::Registry>,
82 pub sample_rate: f64,
88 pub trace_target: Option<Vec<String>>,
90 pub enable_flamegraph: bool,
92}
93
94#[must_use]
95pub struct TelemetryGuards {
96 #[expect(unused)]
97 worker_guard: WorkerGuard,
98 #[expect(unused)]
99 provider: Option<TracerProvider>,
100}
101
102impl TelemetryGuards {
103 fn new(
104 config: TelemetryConfig,
105 worker_guard: WorkerGuard,
106 provider: Option<TracerProvider>,
107 ) -> Self {
108 set_global_telemetry_config(config);
109 Self {
110 worker_guard,
111 provider,
112 }
113 }
114}
115
116impl Drop for TelemetryGuards {
117 fn drop(&mut self) {
118 clear_global_telemetry_config();
119 }
120}
121
122#[derive(Clone, Debug)]
123pub struct FilterHandle(reload::Handle<EnvFilter, Registry>);
124
125impl FilterHandle {
126 pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
127 let filter = EnvFilter::try_new(directives)?;
128 self.0.reload(filter)?;
129 Ok(())
130 }
131
132 pub fn get(&self) -> Result<String, BoxError> {
133 self.0
134 .with_current(|filter| filter.to_string())
135 .map_err(Into::into)
136 }
137}
138
139pub struct TracingHandle {
140 log: FilterHandle,
141 trace: Option<FilterHandle>,
142 file_output: CachedOpenFile,
143 sampler: SamplingFilter,
144 flamegraph: Option<FlameSub>,
145}
146
147impl TracingHandle {
148 pub fn update_log<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
149 self.log.update(directives)
150 }
151
152 pub fn get_log(&self) -> Result<String, BoxError> {
153 self.log.get()
154 }
155
156 pub fn update_sampling_rate(&self, sample_rate: f64) {
157 self.sampler.update_sampling_rate(sample_rate);
158 }
159
160 pub fn update_trace_file<S: AsRef<str>>(&self, trace_file: S) -> Result<(), BoxError> {
161 let trace_path = PathBuf::from_str(trace_file.as_ref())?;
162 self.file_output.update_path(trace_path)?;
163 Ok(())
164 }
165
166 pub fn update_trace_filter<S: AsRef<str>>(
167 &self,
168 directives: S,
169 duration: Duration,
170 ) -> Result<(), TelemetryError> {
171 if let Some(trace) = &self.trace {
172 trace.update(directives)?;
173 let trace = trace.clone();
175 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
176 tokio::spawn(async move {
177 tokio::time::sleep(duration).await;
178 if let Err(e) = trace.update(trace_filter_env) {
179 error!("failed to reset trace filter: {}", e);
180 }
181 });
182 Ok(())
183 } else {
184 Err(TelemetryError::TracingDisabled)
185 }
186 }
187
188 pub fn clear_file_output(&self) {
189 self.file_output.clear_path();
190 }
191
192 pub fn reset_trace(&self) -> Result<(), TelemetryError> {
193 if let Some(trace) = &self.trace {
194 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
195 trace.update(trace_filter_env).map_err(|e| e.into())
196 } else {
197 Err(TelemetryError::TracingDisabled)
198 }
199 }
200
201 pub fn get_flamegraph(&self) -> Option<FlameSub> {
202 self.flamegraph.clone()
203 }
204}
205
206fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
207 if let Some(logfile_prefix) = log_file {
208 let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
209 tracing_appender::non_blocking(file_appender)
210 } else {
211 tracing_appender::non_blocking(stderr())
212 }
213}
214
215fn set_panic_hook(crash_on_panic: bool) {
217 let default_panic_handler = std::panic::take_hook();
218
219 std::panic::set_hook(Box::new(move |panic| {
226 if let Some(location) = panic.location() {
228 tracing::error!(
233 message = %panic,
234 panic.file = location.file(),
235 panic.line = location.line(),
236 panic.column = location.column(),
237 );
238 } else {
239 tracing::error!(message = %panic);
240 }
241
242 default_panic_handler(panic);
243
244 let _ = std::io::stderr().flush();
246 let _ = std::io::stdout().flush();
247
248 if crash_on_panic {
249 std::process::exit(12);
251 }
252 }));
253}
254
255static GLOBAL_CONFIG: Lazy<Arc<Mutex<Option<TelemetryConfig>>>> =
256 Lazy::new(|| Arc::new(Mutex::new(None)));
257
258fn set_global_telemetry_config(config: TelemetryConfig) {
259 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
260 assert!(global_config.is_none());
261 *global_config = Some(config);
262}
263
264fn clear_global_telemetry_config() {
265 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
266 *global_config = None;
267}
268
269pub fn get_global_telemetry_config() -> Option<TelemetryConfig> {
270 let global_config = GLOBAL_CONFIG.lock().unwrap();
271 global_config.clone()
272}
273
274impl TelemetryConfig {
275 pub fn new() -> Self {
276 Self {
277 enable_otlp_tracing: false,
278 tokio_console: false,
279 json_log_output: false,
280 log_file: None,
281 log_string: None,
282 span_level: None,
283 panic_hook: true,
284 crash_on_panic: false,
285 prom_registry: None,
286 sample_rate: 1.0,
287 trace_target: None,
288 enable_flamegraph: false,
289 }
290 }
291
292 pub fn with_json(mut self) -> Self {
293 self.json_log_output = true;
294 self
295 }
296
297 pub fn with_log_level(mut self, log_string: &str) -> Self {
298 self.log_string = Some(log_string.to_owned());
299 self
300 }
301
302 pub fn with_span_level(mut self, span_level: Level) -> Self {
303 self.span_level = Some(span_level);
304 self
305 }
306
307 pub fn with_log_file(mut self, filename: &str) -> Self {
308 self.log_file = Some(filename.to_owned());
309 self
310 }
311
312 pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
313 self.prom_registry = Some(registry.clone());
314 self
315 }
316
317 pub fn with_sample_rate(mut self, rate: f64) -> Self {
318 self.sample_rate = rate;
319 self
320 }
321
322 pub fn with_trace_target(mut self, target: &str) -> Self {
323 match self.trace_target {
324 Some(ref mut v) => v.push(target.to_owned()),
325 None => self.trace_target = Some(vec![target.to_owned()]),
326 };
327
328 self
329 }
330
331 pub fn with_flamegraph(mut self) -> Self {
332 self.enable_flamegraph = true;
333 self
334 }
335
336 pub fn with_env(mut self) -> Self {
337 if env::var("CRASH_ON_PANIC").is_ok() {
338 self.crash_on_panic = true
339 }
340
341 if env::var("TRACE_FILTER").is_ok() {
342 self.enable_otlp_tracing = true
343 }
344
345 if env::var("RUST_LOG_JSON").is_ok() {
346 self.json_log_output = true;
347 }
348
349 if env::var("TOKIO_CONSOLE").is_ok() {
350 self.tokio_console = true;
351 }
352
353 if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
354 self.span_level =
355 Some(Level::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
356 }
357
358 if let Ok(filepath) = env::var("RUST_LOG_FILE") {
359 self.log_file = Some(filepath);
360 }
361
362 if let Ok(sample_rate) = env::var("SAMPLE_RATE") {
363 self.sample_rate = sample_rate.parse().expect("Cannot parse SAMPLE_RATE");
364 }
365
366 if env::var("TRACE_FLAMEGRAPH").is_ok() {
367 self.enable_flamegraph = true
368 }
369
370 self
371 }
372
373 pub fn init(self) -> (TelemetryGuards, TracingHandle) {
374 let config = self;
375 let config_clone = config.clone();
376
377 let mut directives = config.log_string.unwrap_or_else(|| "info".into());
383 if let Some(targets) = config.trace_target {
384 for target in targets {
385 directives.push_str(&format!(",{target}=trace"));
386 }
387 }
388 let env_filter =
389 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(directives));
390 let (log_filter, reload_handle) = reload::Layer::new(env_filter);
391 let log_filter_handle = FilterHandle(reload_handle);
392
393 let span_level = config.span_level.unwrap_or(Level::INFO);
397 let span_filter = filter::filter_fn(move |metadata| {
398 metadata.is_span() && *metadata.level() <= span_level
399 });
400
401 let mut layers = Vec::new();
402
403 if config.tokio_console {
407 layers.push(console_subscriber::spawn().boxed());
408 }
409
410 if let Some(registry) = config.prom_registry {
411 let span_lat_layer = PrometheusSpanLatencyLayer::try_new(®istry, 15)
412 .expect("Could not initialize span latency layer");
413 layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
414 }
415
416 let mut trace_filter_handle = None;
417 let mut file_output = CachedOpenFile::new::<&str>(None).unwrap();
418 let mut provider = None;
419 let sampler = SamplingFilter::new(config.sample_rate);
420 let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("iota-node".to_owned());
421
422 if config.enable_otlp_tracing {
423 let trace_file = env::var("TRACE_FILE").ok();
424
425 let resource = Resource::new(vec![opentelemetry::KeyValue::new(
426 "service.name",
427 service_name.clone(),
428 )]);
429 let sampler = Sampler::ParentBased(Box::new(sampler.clone()));
430
431 let telemetry = if let Some(trace_file) = trace_file {
434 let exporter =
435 FileExporter::new(Some(trace_file.into())).expect("Failed to create exporter");
436 file_output = exporter.cached_open_file.clone();
437 let processor =
438 BatchSpanProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio)
439 .build();
440
441 let p = TracerProvider::builder()
442 .with_resource(resource)
443 .with_sampler(sampler)
444 .with_span_processor(processor)
445 .build();
446
447 let tracer = p.tracer(service_name);
448 provider = Some(p);
449
450 tracing_opentelemetry::layer().with_tracer(tracer)
451 } else {
452 let endpoint = env::var("OTLP_ENDPOINT")
453 .unwrap_or_else(|_| "http://localhost:4317".to_string());
454
455 let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
456 .with_tonic()
457 .with_endpoint(endpoint)
458 .build()
459 .unwrap();
460 let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
461 .with_resource(resource)
462 .with_sampler(sampler)
463 .with_batch_exporter(otlp_exporter, runtime::Tokio)
464 .build();
465 let tracer = tracer_provider.tracer(service_name);
466
467 tracing_opentelemetry::layer().with_tracer(tracer)
468 };
469
470 opentelemetry::global::set_text_map_propagator(
472 opentelemetry_sdk::propagation::TraceContextPropagator::new(),
473 );
474
475 let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
476 let (trace_env_filter, reload_handle) = reload::Layer::new(trace_env_filter);
477 trace_filter_handle = Some(FilterHandle(reload_handle));
478
479 layers.push(telemetry.with_filter(trace_env_filter).boxed());
480 }
481
482 let (nb_output, worker_guard) = get_output(config.log_file.clone());
483 if config.json_log_output {
484 let json_layer = fmt::layer()
486 .with_file(true)
487 .with_line_number(true)
488 .json()
489 .with_writer(nb_output)
490 .with_filter(log_filter)
491 .boxed();
492 layers.push(json_layer);
493 } else {
494 let fmt_layer = fmt::layer()
496 .with_ansi(config.log_file.is_none() && stderr().is_tty())
497 .with_writer(nb_output)
498 .with_filter(log_filter)
499 .boxed();
500 layers.push(fmt_layer);
501 }
502
503 let mut flamegraph = None;
504 if config.enable_flamegraph {
505 let flamesub = FlameSub::new();
506 flamegraph = Some(flamesub.clone());
507 layers.push(flamesub.boxed());
508 }
509
510 let subscriber = tracing_subscriber::registry().with(layers);
511 ::tracing::subscriber::set_global_default(subscriber)
512 .expect("unable to initialize tracing subscriber");
513
514 if config.panic_hook {
515 set_panic_hook(config.crash_on_panic);
516 }
517
518 let guards = TelemetryGuards::new(config_clone, worker_guard, provider);
522
523 (
524 guards,
525 TracingHandle {
526 log: log_filter_handle,
527 trace: trace_filter_handle,
528 file_output,
529 sampler,
530 flamegraph,
531 },
532 )
533 }
534}
535
536#[derive(Debug, Clone)]
538struct SamplingFilter {
539 sample_rate: Arc<AtomicF64>,
541}
542
543impl SamplingFilter {
544 fn new(sample_rate: f64) -> Self {
545 SamplingFilter {
546 sample_rate: Arc::new(AtomicF64::new(Self::clamp(sample_rate))),
547 }
548 }
549
550 fn clamp(sample_rate: f64) -> f64 {
551 sample_rate.clamp(0.0001, 1.0)
553 }
554
555 fn update_sampling_rate(&self, sample_rate: f64) {
556 let sample_rate = Self::clamp(sample_rate);
558 self.sample_rate.store(sample_rate, Ordering::Relaxed);
559 }
560}
561
562impl ShouldSample for SamplingFilter {
563 fn should_sample(
564 &self,
565 parent_context: Option<&Context>,
566 trace_id: TraceId,
567 name: &str,
568 span_kind: &SpanKind,
569 attributes: &[KeyValue],
570 links: &[Link],
571 ) -> SamplingResult {
572 let sample_rate = self.sample_rate.load(Ordering::Relaxed);
573 let sampler = Sampler::TraceIdRatioBased(sample_rate);
574
575 sampler.should_sample(parent_context, trace_id, name, span_kind, attributes, links)
576 }
577}
578
579pub fn init_for_testing() {
581 static LOGGER: Lazy<()> = Lazy::new(|| {
582 let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
583 .with_env_filter(
584 EnvFilter::builder()
585 .with_default_directive(LevelFilter::INFO.into())
586 .from_env_lossy(),
587 )
588 .with_file(true)
589 .with_line_number(true)
590 .with_test_writer()
591 .finish();
592 ::tracing::subscriber::set_global_default(subscriber)
593 .expect("unable to initialize logging for tests");
594 });
595
596 Lazy::force(&LOGGER);
597}
598
599#[cfg(test)]
600mod tests {
601 use std::time::Duration;
602
603 use prometheus::proto::MetricType;
604 use tracing::{debug, debug_span, info, trace_span, warn};
605
606 use super::*;
607
608 #[test]
609 #[should_panic]
610 fn test_telemetry_init() {
611 let registry = prometheus::Registry::new();
612 let config = TelemetryConfig::new()
615 .with_span_level(Level::DEBUG)
616 .with_prom_registry(®istry);
617 let _guard = config.init();
618
619 info!(a = 1, "This will be INFO.");
620 debug_span!("yo span yo").in_scope(|| {
623 debug!(a = 2, "This will be DEBUG.");
625 std::thread::sleep(Duration::from_millis(100));
626 warn!(a = 3, "This will be WARNING.");
627 });
628
629 trace_span!("this span should not be created").in_scope(|| {
631 info!("This log appears, but surrounding span is not created");
632 std::thread::sleep(Duration::from_millis(100));
633 });
634
635 let metrics = registry.gather();
636 assert_eq!(metrics.len(), 1);
638 assert_eq!(metrics[0].name(), "tracing_span_latencies");
639 assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
640 let inner = metrics[0].get_metric();
641 assert_eq!(inner.len(), 1);
642 let labels = inner[0].get_label();
643 assert_eq!(labels.len(), 1);
644 assert_eq!(labels[0].name(), "span_name");
645 assert_eq!(labels[0].value(), "yo span yo");
646
647 panic!("This should cause error logs to be printed out!");
648 }
649
650 #[test]
653 fn testing_logger_1() {
654 init_for_testing();
655 }
656
657 #[test]
658 fn testing_logger_2() {
659 init_for_testing();
660 }
661}