iota_faucet/
server.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    borrow::Cow,
7    net::{IpAddr, SocketAddr},
8    path::PathBuf,
9    sync::Arc,
10    time::Duration,
11};
12
13use axum::{
14    BoxError, Extension, Json, Router,
15    error_handling::HandleErrorLayer,
16    extract::Path,
17    http::StatusCode,
18    response::IntoResponse,
19    routing::{get, post},
20};
21use http::Method;
22use iota_config::IOTA_CLIENT_CONFIG;
23use iota_metrics::spawn_monitored_task;
24use iota_sdk::wallet_context::WalletContext;
25use prometheus::Registry;
26use tower::{ServiceBuilder, limit::RateLimitLayer};
27use tower_http::cors::{Any, CorsLayer};
28use tracing::{info, warn};
29use uuid::Uuid;
30
31use crate::{
32    AppState, BatchFaucetResponse, BatchStatusFaucetResponse, FaucetConfig, FaucetError,
33    FaucetRequest, FaucetResponse, RequestMetricsLayer, faucet::Faucet,
34};
35
36pub async fn start_faucet(
37    app_state: Arc<AppState>,
38    concurrency_limit: usize,
39    prometheus_registry: &Registry,
40) -> Result<(), anyhow::Error> {
41    // TODO: restrict access if needed
42    let cors = CorsLayer::new()
43        .allow_methods(vec![Method::GET, Method::POST])
44        .allow_headers(Any)
45        .allow_origin(Any);
46
47    let FaucetConfig {
48        port,
49        host_ip,
50        request_buffer_size,
51        max_request_per_second,
52        wal_retry_interval,
53        ..
54    } = app_state.config;
55
56    let app = Router::new()
57        .route("/", get(health))
58        .route("/gas", post(request_gas))
59        .route("/v1/gas", post(batch_request_gas))
60        .route("/v1/status/:task_id", get(request_status))
61        .layer(
62            ServiceBuilder::new()
63                .layer(HandleErrorLayer::new(handle_error))
64                .layer(RequestMetricsLayer::new(prometheus_registry))
65                .layer(cors)
66                .load_shed()
67                .buffer(request_buffer_size)
68                .layer(RateLimitLayer::new(
69                    max_request_per_second,
70                    Duration::from_secs(1),
71                ))
72                .concurrency_limit(concurrency_limit)
73                .layer(Extension(app_state.clone()))
74                .into_inner(),
75        );
76
77    spawn_monitored_task!(async move {
78        info!("Starting task to clear WAL.");
79        loop {
80            // Every config.wal_retry_interval (Default: 300 seconds) we try to clear the
81            // wal coins
82            tokio::time::sleep(Duration::from_secs(wal_retry_interval)).await;
83            app_state.faucet.retry_wal_coins().await.unwrap();
84        }
85    });
86
87    let addr = SocketAddr::new(IpAddr::V4(host_ip), port);
88    info!("listening on {}", addr);
89
90    let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
91    axum::serve(listener, app).await?;
92    Ok(())
93}
94
95/// basic handler that responds with a static string
96async fn health() -> &'static str {
97    "OK"
98}
99
100/// handler for batch_request_gas requests
101async fn batch_request_gas(
102    Extension(state): Extension<Arc<AppState>>,
103    Json(payload): Json<FaucetRequest>,
104) -> impl IntoResponse {
105    let id = Uuid::new_v4();
106    // ID for traceability
107    info!(uuid = ?id, "Got new gas request.");
108
109    let FaucetRequest::FixedAmountRequest(request) = payload else {
110        return (
111            StatusCode::BAD_REQUEST,
112            Json(BatchFaucetResponse::from(FaucetError::Internal(
113                "Input Error.".to_string(),
114            ))),
115        );
116    };
117
118    if state.config.batch_enabled {
119        let result = spawn_monitored_task!(async move {
120            state
121                .faucet
122                .batch_send(
123                    id,
124                    request.recipient,
125                    &vec![state.config.amount; state.config.num_coins],
126                )
127                .await
128        })
129        .await
130        .unwrap();
131
132        match result {
133            Ok(v) => {
134                info!(uuid =?id, "Request is successfully served");
135                (StatusCode::ACCEPTED, Json(BatchFaucetResponse::from(v)))
136            }
137            Err(v) => {
138                warn!(uuid =?id, "Failed to request gas: {:?}", v);
139                (
140                    StatusCode::INTERNAL_SERVER_ERROR,
141                    Json(BatchFaucetResponse::from(v)),
142                )
143            }
144        }
145    } else {
146        // TODO (jian): remove this feature gate when batch has proven to be baked long
147        // enough
148        info!(uuid = ?id, "Falling back to v1 implementation");
149        let result = spawn_monitored_task!(async move {
150            state
151                .faucet
152                .send(
153                    id,
154                    request.recipient,
155                    &vec![state.config.amount; state.config.num_coins],
156                )
157                .await
158        })
159        .await
160        .unwrap();
161
162        match result {
163            Ok(_) => {
164                info!(uuid =?id, "Request is successfully served");
165                (StatusCode::ACCEPTED, Json(BatchFaucetResponse::from(id)))
166            }
167            Err(v) => {
168                warn!(uuid =?id, "Failed to request gas: {:?}", v);
169                (
170                    StatusCode::INTERNAL_SERVER_ERROR,
171                    Json(BatchFaucetResponse::from(v)),
172                )
173            }
174        }
175    }
176}
177
178/// handler for batch_get_status requests
179async fn request_status(
180    Extension(state): Extension<Arc<AppState>>,
181    Path(id): Path<String>,
182) -> impl IntoResponse {
183    match Uuid::parse_str(&id) {
184        Ok(task_id) => {
185            let result = state.faucet.get_batch_send_status(task_id).await;
186            match result {
187                Ok(v) => (
188                    StatusCode::CREATED,
189                    Json(BatchStatusFaucetResponse::from(v)),
190                ),
191                Err(v) => (
192                    StatusCode::INTERNAL_SERVER_ERROR,
193                    Json(BatchStatusFaucetResponse::from(v)),
194                ),
195            }
196        }
197        Err(e) => (
198            StatusCode::INTERNAL_SERVER_ERROR,
199            Json(BatchStatusFaucetResponse::from(FaucetError::Internal(
200                e.to_string(),
201            ))),
202        ),
203    }
204}
205
206/// handler for all the request_gas requests
207async fn request_gas(
208    Extension(state): Extension<Arc<AppState>>,
209    Json(payload): Json<FaucetRequest>,
210) -> impl IntoResponse {
211    // ID for traceability
212    let id = Uuid::new_v4();
213    info!(uuid = ?id, "Got new gas request.");
214    let result = match payload {
215        FaucetRequest::FixedAmountRequest(requests) => {
216            // We spawn a tokio task for this such that connection drop will not interrupt
217            // it and impact the recycling of coins
218            spawn_monitored_task!(async move {
219                state
220                    .faucet
221                    .send(
222                        id,
223                        requests.recipient,
224                        &vec![state.config.amount; state.config.num_coins],
225                    )
226                    .await
227            })
228            .await
229            .unwrap()
230        }
231        _ => {
232            return (
233                StatusCode::BAD_REQUEST,
234                Json(FaucetResponse::from(FaucetError::Internal(
235                    "Input Error.".to_string(),
236                ))),
237            );
238        }
239    };
240    match result {
241        Ok(v) => {
242            info!(uuid =?id, "Request is successfully served");
243            (StatusCode::CREATED, Json(FaucetResponse::from(v)))
244        }
245        Err(v) => {
246            warn!(uuid =?id, "Failed to request gas: {:?}", v);
247            (
248                StatusCode::INTERNAL_SERVER_ERROR,
249                Json(FaucetResponse::from(v)),
250            )
251        }
252    }
253}
254
255pub fn create_wallet_context(
256    timeout_secs: u64,
257    config_dir: PathBuf,
258) -> Result<WalletContext, anyhow::Error> {
259    let wallet_conf = config_dir.join(IOTA_CLIENT_CONFIG);
260    info!("Initialize wallet from config path: {:?}", wallet_conf);
261    WalletContext::new(
262        &wallet_conf,
263        Some(Duration::from_secs(timeout_secs)),
264        Some(1000),
265    )
266}
267
268async fn handle_error(error: BoxError) -> impl IntoResponse {
269    if error.is::<tower::load_shed::error::Overloaded>() {
270        return (
271            StatusCode::SERVICE_UNAVAILABLE,
272            Cow::from("service is overloaded, please try again later"),
273        );
274    }
275
276    (
277        StatusCode::INTERNAL_SERVER_ERROR,
278        Cow::from(format!("Unhandled internal error: {}", error)),
279    )
280}