telemetry_subscribers/
file_exporter.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    fs::OpenOptions,
7    io::Write,
8    path::{Path, PathBuf},
9    sync::{Arc, Mutex},
10};
11
12use futures::{FutureExt, future::BoxFuture};
13use opentelemetry::trace::TraceError;
14use opentelemetry_proto::{
15    tonic::collector::trace::v1::ExportTraceServiceRequest,
16    transform::{
17        common::tonic::ResourceAttributesWithSchema,
18        trace::tonic::group_spans_by_resource_and_scope,
19    },
20};
21use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
22use prost::Message;
23
24#[derive(Clone)]
25pub(crate) struct CachedOpenFile {
26    inner: Arc<Mutex<Option<(PathBuf, std::fs::File)>>>,
27}
28
29impl std::fmt::Debug for CachedOpenFile {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        f.debug_struct("CachedOpenFile").finish()
32    }
33}
34
35impl CachedOpenFile {
36    pub fn open_file(path: &Path) -> std::io::Result<std::fs::File> {
37        OpenOptions::new().append(true).create(true).open(path)
38    }
39
40    pub fn new<P: AsRef<Path>>(file_path: Option<P>) -> std::io::Result<Self> {
41        let inner = if let Some(file_path) = file_path {
42            let file_path = file_path.as_ref();
43            let file = Self::open_file(file_path)?;
44            Some((file_path.to_owned(), file))
45        } else {
46            None
47        };
48        Ok(Self {
49            inner: Arc::new(Mutex::new(inner)),
50        })
51    }
52
53    pub fn update_path<P: AsRef<Path>>(&self, file_path: P) -> std::io::Result<()> {
54        let mut inner = self.inner.lock().unwrap();
55        let file_path = file_path.as_ref().to_owned();
56
57        if let Some((old_file_path, _)) = &*inner {
58            if old_file_path == &file_path {
59                return Ok(());
60            }
61        }
62
63        let file = Self::open_file(file_path.as_path())?;
64        *inner = Some((file_path, file));
65        Ok(())
66    }
67
68    pub fn clear_path(&self) {
69        self.inner.lock().unwrap().take();
70    }
71
72    fn with_file(
73        &self,
74        f: impl FnOnce(Option<&mut std::fs::File>) -> std::io::Result<()>,
75    ) -> std::io::Result<()> {
76        f(self.inner.lock().unwrap().as_mut().map(|(_, file)| file))
77    }
78}
79
80#[derive(Debug)]
81pub(crate) struct FileExporter {
82    pub cached_open_file: CachedOpenFile,
83    resource: ResourceAttributesWithSchema,
84}
85
86impl FileExporter {
87    pub fn new(file_path: Option<PathBuf>) -> std::io::Result<Self> {
88        Ok(Self {
89            cached_open_file: CachedOpenFile::new(file_path)?,
90            resource: ResourceAttributesWithSchema::default(),
91        })
92    }
93}
94
95impl SpanExporter for FileExporter {
96    fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
97        let cached_open_file = self.cached_open_file.clone();
98        let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);
99        async move {
100            cached_open_file
101                .with_file(|maybe_file| {
102                    if let Some(file) = maybe_file {
103                        let request = ExportTraceServiceRequest { resource_spans };
104
105                        let buf = request.encode_length_delimited_to_vec();
106
107                        file.write_all(&buf)
108                    } else {
109                        Ok(())
110                    }
111                })
112                .map_err(|e| TraceError::Other(e.into()))
113        }
114        .boxed()
115    }
116}