import_trace/
import-trace.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::io::{self, Cursor, Read};
6
7use bytes::Buf;
8use bytes_varint::VarIntSupport;
9use clap::*;
10use opentelemetry_proto::tonic::{
11    collector::trace::v1::{ExportTraceServiceRequest, trace_service_client::TraceServiceClient},
12    common::v1::{AnyValue, any_value},
13};
14use prost::Message;
15use tonic::Request;
16
17#[derive(Parser, Debug)]
18#[command(author, version, about, long_about = None)]
19struct Args {
20    #[arg(long)]
21    trace_file: String,
22
23    #[arg(long, default_value = "http://localhost:4317")]
24    otlp_endpoint: String,
25
26    #[arg(long)]
27    service_name: Option<String>,
28
29    #[arg(long)]
30    dump_spans: bool,
31}
32
33#[tokio::main]
34async fn main() {
35    let args = Args::parse();
36    let file = std::fs::File::open(args.trace_file).unwrap();
37
38    let messages = decode_all_length_delimited::<_, ExportTraceServiceRequest>(file).unwrap();
39
40    if args.dump_spans {
41        for message in messages.iter() {
42            for span in &message.resource_spans {
43                println!("{:?}", span);
44            }
45        }
46        return;
47    }
48
49    let endpoint = format!("{}{}", args.otlp_endpoint, "/v1/traces");
50    let mut trace_exporter = TraceServiceClient::connect(endpoint).await.unwrap();
51
52    let service_name = args.service_name.unwrap_or_else(|| {
53        let timestamp = std::time::SystemTime::now()
54            .duration_since(std::time::UNIX_EPOCH)
55            .unwrap()
56            .as_secs();
57        format!("iota-node-{}", timestamp)
58    });
59
60    println!("importing trace with service name {:?}", service_name);
61
62    for mut message in messages {
63        println!(
64            "sending {} spans to otlp collector",
65            message.resource_spans.len()
66        );
67
68        // Rewrite the service name to separate the imported trace from other traces
69        for resource_span in message.resource_spans.iter_mut() {
70            if let Some(resource) = resource_span.resource.as_mut() {
71                for attr in resource.attributes.iter_mut() {
72                    if attr.key == "service.name" {
73                        attr.value = Some(AnyValue {
74                            value: Some(any_value::Value::StringValue(service_name.clone())),
75                        });
76                    }
77                }
78            }
79        }
80
81        trace_exporter.export(Request::new(message)).await.unwrap();
82    }
83    println!("all spans imported");
84}
85
86fn decode_all_length_delimited<R, M>(mut reader: R) -> io::Result<Vec<M>>
87where
88    R: Read,
89    M: Message + Default,
90{
91    let mut messages = Vec::new();
92    let mut buffer = Vec::new();
93    reader.read_to_end(&mut buffer)?;
94    let mut cursor = Cursor::new(buffer);
95
96    while cursor.has_remaining() {
97        let len = cursor.try_get_u64_varint().unwrap() as usize;
98
99        if cursor.remaining() < len {
100            return Err(io::Error::new(
101                io::ErrorKind::UnexpectedEof,
102                "Incomplete message",
103            ));
104        }
105
106        // Create a slice for just this message
107        let msg_bytes = cursor
108            .chunk()
109            .get(..len)
110            .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Buffer underflow"))?;
111
112        let msg = M::decode(msg_bytes).map_err(|e| {
113            io::Error::new(io::ErrorKind::InvalidData, format!("Decode error: {}", e))
114        })?;
115        messages.push(msg);
116
117        // Advance the cursor
118        cursor.advance(len);
119    }
120
121    Ok(messages)
122}