1use std::{env, net::SocketAddr, str::FromStr};
6
7use axum::{
8 body::Body,
9 routing::{get, post},
10};
11pub use balance_changes::*;
12use hyper::{
13 Method, Request,
14 header::{HeaderName, HeaderValue},
15};
16pub use iota_config::node::ServerType;
17use iota_core::traffic_controller::metrics::TrafficControllerMetrics;
18use iota_json_rpc_api::{
19 CLIENT_SDK_TYPE_HEADER, CLIENT_SDK_VERSION_HEADER, CLIENT_TARGET_API_VERSION_HEADER,
20};
21use iota_open_rpc::{Module, Project};
22use iota_types::traffic_control::{PolicyConfig, RemoteFirewallConfig};
23use jsonrpsee::{Extensions, RpcModule, types::ErrorObjectOwned};
24pub use object_changes::*;
25use prometheus::Registry;
26use tokio::runtime::Handle;
27use tokio_util::sync::CancellationToken;
28use tower_http::{
29 cors::{AllowOrigin, CorsLayer},
30 trace::TraceLayer,
31};
32use tracing::{debug, info};
33
34use crate::{
35 axum_router::{json_rpc_handler, ws::ws_json_rpc_upgrade},
36 error::Error,
37 metrics::MetricsLogger,
38 routing_layer::RpcRouter,
39};
40
41pub mod authority_state;
42pub mod axum_router;
43mod balance_changes;
44pub mod bridge_api;
45pub mod coin_api;
46pub mod error;
47pub mod governance_api;
48pub mod indexer_api;
49pub mod logger;
50mod metrics;
51pub mod move_utils;
52mod object_changes;
53pub mod read_api;
54mod routing_layer;
55pub mod transaction_builder_api;
56pub mod transaction_execution_api;
57
58pub const APP_NAME_HEADER: &str = "app-name";
59
60pub const MAX_REQUEST_SIZE: u32 = 2 << 30;
61
62pub struct JsonRpcServerBuilder {
63 module: RpcModule<()>,
64 rpc_doc: Project,
65 registry: Registry,
66 policy_config: Option<PolicyConfig>,
67 firewall_config: Option<RemoteFirewallConfig>,
68}
69
70pub fn iota_rpc_doc(version: &str) -> Project {
71 Project::new(
72 version,
73 "IOTA JSON-RPC",
74 "IOTA JSON-RPC API for interaction with IOTA full node or indexer. Make RPC calls using https://api.NETWORK.iota.cafe:443 (or https://indexer.NETWORK.iota.cafe:443 for the indexer), where NETWORK is the network you want to use (testnet, devnet, mainnet). By default, local networks use port 9000 (or 9124 for the indexer).",
75 "IOTA Foundation",
76 "https://iota.org",
77 "info@iota.org",
78 "Apache-2.0",
79 "https://raw.githubusercontent.com/iotaledger/iota/main/LICENSE",
80 )
81}
82
83impl JsonRpcServerBuilder {
84 pub fn new(
85 version: &str,
86 prometheus_registry: &Registry,
87 policy_config: Option<PolicyConfig>,
88 firewall_config: Option<RemoteFirewallConfig>,
89 ) -> Self {
90 Self {
91 module: RpcModule::new(()),
92 rpc_doc: iota_rpc_doc(version),
93 registry: prometheus_registry.clone(),
94 policy_config,
95 firewall_config,
96 }
97 }
98
99 pub fn register_module<T: IotaRpcModule>(&mut self, module: T) -> Result<(), Error> {
100 self.rpc_doc.add_module(T::rpc_doc_module());
101 Ok(self.module.merge(module.rpc())?)
102 }
103
104 fn cors() -> Result<CorsLayer, Error> {
105 let acl = match env::var("ACCESS_CONTROL_ALLOW_ORIGIN") {
106 Ok(value) => {
107 let allow_hosts = value
108 .split(',')
109 .map(HeaderValue::from_str)
110 .collect::<Result<Vec<_>, _>>()?;
111 AllowOrigin::list(allow_hosts)
112 }
113 _ => AllowOrigin::any(),
114 };
115 info!(?acl);
116
117 let cors = CorsLayer::new()
118 .allow_methods([Method::POST])
120 .allow_origin(acl)
122 .allow_headers([
123 hyper::header::CONTENT_TYPE,
124 HeaderName::from_static(CLIENT_SDK_TYPE_HEADER),
125 HeaderName::from_static(CLIENT_SDK_VERSION_HEADER),
126 HeaderName::from_static(CLIENT_TARGET_API_VERSION_HEADER),
127 HeaderName::from_static(APP_NAME_HEADER),
128 ]);
129 Ok(cors)
130 }
131
132 fn trace_layer() -> TraceLayer<
133 tower_http::classify::SharedClassifier<tower_http::classify::ServerErrorsAsFailures>,
134 impl tower_http::trace::MakeSpan<Body> + Clone,
135 > {
136 TraceLayer::new_for_http().make_span_with(|request: &Request<Body>| {
137 let request_id = request
138 .headers()
139 .get("x-req-id")
140 .and_then(|v| v.to_str().ok())
141 .map(tracing::field::display);
142
143 tracing::info_span!("json-rpc-request", "x-req-id" = request_id)
144 })
145 }
146
147 pub async fn to_router(&self, server_type: ServerType) -> Result<axum::Router, Error> {
148 let routing = self.rpc_doc.method_routing.clone();
149
150 let disable_routing = env::var("DISABLE_BACKWARD_COMPATIBILITY")
151 .ok()
152 .and_then(|v| bool::from_str(&v).ok())
153 .unwrap_or_default();
154 info!(
155 "Compatibility method routing {}.",
156 if disable_routing {
157 "disabled"
158 } else {
159 "enabled"
160 }
161 );
162 let rpc_router = RpcRouter::new(routing, disable_routing);
163
164 let rpc_docs = self.rpc_doc.clone();
165 let mut module = self.module.clone();
166 module.register_method("rpc.discover", move |_, _, _| {
167 Result::<_, ErrorObjectOwned>::Ok(rpc_docs.clone())
168 })?;
169 let methods_names = module.method_names().collect::<Vec<_>>();
170
171 let metrics_logger = MetricsLogger::new(&self.registry, &methods_names);
172 let traffic_controller_metrics = TrafficControllerMetrics::new(&self.registry);
173
174 let middleware = tower::ServiceBuilder::new()
175 .layer(Self::trace_layer())
176 .layer(Self::cors()?);
177
178 let service = crate::axum_router::JsonRpcService::new(
179 module.into(),
180 rpc_router,
181 metrics_logger,
182 self.firewall_config.clone(),
183 self.policy_config.clone(),
184 traffic_controller_metrics,
185 Extensions::new(),
186 );
187
188 let mut router = axum::Router::new();
189
190 match server_type {
191 ServerType::WebSocket => {
192 router = router
193 .route("/", get(ws_json_rpc_upgrade))
194 .route("/subscribe", get(ws_json_rpc_upgrade));
195 }
196 ServerType::Http => {
197 router = router
198 .route("/", post(json_rpc_handler))
199 .route("/json-rpc", post(json_rpc_handler))
200 .route("/public", post(json_rpc_handler));
201 }
202 ServerType::Both => {
203 router = router
204 .route("/", post(json_rpc_handler))
205 .route("/", get(ws_json_rpc_upgrade))
206 .route("/subscribe", get(ws_json_rpc_upgrade))
207 .route("/json-rpc", post(json_rpc_handler))
208 .route("/public", post(json_rpc_handler));
209 }
210 }
211
212 let app = router.with_state(service).layer(middleware);
213
214 info!("Available JSON-RPC methods : {methods_names:?}");
215
216 Ok(app)
217 }
218
219 pub async fn start(
220 self,
221 listen_address: SocketAddr,
222 custom_runtime: Option<Handle>,
223 server_type: ServerType,
224 cancel: Option<CancellationToken>,
225 ) -> Result<ServerHandle, Error> {
226 let app = self.to_router(server_type).await?;
227
228 let listener = tokio::net::TcpListener::bind(listen_address)
229 .await
230 .map_err(|e| {
231 Error::Unexpected(format!("invalid listen address {listen_address}: {e}"))
232 })?;
233
234 let addr = listener.local_addr().map_err(|e| {
235 Error::Unexpected(format!("invalid listen address {listen_address}: {e}"))
236 })?;
237
238 let fut = async move {
239 axum::serve(
240 listener,
241 app.into_make_service_with_connect_info::<SocketAddr>(),
242 )
243 .await
244 .unwrap();
245 if let Some(cancel) = cancel {
246 cancel.cancel();
248 }
249 };
250 let handle = if let Some(custom_runtime) = custom_runtime {
251 debug!("Spawning server with custom runtime");
252 custom_runtime.spawn(fut)
253 } else {
254 tokio::spawn(fut)
255 };
256
257 let handle = ServerHandle {
258 handle: ServerHandleInner::Axum(handle),
259 };
260 info!(local_addr =? addr, "IOTA JSON-RPC server listening on {addr}");
261 Ok(handle)
262 }
263}
264
265pub struct ServerHandle {
266 handle: ServerHandleInner,
267}
268
269impl ServerHandle {
270 pub async fn stopped(self) {
271 match self.handle {
272 ServerHandleInner::Axum(handle) => handle.await.unwrap(),
273 }
274 }
275}
276
277enum ServerHandleInner {
278 Axum(tokio::task::JoinHandle<()>),
279}
280
281pub trait IotaRpcModule
282where
283 Self: Sized,
284{
285 fn rpc(self) -> RpcModule<Self>;
286 fn rpc_doc_module() -> Module;
287}