1use std::{env, net::SocketAddr, str::FromStr};
6
7use axum::{
8 body::Body,
9 routing::{get, post},
10};
11pub use balance_changes::*;
12use hyper::{
13 Method, Request,
14 header::{HeaderName, HeaderValue},
15};
16pub use iota_config::node::ServerType;
17use iota_core::traffic_controller::metrics::TrafficControllerMetrics;
18use iota_json_rpc_api::{
19 CLIENT_SDK_TYPE_HEADER, CLIENT_SDK_VERSION_HEADER, CLIENT_TARGET_API_VERSION_HEADER,
20};
21use iota_open_rpc::{Module, Project};
22use iota_types::traffic_control::{PolicyConfig, RemoteFirewallConfig};
23use jsonrpsee::{Extensions, RpcModule, types::ErrorObjectOwned};
24pub use object_changes::*;
25use prometheus::Registry;
26use tokio::runtime::Handle;
27use tokio_util::sync::CancellationToken;
28use tower_http::{
29 cors::{AllowOrigin, CorsLayer},
30 trace::TraceLayer,
31};
32use tracing::{debug, info};
33
34use crate::{
35 axum_router::{json_rpc_handler, ws::ws_json_rpc_upgrade},
36 error::Error,
37 metrics::MetricsLogger,
38 routing_layer::RpcRouter,
39};
40
41pub mod authority_state;
42pub mod axum_router;
43mod balance_changes;
44pub mod coin_api;
45pub mod error;
46pub mod governance_api;
47pub mod indexer_api;
48pub mod logger;
49mod metrics;
50pub mod move_utils;
51mod object_changes;
52pub mod read_api;
53mod routing_layer;
54pub mod transaction_builder_api;
55pub mod transaction_execution_api;
56
57pub const APP_NAME_HEADER: &str = "app-name";
58
59pub const MAX_REQUEST_SIZE: u32 = 2 << 30;
60
61pub struct JsonRpcServerBuilder {
62 module: RpcModule<()>,
63 rpc_doc: Project,
64 registry: Registry,
65 policy_config: Option<PolicyConfig>,
66 firewall_config: Option<RemoteFirewallConfig>,
67}
68
69pub fn iota_rpc_doc(version: &str) -> Project {
70 Project::new(
71 version,
72 "IOTA JSON-RPC",
73 "IOTA JSON-RPC API for interaction with IOTA full node or indexer. Make RPC calls using https://api.NETWORK.iota.cafe:443 (or https://indexer.NETWORK.iota.cafe:443 for the indexer), where NETWORK is the network you want to use (testnet, devnet, mainnet). By default, local networks use port 9000 (or 9124 for the indexer).",
74 "IOTA Foundation",
75 "https://iota.org",
76 "info@iota.org",
77 "Apache-2.0",
78 "https://raw.githubusercontent.com/iotaledger/iota/main/LICENSE",
79 )
80}
81
82impl JsonRpcServerBuilder {
83 pub fn new(
84 version: &str,
85 prometheus_registry: &Registry,
86 policy_config: Option<PolicyConfig>,
87 firewall_config: Option<RemoteFirewallConfig>,
88 ) -> Self {
89 Self {
90 module: RpcModule::new(()),
91 rpc_doc: iota_rpc_doc(version),
92 registry: prometheus_registry.clone(),
93 policy_config,
94 firewall_config,
95 }
96 }
97
98 pub fn register_module<T: IotaRpcModule>(&mut self, module: T) -> Result<(), Error> {
99 self.rpc_doc.add_module(T::rpc_doc_module());
100 Ok(self.module.merge(module.rpc())?)
101 }
102
103 fn cors() -> Result<CorsLayer, Error> {
104 let acl = match env::var("ACCESS_CONTROL_ALLOW_ORIGIN") {
105 Ok(value) => {
106 let allow_hosts = value
107 .split(',')
108 .map(HeaderValue::from_str)
109 .collect::<Result<Vec<_>, _>>()?;
110 AllowOrigin::list(allow_hosts)
111 }
112 _ => AllowOrigin::any(),
113 };
114 info!(?acl);
115
116 let cors = CorsLayer::new()
117 .allow_methods([Method::POST])
119 .allow_origin(acl)
121 .allow_headers([
122 hyper::header::CONTENT_TYPE,
123 HeaderName::from_static(CLIENT_SDK_TYPE_HEADER),
124 HeaderName::from_static(CLIENT_SDK_VERSION_HEADER),
125 HeaderName::from_static(CLIENT_TARGET_API_VERSION_HEADER),
126 HeaderName::from_static(APP_NAME_HEADER),
127 ]);
128 Ok(cors)
129 }
130
131 fn trace_layer() -> TraceLayer<
132 tower_http::classify::SharedClassifier<tower_http::classify::ServerErrorsAsFailures>,
133 impl tower_http::trace::MakeSpan<Body> + Clone,
134 > {
135 TraceLayer::new_for_http().make_span_with(|request: &Request<Body>| {
136 let request_id = request
137 .headers()
138 .get("x-req-id")
139 .and_then(|v| v.to_str().ok())
140 .map(tracing::field::display);
141
142 tracing::info_span!("json-rpc-request", "x-req-id" = request_id)
143 })
144 }
145
146 pub async fn to_router(&self, server_type: ServerType) -> Result<axum::Router, Error> {
147 let routing = self.rpc_doc.method_routing.clone();
148
149 let disable_routing = env::var("DISABLE_BACKWARD_COMPATIBILITY")
150 .ok()
151 .and_then(|v| bool::from_str(&v).ok())
152 .unwrap_or_default();
153 info!(
154 "Compatibility method routing {}.",
155 if disable_routing {
156 "disabled"
157 } else {
158 "enabled"
159 }
160 );
161 let rpc_router = RpcRouter::new(routing, disable_routing);
162
163 let rpc_docs = self.rpc_doc.clone();
164 let mut module = self.module.clone();
165 module.register_method("rpc.discover", move |_, _, _| {
166 Result::<_, ErrorObjectOwned>::Ok(rpc_docs.clone())
167 })?;
168 let methods_names = module.method_names().collect::<Vec<_>>();
169
170 let metrics_logger = MetricsLogger::new(&self.registry, &methods_names);
171 let traffic_controller_metrics = TrafficControllerMetrics::new(&self.registry);
172
173 let middleware = tower::ServiceBuilder::new()
174 .layer(Self::trace_layer())
175 .layer(Self::cors()?);
176
177 let service = crate::axum_router::JsonRpcService::new(
178 module.into(),
179 rpc_router,
180 metrics_logger,
181 self.firewall_config.clone(),
182 self.policy_config.clone(),
183 traffic_controller_metrics,
184 Extensions::new(),
185 );
186
187 let mut router = axum::Router::new();
188
189 match server_type {
190 ServerType::WebSocket => {
191 router = router
192 .route("/", get(ws_json_rpc_upgrade))
193 .route("/subscribe", get(ws_json_rpc_upgrade));
194 }
195 ServerType::Http => {
196 router = router
197 .route("/", post(json_rpc_handler))
198 .route("/json-rpc", post(json_rpc_handler))
199 .route("/public", post(json_rpc_handler));
200 }
201 ServerType::Both => {
202 router = router
203 .route("/", post(json_rpc_handler))
204 .route("/", get(ws_json_rpc_upgrade))
205 .route("/subscribe", get(ws_json_rpc_upgrade))
206 .route("/json-rpc", post(json_rpc_handler))
207 .route("/public", post(json_rpc_handler));
208 }
209 }
210
211 let app = router.with_state(service).layer(middleware);
212
213 info!("Available JSON-RPC methods : {methods_names:?}");
214
215 Ok(app)
216 }
217
218 pub async fn start(
219 self,
220 listen_address: SocketAddr,
221 custom_runtime: Option<Handle>,
222 server_type: ServerType,
223 cancel: Option<CancellationToken>,
224 ) -> Result<ServerHandle, Error> {
225 let app = self.to_router(server_type).await?;
226
227 let listener = tokio::net::TcpListener::bind(listen_address)
228 .await
229 .map_err(|e| {
230 Error::Unexpected(format!("invalid listen address {listen_address}: {e}"))
231 })?;
232
233 let addr = listener.local_addr().map_err(|e| {
234 Error::Unexpected(format!("invalid listen address {listen_address}: {e}"))
235 })?;
236
237 let fut = async move {
238 axum::serve(
239 listener,
240 app.into_make_service_with_connect_info::<SocketAddr>(),
241 )
242 .await
243 .unwrap();
244 if let Some(cancel) = cancel {
245 cancel.cancel();
247 }
248 };
249 let handle = if let Some(custom_runtime) = custom_runtime {
250 debug!("Spawning server with custom runtime");
251 custom_runtime.spawn(fut)
252 } else {
253 tokio::spawn(fut)
254 };
255
256 let handle = ServerHandle {
257 handle: ServerHandleInner::Axum(handle),
258 };
259 info!(local_addr =? addr, "IOTA JSON-RPC server listening on {addr}");
260 Ok(handle)
261 }
262}
263
264pub struct ServerHandle {
265 handle: ServerHandleInner,
266}
267
268impl ServerHandle {
269 pub async fn stopped(self) {
270 match self.handle {
271 ServerHandleInner::Axum(handle) => handle.await.unwrap(),
272 }
273 }
274}
275
276enum ServerHandleInner {
277 Axum(tokio::task::JoinHandle<()>),
278}
279
280pub trait IotaRpcModule
281where
282 Self: Sized,
283{
284 fn rpc(self) -> RpcModule<Self>;
285 fn rpc_doc_module() -> Module;
286}