1use std::{
6 env,
7 io::{Write, stderr},
8 path::PathBuf,
9 str::FromStr,
10 sync::{Arc, Mutex, atomic::Ordering},
11 time::Duration,
12};
13
14use atomic_float::AtomicF64;
15use crossterm::tty::IsTty;
16use once_cell::sync::Lazy;
17use opentelemetry::{
18 Context, KeyValue,
19 trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
20};
21use opentelemetry_otlp::WithExportConfig;
22use opentelemetry_sdk::{
23 Resource, runtime,
24 trace::{BatchSpanProcessor, Sampler, ShouldSample, TracerProvider},
25};
26use span_latency_prom::PrometheusSpanLatencyLayer;
27use thiserror::Error;
28use tracing::{Level, error, metadata::LevelFilter};
29use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
30use tracing_subscriber::{EnvFilter, Layer, Registry, filter, fmt, layer::SubscriberExt, reload};
31
32use crate::file_exporter::{CachedOpenFile, FileExporter};
33
34mod file_exporter;
35pub mod span_latency_prom;
36
37pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
39
40#[derive(Debug, Error)]
41pub enum TelemetryError {
42 #[error("OTLP protocol not enabled in the node's configuration")]
43 TracingDisabled,
44
45 #[error("{0}")]
46 Other(#[from] BoxError),
47}
48
49#[derive(Default, Clone, Debug)]
51pub struct TelemetryConfig {
52 pub enable_otlp_tracing: bool,
55 pub tokio_console: bool,
58 pub json_log_output: bool,
61 pub log_file: Option<String>,
64 pub log_string: Option<String>,
67 pub span_level: Option<Level>,
71 pub panic_hook: bool,
73 pub crash_on_panic: bool,
76 pub prom_registry: Option<prometheus::Registry>,
79 pub sample_rate: f64,
85 pub trace_target: Option<Vec<String>>,
87}
88
89#[must_use]
90pub struct TelemetryGuards {
91 #[expect(unused)]
92 worker_guard: WorkerGuard,
93 #[expect(unused)]
94 provider: Option<TracerProvider>,
95}
96
97impl TelemetryGuards {
98 fn new(
99 config: TelemetryConfig,
100 worker_guard: WorkerGuard,
101 provider: Option<TracerProvider>,
102 ) -> Self {
103 set_global_telemetry_config(config);
104 Self {
105 worker_guard,
106 provider,
107 }
108 }
109}
110
111impl Drop for TelemetryGuards {
112 fn drop(&mut self) {
113 clear_global_telemetry_config();
114 }
115}
116
117#[derive(Clone, Debug)]
118pub struct FilterHandle(reload::Handle<EnvFilter, Registry>);
119
120impl FilterHandle {
121 pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
122 let filter = EnvFilter::try_new(directives)?;
123 self.0.reload(filter)?;
124 Ok(())
125 }
126
127 pub fn get(&self) -> Result<String, BoxError> {
128 self.0
129 .with_current(|filter| filter.to_string())
130 .map_err(Into::into)
131 }
132}
133
134pub struct TracingHandle {
135 log: FilterHandle,
136 trace: Option<FilterHandle>,
137 file_output: CachedOpenFile,
138 sampler: SamplingFilter,
139}
140
141impl TracingHandle {
142 pub fn update_log<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
143 self.log.update(directives)
144 }
145
146 pub fn get_log(&self) -> Result<String, BoxError> {
147 self.log.get()
148 }
149
150 pub fn update_sampling_rate(&self, sample_rate: f64) {
151 self.sampler.update_sampling_rate(sample_rate);
152 }
153
154 pub fn update_trace_file<S: AsRef<str>>(&self, trace_file: S) -> Result<(), BoxError> {
155 let trace_path = PathBuf::from_str(trace_file.as_ref())?;
156 self.file_output.update_path(trace_path)?;
157 Ok(())
158 }
159
160 pub fn update_trace_filter<S: AsRef<str>>(
161 &self,
162 directives: S,
163 duration: Duration,
164 ) -> Result<(), TelemetryError> {
165 if let Some(trace) = &self.trace {
166 trace.update(directives)?;
167 let trace = trace.clone();
169 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
170 tokio::spawn(async move {
171 tokio::time::sleep(duration).await;
172 if let Err(e) = trace.update(trace_filter_env) {
173 error!("failed to reset trace filter: {}", e);
174 }
175 });
176 Ok(())
177 } else {
178 Err(TelemetryError::TracingDisabled)
179 }
180 }
181
182 pub fn clear_file_output(&self) {
183 self.file_output.clear_path();
184 }
185
186 pub fn reset_trace(&self) -> Result<(), TelemetryError> {
187 if let Some(trace) = &self.trace {
188 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
189 trace.update(trace_filter_env).map_err(|e| e.into())
190 } else {
191 Err(TelemetryError::TracingDisabled)
192 }
193 }
194}
195
196fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
197 if let Some(logfile_prefix) = log_file {
198 let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
199 tracing_appender::non_blocking(file_appender)
200 } else {
201 tracing_appender::non_blocking(stderr())
202 }
203}
204
205fn set_panic_hook(crash_on_panic: bool) {
207 let default_panic_handler = std::panic::take_hook();
208
209 std::panic::set_hook(Box::new(move |panic| {
216 if let Some(location) = panic.location() {
218 tracing::error!(
223 message = %panic,
224 panic.file = location.file(),
225 panic.line = location.line(),
226 panic.column = location.column(),
227 );
228 } else {
229 tracing::error!(message = %panic);
230 }
231
232 default_panic_handler(panic);
233
234 let _ = std::io::stderr().flush();
236 let _ = std::io::stdout().flush();
237
238 if crash_on_panic {
239 std::process::exit(12);
241 }
242 }));
243}
244
245static GLOBAL_CONFIG: Lazy<Arc<Mutex<Option<TelemetryConfig>>>> =
246 Lazy::new(|| Arc::new(Mutex::new(None)));
247
248fn set_global_telemetry_config(config: TelemetryConfig) {
249 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
250 assert!(global_config.is_none());
251 *global_config = Some(config);
252}
253
254fn clear_global_telemetry_config() {
255 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
256 *global_config = None;
257}
258
259pub fn get_global_telemetry_config() -> Option<TelemetryConfig> {
260 let global_config = GLOBAL_CONFIG.lock().unwrap();
261 global_config.clone()
262}
263
264impl TelemetryConfig {
265 pub fn new() -> Self {
266 Self {
267 enable_otlp_tracing: false,
268 tokio_console: false,
269 json_log_output: false,
270 log_file: None,
271 log_string: None,
272 span_level: None,
273 panic_hook: true,
274 crash_on_panic: false,
275 prom_registry: None,
276 sample_rate: 1.0,
277 trace_target: None,
278 }
279 }
280
281 pub fn with_json(mut self) -> Self {
282 self.json_log_output = true;
283 self
284 }
285
286 pub fn with_log_level(mut self, log_string: &str) -> Self {
287 self.log_string = Some(log_string.to_owned());
288 self
289 }
290
291 pub fn with_span_level(mut self, span_level: Level) -> Self {
292 self.span_level = Some(span_level);
293 self
294 }
295
296 pub fn with_log_file(mut self, filename: &str) -> Self {
297 self.log_file = Some(filename.to_owned());
298 self
299 }
300
301 pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
302 self.prom_registry = Some(registry.clone());
303 self
304 }
305
306 pub fn with_sample_rate(mut self, rate: f64) -> Self {
307 self.sample_rate = rate;
308 self
309 }
310
311 pub fn with_trace_target(mut self, target: &str) -> Self {
312 match self.trace_target {
313 Some(ref mut v) => v.push(target.to_owned()),
314 None => self.trace_target = Some(vec![target.to_owned()]),
315 };
316
317 self
318 }
319
320 pub fn with_env(mut self) -> Self {
321 if env::var("CRASH_ON_PANIC").is_ok() {
322 self.crash_on_panic = true
323 }
324
325 if env::var("TRACE_FILTER").is_ok() {
326 self.enable_otlp_tracing = true
327 }
328
329 if env::var("RUST_LOG_JSON").is_ok() {
330 self.json_log_output = true;
331 }
332
333 if env::var("TOKIO_CONSOLE").is_ok() {
334 self.tokio_console = true;
335 }
336
337 if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
338 self.span_level =
339 Some(Level::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
340 }
341
342 if let Ok(filepath) = env::var("RUST_LOG_FILE") {
343 self.log_file = Some(filepath);
344 }
345
346 if let Ok(sample_rate) = env::var("SAMPLE_RATE") {
347 self.sample_rate = sample_rate.parse().expect("Cannot parse SAMPLE_RATE");
348 }
349
350 self
351 }
352
353 pub fn init(self) -> (TelemetryGuards, TracingHandle) {
354 let config = self;
355 let config_clone = config.clone();
356
357 let mut directives = config.log_string.unwrap_or_else(|| "info".into());
363 if let Some(targets) = config.trace_target {
364 for target in targets {
365 directives.push_str(&format!(",{target}=trace"));
366 }
367 }
368 let env_filter =
369 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(directives));
370 let (log_filter, reload_handle) = reload::Layer::new(env_filter);
371 let log_filter_handle = FilterHandle(reload_handle);
372
373 let span_level = config.span_level.unwrap_or(Level::INFO);
377 let span_filter = filter::filter_fn(move |metadata| {
378 metadata.is_span() && *metadata.level() <= span_level
379 });
380
381 let mut layers = Vec::new();
382
383 if config.tokio_console {
387 layers.push(console_subscriber::spawn().boxed());
388 }
389
390 if let Some(registry) = config.prom_registry {
391 let span_lat_layer = PrometheusSpanLatencyLayer::try_new(®istry, 15)
392 .expect("Could not initialize span latency layer");
393 layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
394 }
395
396 let mut trace_filter_handle = None;
397 let mut file_output = CachedOpenFile::new::<&str>(None).unwrap();
398 let mut provider = None;
399 let sampler = SamplingFilter::new(config.sample_rate);
400 let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("iota-node".to_owned());
401
402 if config.enable_otlp_tracing {
403 let trace_file = env::var("TRACE_FILE").ok();
404
405 let resource = Resource::new(vec![opentelemetry::KeyValue::new(
406 "service.name",
407 service_name.clone(),
408 )]);
409 let sampler = Sampler::ParentBased(Box::new(sampler.clone()));
410
411 let telemetry = if let Some(trace_file) = trace_file {
414 let exporter =
415 FileExporter::new(Some(trace_file.into())).expect("Failed to create exporter");
416 file_output = exporter.cached_open_file.clone();
417 let processor =
418 BatchSpanProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio)
419 .build();
420
421 let p = TracerProvider::builder()
422 .with_resource(resource)
423 .with_sampler(sampler)
424 .with_span_processor(processor)
425 .build();
426
427 let tracer = p.tracer(service_name);
428 provider = Some(p);
429
430 tracing_opentelemetry::layer().with_tracer(tracer)
431 } else {
432 let endpoint = env::var("OTLP_ENDPOINT")
433 .unwrap_or_else(|_| "http://localhost:4317".to_string());
434
435 let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
436 .with_tonic()
437 .with_endpoint(endpoint)
438 .build()
439 .unwrap();
440 let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
441 .with_resource(resource)
442 .with_sampler(sampler)
443 .with_batch_exporter(otlp_exporter, runtime::Tokio)
444 .build();
445 let tracer = tracer_provider.tracer(service_name);
446
447 tracing_opentelemetry::layer().with_tracer(tracer)
448 };
449
450 opentelemetry::global::set_text_map_propagator(
452 opentelemetry_sdk::propagation::TraceContextPropagator::new(),
453 );
454
455 let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
456 let (trace_env_filter, reload_handle) = reload::Layer::new(trace_env_filter);
457 trace_filter_handle = Some(FilterHandle(reload_handle));
458
459 layers.push(telemetry.with_filter(trace_env_filter).boxed());
460 }
461
462 let (nb_output, worker_guard) = get_output(config.log_file.clone());
463 if config.json_log_output {
464 let json_layer = fmt::layer()
466 .with_file(true)
467 .with_line_number(true)
468 .json()
469 .with_writer(nb_output)
470 .with_filter(log_filter)
471 .boxed();
472 layers.push(json_layer);
473 } else {
474 let fmt_layer = fmt::layer()
476 .with_ansi(config.log_file.is_none() && stderr().is_tty())
477 .with_writer(nb_output)
478 .with_filter(log_filter)
479 .boxed();
480 layers.push(fmt_layer);
481 }
482
483 let subscriber = tracing_subscriber::registry().with(layers);
484 ::tracing::subscriber::set_global_default(subscriber)
485 .expect("unable to initialize tracing subscriber");
486
487 if config.panic_hook {
488 set_panic_hook(config.crash_on_panic);
489 }
490
491 let guards = TelemetryGuards::new(config_clone, worker_guard, provider);
495
496 (
497 guards,
498 TracingHandle {
499 log: log_filter_handle,
500 trace: trace_filter_handle,
501 file_output,
502 sampler,
503 },
504 )
505 }
506}
507
508#[derive(Debug, Clone)]
510struct SamplingFilter {
511 sample_rate: Arc<AtomicF64>,
513}
514
515impl SamplingFilter {
516 fn new(sample_rate: f64) -> Self {
517 SamplingFilter {
518 sample_rate: Arc::new(AtomicF64::new(Self::clamp(sample_rate))),
519 }
520 }
521
522 fn clamp(sample_rate: f64) -> f64 {
523 sample_rate.clamp(0.0001, 1.0)
525 }
526
527 fn update_sampling_rate(&self, sample_rate: f64) {
528 let sample_rate = Self::clamp(sample_rate);
530 self.sample_rate.store(sample_rate, Ordering::Relaxed);
531 }
532}
533
534impl ShouldSample for SamplingFilter {
535 fn should_sample(
536 &self,
537 parent_context: Option<&Context>,
538 trace_id: TraceId,
539 name: &str,
540 span_kind: &SpanKind,
541 attributes: &[KeyValue],
542 links: &[Link],
543 ) -> SamplingResult {
544 let sample_rate = self.sample_rate.load(Ordering::Relaxed);
545 let sampler = Sampler::TraceIdRatioBased(sample_rate);
546
547 sampler.should_sample(parent_context, trace_id, name, span_kind, attributes, links)
548 }
549}
550
551pub fn init_for_testing() {
553 static LOGGER: Lazy<()> = Lazy::new(|| {
554 let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
555 .with_env_filter(
556 EnvFilter::builder()
557 .with_default_directive(LevelFilter::INFO.into())
558 .from_env_lossy(),
559 )
560 .with_file(true)
561 .with_line_number(true)
562 .with_test_writer()
563 .finish();
564 ::tracing::subscriber::set_global_default(subscriber)
565 .expect("unable to initialize logging for tests");
566 });
567
568 Lazy::force(&LOGGER);
569}
570
571#[cfg(test)]
572mod tests {
573 use std::time::Duration;
574
575 use prometheus::proto::MetricType;
576 use tracing::{debug, debug_span, info, trace_span, warn};
577
578 use super::*;
579
580 #[test]
581 #[should_panic]
582 fn test_telemetry_init() {
583 let registry = prometheus::Registry::new();
584 let config = TelemetryConfig::new()
587 .with_span_level(Level::DEBUG)
588 .with_prom_registry(®istry);
589 let _guard = config.init();
590
591 info!(a = 1, "This will be INFO.");
592 debug_span!("yo span yo").in_scope(|| {
595 debug!(a = 2, "This will be DEBUG.");
597 std::thread::sleep(Duration::from_millis(100));
598 warn!(a = 3, "This will be WARNING.");
599 });
600
601 trace_span!("this span should not be created").in_scope(|| {
603 info!("This log appears, but surrounding span is not created");
604 std::thread::sleep(Duration::from_millis(100));
605 });
606
607 let metrics = registry.gather();
608 assert_eq!(metrics.len(), 1);
610 assert_eq!(metrics[0].name(), "tracing_span_latencies");
611 assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
612 let inner = metrics[0].get_metric();
613 assert_eq!(inner.len(), 1);
614 let labels = inner[0].get_label();
615 assert_eq!(labels.len(), 1);
616 assert_eq!(labels[0].name(), "span_name");
617 assert_eq!(labels[0].value(), "yo span yo");
618
619 panic!("This should cause error logs to be printed out!");
620 }
621
622 #[test]
625 fn testing_logger_1() {
626 init_for_testing();
627 }
628
629 #[test]
630 fn testing_logger_2() {
631 init_for_testing();
632 }
633}