iota_proxy/
admin.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{fs, io::BufReader, net::SocketAddr, sync::Arc, time::Duration};
6
7use anyhow::{Error, Result};
8use axum::{Extension, Router, extract::DefaultBodyLimit, middleware, routing::post};
9use fastcrypto::{
10    ed25519::{Ed25519KeyPair, Ed25519PublicKey},
11    traits::{KeyPair, ToFromBytes},
12};
13use iota_tls::{
14    AllowAll, ClientCertVerifier, IOTA_VALIDATOR_SERVER_NAME, SelfSignedCertificate, TlsAcceptor,
15    rustls::ServerConfig,
16};
17use tokio::signal;
18use tower::ServiceBuilder;
19use tower_http::{
20    LatencyUnit,
21    timeout::TimeoutLayer,
22    trace::{DefaultOnFailure, DefaultOnResponse, TraceLayer},
23};
24use tracing::{Level, info};
25
26use crate::{
27    config::{DynamicPeerValidationConfig, RemoteWriteConfig, StaticPeerValidationConfig},
28    handlers::publish_metrics,
29    histogram_relay::HistogramRelay,
30    middleware::{expect_content_length, expect_iota_proxy_header, expect_valid_public_key},
31    peers::{IotaNodeProvider, IotaPeer},
32    var,
33};
34
35/// Configure our graceful shutdown scenarios
36pub async fn shutdown_signal(h: axum_server::Handle) {
37    let ctrl_c = async {
38        signal::ctrl_c()
39            .await
40            .expect("failed to install Ctrl+C handler");
41    };
42
43    #[cfg(unix)]
44    let terminate = async {
45        signal::unix::signal(signal::unix::SignalKind::terminate())
46            .expect("failed to install signal handler")
47            .recv()
48            .await;
49    };
50
51    #[cfg(not(unix))]
52    let terminate = std::future::pending::<()>();
53
54    tokio::select! {
55        _ = ctrl_c => {},
56        _ = terminate => {},
57    }
58
59    let grace = 30;
60    info!(
61        "signal received, starting graceful shutdown, grace period {} seconds, if needed",
62        &grace
63    );
64    h.graceful_shutdown(Some(Duration::from_secs(grace)))
65}
66
67/// Reqwest client holds the global client for remote_push api calls
68/// it also holds the username and password.  The client has an underlying
69/// connection pool.  See reqwest documentation for details
70#[derive(Clone)]
71pub struct ReqwestClient {
72    pub client: reqwest::Client,
73    pub settings: RemoteWriteConfig,
74}
75
76pub fn make_reqwest_client(settings: RemoteWriteConfig, user_agent: &str) -> ReqwestClient {
77    ReqwestClient {
78        client: reqwest::Client::builder()
79            .user_agent(user_agent)
80            .pool_max_idle_per_host(settings.pool_max_idle_per_host)
81            .timeout(Duration::from_secs(var!("MIMIR_CLIENT_TIMEOUT", 30)))
82            .build()
83            .expect("cannot create reqwest client"),
84        settings,
85    }
86}
87
88// Labels are adhoc labels we will inject per our config
89#[derive(Clone)]
90pub struct Labels {
91    pub network: String,
92}
93
94/// App will configure our routes. This fn is also used to instrument our tests
95pub fn app(
96    labels: Labels,
97    client: ReqwestClient,
98    relay: HistogramRelay,
99    allower: Option<IotaNodeProvider>,
100) -> Router {
101    // build our application with a route and our sender mpsc
102    let mut router = Router::new()
103        .route("/publish/metrics", post(publish_metrics))
104        .route_layer(DefaultBodyLimit::max(var!(
105            "MAX_BODY_SIZE",
106            1024 * 1024 * 5
107        )))
108        .route_layer(middleware::from_fn(expect_iota_proxy_header))
109        .route_layer(middleware::from_fn(expect_content_length));
110    if let Some(allower) = allower {
111        router = router
112            .route_layer(middleware::from_fn(expect_valid_public_key))
113            .layer(Extension(Arc::new(allower)));
114    }
115    router
116        // Enforce on all routes.
117        // If the request does not complete within the specified timeout it will be aborted
118        // and a 408 Request Timeout response will be sent.
119        .layer(TimeoutLayer::new(Duration::from_secs(var!(
120            "NODE_CLIENT_TIMEOUT",
121            20
122        ))))
123        .layer(Extension(relay))
124        .layer(Extension(labels))
125        .layer(Extension(client))
126        .layer(
127            ServiceBuilder::new().layer(
128                TraceLayer::new_for_http()
129                    .on_response(
130                        DefaultOnResponse::new()
131                            .level(Level::INFO)
132                            .latency_unit(LatencyUnit::Seconds),
133                    )
134                    .on_failure(
135                        DefaultOnFailure::new()
136                            .level(Level::ERROR)
137                            .latency_unit(LatencyUnit::Seconds),
138                    ),
139            ),
140        )
141}
142
143/// Server creates our http/https server
144pub async fn server(
145    listener: std::net::TcpListener,
146    app: Router,
147    acceptor: Option<TlsAcceptor>,
148) -> std::io::Result<()> {
149    // setup our graceful shutdown
150    let handle = axum_server::Handle::new();
151    // Spawn a task to gracefully shutdown server.
152    tokio::spawn(shutdown_signal(handle.clone()));
153
154    if let Some(verify_peers) = acceptor {
155        axum_server::Server::from_tcp(listener)
156            .acceptor(verify_peers)
157            .handle(handle)
158            .serve(app.into_make_service_with_connect_info::<SocketAddr>())
159            .await
160    } else {
161        axum_server::Server::from_tcp(listener)
162            .handle(handle)
163            .serve(app.into_make_service_with_connect_info::<SocketAddr>())
164            .await
165    }
166}
167
168/// CertKeyPair wraps a self signed certificate and the corresponding public key
169pub struct CertKeyPair(pub SelfSignedCertificate, pub Ed25519PublicKey);
170
171/// Generate server certs for use with peer verification
172pub fn generate_self_cert(hostname: String) -> CertKeyPair {
173    let mut rng = rand::thread_rng();
174    let keypair = Ed25519KeyPair::generate(&mut rng);
175    CertKeyPair(
176        SelfSignedCertificate::new(keypair.copy().private(), &hostname),
177        keypair.public().to_owned(),
178    )
179}
180
181/// Load a certificate for use by the listening service
182fn load_certs(filename: &str) -> Vec<rustls::pki_types::CertificateDer<'static>> {
183    let certfile = fs::File::open(filename)
184        .unwrap_or_else(|e| panic!("cannot open certificate file: {}; {}", filename, e));
185    let mut reader = BufReader::new(certfile);
186    rustls_pemfile::certs(&mut reader)
187        .collect::<Result<Vec<_>, _>>()
188        .unwrap()
189}
190
191/// Load a private key
192fn load_private_key(filename: &str) -> rustls::pki_types::PrivateKeyDer<'static> {
193    let keyfile = fs::File::open(filename)
194        .unwrap_or_else(|e| panic!("cannot open private key file {}; {}", filename, e));
195    let mut reader = BufReader::new(keyfile);
196
197    loop {
198        match rustls_pemfile::read_one(&mut reader).expect("cannot parse private key .pem file") {
199            Some(rustls_pemfile::Item::Pkcs1Key(key)) => return key.into(),
200            Some(rustls_pemfile::Item::Pkcs8Key(key)) => return key.into(),
201            Some(rustls_pemfile::Item::Sec1Key(key)) => return key.into(),
202            None => break,
203            _ => {}
204        }
205    }
206
207    panic!(
208        "no keys found in {:?} (encrypted keys not supported)",
209        filename
210    );
211}
212
213/// load the static keys we'll use to allow external non-validator nodes to push
214/// metrics
215fn load_static_peers(
216    static_peers: Option<StaticPeerValidationConfig>,
217) -> Result<Vec<IotaPeer>, Error> {
218    let Some(static_peers) = static_peers else {
219        return Ok(vec![]);
220    };
221    let static_keys = static_peers
222        .pub_keys
223        .into_iter()
224        .map(|spk| {
225            let peer_id = hex::decode(spk.peer_id).unwrap();
226            let public_key = Ed25519PublicKey::from_bytes(peer_id.as_ref()).unwrap();
227            let s = IotaPeer {
228                name: spk.name.clone(),
229                public_key,
230            };
231            info!(
232                "loaded static peer: {} public key: {}",
233                &s.name, &s.public_key,
234            );
235            s
236        })
237        .collect();
238    Ok(static_keys)
239}
240
241/// Default allow mode for server, we don't verify clients, everything is
242/// accepted
243pub fn create_server_cert_default_allow(
244    hostname: String,
245) -> Result<ServerConfig, iota_tls::rustls::Error> {
246    let CertKeyPair(server_certificate, _) = generate_self_cert(hostname);
247
248    ClientCertVerifier::new(AllowAll, IOTA_VALIDATOR_SERVER_NAME.to_string()).rustls_server_config(
249        vec![server_certificate.rustls_certificate()],
250        server_certificate.rustls_private_key(),
251    )
252}
253
254/// Verify clients against iota blockchain, clients that are not found in
255/// iota_getValidators will be rejected
256pub fn create_server_cert_enforce_peer(
257    dynamic_peers: DynamicPeerValidationConfig,
258    static_peers: Option<StaticPeerValidationConfig>,
259) -> Result<(ServerConfig, Option<IotaNodeProvider>), iota_tls::rustls::Error> {
260    let (Some(certificate_path), Some(private_key_path)) =
261        (dynamic_peers.certificate_file, dynamic_peers.private_key)
262    else {
263        return Err(iota_tls::rustls::Error::General(
264            "missing certs to initialize server".into(),
265        ));
266    };
267    let static_peers = load_static_peers(static_peers).map_err(|e| {
268        iota_tls::rustls::Error::General(format!("unable to load static pub keys: {}", e))
269    })?;
270    let allower = IotaNodeProvider::new(dynamic_peers.url, dynamic_peers.interval, static_peers);
271    allower.poll_peer_list();
272    let c = ClientCertVerifier::new(allower.clone(), IOTA_VALIDATOR_SERVER_NAME.to_string())
273        .rustls_server_config(
274            load_certs(&certificate_path),
275            load_private_key(&private_key_path),
276        )?;
277    Ok((c, Some(allower)))
278}