1#![recursion_limit = "256"]
6
7use std::{net::SocketAddr, path::PathBuf, time::Duration};
8
9use anyhow::{Result, anyhow};
10use clap::{Args, Parser};
11use errors::IndexerError;
12use iota_json_rpc::{JsonRpcServerBuilder, ServerHandle, ServerType};
13use iota_json_rpc_api::CLIENT_SDK_TYPE_HEADER;
14use iota_metrics::spawn_monitored_task;
15use iota_names::config::IotaNamesConfig;
16use iota_types::base_types::{IotaAddress, ObjectID};
17use jsonrpsee::http_client::{HeaderMap, HeaderValue, HttpClient, HttpClientBuilder};
18use metrics::IndexerMetrics;
19use prometheus::Registry;
20use secrecy::{ExposeSecret, Secret};
21use system_package_task::SystemPackageTask;
22use tokio::runtime::Handle;
23use tokio_util::sync::CancellationToken;
24use tracing::warn;
25use url::Url;
26
27use crate::{
28 apis::{
29 CoinReadApi, ExtendedApi, GovernanceReadApi, IndexerApi, MoveUtilsApi, ReadApi,
30 TransactionBuilderApi, WriteApi,
31 },
32 indexer_reader::IndexerReader,
33};
34
35pub mod apis;
36pub mod db;
37pub mod errors;
38pub mod handlers;
39pub mod indexer;
40pub mod indexer_reader;
41pub mod metrics;
42pub mod models;
43pub mod processors;
44pub mod schema;
45pub mod store;
46pub mod system_package_task;
47pub mod test_utils;
48pub mod types;
49
50#[derive(Parser, Clone, Debug)]
51#[command(
52 name = "IOTA indexer",
53 about = "An off-fullnode service serving data from IOTA protocol"
54)]
55pub struct IndexerConfig {
56 #[arg(long)]
57 pub db_url: Option<Secret<String>>,
58 #[arg(long)]
59 pub db_user_name: Option<String>,
60 #[arg(long)]
61 pub db_password: Option<Secret<String>>,
62 #[arg(long)]
63 pub db_host: Option<String>,
64 #[arg(long)]
65 pub db_port: Option<u16>,
66 #[arg(long)]
67 pub db_name: Option<String>,
68 #[arg(long, default_value = "http://0.0.0.0:9000", global = true)]
69 pub rpc_client_url: String,
70 #[arg(long, default_value = Some("http://0.0.0.0:9000/api/v1"), global = true)]
71 pub remote_store_url: Option<String>,
72 #[arg(long, default_value = "0.0.0.0", global = true)]
73 pub client_metric_host: String,
74 #[arg(long, default_value = "9184", global = true)]
75 pub client_metric_port: u16,
76 #[arg(long, default_value = "0.0.0.0", global = true)]
77 pub rpc_server_url: String,
78 #[arg(long, default_value = "9000", global = true)]
79 pub rpc_server_port: u16,
80 #[arg(long)]
81 pub reset_db: bool,
82 #[arg(long)]
83 pub fullnode_sync_worker: bool,
84 #[arg(long)]
85 pub rpc_server_worker: bool,
86 #[arg(long)]
87 pub data_ingestion_path: Option<PathBuf>,
88 #[arg(long)]
89 pub analytical_worker: bool,
90 #[command(flatten)]
91 pub iota_names_options: IotaNamesOptions,
92}
93
94impl IndexerConfig {
95 pub fn base_connection_url(&self) -> Result<String, anyhow::Error> {
97 let url_secret = self.get_db_url()?;
98 let url_str = url_secret.expose_secret();
99 let url = Url::parse(url_str).expect("Failed to parse URL");
100 Ok(format!(
101 "{}://{}:{}@{}:{}/",
102 url.scheme(),
103 url.username(),
104 url.password().unwrap_or_default(),
105 url.host_str().unwrap_or_default(),
106 url.port().unwrap_or_default()
107 ))
108 }
109
110 pub fn get_db_url(&self) -> Result<Secret<String>, anyhow::Error> {
111 match (
112 &self.db_url,
113 &self.db_user_name,
114 &self.db_password,
115 &self.db_host,
116 &self.db_port,
117 &self.db_name,
118 ) {
119 (Some(db_url), _, _, _, _, _) => Ok(db_url.clone()),
120 (
121 None,
122 Some(db_user_name),
123 Some(db_password),
124 Some(db_host),
125 Some(db_port),
126 Some(db_name),
127 ) => Ok(secrecy::Secret::new(format!(
128 "postgres://{}:{}@{}:{}/{}",
129 db_user_name,
130 db_password.expose_secret(),
131 db_host,
132 db_port,
133 db_name
134 ))),
135 _ => Err(anyhow!(
136 "Invalid db connection config, either db_url or (db_user_name, db_password, db_host, db_port, db_name) must be provided"
137 )),
138 }
139 }
140}
141
142impl Default for IndexerConfig {
143 fn default() -> Self {
144 Self {
145 db_url: Some(secrecy::Secret::new(
146 "postgres://postgres:postgrespw@localhost:5432/iota_indexer".to_string(),
147 )),
148 db_user_name: None,
149 db_password: None,
150 db_host: None,
151 db_port: None,
152 db_name: None,
153 rpc_client_url: "http://127.0.0.1:9000".to_string(),
154 remote_store_url: Some("http://127.0.0.1:9000/api/v1".to_string()),
155 client_metric_host: "0.0.0.0".to_string(),
156 client_metric_port: 9184,
157 rpc_server_url: "0.0.0.0".to_string(),
158 rpc_server_port: 9000,
159 reset_db: false,
160 fullnode_sync_worker: true,
161 rpc_server_worker: true,
162 data_ingestion_path: None,
163 analytical_worker: false,
164 iota_names_options: IotaNamesOptions::default(),
165 }
166 }
167}
168
169#[derive(Args, Debug, Clone)]
170pub struct IotaNamesOptions {
171 #[arg(default_value_t = IotaNamesConfig::default().package_address)]
172 #[arg(long = "iota-names-package-address")]
173 pub package_address: IotaAddress,
174 #[arg(default_value_t = IotaNamesConfig::default().object_id)]
175 #[arg(long = "iota-names-object-id")]
176 pub object_id: ObjectID,
177 #[arg(default_value_t = IotaNamesConfig::default().payments_package_address)]
178 #[arg(long = "iota-names-payments-package-address")]
179 pub payments_package_address: IotaAddress,
180 #[arg(default_value_t = IotaNamesConfig::default().registry_id)]
181 #[arg(long = "iota-names-registry-id")]
182 pub registry_id: ObjectID,
183 #[arg(default_value_t = IotaNamesConfig::default().reverse_registry_id)]
184 #[arg(long = "iota-names-reverse-registry-id")]
185 pub reverse_registry_id: ObjectID,
186}
187
188impl From<IotaNamesOptions> for IotaNamesConfig {
189 fn from(options: IotaNamesOptions) -> Self {
190 let IotaNamesOptions {
191 package_address,
192 object_id,
193 payments_package_address,
194 registry_id,
195 reverse_registry_id,
196 } = options;
197 Self {
198 package_address,
199 object_id,
200 payments_package_address,
201 registry_id,
202 reverse_registry_id,
203 }
204 }
205}
206
207impl From<IotaNamesConfig> for IotaNamesOptions {
208 fn from(config: IotaNamesConfig) -> Self {
209 let IotaNamesConfig {
210 package_address,
211 object_id,
212 payments_package_address,
213 registry_id,
214 reverse_registry_id,
215 } = config;
216 Self {
217 package_address,
218 object_id,
219 payments_package_address,
220 registry_id,
221 reverse_registry_id,
222 }
223 }
224}
225
226impl Default for IotaNamesOptions {
227 fn default() -> Self {
228 IotaNamesConfig::default().into()
229 }
230}
231
232pub async fn build_json_rpc_server(
233 prometheus_registry: &Registry,
234 reader: IndexerReader,
235 config: &IndexerConfig,
236 custom_runtime: Option<Handle>,
237) -> Result<ServerHandle, IndexerError> {
238 let mut builder =
239 JsonRpcServerBuilder::new(env!("CARGO_PKG_VERSION"), prometheus_registry, None, None);
240 let http_client = crate::get_http_client(config.rpc_client_url.as_str())?;
241
242 builder.register_module(WriteApi::new(http_client.clone()))?;
243 builder.register_module(IndexerApi::new(
244 reader.clone(),
245 config.iota_names_options.clone().into(),
246 ))?;
247 builder.register_module(TransactionBuilderApi::new(reader.clone()))?;
248 builder.register_module(MoveUtilsApi::new(reader.clone()))?;
249 builder.register_module(GovernanceReadApi::new(reader.clone()))?;
250 builder.register_module(ReadApi::new(reader.clone()))?;
251 builder.register_module(CoinReadApi::new(reader.clone())?)?;
252 builder.register_module(ExtendedApi::new(reader.clone()))?;
253
254 let default_socket_addr: SocketAddr = SocketAddr::new(
255 config.rpc_server_url.as_str().parse().unwrap(),
257 config.rpc_server_port,
258 );
259
260 let cancel = CancellationToken::new();
261 let system_package_task =
262 SystemPackageTask::new(reader.clone(), cancel.clone(), Duration::from_secs(10));
263
264 tracing::info!("Starting system package task");
265 spawn_monitored_task!(async move { system_package_task.run().await });
266
267 Ok(builder
268 .start(
269 default_socket_addr,
270 custom_runtime,
271 ServerType::Http,
272 Some(cancel),
273 )
274 .await?)
275}
276
277fn get_http_client(rpc_client_url: &str) -> Result<HttpClient, IndexerError> {
278 let mut headers = HeaderMap::new();
279 headers.insert(CLIENT_SDK_TYPE_HEADER, HeaderValue::from_static("indexer"));
280
281 HttpClientBuilder::default()
282 .max_request_size(2 << 30)
283 .set_headers(headers.clone())
284 .build(rpc_client_url)
285 .map_err(|e| {
286 warn!("Failed to get new Http client with error: {:?}", e);
287 IndexerError::HttpClientInit(format!(
288 "Failed to initialize fullnode RPC client with error: {:?}",
289 e
290 ))
291 })
292}