1use std::{
6 borrow::Cow,
7 net::{IpAddr, SocketAddr},
8 path::PathBuf,
9 sync::Arc,
10 time::Duration,
11};
12
13use axum::{
14 BoxError, Extension, Json, Router,
15 error_handling::HandleErrorLayer,
16 extract::Path,
17 http::StatusCode,
18 response::IntoResponse,
19 routing::{get, post},
20};
21use http::Method;
22use iota_config::IOTA_CLIENT_CONFIG;
23use iota_metrics::spawn_monitored_task;
24use iota_sdk::wallet_context::WalletContext;
25use prometheus::Registry;
26use tower::{ServiceBuilder, limit::RateLimitLayer};
27use tower_http::cors::{Any, CorsLayer};
28use tracing::{info, warn};
29use uuid::Uuid;
30
31use crate::{
32 AppState, BatchFaucetResponse, BatchStatusFaucetResponse, FaucetConfig, FaucetError,
33 FaucetRequest, FaucetResponse, RequestMetricsLayer, faucet::Faucet,
34};
35
36pub async fn start_faucet(
37 app_state: Arc<AppState>,
38 concurrency_limit: usize,
39 prometheus_registry: &Registry,
40) -> Result<(), anyhow::Error> {
41 let cors = CorsLayer::new()
43 .allow_methods(vec![Method::GET, Method::POST])
44 .allow_headers(Any)
45 .allow_origin(Any);
46
47 let FaucetConfig {
48 port,
49 host_ip,
50 request_buffer_size,
51 max_request_per_second,
52 wal_retry_interval,
53 ..
54 } = app_state.config;
55
56 let app = Router::new()
57 .route("/", get(health))
58 .route("/gas", post(request_gas))
59 .route("/v1/gas", post(batch_request_gas))
60 .route("/v1/status/{task_id}", get(request_status))
61 .layer(
62 ServiceBuilder::new()
63 .layer(HandleErrorLayer::new(handle_error))
64 .layer(RequestMetricsLayer::new(prometheus_registry))
65 .layer(cors)
66 .load_shed()
67 .buffer(request_buffer_size)
68 .layer(RateLimitLayer::new(
69 max_request_per_second,
70 Duration::from_secs(1),
71 ))
72 .concurrency_limit(concurrency_limit)
73 .layer(Extension(app_state.clone()))
74 .into_inner(),
75 );
76
77 spawn_monitored_task!(async move {
78 info!("Starting task to clear WAL.");
79 loop {
80 tokio::time::sleep(Duration::from_secs(wal_retry_interval)).await;
83 app_state.faucet.retry_wal_coins().await.unwrap();
84 }
85 });
86
87 let addr = SocketAddr::new(IpAddr::V4(host_ip), port);
88 info!("listening on {addr}");
89
90 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
91 axum::serve(listener, app).await?;
92 Ok(())
93}
94
95async fn health() -> &'static str {
97 "OK"
98}
99
100async fn batch_request_gas(
102 Extension(state): Extension<Arc<AppState>>,
103 Json(payload): Json<FaucetRequest>,
104) -> impl IntoResponse {
105 let id = Uuid::new_v4();
106 info!(uuid = ?id, "Got new gas request.");
108
109 let FaucetRequest::FixedAmountRequest(request) = payload else {
110 return (
111 StatusCode::BAD_REQUEST,
112 Json(BatchFaucetResponse::from(FaucetError::Internal(
113 "Input Error.".to_string(),
114 ))),
115 );
116 };
117
118 if state.config.batch_enabled {
119 let result = spawn_monitored_task!(async move {
120 state
121 .faucet
122 .batch_send(
123 id,
124 request.recipient,
125 &vec![state.config.amount; state.config.num_coins],
126 )
127 .await
128 })
129 .await
130 .unwrap();
131
132 match result {
133 Ok(v) => {
134 info!(uuid =?id, "Request is successfully served");
135 (StatusCode::ACCEPTED, Json(BatchFaucetResponse::from(v)))
136 }
137 Err(v) => {
138 warn!(uuid =?id, "failed to request gas: {v:?}");
139 (
140 StatusCode::INTERNAL_SERVER_ERROR,
141 Json(BatchFaucetResponse::from(v)),
142 )
143 }
144 }
145 } else {
146 info!(uuid = ?id, "Falling back to v1 implementation");
149 let result = spawn_monitored_task!(async move {
150 state
151 .faucet
152 .send(
153 id,
154 request.recipient,
155 &vec![state.config.amount; state.config.num_coins],
156 )
157 .await
158 })
159 .await
160 .unwrap();
161
162 match result {
163 Ok(_) => {
164 info!(uuid =?id, "Request is successfully served");
165 (StatusCode::ACCEPTED, Json(BatchFaucetResponse::from(id)))
166 }
167 Err(v) => {
168 warn!(uuid =?id, "failed to request gas: {v:?}");
169 (
170 StatusCode::INTERNAL_SERVER_ERROR,
171 Json(BatchFaucetResponse::from(v)),
172 )
173 }
174 }
175 }
176}
177
178async fn request_status(
180 Extension(state): Extension<Arc<AppState>>,
181 Path(id): Path<String>,
182) -> impl IntoResponse {
183 match Uuid::parse_str(&id) {
184 Ok(task_id) => {
185 let result = state.faucet.get_batch_send_status(task_id).await;
186 match result {
187 Ok(v) => (
188 StatusCode::CREATED,
189 Json(BatchStatusFaucetResponse::from(v)),
190 ),
191 Err(v) => (
192 StatusCode::INTERNAL_SERVER_ERROR,
193 Json(BatchStatusFaucetResponse::from(v)),
194 ),
195 }
196 }
197 Err(e) => (
198 StatusCode::INTERNAL_SERVER_ERROR,
199 Json(BatchStatusFaucetResponse::from(FaucetError::Internal(
200 e.to_string(),
201 ))),
202 ),
203 }
204}
205
206async fn request_gas(
208 Extension(state): Extension<Arc<AppState>>,
209 Json(payload): Json<FaucetRequest>,
210) -> impl IntoResponse {
211 let id = Uuid::new_v4();
213 info!(uuid = ?id, "Got new gas request.");
214 let result = match payload {
215 FaucetRequest::FixedAmountRequest(requests) => {
216 spawn_monitored_task!(async move {
219 state
220 .faucet
221 .send(
222 id,
223 requests.recipient,
224 &vec![state.config.amount; state.config.num_coins],
225 )
226 .await
227 })
228 .await
229 .unwrap()
230 }
231 _ => {
232 return (
233 StatusCode::BAD_REQUEST,
234 Json(FaucetResponse::from(FaucetError::Internal(
235 "Input Error.".to_string(),
236 ))),
237 );
238 }
239 };
240 match result {
241 Ok(v) => {
242 info!(uuid =?id, "Request is successfully served");
243 (StatusCode::CREATED, Json(FaucetResponse::from(v)))
244 }
245 Err(v) => {
246 warn!(uuid =?id, "failed to request gas: {v:?}");
247 (
248 StatusCode::INTERNAL_SERVER_ERROR,
249 Json(FaucetResponse::from(v)),
250 )
251 }
252 }
253}
254
255pub fn create_wallet_context(
256 timeout_secs: u64,
257 config_dir: PathBuf,
258) -> Result<WalletContext, anyhow::Error> {
259 let wallet_conf = config_dir.join(IOTA_CLIENT_CONFIG);
260 info!("Initialize wallet from config path: {wallet_conf:?}");
261 WalletContext::new(
262 &wallet_conf,
263 Some(Duration::from_secs(timeout_secs)),
264 Some(1000),
265 )
266}
267
268async fn handle_error(error: BoxError) -> impl IntoResponse {
269 if error.is::<tower::load_shed::error::Overloaded>() {
270 return (
271 StatusCode::SERVICE_UNAVAILABLE,
272 Cow::from("service is overloaded, please try again later"),
273 );
274 }
275
276 (
277 StatusCode::INTERNAL_SERVER_ERROR,
278 Cow::from(format!("Unhandled internal error: {error}")),
279 )
280}