1use std::{
6 env,
7 io::{Write, stderr},
8 path::PathBuf,
9 str::FromStr,
10 sync::{Arc, Mutex, atomic::Ordering},
11 time::Duration,
12};
13
14use atomic_float::AtomicF64;
15use crossterm::tty::IsTty;
16use once_cell::sync::Lazy;
17use opentelemetry::{
18 Context, KeyValue,
19 trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
20};
21use opentelemetry_otlp::WithExportConfig;
22use opentelemetry_sdk::{
23 Resource, runtime,
24 trace::{BatchSpanProcessor, Sampler, ShouldSample, TracerProvider},
25};
26use span_latency_prom::PrometheusSpanLatencyLayer;
27use thiserror::Error;
28use tracing::{Level, error, metadata::LevelFilter};
29use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
30use tracing_subscriber::{EnvFilter, Layer, Registry, filter, fmt, layer::SubscriberExt, reload};
31
32use crate::file_exporter::{CachedOpenFile, FileExporter};
33
34mod file_exporter;
35pub mod span_latency_prom;
36
37pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
39
40#[derive(Debug, Error)]
41pub enum TelemetryError {
42 #[error("OTLP protocol not enabled in the node's configuration")]
43 TracingDisabled,
44
45 #[error("{0}")]
46 Other(#[from] BoxError),
47}
48
49#[derive(Default, Clone, Debug)]
51pub struct TelemetryConfig {
52 pub enable_otlp_tracing: bool,
55 pub tokio_console: bool,
58 pub json_log_output: bool,
61 pub log_file: Option<String>,
64 pub log_string: Option<String>,
67 pub span_level: Option<Level>,
71 pub panic_hook: bool,
73 pub crash_on_panic: bool,
76 pub prom_registry: Option<prometheus::Registry>,
79 pub sample_rate: f64,
85 pub trace_target: Option<Vec<String>>,
87}
88
89#[must_use]
90pub struct TelemetryGuards {
91 #[expect(unused)]
92 worker_guard: WorkerGuard,
93 #[expect(unused)]
94 provider: Option<TracerProvider>,
95}
96
97impl TelemetryGuards {
98 fn new(
99 config: TelemetryConfig,
100 worker_guard: WorkerGuard,
101 provider: Option<TracerProvider>,
102 ) -> Self {
103 set_global_telemetry_config(config);
104 Self {
105 worker_guard,
106 provider,
107 }
108 }
109}
110
111impl Drop for TelemetryGuards {
112 fn drop(&mut self) {
113 clear_global_telemetry_config();
114 }
115}
116
117#[derive(Clone, Debug)]
118pub struct FilterHandle(reload::Handle<EnvFilter, Registry>);
119
120impl FilterHandle {
121 pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
122 let filter = EnvFilter::try_new(directives)?;
123 self.0.reload(filter)?;
124 Ok(())
125 }
126
127 pub fn get(&self) -> Result<String, BoxError> {
128 self.0
129 .with_current(|filter| filter.to_string())
130 .map_err(Into::into)
131 }
132}
133
134pub struct TracingHandle {
135 log: FilterHandle,
136 trace: Option<FilterHandle>,
137 file_output: CachedOpenFile,
138 sampler: SamplingFilter,
139}
140
141impl TracingHandle {
142 pub fn update_log<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
143 self.log.update(directives)
144 }
145
146 pub fn get_log(&self) -> Result<String, BoxError> {
147 self.log.get()
148 }
149
150 pub fn update_sampling_rate(&self, sample_rate: f64) {
151 self.sampler.update_sampling_rate(sample_rate);
152 }
153
154 pub fn update_trace_file<S: AsRef<str>>(&self, trace_file: S) -> Result<(), BoxError> {
155 let trace_path = PathBuf::from_str(trace_file.as_ref())?;
156 self.file_output.update_path(trace_path)?;
157 Ok(())
158 }
159
160 pub fn update_trace_filter<S: AsRef<str>>(
161 &self,
162 directives: S,
163 duration: Duration,
164 ) -> Result<(), TelemetryError> {
165 if let Some(trace) = &self.trace {
166 trace.update(directives)?;
167 let trace = trace.clone();
169 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
170 tokio::spawn(async move {
171 tokio::time::sleep(duration).await;
172 if let Err(e) = trace.update(trace_filter_env) {
173 error!("failed to reset trace filter: {}", e);
174 }
175 });
176 Ok(())
177 } else {
178 Err(TelemetryError::TracingDisabled)
179 }
180 }
181
182 pub fn clear_file_output(&self) {
183 self.file_output.clear_path();
184 }
185
186 pub fn reset_trace(&self) -> Result<(), TelemetryError> {
187 if let Some(trace) = &self.trace {
188 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
189 trace.update(trace_filter_env).map_err(|e| e.into())
190 } else {
191 Err(TelemetryError::TracingDisabled)
192 }
193 }
194}
195
196fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
197 if let Some(logfile_prefix) = log_file {
198 let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
199 tracing_appender::non_blocking(file_appender)
200 } else {
201 tracing_appender::non_blocking(stderr())
202 }
203}
204
205fn set_panic_hook(crash_on_panic: bool) {
207 let default_panic_handler = std::panic::take_hook();
208
209 std::panic::set_hook(Box::new(move |panic| {
216 if let Some(location) = panic.location() {
218 tracing::error!(
223 message = %panic,
224 panic.file = location.file(),
225 panic.line = location.line(),
226 panic.column = location.column(),
227 );
228 } else {
229 tracing::error!(message = %panic);
230 }
231
232 default_panic_handler(panic);
233
234 let _ = std::io::stderr().flush();
236 let _ = std::io::stdout().flush();
237
238 if crash_on_panic {
239 std::process::exit(12);
241 }
242 }));
243}
244
245static GLOBAL_CONFIG: Lazy<Arc<Mutex<Option<TelemetryConfig>>>> =
246 Lazy::new(|| Arc::new(Mutex::new(None)));
247
248fn set_global_telemetry_config(config: TelemetryConfig) {
249 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
250 assert!(global_config.is_none());
251 *global_config = Some(config);
252}
253
254fn clear_global_telemetry_config() {
255 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
256 *global_config = None;
257}
258
259pub fn get_global_telemetry_config() -> Option<TelemetryConfig> {
260 let global_config = GLOBAL_CONFIG.lock().unwrap();
261 global_config.clone()
262}
263
264impl TelemetryConfig {
265 pub fn new() -> Self {
266 Self {
267 enable_otlp_tracing: false,
268 tokio_console: false,
269 json_log_output: false,
270 log_file: None,
271 log_string: None,
272 span_level: None,
273 panic_hook: true,
274 crash_on_panic: false,
275 prom_registry: None,
276 sample_rate: 1.0,
277 trace_target: None,
278 }
279 }
280
281 pub fn with_json(mut self) -> Self {
282 self.json_log_output = true;
283 self
284 }
285
286 pub fn with_log_level(mut self, log_string: &str) -> Self {
287 self.log_string = Some(log_string.to_owned());
288 self
289 }
290
291 pub fn with_span_level(mut self, span_level: Level) -> Self {
292 self.span_level = Some(span_level);
293 self
294 }
295
296 pub fn with_log_file(mut self, filename: &str) -> Self {
297 self.log_file = Some(filename.to_owned());
298 self
299 }
300
301 pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
302 self.prom_registry = Some(registry.clone());
303 self
304 }
305
306 pub fn with_sample_rate(mut self, rate: f64) -> Self {
307 self.sample_rate = rate;
308 self
309 }
310
311 pub fn with_trace_target(mut self, target: &str) -> Self {
312 match self.trace_target {
313 Some(ref mut v) => v.push(target.to_owned()),
314 None => self.trace_target = Some(vec![target.to_owned()]),
315 };
316
317 self
318 }
319
320 pub fn with_env(mut self) -> Self {
321 if env::var("CRASH_ON_PANIC").is_ok() {
322 self.crash_on_panic = true
323 }
324
325 if env::var("TRACE_FILTER").is_ok() {
326 self.enable_otlp_tracing = true
327 }
328
329 if env::var("RUST_LOG_JSON").is_ok() {
330 self.json_log_output = true;
331 }
332
333 if env::var("TOKIO_CONSOLE").is_ok() {
334 self.tokio_console = true;
335 }
336
337 if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
338 self.span_level =
339 Some(Level::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
340 }
341
342 if let Ok(filepath) = env::var("RUST_LOG_FILE") {
343 self.log_file = Some(filepath);
344 }
345
346 if let Ok(sample_rate) = env::var("SAMPLE_RATE") {
347 self.sample_rate = sample_rate.parse().expect("Cannot parse SAMPLE_RATE");
348 }
349
350 self
351 }
352
353 pub fn init(self) -> (TelemetryGuards, TracingHandle) {
354 let config = self;
355 let config_clone = config.clone();
356
357 let mut directives = config.log_string.unwrap_or_else(|| "info".into());
363 if let Some(targets) = config.trace_target {
364 for target in targets {
365 directives.push_str(&format!(",{}=trace", target));
366 }
367 }
368 let env_filter =
369 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(directives));
370 let (log_filter, reload_handle) = reload::Layer::new(env_filter);
371 let log_filter_handle = FilterHandle(reload_handle);
372
373 let span_level = config.span_level.unwrap_or(Level::INFO);
377 let span_filter = filter::filter_fn(move |metadata| {
378 metadata.is_span() && *metadata.level() <= span_level
379 });
380
381 let mut layers = Vec::new();
382
383 #[cfg(feature = "tokio-console")]
387 if config.tokio_console {
388 layers.push(console_subscriber::spawn().boxed());
389 }
390
391 if let Some(registry) = config.prom_registry {
392 let span_lat_layer = PrometheusSpanLatencyLayer::try_new(®istry, 15)
393 .expect("Could not initialize span latency layer");
394 layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
395 }
396
397 let mut trace_filter_handle = None;
398 let mut file_output = CachedOpenFile::new::<&str>(None).unwrap();
399 let mut provider = None;
400 let sampler = SamplingFilter::new(config.sample_rate);
401 let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("iota-node".to_owned());
402
403 if config.enable_otlp_tracing {
404 let trace_file = env::var("TRACE_FILE").ok();
405
406 let resource = Resource::new(vec![opentelemetry::KeyValue::new(
407 "service.name",
408 service_name.clone(),
409 )]);
410 let sampler = Sampler::ParentBased(Box::new(sampler.clone()));
411
412 let telemetry = if let Some(trace_file) = trace_file {
415 let exporter =
416 FileExporter::new(Some(trace_file.into())).expect("Failed to create exporter");
417 file_output = exporter.cached_open_file.clone();
418 let processor =
419 BatchSpanProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio)
420 .build();
421
422 let p = TracerProvider::builder()
423 .with_resource(resource)
424 .with_sampler(sampler)
425 .with_span_processor(processor)
426 .build();
427
428 let tracer = p.tracer(service_name);
429 provider = Some(p);
430
431 tracing_opentelemetry::layer().with_tracer(tracer)
432 } else {
433 let endpoint = env::var("OTLP_ENDPOINT")
434 .unwrap_or_else(|_| "http://localhost:4317".to_string());
435
436 let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
437 .with_tonic()
438 .with_endpoint(endpoint)
439 .build()
440 .unwrap();
441 let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
442 .with_resource(resource)
443 .with_sampler(sampler)
444 .with_batch_exporter(otlp_exporter, runtime::Tokio)
445 .build();
446 let tracer = tracer_provider.tracer(service_name);
447
448 tracing_opentelemetry::layer().with_tracer(tracer)
449 };
450
451 opentelemetry::global::set_text_map_propagator(
453 opentelemetry_sdk::propagation::TraceContextPropagator::new(),
454 );
455
456 let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
457 let (trace_env_filter, reload_handle) = reload::Layer::new(trace_env_filter);
458 trace_filter_handle = Some(FilterHandle(reload_handle));
459
460 layers.push(telemetry.with_filter(trace_env_filter).boxed());
461 }
462
463 let (nb_output, worker_guard) = get_output(config.log_file.clone());
464 if config.json_log_output {
465 let json_layer = fmt::layer()
467 .with_file(true)
468 .with_line_number(true)
469 .json()
470 .with_writer(nb_output)
471 .with_filter(log_filter)
472 .boxed();
473 layers.push(json_layer);
474 } else {
475 let fmt_layer = fmt::layer()
477 .with_ansi(config.log_file.is_none() && stderr().is_tty())
478 .with_writer(nb_output)
479 .with_filter(log_filter)
480 .boxed();
481 layers.push(fmt_layer);
482 }
483
484 let subscriber = tracing_subscriber::registry().with(layers);
485 ::tracing::subscriber::set_global_default(subscriber)
486 .expect("unable to initialize tracing subscriber");
487
488 if config.panic_hook {
489 set_panic_hook(config.crash_on_panic);
490 }
491
492 let guards = TelemetryGuards::new(config_clone, worker_guard, provider);
496
497 (
498 guards,
499 TracingHandle {
500 log: log_filter_handle,
501 trace: trace_filter_handle,
502 file_output,
503 sampler,
504 },
505 )
506 }
507}
508
509#[derive(Debug, Clone)]
511struct SamplingFilter {
512 sample_rate: Arc<AtomicF64>,
514}
515
516impl SamplingFilter {
517 fn new(sample_rate: f64) -> Self {
518 SamplingFilter {
519 sample_rate: Arc::new(AtomicF64::new(Self::clamp(sample_rate))),
520 }
521 }
522
523 fn clamp(sample_rate: f64) -> f64 {
524 sample_rate.clamp(0.0001, 1.0)
526 }
527
528 fn update_sampling_rate(&self, sample_rate: f64) {
529 let sample_rate = Self::clamp(sample_rate);
531 self.sample_rate.store(sample_rate, Ordering::Relaxed);
532 }
533}
534
535impl ShouldSample for SamplingFilter {
536 fn should_sample(
537 &self,
538 parent_context: Option<&Context>,
539 trace_id: TraceId,
540 name: &str,
541 span_kind: &SpanKind,
542 attributes: &[KeyValue],
543 links: &[Link],
544 ) -> SamplingResult {
545 let sample_rate = self.sample_rate.load(Ordering::Relaxed);
546 let sampler = Sampler::TraceIdRatioBased(sample_rate);
547
548 sampler.should_sample(parent_context, trace_id, name, span_kind, attributes, links)
549 }
550}
551
552pub fn init_for_testing() {
554 static LOGGER: Lazy<()> = Lazy::new(|| {
555 let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
556 .with_env_filter(
557 EnvFilter::builder()
558 .with_default_directive(LevelFilter::INFO.into())
559 .from_env_lossy(),
560 )
561 .with_file(true)
562 .with_line_number(true)
563 .with_test_writer()
564 .finish();
565 ::tracing::subscriber::set_global_default(subscriber)
566 .expect("unable to initialize logging for tests");
567 });
568
569 Lazy::force(&LOGGER);
570}
571
572#[cfg(test)]
573mod tests {
574 use std::time::Duration;
575
576 use prometheus::proto::MetricType;
577 use tracing::{debug, debug_span, info, trace_span, warn};
578
579 use super::*;
580
581 #[test]
582 #[should_panic]
583 fn test_telemetry_init() {
584 let registry = prometheus::Registry::new();
585 let config = TelemetryConfig::new()
588 .with_span_level(Level::DEBUG)
589 .with_prom_registry(®istry);
590 let _guard = config.init();
591
592 info!(a = 1, "This will be INFO.");
593 debug_span!("yo span yo").in_scope(|| {
596 debug!(a = 2, "This will be DEBUG.");
598 std::thread::sleep(Duration::from_millis(100));
599 warn!(a = 3, "This will be WARNING.");
600 });
601
602 trace_span!("this span should not be created").in_scope(|| {
604 info!("This log appears, but surrounding span is not created");
605 std::thread::sleep(Duration::from_millis(100));
606 });
607
608 let metrics = registry.gather();
609 assert_eq!(metrics.len(), 1);
611 assert_eq!(metrics[0].name(), "tracing_span_latencies");
612 assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
613 let inner = metrics[0].get_metric();
614 assert_eq!(inner.len(), 1);
615 let labels = inner[0].get_label();
616 assert_eq!(labels.len(), 1);
617 assert_eq!(labels[0].name(), "span_name");
618 assert_eq!(labels[0].value(), "yo span yo");
619
620 panic!("This should cause error logs to be printed out!");
621 }
622
623 #[test]
626 fn testing_logger_1() {
627 init_for_testing();
628 }
629
630 #[test]
631 fn testing_logger_2() {
632 init_for_testing();
633 }
634}