iota_proxy/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub mod admin;
6pub mod config;
7pub mod consumer;
8pub mod handlers;
9pub mod histogram_relay;
10pub mod metrics;
11pub mod middleware;
12pub mod peers;
13pub mod prom_to_mimir;
14pub mod remote_write;
15
16/// var extracts environment variables at runtime with a default fallback value
17/// if a default is not provided, the value is simply an empty string if not
18/// found This function will return the provided default if env::var cannot find
19/// the key or if the key is somehow malformed.
20#[macro_export]
21macro_rules! var {
22    ($key:expr) => {
23        match std::env::var($key) {
24            Ok(val) => val,
25            Err(_) => "".into(),
26        }
27    };
28    ($key:expr, $default:expr) => {
29        match std::env::var($key) {
30            Ok(val) => val.parse::<_>().unwrap(),
31            Err(_) => $default,
32        }
33    };
34}
35
36#[cfg(test)]
37mod tests {
38    use std::{net::TcpListener, time::Duration};
39
40    use axum::{Router, http::StatusCode, routing::post};
41    use iota_tls::{ClientCertVerifier, TlsAcceptor};
42    use prometheus::{Encoder, PROTOBUF_FORMAT};
43
44    use super::*;
45    use crate::{
46        admin::{CertKeyPair, Labels},
47        config::RemoteWriteConfig,
48        histogram_relay::HistogramRelay,
49        peers::IotaNodeProvider,
50        prom_to_mimir::tests::*,
51    };
52
53    async fn run_dummy_remote_write(listener: TcpListener) {
54        /// i accept everything, send me the trash
55        async fn handler() -> StatusCode {
56            StatusCode::OK
57        }
58
59        // build our application with a route
60        let app = Router::new().route("/v1/push", post(handler));
61
62        // run it
63        listener.set_nonblocking(true).unwrap();
64        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
65        axum::serve(listener, app).await.unwrap();
66    }
67
68    async fn run_dummy_remote_write_very_slow(listener: TcpListener) {
69        /// i accept everything, send me the trash, but i will sleep and never
70        /// return before a timeout this is for testing slow clients and
71        /// this is the easiest way to do so without adding a special
72        /// route in the server to do so
73        async fn handler() -> StatusCode {
74            // Simulate a route that hangs while waiting for a client to send data
75            // but the server itself doesn't delay its processing
76            tokio::time::sleep(Duration::from_secs(60)).await; // A very long sleep
77            StatusCode::OK
78        }
79
80        // build our application with a route
81        let app = Router::new().route("/v1/push", post(handler));
82
83        // run it
84        listener.set_nonblocking(true).unwrap();
85        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
86        axum::serve(listener, app).await.unwrap();
87    }
88
89    /// test_axum_acceptor is a basic e2e test that creates a mock remote_write
90    /// post endpoint and has a simple iota-node client that posts data to
91    /// the proxy using the protobuf format.  The server processes this data
92    /// and sends it to the mock remote_write which accepts everything.  Future
93    /// work is to make this more robust and expand the scope of coverage,
94    /// probably moving this test elsewhere and renaming it.
95    #[tokio::test]
96    async fn test_axum_acceptor() {
97        // generate self-signed certificates
98        let CertKeyPair(client_priv_cert, client_pub_key) =
99            admin::generate_self_cert("iota".into());
100        let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
101
102        // create a fake rpc server
103        let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
104        let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
105        let dummy_remote_write_url = format!(
106            "http://localhost:{}/v1/push",
107            dummy_remote_write_address.port()
108        );
109
110        let _dummy_remote_write =
111            tokio::spawn(async move { run_dummy_remote_write(dummy_remote_write_listener).await });
112
113        // init the tls config and allower
114        let mut allower = IotaNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
115        let tls_config = ClientCertVerifier::new(
116            allower.clone(),
117            iota_tls::IOTA_VALIDATOR_SERVER_NAME.to_string(),
118        )
119        .rustls_server_config(
120            vec![server_priv_cert.rustls_certificate()],
121            server_priv_cert.rustls_private_key(),
122        )
123        .unwrap();
124
125        let client = admin::make_reqwest_client(
126            RemoteWriteConfig {
127                url: dummy_remote_write_url.to_owned(),
128                username: "bar".into(),
129                password: "foo".into(),
130                ..Default::default()
131            },
132            "dummy user agent",
133        );
134
135        let app = admin::app(
136            Labels {
137                network: "unittest-network".into(),
138            },
139            client,
140            HistogramRelay::new(),
141            Some(allower.clone()),
142        );
143
144        let listener = std::net::TcpListener::bind("localhost:0").unwrap();
145        let server_address = listener.local_addr().unwrap();
146        let server_url = format!(
147            "https://localhost:{}/publish/metrics",
148            server_address.port()
149        );
150
151        let acceptor = TlsAcceptor::new(tls_config);
152        let _server = tokio::spawn(async move {
153            admin::server(listener, app, Some(acceptor)).await.unwrap();
154        });
155
156        // build a client
157        let client = reqwest::Client::builder()
158            .add_root_certificate(server_priv_cert.reqwest_certificate())
159            .identity(client_priv_cert.reqwest_identity())
160            .https_only(true)
161            .build()
162            .unwrap();
163
164        // Client request is rejected because it isn't in the allowlist
165        client.get(&server_url).send().await.unwrap_err();
166
167        // Insert the client's public key into the allowlist and verify the request is
168        // successful
169        allower.get_mut().write().unwrap().insert(
170            client_pub_key.to_owned(),
171            peers::IotaPeer {
172                name: "some-node".into(),
173                public_key: client_pub_key.to_owned(),
174            },
175        );
176
177        let mf = create_metric_family(
178            "foo_metric",
179            "some help this is",
180            None,
181            vec![create_metric_counter(
182                create_labels(vec![("some", "label")]),
183                create_counter(2046.0),
184            )],
185        );
186
187        let mut buf = vec![];
188        let encoder = prometheus::ProtobufEncoder::new();
189        encoder.encode(&[mf], &mut buf).unwrap();
190
191        let res = client
192            .post(&server_url)
193            .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
194            .body(buf)
195            .send()
196            .await
197            .expect("expected a successful post with a self-signed certificate");
198        let status = res.status();
199        let body = res.text().await.unwrap();
200        assert_eq!("created", body);
201        assert_eq!(status, reqwest::StatusCode::CREATED);
202    }
203
204    /// this is a long test to ensure we are timing out clients that are slow
205    #[tokio::test]
206    async fn test_client_timeout() {
207        // generate self-signed certificates
208        let CertKeyPair(client_priv_cert, client_pub_key) =
209            admin::generate_self_cert("iota".into());
210        let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
211
212        // create a fake rpc server
213        let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
214        let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
215        let dummy_remote_write_url = format!(
216            "http://localhost:{}/v1/push",
217            dummy_remote_write_address.port()
218        );
219
220        let _dummy_remote_write = tokio::spawn(async move {
221            run_dummy_remote_write_very_slow(dummy_remote_write_listener).await
222        });
223
224        // init the tls config and allower
225        let mut allower = IotaNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
226        let tls_config = ClientCertVerifier::new(
227            allower.clone(),
228            iota_tls::IOTA_VALIDATOR_SERVER_NAME.to_string(),
229        )
230        .rustls_server_config(
231            vec![server_priv_cert.rustls_certificate()],
232            server_priv_cert.rustls_private_key(),
233        )
234        .unwrap();
235
236        let client = admin::make_reqwest_client(
237            RemoteWriteConfig {
238                url: dummy_remote_write_url.to_owned(),
239                username: "bar".into(),
240                password: "foo".into(),
241                ..Default::default()
242            },
243            "dummy user agent",
244        );
245
246        // this will affect other tests if they are run in parallel, but we only have
247        // two tests, so it shouldn't be an issue (yet) even still, the other
248        // tests complete very fast so those tests would also need to slow down by
249        // orders and orders to be bothered by this env var
250        std::env::set_var("NODE_CLIENT_TIMEOUT", "5");
251
252        let app = admin::app(
253            Labels {
254                network: "unittest-network".into(),
255            },
256            client,
257            HistogramRelay::new(),
258            Some(allower.clone()),
259        );
260
261        let listener = std::net::TcpListener::bind("localhost:0").unwrap();
262        let server_address = listener.local_addr().unwrap();
263        let server_url = format!(
264            "https://localhost:{}/publish/metrics",
265            server_address.port()
266        );
267
268        let acceptor = TlsAcceptor::new(tls_config);
269        let _server = tokio::spawn(async move {
270            admin::server(listener, app, Some(acceptor)).await.unwrap();
271        });
272
273        // build a client
274        let client = reqwest::Client::builder()
275            .add_root_certificate(server_priv_cert.reqwest_certificate())
276            .identity(client_priv_cert.reqwest_identity())
277            .https_only(true)
278            .build()
279            .unwrap();
280
281        // Client request is rejected because it isn't in the allowlist
282        client.get(&server_url).send().await.unwrap_err();
283
284        // Insert the client's public key into the allowlist and verify the request is
285        // successful
286        allower.get_mut().write().unwrap().insert(
287            client_pub_key.to_owned(),
288            peers::IotaPeer {
289                name: "some-node".into(),
290                public_key: client_pub_key.to_owned(),
291            },
292        );
293
294        let mf = create_metric_family(
295            "foo_metric",
296            "some help this is",
297            None,
298            vec![create_metric_counter(
299                create_labels(vec![("some", "label")]),
300                create_counter(2046.0),
301            )],
302        );
303
304        let mut buf = vec![];
305        let encoder = prometheus::ProtobufEncoder::new();
306        encoder.encode(&[mf], &mut buf).unwrap();
307
308        let res = client
309            .post(&server_url)
310            .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
311            .body(buf)
312            .send()
313            .await
314            .expect("expected a successful post with a self-signed certificate");
315        let status = res.status();
316        assert_eq!(status, StatusCode::REQUEST_TIMEOUT);
317        // Clean up the environment variable
318        std::env::remove_var("NODE_CLIENT_TIMEOUT");
319    }
320}