import_trace/
import-trace.rs1use std::io::{self, Cursor, Read};
6
7use bytes::Buf;
8use bytes_varint::VarIntSupport;
9use clap::*;
10use opentelemetry_proto::tonic::{
11 collector::trace::v1::{ExportTraceServiceRequest, trace_service_client::TraceServiceClient},
12 common::v1::{AnyValue, KeyValue, any_value},
13};
14use prost::Message;
15use tonic::Request;
16
17#[derive(Parser, Debug)]
18#[command(author, version, about, long_about = None)]
19struct Args {
20 #[arg(long)]
21 trace_file: String,
22
23 #[arg(long, default_value = "http://localhost:4317")]
24 otlp_endpoint: String,
25
26 #[arg(long)]
27 service_name: Option<String>,
28
29 #[arg(long)]
30 dump_spans: bool,
31}
32
33#[tokio::main]
34async fn main() {
35 let args = Args::parse();
36 let file = std::fs::File::open(args.trace_file).unwrap();
37
38 let messages = decode_all_length_delimited::<_, ExportTraceServiceRequest>(file).unwrap();
39
40 if args.dump_spans {
41 for message in messages.iter() {
42 for span in &message.resource_spans {
43 println!("{span:#?}");
44 }
45 }
46 return;
47 }
48
49 let endpoint = format!("{}{}", args.otlp_endpoint, "/v1/traces");
50 let mut trace_exporter = TraceServiceClient::connect(endpoint).await.unwrap();
51
52 let service_name = args.service_name.unwrap_or_else(|| {
53 let timestamp = std::time::SystemTime::now()
54 .duration_since(std::time::UNIX_EPOCH)
55 .unwrap()
56 .as_secs();
57 format!("iota-node-{timestamp}")
58 });
59
60 println!("importing trace with service name {service_name:?}");
61
62 for mut message in messages {
63 let mut span_count = 0;
64
65 for resource_span in message.resource_spans.iter_mut() {
67 for scope_span in resource_span.scope_spans.iter() {
68 span_count += scope_span.spans.len();
69 }
70
71 if let Some(resource) = resource_span.resource.as_mut() {
72 let mut service_name_found = false;
73 for attr in resource.attributes.iter_mut() {
74 if attr.key == "service.name" {
75 service_name_found = true;
76 attr.value = Some(AnyValue {
77 value: Some(any_value::Value::StringValue(service_name.clone())),
78 });
79 }
80 }
81 if !service_name_found {
82 resource.attributes.push(KeyValue {
83 key: "service.name".to_string(),
84 value: Some(AnyValue {
85 value: Some(any_value::Value::StringValue(service_name.clone())),
86 }),
87 });
88 }
89 }
90 }
91
92 println!("sending {span_count} spans to otlp collector");
93 trace_exporter.export(Request::new(message)).await.unwrap();
94 }
95 println!("all spans imported");
96}
97
98fn decode_all_length_delimited<R, M>(mut reader: R) -> io::Result<Vec<M>>
99where
100 R: Read,
101 M: Message + Default,
102{
103 let mut messages = Vec::new();
104 let mut buffer = Vec::new();
105 reader.read_to_end(&mut buffer)?;
106 let mut cursor = Cursor::new(buffer);
107
108 while cursor.has_remaining() {
109 let len = cursor.try_get_u64_varint().unwrap() as usize;
110
111 if cursor.remaining() < len {
112 return Err(io::Error::new(
113 io::ErrorKind::UnexpectedEof,
114 "Incomplete message",
115 ));
116 }
117
118 let msg_bytes = cursor
120 .chunk()
121 .get(..len)
122 .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Buffer underflow"))?;
123
124 let msg = M::decode(msg_bytes).map_err(|e| {
125 io::Error::new(io::ErrorKind::InvalidData, format!("Decode error: {e}"))
126 })?;
127 messages.push(msg);
128
129 cursor.advance(len);
131 }
132
133 Ok(messages)
134}