1pub mod admin;
6pub mod config;
7pub mod consumer;
8pub mod handlers;
9pub mod histogram_relay;
10pub mod metrics;
11pub mod middleware;
12pub mod peers;
13pub mod prom_to_mimir;
14pub mod remote_write;
15
16#[macro_export]
21macro_rules! var {
22 ($key:expr) => {
23 match std::env::var($key) {
24 Ok(val) => val,
25 Err(_) => "".into(),
26 }
27 };
28 ($key:expr, $default:expr) => {
29 match std::env::var($key) {
30 Ok(val) => val.parse::<_>().unwrap(),
31 Err(_) => $default,
32 }
33 };
34}
35
36#[cfg(test)]
37mod tests {
38 use std::{net::TcpListener, time::Duration};
39
40 use axum::{Router, http::StatusCode, routing::post};
41 use iota_tls::{ClientCertVerifier, TlsAcceptor};
42 use prometheus::{Encoder, PROTOBUF_FORMAT};
43
44 use super::*;
45 use crate::{
46 admin::{CertKeyPair, Labels},
47 config::RemoteWriteConfig,
48 histogram_relay::HistogramRelay,
49 peers::IotaNodeProvider,
50 prom_to_mimir::tests::*,
51 };
52
53 async fn run_dummy_remote_write(listener: TcpListener) {
54 async fn handler() -> StatusCode {
56 StatusCode::OK
57 }
58
59 let app = Router::new().route("/v1/push", post(handler));
61
62 listener.set_nonblocking(true).unwrap();
64 let listener = tokio::net::TcpListener::from_std(listener).unwrap();
65 axum::serve(listener, app).await.unwrap();
66 }
67
68 async fn run_dummy_remote_write_very_slow(listener: TcpListener) {
69 async fn handler() -> StatusCode {
74 tokio::time::sleep(Duration::from_secs(60)).await; StatusCode::OK
78 }
79
80 let app = Router::new().route("/v1/push", post(handler));
82
83 listener.set_nonblocking(true).unwrap();
85 let listener = tokio::net::TcpListener::from_std(listener).unwrap();
86 axum::serve(listener, app).await.unwrap();
87 }
88
89 #[tokio::test]
96 async fn test_axum_acceptor() {
97 let CertKeyPair(client_priv_cert, client_pub_key) =
99 admin::generate_self_cert("iota".into());
100 let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
101
102 let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
104 let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
105 let dummy_remote_write_url = format!(
106 "http://localhost:{}/v1/push",
107 dummy_remote_write_address.port()
108 );
109
110 let _dummy_remote_write =
111 tokio::spawn(async move { run_dummy_remote_write(dummy_remote_write_listener).await });
112
113 let mut allower = IotaNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
115 let tls_config = ClientCertVerifier::new(
116 allower.clone(),
117 iota_tls::IOTA_VALIDATOR_SERVER_NAME.to_string(),
118 )
119 .rustls_server_config(
120 vec![server_priv_cert.rustls_certificate()],
121 server_priv_cert.rustls_private_key(),
122 )
123 .unwrap();
124
125 let client = admin::make_reqwest_client(
126 RemoteWriteConfig {
127 url: dummy_remote_write_url.to_owned(),
128 username: "bar".into(),
129 password: "foo".into(),
130 ..Default::default()
131 },
132 "dummy user agent",
133 );
134
135 let app = admin::app(
136 Labels {
137 network: "unittest-network".into(),
138 },
139 client,
140 HistogramRelay::new(),
141 Some(allower.clone()),
142 None,
143 );
144
145 let listener = std::net::TcpListener::bind("localhost:0").unwrap();
146 let server_address = listener.local_addr().unwrap();
147 let server_url = format!(
148 "https://localhost:{}/publish/metrics",
149 server_address.port()
150 );
151
152 let acceptor = TlsAcceptor::new(tls_config);
153 let _server = tokio::spawn(async move {
154 admin::server(listener, app, Some(acceptor)).await.unwrap();
155 });
156
157 let client = reqwest::Client::builder()
159 .add_root_certificate(server_priv_cert.reqwest_certificate())
160 .identity(client_priv_cert.reqwest_identity())
161 .https_only(true)
162 .build()
163 .unwrap();
164
165 client.get(&server_url).send().await.unwrap_err();
167
168 allower.get_mut().write().unwrap().insert(
171 client_pub_key.to_owned(),
172 peers::AllowedPeer {
173 name: "some-node".into(),
174 public_key: client_pub_key.to_owned(),
175 },
176 );
177
178 let mf = create_metric_family(
179 "foo_metric",
180 "some help this is",
181 None,
182 vec![create_metric_counter(
183 create_labels(vec![("some", "label")]),
184 create_counter(2046.0),
185 )],
186 );
187
188 let mut buf = vec![];
189 let encoder = prometheus::ProtobufEncoder::new();
190 encoder.encode(&[mf], &mut buf).unwrap();
191
192 let res = client
193 .post(&server_url)
194 .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
195 .body(buf)
196 .send()
197 .await
198 .expect("expected a successful post with a self-signed certificate");
199 let status = res.status();
200 let body = res.text().await.unwrap();
201 assert_eq!("created", body);
202 assert_eq!(status, reqwest::StatusCode::CREATED);
203 }
204
205 #[tokio::test]
207 async fn test_client_timeout() {
208 let CertKeyPair(client_priv_cert, client_pub_key) =
210 admin::generate_self_cert("iota".into());
211 let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
212
213 let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
215 let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
216 let dummy_remote_write_url = format!(
217 "http://localhost:{}/v1/push",
218 dummy_remote_write_address.port()
219 );
220
221 let _dummy_remote_write = tokio::spawn(async move {
222 run_dummy_remote_write_very_slow(dummy_remote_write_listener).await
223 });
224
225 let mut allower = IotaNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
227 let tls_config = ClientCertVerifier::new(
228 allower.clone(),
229 iota_tls::IOTA_VALIDATOR_SERVER_NAME.to_string(),
230 )
231 .rustls_server_config(
232 vec![server_priv_cert.rustls_certificate()],
233 server_priv_cert.rustls_private_key(),
234 )
235 .unwrap();
236
237 let client = admin::make_reqwest_client(
238 RemoteWriteConfig {
239 url: dummy_remote_write_url.to_owned(),
240 username: "bar".into(),
241 password: "foo".into(),
242 ..Default::default()
243 },
244 "dummy user agent",
245 );
246
247 let timeout_secs = Some(2u64);
248
249 let app = admin::app(
250 Labels {
251 network: "unittest-network".into(),
252 },
253 client,
254 HistogramRelay::new(),
255 Some(allower.clone()),
256 timeout_secs,
257 );
258
259 let listener = std::net::TcpListener::bind("localhost:0").unwrap();
260 let server_address = listener.local_addr().unwrap();
261 let server_url = format!(
262 "https://localhost:{}/publish/metrics",
263 server_address.port()
264 );
265
266 let acceptor = TlsAcceptor::new(tls_config);
267 let _server = tokio::spawn(async move {
268 admin::server(listener, app, Some(acceptor)).await.unwrap();
269 });
270
271 let client = reqwest::Client::builder()
273 .add_root_certificate(server_priv_cert.reqwest_certificate())
274 .identity(client_priv_cert.reqwest_identity())
275 .https_only(true)
276 .build()
277 .unwrap();
278
279 client.get(&server_url).send().await.unwrap_err();
281
282 allower.get_mut().write().unwrap().insert(
285 client_pub_key.to_owned(),
286 peers::AllowedPeer {
287 name: "some-node".into(),
288 public_key: client_pub_key.to_owned(),
289 },
290 );
291
292 let mf = create_metric_family(
293 "foo_metric",
294 "some help this is",
295 None,
296 vec![create_metric_counter(
297 create_labels(vec![("some", "label")]),
298 create_counter(2046.0),
299 )],
300 );
301
302 let mut buf = vec![];
303 let encoder = prometheus::ProtobufEncoder::new();
304 encoder.encode(&[mf], &mut buf).unwrap();
305
306 let res = client
307 .post(&server_url)
308 .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
309 .body(buf)
310 .send()
311 .await
312 .expect("expected a successful post with a self-signed certificate");
313 let status = res.status();
314 assert_eq!(status, StatusCode::REQUEST_TIMEOUT);
315 }
316}