1use std::{fs, io::BufReader, net::SocketAddr, sync::Arc, time::Duration};
6
7use anyhow::{Error, Result};
8use axum::{Extension, Router, extract::DefaultBodyLimit, middleware, routing::post};
9use fastcrypto::{
10 ed25519::{Ed25519KeyPair, Ed25519PublicKey},
11 traits::{KeyPair, ToFromBytes},
12};
13use iota_tls::{
14 AllowAll, ClientCertVerifier, IOTA_VALIDATOR_SERVER_NAME, SelfSignedCertificate, TlsAcceptor,
15 rustls::ServerConfig,
16};
17use tokio::signal;
18use tower::ServiceBuilder;
19use tower_http::{
20 LatencyUnit,
21 timeout::TimeoutLayer,
22 trace::{DefaultOnFailure, DefaultOnResponse, TraceLayer},
23};
24use tracing::{Level, info};
25
26use crate::{
27 config::{DynamicPeerValidationConfig, RemoteWriteConfig, StaticPeerValidationConfig},
28 handlers::publish_metrics,
29 histogram_relay::HistogramRelay,
30 middleware::{expect_content_length, expect_iota_proxy_header, expect_valid_public_key},
31 peers::{IotaNodeProvider, IotaPeer},
32 var,
33};
34
35pub async fn shutdown_signal(h: axum_server::Handle) {
37 let ctrl_c = async {
38 signal::ctrl_c()
39 .await
40 .expect("failed to install Ctrl+C handler");
41 };
42
43 #[cfg(unix)]
44 let terminate = async {
45 signal::unix::signal(signal::unix::SignalKind::terminate())
46 .expect("failed to install signal handler")
47 .recv()
48 .await;
49 };
50
51 #[cfg(not(unix))]
52 let terminate = std::future::pending::<()>();
53
54 tokio::select! {
55 _ = ctrl_c => {},
56 _ = terminate => {},
57 }
58
59 let grace = 30;
60 info!(
61 "signal received, starting graceful shutdown, grace period {} seconds, if needed",
62 &grace
63 );
64 h.graceful_shutdown(Some(Duration::from_secs(grace)))
65}
66
67#[derive(Clone)]
71pub struct ReqwestClient {
72 pub client: reqwest::Client,
73 pub settings: RemoteWriteConfig,
74}
75
76pub fn make_reqwest_client(settings: RemoteWriteConfig, user_agent: &str) -> ReqwestClient {
77 ReqwestClient {
78 client: reqwest::Client::builder()
79 .user_agent(user_agent)
80 .pool_max_idle_per_host(settings.pool_max_idle_per_host)
81 .timeout(Duration::from_secs(var!("MIMIR_CLIENT_TIMEOUT", 30)))
82 .build()
83 .expect("cannot create reqwest client"),
84 settings,
85 }
86}
87
88#[derive(Clone)]
90pub struct Labels {
91 pub network: String,
92}
93
94pub fn app(
96 labels: Labels,
97 client: ReqwestClient,
98 relay: HistogramRelay,
99 allower: Option<IotaNodeProvider>,
100) -> Router {
101 let mut router = Router::new()
103 .route("/publish/metrics", post(publish_metrics))
104 .route_layer(DefaultBodyLimit::max(var!(
105 "MAX_BODY_SIZE",
106 1024 * 1024 * 5
107 )))
108 .route_layer(middleware::from_fn(expect_iota_proxy_header))
109 .route_layer(middleware::from_fn(expect_content_length));
110 if let Some(allower) = allower {
111 router = router
112 .route_layer(middleware::from_fn(expect_valid_public_key))
113 .layer(Extension(Arc::new(allower)));
114 }
115 router
116 .layer(TimeoutLayer::new(Duration::from_secs(var!(
120 "NODE_CLIENT_TIMEOUT",
121 20
122 ))))
123 .layer(Extension(relay))
124 .layer(Extension(labels))
125 .layer(Extension(client))
126 .layer(
127 ServiceBuilder::new().layer(
128 TraceLayer::new_for_http()
129 .on_response(
130 DefaultOnResponse::new()
131 .level(Level::INFO)
132 .latency_unit(LatencyUnit::Seconds),
133 )
134 .on_failure(
135 DefaultOnFailure::new()
136 .level(Level::ERROR)
137 .latency_unit(LatencyUnit::Seconds),
138 ),
139 ),
140 )
141}
142
143pub async fn server(
145 listener: std::net::TcpListener,
146 app: Router,
147 acceptor: Option<TlsAcceptor>,
148) -> std::io::Result<()> {
149 let handle = axum_server::Handle::new();
151 tokio::spawn(shutdown_signal(handle.clone()));
153
154 if let Some(verify_peers) = acceptor {
155 axum_server::Server::from_tcp(listener)
156 .acceptor(verify_peers)
157 .handle(handle)
158 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
159 .await
160 } else {
161 axum_server::Server::from_tcp(listener)
162 .handle(handle)
163 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
164 .await
165 }
166}
167
168pub struct CertKeyPair(pub SelfSignedCertificate, pub Ed25519PublicKey);
170
171pub fn generate_self_cert(hostname: String) -> CertKeyPair {
173 let mut rng = rand::thread_rng();
174 let keypair = Ed25519KeyPair::generate(&mut rng);
175 CertKeyPair(
176 SelfSignedCertificate::new(keypair.copy().private(), &hostname),
177 keypair.public().to_owned(),
178 )
179}
180
181fn load_certs(filename: &str) -> Vec<rustls::pki_types::CertificateDer<'static>> {
183 let certfile = fs::File::open(filename)
184 .unwrap_or_else(|e| panic!("cannot open certificate file: {}; {}", filename, e));
185 let mut reader = BufReader::new(certfile);
186 rustls_pemfile::certs(&mut reader)
187 .collect::<Result<Vec<_>, _>>()
188 .unwrap()
189}
190
191fn load_private_key(filename: &str) -> rustls::pki_types::PrivateKeyDer<'static> {
193 let keyfile = fs::File::open(filename)
194 .unwrap_or_else(|e| panic!("cannot open private key file {}; {}", filename, e));
195 let mut reader = BufReader::new(keyfile);
196
197 loop {
198 match rustls_pemfile::read_one(&mut reader).expect("cannot parse private key .pem file") {
199 Some(rustls_pemfile::Item::Pkcs1Key(key)) => return key.into(),
200 Some(rustls_pemfile::Item::Pkcs8Key(key)) => return key.into(),
201 Some(rustls_pemfile::Item::Sec1Key(key)) => return key.into(),
202 None => break,
203 _ => {}
204 }
205 }
206
207 panic!(
208 "no keys found in {:?} (encrypted keys not supported)",
209 filename
210 );
211}
212
213fn load_static_peers(
216 static_peers: Option<StaticPeerValidationConfig>,
217) -> Result<Vec<IotaPeer>, Error> {
218 let Some(static_peers) = static_peers else {
219 return Ok(vec![]);
220 };
221 let static_keys = static_peers
222 .pub_keys
223 .into_iter()
224 .map(|spk| {
225 let peer_id = hex::decode(spk.peer_id).unwrap();
226 let public_key = Ed25519PublicKey::from_bytes(peer_id.as_ref()).unwrap();
227 let s = IotaPeer {
228 name: spk.name.clone(),
229 public_key,
230 };
231 info!(
232 "loaded static peer: {} public key: {}",
233 &s.name, &s.public_key,
234 );
235 s
236 })
237 .collect();
238 Ok(static_keys)
239}
240
241pub fn create_server_cert_default_allow(
244 hostname: String,
245) -> Result<ServerConfig, iota_tls::rustls::Error> {
246 let CertKeyPair(server_certificate, _) = generate_self_cert(hostname);
247
248 ClientCertVerifier::new(AllowAll, IOTA_VALIDATOR_SERVER_NAME.to_string()).rustls_server_config(
249 vec![server_certificate.rustls_certificate()],
250 server_certificate.rustls_private_key(),
251 )
252}
253
254pub fn create_server_cert_enforce_peer(
257 dynamic_peers: DynamicPeerValidationConfig,
258 static_peers: Option<StaticPeerValidationConfig>,
259) -> Result<(ServerConfig, Option<IotaNodeProvider>), iota_tls::rustls::Error> {
260 let (Some(certificate_path), Some(private_key_path)) =
261 (dynamic_peers.certificate_file, dynamic_peers.private_key)
262 else {
263 return Err(iota_tls::rustls::Error::General(
264 "missing certs to initialize server".into(),
265 ));
266 };
267 let static_peers = load_static_peers(static_peers).map_err(|e| {
268 iota_tls::rustls::Error::General(format!("unable to load static pub keys: {}", e))
269 })?;
270 let allower = IotaNodeProvider::new(dynamic_peers.url, dynamic_peers.interval, static_peers);
271 allower.poll_peer_list();
272 let c = ClientCertVerifier::new(allower.clone(), IOTA_VALIDATOR_SERVER_NAME.to_string())
273 .rustls_server_config(
274 load_certs(&certificate_path),
275 load_private_key(&private_key_path),
276 )?;
277 Ok((c, Some(allower)))
278}