iota_proxy/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub mod admin;
6pub mod config;
7pub mod consumer;
8pub mod handlers;
9pub mod histogram_relay;
10pub mod metrics;
11pub mod middleware;
12pub mod peers;
13pub mod prom_to_mimir;
14pub mod remote_write;
15
16/// var extracts environment variables at runtime with a default fallback value
17/// if a default is not provided, the value is simply an empty string if not
18/// found This function will return the provided default if env::var cannot find
19/// the key or if the key is somehow malformed.
20#[macro_export]
21macro_rules! var {
22    ($key:expr) => {
23        match std::env::var($key) {
24            Ok(val) => val,
25            Err(_) => "".into(),
26        }
27    };
28    ($key:expr, $default:expr) => {
29        match std::env::var($key) {
30            Ok(val) => val.parse::<_>().unwrap(),
31            Err(_) => $default,
32        }
33    };
34}
35
36#[cfg(test)]
37mod tests {
38    use std::{net::TcpListener, time::Duration};
39
40    use axum::{Router, http::StatusCode, routing::post};
41    use iota_tls::{ClientCertVerifier, TlsAcceptor};
42    use prometheus::{Encoder, PROTOBUF_FORMAT};
43
44    use super::*;
45    use crate::{
46        admin::{CertKeyPair, Labels},
47        config::RemoteWriteConfig,
48        histogram_relay::HistogramRelay,
49        peers::IotaNodeProvider,
50        prom_to_mimir::tests::*,
51    };
52
53    async fn run_dummy_remote_write(listener: TcpListener) {
54        /// i accept everything, send me the trash
55        async fn handler() -> StatusCode {
56            StatusCode::OK
57        }
58
59        // build our application with a route
60        let app = Router::new().route("/v1/push", post(handler));
61
62        // run it
63        listener.set_nonblocking(true).unwrap();
64        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
65        axum::serve(listener, app).await.unwrap();
66    }
67
68    async fn run_dummy_remote_write_very_slow(listener: TcpListener) {
69        /// i accept everything, send me the trash, but i will sleep and never
70        /// return before a timeout this is for testing slow clients and
71        /// this is the easiest way to do so without adding a special
72        /// route in the server to do so
73        async fn handler() -> StatusCode {
74            // Simulate a route that hangs while waiting for a client to send data
75            // but the server itself doesn't delay its processing
76            tokio::time::sleep(Duration::from_secs(60)).await; // A very long sleep
77            StatusCode::OK
78        }
79
80        // build our application with a route
81        let app = Router::new().route("/v1/push", post(handler));
82
83        // run it
84        listener.set_nonblocking(true).unwrap();
85        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
86        axum::serve(listener, app).await.unwrap();
87    }
88
89    /// test_axum_acceptor is a basic e2e test that creates a mock remote_write
90    /// post endpoint and has a simple iota-node client that posts data to
91    /// the proxy using the protobuf format.  The server processes this data
92    /// and sends it to the mock remote_write which accepts everything.  Future
93    /// work is to make this more robust and expand the scope of coverage,
94    /// probably moving this test elsewhere and renaming it.
95    #[tokio::test]
96    async fn test_axum_acceptor() {
97        // generate self-signed certificates
98        let CertKeyPair(client_priv_cert, client_pub_key) =
99            admin::generate_self_cert("iota".into());
100        let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
101
102        // create a fake rpc server
103        let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
104        let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
105        let dummy_remote_write_url = format!(
106            "http://localhost:{}/v1/push",
107            dummy_remote_write_address.port()
108        );
109
110        let _dummy_remote_write =
111            tokio::spawn(async move { run_dummy_remote_write(dummy_remote_write_listener).await });
112
113        // init the tls config and allower
114        let mut allower = IotaNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
115        let tls_config = ClientCertVerifier::new(
116            allower.clone(),
117            iota_tls::IOTA_VALIDATOR_SERVER_NAME.to_string(),
118        )
119        .rustls_server_config(
120            vec![server_priv_cert.rustls_certificate()],
121            server_priv_cert.rustls_private_key(),
122        )
123        .unwrap();
124
125        let client = admin::make_reqwest_client(
126            RemoteWriteConfig {
127                url: dummy_remote_write_url.to_owned(),
128                username: "bar".into(),
129                password: "foo".into(),
130                ..Default::default()
131            },
132            "dummy user agent",
133        );
134
135        let app = admin::app(
136            Labels {
137                network: "unittest-network".into(),
138            },
139            client,
140            HistogramRelay::new(),
141            Some(allower.clone()),
142            None,
143        );
144
145        let listener = std::net::TcpListener::bind("localhost:0").unwrap();
146        let server_address = listener.local_addr().unwrap();
147        let server_url = format!(
148            "https://localhost:{}/publish/metrics",
149            server_address.port()
150        );
151
152        let acceptor = TlsAcceptor::new(tls_config);
153        let _server = tokio::spawn(async move {
154            admin::server(listener, app, Some(acceptor)).await.unwrap();
155        });
156
157        // build a client
158        let client = reqwest::Client::builder()
159            .add_root_certificate(server_priv_cert.reqwest_certificate())
160            .identity(client_priv_cert.reqwest_identity())
161            .https_only(true)
162            .build()
163            .unwrap();
164
165        // Client request is rejected because it isn't in the allowlist
166        client.get(&server_url).send().await.unwrap_err();
167
168        // Insert the client's public key into the allowlist and verify the request is
169        // successful
170        allower.get_mut().write().unwrap().insert(
171            client_pub_key.to_owned(),
172            peers::AllowedPeer {
173                name: "some-node".into(),
174                public_key: client_pub_key.to_owned(),
175            },
176        );
177
178        let mf = create_metric_family(
179            "foo_metric",
180            "some help this is",
181            None,
182            vec![create_metric_counter(
183                create_labels(vec![("some", "label")]),
184                create_counter(2046.0),
185            )],
186        );
187
188        let mut buf = vec![];
189        let encoder = prometheus::ProtobufEncoder::new();
190        encoder.encode(&[mf], &mut buf).unwrap();
191
192        let res = client
193            .post(&server_url)
194            .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
195            .body(buf)
196            .send()
197            .await
198            .expect("expected a successful post with a self-signed certificate");
199        let status = res.status();
200        let body = res.text().await.unwrap();
201        assert_eq!("created", body);
202        assert_eq!(status, reqwest::StatusCode::CREATED);
203    }
204
205    /// this is a long test to ensure we are timing out clients that are slow
206    #[tokio::test]
207    async fn test_client_timeout() {
208        // generate self-signed certificates
209        let CertKeyPair(client_priv_cert, client_pub_key) =
210            admin::generate_self_cert("iota".into());
211        let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
212
213        // create a fake rpc server
214        let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
215        let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
216        let dummy_remote_write_url = format!(
217            "http://localhost:{}/v1/push",
218            dummy_remote_write_address.port()
219        );
220
221        let _dummy_remote_write = tokio::spawn(async move {
222            run_dummy_remote_write_very_slow(dummy_remote_write_listener).await
223        });
224
225        // init the tls config and allower
226        let mut allower = IotaNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
227        let tls_config = ClientCertVerifier::new(
228            allower.clone(),
229            iota_tls::IOTA_VALIDATOR_SERVER_NAME.to_string(),
230        )
231        .rustls_server_config(
232            vec![server_priv_cert.rustls_certificate()],
233            server_priv_cert.rustls_private_key(),
234        )
235        .unwrap();
236
237        let client = admin::make_reqwest_client(
238            RemoteWriteConfig {
239                url: dummy_remote_write_url.to_owned(),
240                username: "bar".into(),
241                password: "foo".into(),
242                ..Default::default()
243            },
244            "dummy user agent",
245        );
246
247        let timeout_secs = Some(2u64);
248
249        let app = admin::app(
250            Labels {
251                network: "unittest-network".into(),
252            },
253            client,
254            HistogramRelay::new(),
255            Some(allower.clone()),
256            timeout_secs,
257        );
258
259        let listener = std::net::TcpListener::bind("localhost:0").unwrap();
260        let server_address = listener.local_addr().unwrap();
261        let server_url = format!(
262            "https://localhost:{}/publish/metrics",
263            server_address.port()
264        );
265
266        let acceptor = TlsAcceptor::new(tls_config);
267        let _server = tokio::spawn(async move {
268            admin::server(listener, app, Some(acceptor)).await.unwrap();
269        });
270
271        // build a client
272        let client = reqwest::Client::builder()
273            .add_root_certificate(server_priv_cert.reqwest_certificate())
274            .identity(client_priv_cert.reqwest_identity())
275            .https_only(true)
276            .build()
277            .unwrap();
278
279        // Client request is rejected because it isn't in the allowlist
280        client.get(&server_url).send().await.unwrap_err();
281
282        // Insert the client's public key into the allowlist and verify the request is
283        // successful
284        allower.get_mut().write().unwrap().insert(
285            client_pub_key.to_owned(),
286            peers::AllowedPeer {
287                name: "some-node".into(),
288                public_key: client_pub_key.to_owned(),
289            },
290        );
291
292        let mf = create_metric_family(
293            "foo_metric",
294            "some help this is",
295            None,
296            vec![create_metric_counter(
297                create_labels(vec![("some", "label")]),
298                create_counter(2046.0),
299            )],
300        );
301
302        let mut buf = vec![];
303        let encoder = prometheus::ProtobufEncoder::new();
304        encoder.encode(&[mf], &mut buf).unwrap();
305
306        let res = client
307            .post(&server_url)
308            .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
309            .body(buf)
310            .send()
311            .await
312            .expect("expected a successful post with a self-signed certificate");
313        let status = res.status();
314        assert_eq!(status, StatusCode::REQUEST_TIMEOUT);
315    }
316}