iota_json_rpc/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{env, net::SocketAddr, str::FromStr};
6
7use axum::{
8    body::Body,
9    routing::{get, post},
10};
11pub use balance_changes::*;
12use hyper::{
13    Method, Request,
14    header::{HeaderName, HeaderValue},
15};
16pub use iota_config::node::ServerType;
17use iota_core::traffic_controller::metrics::TrafficControllerMetrics;
18use iota_json_rpc_api::{
19    CLIENT_SDK_TYPE_HEADER, CLIENT_SDK_VERSION_HEADER, CLIENT_TARGET_API_VERSION_HEADER,
20};
21use iota_open_rpc::{Module, Project};
22use iota_types::traffic_control::{PolicyConfig, RemoteFirewallConfig};
23use jsonrpsee::{Extensions, RpcModule, types::ErrorObjectOwned};
24pub use object_changes::*;
25use prometheus::Registry;
26use tokio::runtime::Handle;
27use tokio_util::sync::CancellationToken;
28use tower_http::{
29    cors::{AllowOrigin, CorsLayer},
30    trace::TraceLayer,
31};
32use tracing::{debug, info};
33
34use crate::{
35    axum_router::{json_rpc_handler, ws::ws_json_rpc_upgrade},
36    error::Error,
37    metrics::MetricsLogger,
38    routing_layer::RpcRouter,
39};
40
41pub mod authority_state;
42pub mod axum_router;
43mod balance_changes;
44pub mod bridge_api;
45pub mod coin_api;
46pub mod error;
47pub mod governance_api;
48pub mod indexer_api;
49pub mod logger;
50mod metrics;
51pub mod move_utils;
52mod object_changes;
53pub mod read_api;
54mod routing_layer;
55pub mod transaction_builder_api;
56pub mod transaction_execution_api;
57
58pub const APP_NAME_HEADER: &str = "app-name";
59
60pub const MAX_REQUEST_SIZE: u32 = 2 << 30;
61
62pub struct JsonRpcServerBuilder {
63    module: RpcModule<()>,
64    rpc_doc: Project,
65    registry: Registry,
66    policy_config: Option<PolicyConfig>,
67    firewall_config: Option<RemoteFirewallConfig>,
68}
69
70pub fn iota_rpc_doc(version: &str) -> Project {
71    Project::new(
72        version,
73        "IOTA JSON-RPC",
74        "IOTA JSON-RPC API for interaction with IOTA full node or indexer. Make RPC calls using https://api.NETWORK.iota.cafe:443 (or https://indexer.NETWORK.iota.cafe:443 for the indexer), where NETWORK is the network you want to use (testnet, devnet, mainnet). By default, local networks use port 9000 (or 9124 for the indexer).",
75        "IOTA Foundation",
76        "https://iota.org",
77        "info@iota.org",
78        "Apache-2.0",
79        "https://raw.githubusercontent.com/iotaledger/iota/main/LICENSE",
80    )
81}
82
83impl JsonRpcServerBuilder {
84    pub fn new(
85        version: &str,
86        prometheus_registry: &Registry,
87        policy_config: Option<PolicyConfig>,
88        firewall_config: Option<RemoteFirewallConfig>,
89    ) -> Self {
90        Self {
91            module: RpcModule::new(()),
92            rpc_doc: iota_rpc_doc(version),
93            registry: prometheus_registry.clone(),
94            policy_config,
95            firewall_config,
96        }
97    }
98
99    pub fn register_module<T: IotaRpcModule>(&mut self, module: T) -> Result<(), Error> {
100        self.rpc_doc.add_module(T::rpc_doc_module());
101        Ok(self.module.merge(module.rpc())?)
102    }
103
104    fn cors() -> Result<CorsLayer, Error> {
105        let acl = match env::var("ACCESS_CONTROL_ALLOW_ORIGIN") {
106            Ok(value) => {
107                let allow_hosts = value
108                    .split(',')
109                    .map(HeaderValue::from_str)
110                    .collect::<Result<Vec<_>, _>>()?;
111                AllowOrigin::list(allow_hosts)
112            }
113            _ => AllowOrigin::any(),
114        };
115        info!(?acl);
116
117        let cors = CorsLayer::new()
118            // Allow `POST` when accessing the resource
119            .allow_methods([Method::POST])
120            // Allow requests from any origin
121            .allow_origin(acl)
122            .allow_headers([
123                hyper::header::CONTENT_TYPE,
124                HeaderName::from_static(CLIENT_SDK_TYPE_HEADER),
125                HeaderName::from_static(CLIENT_SDK_VERSION_HEADER),
126                HeaderName::from_static(CLIENT_TARGET_API_VERSION_HEADER),
127                HeaderName::from_static(APP_NAME_HEADER),
128            ]);
129        Ok(cors)
130    }
131
132    fn trace_layer() -> TraceLayer<
133        tower_http::classify::SharedClassifier<tower_http::classify::ServerErrorsAsFailures>,
134        impl tower_http::trace::MakeSpan<Body> + Clone,
135    > {
136        TraceLayer::new_for_http().make_span_with(|request: &Request<Body>| {
137            let request_id = request
138                .headers()
139                .get("x-req-id")
140                .and_then(|v| v.to_str().ok())
141                .map(tracing::field::display);
142
143            tracing::info_span!("json-rpc-request", "x-req-id" = request_id)
144        })
145    }
146
147    pub async fn to_router(&self, server_type: ServerType) -> Result<axum::Router, Error> {
148        let routing = self.rpc_doc.method_routing.clone();
149
150        let disable_routing = env::var("DISABLE_BACKWARD_COMPATIBILITY")
151            .ok()
152            .and_then(|v| bool::from_str(&v).ok())
153            .unwrap_or_default();
154        info!(
155            "Compatibility method routing {}.",
156            if disable_routing {
157                "disabled"
158            } else {
159                "enabled"
160            }
161        );
162        let rpc_router = RpcRouter::new(routing, disable_routing);
163
164        let rpc_docs = self.rpc_doc.clone();
165        let mut module = self.module.clone();
166        module.register_method("rpc.discover", move |_, _, _| {
167            Result::<_, ErrorObjectOwned>::Ok(rpc_docs.clone())
168        })?;
169        let methods_names = module.method_names().collect::<Vec<_>>();
170
171        let metrics_logger = MetricsLogger::new(&self.registry, &methods_names);
172        let traffic_controller_metrics = TrafficControllerMetrics::new(&self.registry);
173
174        let middleware = tower::ServiceBuilder::new()
175            .layer(Self::trace_layer())
176            .layer(Self::cors()?);
177
178        let service = crate::axum_router::JsonRpcService::new(
179            module.into(),
180            rpc_router,
181            metrics_logger,
182            self.firewall_config.clone(),
183            self.policy_config.clone(),
184            traffic_controller_metrics,
185            Extensions::new(),
186        );
187
188        let mut router = axum::Router::new();
189
190        match server_type {
191            ServerType::WebSocket => {
192                router = router
193                    .route("/", get(ws_json_rpc_upgrade))
194                    .route("/subscribe", get(ws_json_rpc_upgrade));
195            }
196            ServerType::Http => {
197                router = router
198                    .route("/", post(json_rpc_handler))
199                    .route("/json-rpc", post(json_rpc_handler))
200                    .route("/public", post(json_rpc_handler));
201            }
202            ServerType::Both => {
203                router = router
204                    .route("/", post(json_rpc_handler))
205                    .route("/", get(ws_json_rpc_upgrade))
206                    .route("/subscribe", get(ws_json_rpc_upgrade))
207                    .route("/json-rpc", post(json_rpc_handler))
208                    .route("/public", post(json_rpc_handler));
209            }
210        }
211
212        let app = router.with_state(service).layer(middleware);
213
214        info!("Available JSON-RPC methods : {methods_names:?}");
215
216        Ok(app)
217    }
218
219    pub async fn start(
220        self,
221        listen_address: SocketAddr,
222        custom_runtime: Option<Handle>,
223        server_type: ServerType,
224        cancel: Option<CancellationToken>,
225    ) -> Result<ServerHandle, Error> {
226        let app = self.to_router(server_type).await?;
227
228        let listener = tokio::net::TcpListener::bind(listen_address)
229            .await
230            .map_err(|e| {
231                Error::Unexpected(format!("invalid listen address {listen_address}: {e}"))
232            })?;
233
234        let addr = listener.local_addr().map_err(|e| {
235            Error::Unexpected(format!("invalid listen address {listen_address}: {e}"))
236        })?;
237
238        let fut = async move {
239            axum::serve(
240                listener,
241                app.into_make_service_with_connect_info::<SocketAddr>(),
242            )
243            .await
244            .unwrap();
245            if let Some(cancel) = cancel {
246                // Signal that the server is shutting down, so other tasks can clean-up.
247                cancel.cancel();
248            }
249        };
250        let handle = if let Some(custom_runtime) = custom_runtime {
251            debug!("Spawning server with custom runtime");
252            custom_runtime.spawn(fut)
253        } else {
254            tokio::spawn(fut)
255        };
256
257        let handle = ServerHandle {
258            handle: ServerHandleInner::Axum(handle),
259        };
260        info!(local_addr =? addr, "IOTA JSON-RPC server listening on {addr}");
261        Ok(handle)
262    }
263}
264
265pub struct ServerHandle {
266    handle: ServerHandleInner,
267}
268
269impl ServerHandle {
270    pub async fn stopped(self) {
271        match self.handle {
272            ServerHandleInner::Axum(handle) => handle.await.unwrap(),
273        }
274    }
275}
276
277enum ServerHandleInner {
278    Axum(tokio::task::JoinHandle<()>),
279}
280
281pub trait IotaRpcModule
282where
283    Self: Sized,
284{
285    fn rpc(self) -> RpcModule<Self>;
286    fn rpc_doc_module() -> Module;
287}