1pub mod admin;
6pub mod config;
7pub mod consumer;
8pub mod handlers;
9pub mod histogram_relay;
10pub mod metrics;
11pub mod middleware;
12pub mod peers;
13pub mod prom_to_mimir;
14pub mod remote_write;
15
16#[macro_export]
21macro_rules! var {
22 ($key:expr) => {
23 match std::env::var($key) {
24 Ok(val) => val,
25 Err(_) => "".into(),
26 }
27 };
28 ($key:expr, $default:expr) => {
29 match std::env::var($key) {
30 Ok(val) => val.parse::<_>().unwrap(),
31 Err(_) => $default,
32 }
33 };
34}
35
36#[cfg(test)]
37mod tests {
38 use std::{net::TcpListener, time::Duration};
39
40 use axum::{Router, http::StatusCode, routing::post};
41 use iota_tls::{ClientCertVerifier, TlsAcceptor};
42 use prometheus::{Encoder, PROTOBUF_FORMAT};
43
44 use super::*;
45 use crate::{
46 admin::{CertKeyPair, Labels},
47 config::RemoteWriteConfig,
48 histogram_relay::HistogramRelay,
49 peers::IotaNodeProvider,
50 prom_to_mimir::tests::*,
51 };
52
53 async fn run_dummy_remote_write(listener: TcpListener) {
54 async fn handler() -> StatusCode {
56 StatusCode::OK
57 }
58
59 let app = Router::new().route("/v1/push", post(handler));
61
62 listener.set_nonblocking(true).unwrap();
64 let listener = tokio::net::TcpListener::from_std(listener).unwrap();
65 axum::serve(listener, app).await.unwrap();
66 }
67
68 async fn run_dummy_remote_write_very_slow(listener: TcpListener) {
69 async fn handler() -> StatusCode {
74 tokio::time::sleep(Duration::from_secs(60)).await; StatusCode::OK
78 }
79
80 let app = Router::new().route("/v1/push", post(handler));
82
83 listener.set_nonblocking(true).unwrap();
85 let listener = tokio::net::TcpListener::from_std(listener).unwrap();
86 axum::serve(listener, app).await.unwrap();
87 }
88
89 #[tokio::test]
96 async fn test_axum_acceptor() {
97 let CertKeyPair(client_priv_cert, client_pub_key) =
99 admin::generate_self_cert("iota".into());
100 let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
101
102 let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
104 let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
105 let dummy_remote_write_url = format!(
106 "http://localhost:{}/v1/push",
107 dummy_remote_write_address.port()
108 );
109
110 let _dummy_remote_write =
111 tokio::spawn(async move { run_dummy_remote_write(dummy_remote_write_listener).await });
112
113 let mut allower = IotaNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
115 let tls_config = ClientCertVerifier::new(
116 allower.clone(),
117 iota_tls::IOTA_VALIDATOR_SERVER_NAME.to_string(),
118 )
119 .rustls_server_config(
120 vec![server_priv_cert.rustls_certificate()],
121 server_priv_cert.rustls_private_key(),
122 )
123 .unwrap();
124
125 let client = admin::make_reqwest_client(
126 RemoteWriteConfig {
127 url: dummy_remote_write_url.to_owned(),
128 username: "bar".into(),
129 password: "foo".into(),
130 ..Default::default()
131 },
132 "dummy user agent",
133 );
134
135 let app = admin::app(
136 Labels {
137 network: "unittest-network".into(),
138 },
139 client,
140 HistogramRelay::new(),
141 Some(allower.clone()),
142 );
143
144 let listener = std::net::TcpListener::bind("localhost:0").unwrap();
145 let server_address = listener.local_addr().unwrap();
146 let server_url = format!(
147 "https://localhost:{}/publish/metrics",
148 server_address.port()
149 );
150
151 let acceptor = TlsAcceptor::new(tls_config);
152 let _server = tokio::spawn(async move {
153 admin::server(listener, app, Some(acceptor)).await.unwrap();
154 });
155
156 let client = reqwest::Client::builder()
158 .add_root_certificate(server_priv_cert.reqwest_certificate())
159 .identity(client_priv_cert.reqwest_identity())
160 .https_only(true)
161 .build()
162 .unwrap();
163
164 client.get(&server_url).send().await.unwrap_err();
166
167 allower.get_mut().write().unwrap().insert(
170 client_pub_key.to_owned(),
171 peers::IotaPeer {
172 name: "some-node".into(),
173 public_key: client_pub_key.to_owned(),
174 },
175 );
176
177 let mf = create_metric_family(
178 "foo_metric",
179 "some help this is",
180 None,
181 vec![create_metric_counter(
182 create_labels(vec![("some", "label")]),
183 create_counter(2046.0),
184 )],
185 );
186
187 let mut buf = vec![];
188 let encoder = prometheus::ProtobufEncoder::new();
189 encoder.encode(&[mf], &mut buf).unwrap();
190
191 let res = client
192 .post(&server_url)
193 .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
194 .body(buf)
195 .send()
196 .await
197 .expect("expected a successful post with a self-signed certificate");
198 let status = res.status();
199 let body = res.text().await.unwrap();
200 assert_eq!("created", body);
201 assert_eq!(status, reqwest::StatusCode::CREATED);
202 }
203
204 #[tokio::test]
206 async fn test_client_timeout() {
207 let CertKeyPair(client_priv_cert, client_pub_key) =
209 admin::generate_self_cert("iota".into());
210 let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
211
212 let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
214 let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
215 let dummy_remote_write_url = format!(
216 "http://localhost:{}/v1/push",
217 dummy_remote_write_address.port()
218 );
219
220 let _dummy_remote_write = tokio::spawn(async move {
221 run_dummy_remote_write_very_slow(dummy_remote_write_listener).await
222 });
223
224 let mut allower = IotaNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
226 let tls_config = ClientCertVerifier::new(
227 allower.clone(),
228 iota_tls::IOTA_VALIDATOR_SERVER_NAME.to_string(),
229 )
230 .rustls_server_config(
231 vec![server_priv_cert.rustls_certificate()],
232 server_priv_cert.rustls_private_key(),
233 )
234 .unwrap();
235
236 let client = admin::make_reqwest_client(
237 RemoteWriteConfig {
238 url: dummy_remote_write_url.to_owned(),
239 username: "bar".into(),
240 password: "foo".into(),
241 ..Default::default()
242 },
243 "dummy user agent",
244 );
245
246 std::env::set_var("NODE_CLIENT_TIMEOUT", "5");
251
252 let app = admin::app(
253 Labels {
254 network: "unittest-network".into(),
255 },
256 client,
257 HistogramRelay::new(),
258 Some(allower.clone()),
259 );
260
261 let listener = std::net::TcpListener::bind("localhost:0").unwrap();
262 let server_address = listener.local_addr().unwrap();
263 let server_url = format!(
264 "https://localhost:{}/publish/metrics",
265 server_address.port()
266 );
267
268 let acceptor = TlsAcceptor::new(tls_config);
269 let _server = tokio::spawn(async move {
270 admin::server(listener, app, Some(acceptor)).await.unwrap();
271 });
272
273 let client = reqwest::Client::builder()
275 .add_root_certificate(server_priv_cert.reqwest_certificate())
276 .identity(client_priv_cert.reqwest_identity())
277 .https_only(true)
278 .build()
279 .unwrap();
280
281 client.get(&server_url).send().await.unwrap_err();
283
284 allower.get_mut().write().unwrap().insert(
287 client_pub_key.to_owned(),
288 peers::IotaPeer {
289 name: "some-node".into(),
290 public_key: client_pub_key.to_owned(),
291 },
292 );
293
294 let mf = create_metric_family(
295 "foo_metric",
296 "some help this is",
297 None,
298 vec![create_metric_counter(
299 create_labels(vec![("some", "label")]),
300 create_counter(2046.0),
301 )],
302 );
303
304 let mut buf = vec![];
305 let encoder = prometheus::ProtobufEncoder::new();
306 encoder.encode(&[mf], &mut buf).unwrap();
307
308 let res = client
309 .post(&server_url)
310 .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
311 .body(buf)
312 .send()
313 .await
314 .expect("expected a successful post with a self-signed certificate");
315 let status = res.status();
316 assert_eq!(status, StatusCode::REQUEST_TIMEOUT);
317 std::env::remove_var("NODE_CLIENT_TIMEOUT");
319 }
320}