import_trace/
import-trace.rs1use std::io::{self, Cursor, Read};
6
7use bytes::Buf;
8use bytes_varint::VarIntSupport;
9use clap::*;
10use opentelemetry_proto::tonic::{
11 collector::trace::v1::{ExportTraceServiceRequest, trace_service_client::TraceServiceClient},
12 common::v1::{AnyValue, any_value},
13};
14use prost::Message;
15use tonic::Request;
16
17#[derive(Parser, Debug)]
18#[command(author, version, about, long_about = None)]
19struct Args {
20 #[arg(long)]
21 trace_file: String,
22
23 #[arg(long, default_value = "http://localhost:4317")]
24 otlp_endpoint: String,
25
26 #[arg(long)]
27 service_name: Option<String>,
28
29 #[arg(long)]
30 dump_spans: bool,
31}
32
33#[tokio::main]
34async fn main() {
35 let args = Args::parse();
36 let file = std::fs::File::open(args.trace_file).unwrap();
37
38 let messages = decode_all_length_delimited::<_, ExportTraceServiceRequest>(file).unwrap();
39
40 if args.dump_spans {
41 for message in messages.iter() {
42 for span in &message.resource_spans {
43 println!("{:?}", span);
44 }
45 }
46 return;
47 }
48
49 let endpoint = format!("{}{}", args.otlp_endpoint, "/v1/traces");
50 let mut trace_exporter = TraceServiceClient::connect(endpoint).await.unwrap();
51
52 let service_name = args.service_name.unwrap_or_else(|| {
53 let timestamp = std::time::SystemTime::now()
54 .duration_since(std::time::UNIX_EPOCH)
55 .unwrap()
56 .as_secs();
57 format!("iota-node-{}", timestamp)
58 });
59
60 println!("importing trace with service name {:?}", service_name);
61
62 for mut message in messages {
63 println!(
64 "sending {} spans to otlp collector",
65 message.resource_spans.len()
66 );
67
68 for resource_span in message.resource_spans.iter_mut() {
70 if let Some(resource) = resource_span.resource.as_mut() {
71 for attr in resource.attributes.iter_mut() {
72 if attr.key == "service.name" {
73 attr.value = Some(AnyValue {
74 value: Some(any_value::Value::StringValue(service_name.clone())),
75 });
76 }
77 }
78 }
79 }
80
81 trace_exporter.export(Request::new(message)).await.unwrap();
82 }
83 println!("all spans imported");
84}
85
86fn decode_all_length_delimited<R, M>(mut reader: R) -> io::Result<Vec<M>>
87where
88 R: Read,
89 M: Message + Default,
90{
91 let mut messages = Vec::new();
92 let mut buffer = Vec::new();
93 reader.read_to_end(&mut buffer)?;
94 let mut cursor = Cursor::new(buffer);
95
96 while cursor.has_remaining() {
97 let len = cursor.try_get_u64_varint().unwrap() as usize;
98
99 if cursor.remaining() < len {
100 return Err(io::Error::new(
101 io::ErrorKind::UnexpectedEof,
102 "Incomplete message",
103 ));
104 }
105
106 let msg_bytes = cursor
108 .chunk()
109 .get(..len)
110 .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Buffer underflow"))?;
111
112 let msg = M::decode(msg_bytes).map_err(|e| {
113 io::Error::new(io::ErrorKind::InvalidData, format!("Decode error: {}", e))
114 })?;
115 messages.push(msg);
116
117 cursor.advance(len);
119 }
120
121 Ok(messages)
122}