iota_json_rpc/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{env, net::SocketAddr, str::FromStr};
6
7use axum::{
8    body::Body,
9    routing::{get, post},
10};
11pub use balance_changes::*;
12use hyper::{
13    Method, Request,
14    header::{HeaderName, HeaderValue},
15};
16pub use iota_config::node::ServerType;
17use iota_core::traffic_controller::metrics::TrafficControllerMetrics;
18use iota_json_rpc_api::{
19    CLIENT_SDK_TYPE_HEADER, CLIENT_SDK_VERSION_HEADER, CLIENT_TARGET_API_VERSION_HEADER,
20};
21use iota_open_rpc::{Module, Project};
22use iota_types::traffic_control::{PolicyConfig, RemoteFirewallConfig};
23use jsonrpsee::{Extensions, RpcModule, types::ErrorObjectOwned};
24pub use object_changes::*;
25use prometheus::Registry;
26use tokio::runtime::Handle;
27use tokio_util::sync::CancellationToken;
28use tower_http::{
29    cors::{AllowOrigin, CorsLayer},
30    trace::TraceLayer,
31};
32use tracing::{debug, info};
33
34use crate::{
35    axum_router::{json_rpc_handler, ws::ws_json_rpc_upgrade},
36    error::Error,
37    metrics::MetricsLogger,
38    routing_layer::RpcRouter,
39};
40
41pub mod authority_state;
42pub mod axum_router;
43mod balance_changes;
44pub mod coin_api;
45pub mod error;
46pub mod governance_api;
47pub mod indexer_api;
48pub mod logger;
49mod metrics;
50pub mod move_utils;
51mod object_changes;
52pub mod read_api;
53mod routing_layer;
54pub mod transaction_builder_api;
55pub mod transaction_execution_api;
56
57pub const APP_NAME_HEADER: &str = "app-name";
58
59pub const MAX_REQUEST_SIZE: u32 = 2 << 30;
60
61pub struct JsonRpcServerBuilder {
62    module: RpcModule<()>,
63    rpc_doc: Project,
64    registry: Registry,
65    policy_config: Option<PolicyConfig>,
66    firewall_config: Option<RemoteFirewallConfig>,
67}
68
69pub fn iota_rpc_doc(version: &str) -> Project {
70    Project::new(
71        version,
72        "IOTA JSON-RPC",
73        "IOTA JSON-RPC API for interaction with IOTA full node or indexer. Make RPC calls using https://api.NETWORK.iota.cafe:443 (or https://indexer.NETWORK.iota.cafe:443 for the indexer), where NETWORK is the network you want to use (testnet, devnet, mainnet). By default, local networks use port 9000 (or 9124 for the indexer).",
74        "IOTA Foundation",
75        "https://iota.org",
76        "info@iota.org",
77        "Apache-2.0",
78        "https://raw.githubusercontent.com/iotaledger/iota/main/LICENSE",
79    )
80}
81
82impl JsonRpcServerBuilder {
83    pub fn new(
84        version: &str,
85        prometheus_registry: &Registry,
86        policy_config: Option<PolicyConfig>,
87        firewall_config: Option<RemoteFirewallConfig>,
88    ) -> Self {
89        Self {
90            module: RpcModule::new(()),
91            rpc_doc: iota_rpc_doc(version),
92            registry: prometheus_registry.clone(),
93            policy_config,
94            firewall_config,
95        }
96    }
97
98    pub fn register_module<T: IotaRpcModule>(&mut self, module: T) -> Result<(), Error> {
99        self.rpc_doc.add_module(T::rpc_doc_module());
100        Ok(self.module.merge(module.rpc())?)
101    }
102
103    fn cors() -> Result<CorsLayer, Error> {
104        let acl = match env::var("ACCESS_CONTROL_ALLOW_ORIGIN") {
105            Ok(value) => {
106                let allow_hosts = value
107                    .split(',')
108                    .map(HeaderValue::from_str)
109                    .collect::<Result<Vec<_>, _>>()?;
110                AllowOrigin::list(allow_hosts)
111            }
112            _ => AllowOrigin::any(),
113        };
114        info!(?acl);
115
116        let cors = CorsLayer::new()
117            // Allow `POST` when accessing the resource
118            .allow_methods([Method::POST])
119            // Allow requests from any origin
120            .allow_origin(acl)
121            .allow_headers([
122                hyper::header::CONTENT_TYPE,
123                HeaderName::from_static(CLIENT_SDK_TYPE_HEADER),
124                HeaderName::from_static(CLIENT_SDK_VERSION_HEADER),
125                HeaderName::from_static(CLIENT_TARGET_API_VERSION_HEADER),
126                HeaderName::from_static(APP_NAME_HEADER),
127            ]);
128        Ok(cors)
129    }
130
131    fn trace_layer() -> TraceLayer<
132        tower_http::classify::SharedClassifier<tower_http::classify::ServerErrorsAsFailures>,
133        impl tower_http::trace::MakeSpan<Body> + Clone,
134    > {
135        TraceLayer::new_for_http().make_span_with(|request: &Request<Body>| {
136            let request_id = request
137                .headers()
138                .get("x-req-id")
139                .and_then(|v| v.to_str().ok())
140                .map(tracing::field::display);
141
142            tracing::info_span!("json-rpc-request", "x-req-id" = request_id)
143        })
144    }
145
146    pub async fn to_router(&self, server_type: ServerType) -> Result<axum::Router, Error> {
147        let routing = self.rpc_doc.method_routing.clone();
148
149        let disable_routing = env::var("DISABLE_BACKWARD_COMPATIBILITY")
150            .ok()
151            .and_then(|v| bool::from_str(&v).ok())
152            .unwrap_or_default();
153        info!(
154            "Compatibility method routing {}.",
155            if disable_routing {
156                "disabled"
157            } else {
158                "enabled"
159            }
160        );
161        let rpc_router = RpcRouter::new(routing, disable_routing);
162
163        let rpc_docs = self.rpc_doc.clone();
164        let mut module = self.module.clone();
165        module.register_method("rpc.discover", move |_, _, _| {
166            Result::<_, ErrorObjectOwned>::Ok(rpc_docs.clone())
167        })?;
168        let methods_names = module.method_names().collect::<Vec<_>>();
169
170        let metrics_logger = MetricsLogger::new(&self.registry, &methods_names);
171        let traffic_controller_metrics = TrafficControllerMetrics::new(&self.registry);
172
173        let middleware = tower::ServiceBuilder::new()
174            .layer(Self::trace_layer())
175            .layer(Self::cors()?);
176
177        let service = crate::axum_router::JsonRpcService::new(
178            module.into(),
179            rpc_router,
180            metrics_logger,
181            self.firewall_config.clone(),
182            self.policy_config.clone(),
183            traffic_controller_metrics,
184            Extensions::new(),
185        );
186
187        let mut router = axum::Router::new();
188
189        match server_type {
190            ServerType::WebSocket => {
191                router = router
192                    .route("/", get(ws_json_rpc_upgrade))
193                    .route("/subscribe", get(ws_json_rpc_upgrade));
194            }
195            ServerType::Http => {
196                router = router
197                    .route("/", post(json_rpc_handler))
198                    .route("/json-rpc", post(json_rpc_handler))
199                    .route("/public", post(json_rpc_handler));
200            }
201            ServerType::Both => {
202                router = router
203                    .route("/", post(json_rpc_handler))
204                    .route("/", get(ws_json_rpc_upgrade))
205                    .route("/subscribe", get(ws_json_rpc_upgrade))
206                    .route("/json-rpc", post(json_rpc_handler))
207                    .route("/public", post(json_rpc_handler));
208            }
209        }
210
211        let app = router.with_state(service).layer(middleware);
212
213        info!("Available JSON-RPC methods : {methods_names:?}");
214
215        Ok(app)
216    }
217
218    pub async fn start(
219        self,
220        listen_address: SocketAddr,
221        custom_runtime: Option<Handle>,
222        server_type: ServerType,
223        cancel: Option<CancellationToken>,
224    ) -> Result<ServerHandle, Error> {
225        let app = self.to_router(server_type).await?;
226
227        let listener = tokio::net::TcpListener::bind(listen_address)
228            .await
229            .map_err(|e| {
230                Error::Unexpected(format!("invalid listen address {listen_address}: {e}"))
231            })?;
232
233        let addr = listener.local_addr().map_err(|e| {
234            Error::Unexpected(format!("invalid listen address {listen_address}: {e}"))
235        })?;
236
237        let fut = async move {
238            axum::serve(
239                listener,
240                app.into_make_service_with_connect_info::<SocketAddr>(),
241            )
242            .await
243            .unwrap();
244            if let Some(cancel) = cancel {
245                // Signal that the server is shutting down, so other tasks can clean-up.
246                cancel.cancel();
247            }
248        };
249        let handle = if let Some(custom_runtime) = custom_runtime {
250            debug!("Spawning server with custom runtime");
251            custom_runtime.spawn(fut)
252        } else {
253            tokio::spawn(fut)
254        };
255
256        let handle = ServerHandle {
257            handle: ServerHandleInner::Axum(handle),
258        };
259        info!(local_addr =? addr, "IOTA JSON-RPC server listening on {addr}");
260        Ok(handle)
261    }
262}
263
264pub struct ServerHandle {
265    handle: ServerHandleInner,
266}
267
268impl ServerHandle {
269    pub async fn stopped(self) {
270        match self.handle {
271            ServerHandleInner::Axum(handle) => handle.await.unwrap(),
272        }
273    }
274}
275
276enum ServerHandleInner {
277    Axum(tokio::task::JoinHandle<()>),
278}
279
280pub trait IotaRpcModule
281where
282    Self: Sized,
283{
284    fn rpc(self) -> RpcModule<Self>;
285    fn rpc_doc_module() -> Module;
286}