telemetry_subscribers/
file_exporter.rs1use std::{
6 fs::OpenOptions,
7 io::Write,
8 path::{Path, PathBuf},
9 sync::{Arc, Mutex},
10};
11
12use futures::{FutureExt, future::BoxFuture};
13use opentelemetry::trace::TraceError;
14use opentelemetry_proto::{
15 tonic::collector::trace::v1::ExportTraceServiceRequest,
16 transform::{
17 common::tonic::ResourceAttributesWithSchema,
18 trace::tonic::group_spans_by_resource_and_scope,
19 },
20};
21use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
22use prost::Message;
23
24#[derive(Clone)]
25pub(crate) struct CachedOpenFile {
26 inner: Arc<Mutex<Option<(PathBuf, std::fs::File)>>>,
27}
28
29impl std::fmt::Debug for CachedOpenFile {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 f.debug_struct("CachedOpenFile").finish()
32 }
33}
34
35impl CachedOpenFile {
36 pub fn open_file(path: &Path) -> std::io::Result<std::fs::File> {
37 OpenOptions::new().append(true).create(true).open(path)
38 }
39
40 pub fn new<P: AsRef<Path>>(file_path: Option<P>) -> std::io::Result<Self> {
41 let inner = if let Some(file_path) = file_path {
42 let file_path = file_path.as_ref();
43 let file = Self::open_file(file_path)?;
44 Some((file_path.to_owned(), file))
45 } else {
46 None
47 };
48 Ok(Self {
49 inner: Arc::new(Mutex::new(inner)),
50 })
51 }
52
53 pub fn update_path<P: AsRef<Path>>(&self, file_path: P) -> std::io::Result<()> {
54 let mut inner = self.inner.lock().unwrap();
55 let file_path = file_path.as_ref().to_owned();
56
57 if let Some((old_file_path, _)) = &*inner {
58 if old_file_path == &file_path {
59 return Ok(());
60 }
61 }
62
63 let file = Self::open_file(file_path.as_path())?;
64 *inner = Some((file_path, file));
65 Ok(())
66 }
67
68 pub fn clear_path(&self) {
69 self.inner.lock().unwrap().take();
70 }
71
72 fn with_file(
73 &self,
74 f: impl FnOnce(Option<&mut std::fs::File>) -> std::io::Result<()>,
75 ) -> std::io::Result<()> {
76 f(self.inner.lock().unwrap().as_mut().map(|(_, file)| file))
77 }
78}
79
80#[derive(Debug)]
81pub(crate) struct FileExporter {
82 pub cached_open_file: CachedOpenFile,
83 resource: ResourceAttributesWithSchema,
84}
85
86impl FileExporter {
87 pub fn new(file_path: Option<PathBuf>) -> std::io::Result<Self> {
88 Ok(Self {
89 cached_open_file: CachedOpenFile::new(file_path)?,
90 resource: ResourceAttributesWithSchema::default(),
91 })
92 }
93}
94
95impl SpanExporter for FileExporter {
96 fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
97 let cached_open_file = self.cached_open_file.clone();
98 let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);
99 async move {
100 cached_open_file
101 .with_file(|maybe_file| {
102 if let Some(file) = maybe_file {
103 let request = ExportTraceServiceRequest { resource_spans };
104
105 let buf = request.encode_length_delimited_to_vec();
106
107 file.write_all(&buf)
108 } else {
109 Ok(())
110 }
111 })
112 .map_err(|e| TraceError::Other(e.into()))
113 }
114 .boxed()
115 }
116}