iota_indexer/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#![recursion_limit = "256"]
6
7use std::{net::SocketAddr, path::PathBuf, time::Duration};
8
9use anyhow::{Result, anyhow};
10use clap::{Args, Parser};
11use errors::IndexerError;
12use iota_json_rpc::{JsonRpcServerBuilder, ServerHandle, ServerType};
13use iota_json_rpc_api::CLIENT_SDK_TYPE_HEADER;
14use iota_metrics::spawn_monitored_task;
15use iota_names::config::IotaNamesConfig;
16use iota_types::base_types::{IotaAddress, ObjectID};
17use jsonrpsee::http_client::{HeaderMap, HeaderValue, HttpClient, HttpClientBuilder};
18use metrics::IndexerMetrics;
19use prometheus::Registry;
20use secrecy::{ExposeSecret, Secret};
21use system_package_task::SystemPackageTask;
22use tokio::runtime::Handle;
23use tokio_util::sync::CancellationToken;
24use tracing::warn;
25use url::Url;
26
27use crate::{
28    apis::{
29        CoinReadApi, ExtendedApi, GovernanceReadApi, IndexerApi, MoveUtilsApi, ReadApi,
30        TransactionBuilderApi, WriteApi,
31    },
32    indexer_reader::IndexerReader,
33};
34
35pub mod apis;
36pub mod db;
37pub mod errors;
38pub mod handlers;
39pub mod indexer;
40pub mod indexer_reader;
41pub mod metrics;
42pub mod models;
43pub mod processors;
44pub mod schema;
45pub mod store;
46pub mod system_package_task;
47pub mod test_utils;
48pub mod types;
49
50#[derive(Parser, Clone, Debug)]
51#[command(
52    name = "IOTA indexer",
53    about = "An off-fullnode service serving data from IOTA protocol"
54)]
55pub struct IndexerConfig {
56    #[arg(long)]
57    pub db_url: Option<Secret<String>>,
58    #[arg(long)]
59    pub db_user_name: Option<String>,
60    #[arg(long)]
61    pub db_password: Option<Secret<String>>,
62    #[arg(long)]
63    pub db_host: Option<String>,
64    #[arg(long)]
65    pub db_port: Option<u16>,
66    #[arg(long)]
67    pub db_name: Option<String>,
68    #[arg(long, default_value = "http://0.0.0.0:9000", global = true)]
69    pub rpc_client_url: String,
70    #[arg(long, default_value = Some("http://0.0.0.0:9000/api/v1"), global = true)]
71    pub remote_store_url: Option<String>,
72    #[arg(long, default_value = "0.0.0.0", global = true)]
73    pub client_metric_host: String,
74    #[arg(long, default_value = "9184", global = true)]
75    pub client_metric_port: u16,
76    #[arg(long, default_value = "0.0.0.0", global = true)]
77    pub rpc_server_url: String,
78    #[arg(long, default_value = "9000", global = true)]
79    pub rpc_server_port: u16,
80    #[arg(long)]
81    pub reset_db: bool,
82    #[arg(long)]
83    pub fullnode_sync_worker: bool,
84    #[arg(long)]
85    pub rpc_server_worker: bool,
86    #[arg(long)]
87    pub data_ingestion_path: Option<PathBuf>,
88    #[arg(long)]
89    pub analytical_worker: bool,
90    #[command(flatten)]
91    pub iota_names_options: IotaNamesOptions,
92}
93
94impl IndexerConfig {
95    /// returns connection url without the db name
96    pub fn base_connection_url(&self) -> Result<String, anyhow::Error> {
97        let url_secret = self.get_db_url()?;
98        let url_str = url_secret.expose_secret();
99        let url = Url::parse(url_str).expect("Failed to parse URL");
100        Ok(format!(
101            "{}://{}:{}@{}:{}/",
102            url.scheme(),
103            url.username(),
104            url.password().unwrap_or_default(),
105            url.host_str().unwrap_or_default(),
106            url.port().unwrap_or_default()
107        ))
108    }
109
110    pub fn get_db_url(&self) -> Result<Secret<String>, anyhow::Error> {
111        match (
112            &self.db_url,
113            &self.db_user_name,
114            &self.db_password,
115            &self.db_host,
116            &self.db_port,
117            &self.db_name,
118        ) {
119            (Some(db_url), _, _, _, _, _) => Ok(db_url.clone()),
120            (
121                None,
122                Some(db_user_name),
123                Some(db_password),
124                Some(db_host),
125                Some(db_port),
126                Some(db_name),
127            ) => Ok(secrecy::Secret::new(format!(
128                "postgres://{}:{}@{}:{}/{}",
129                db_user_name,
130                db_password.expose_secret(),
131                db_host,
132                db_port,
133                db_name
134            ))),
135            _ => Err(anyhow!(
136                "Invalid db connection config, either db_url or (db_user_name, db_password, db_host, db_port, db_name) must be provided"
137            )),
138        }
139    }
140}
141
142impl Default for IndexerConfig {
143    fn default() -> Self {
144        Self {
145            db_url: Some(secrecy::Secret::new(
146                "postgres://postgres:postgrespw@localhost:5432/iota_indexer".to_string(),
147            )),
148            db_user_name: None,
149            db_password: None,
150            db_host: None,
151            db_port: None,
152            db_name: None,
153            rpc_client_url: "http://127.0.0.1:9000".to_string(),
154            remote_store_url: Some("http://127.0.0.1:9000/api/v1".to_string()),
155            client_metric_host: "0.0.0.0".to_string(),
156            client_metric_port: 9184,
157            rpc_server_url: "0.0.0.0".to_string(),
158            rpc_server_port: 9000,
159            reset_db: false,
160            fullnode_sync_worker: true,
161            rpc_server_worker: true,
162            data_ingestion_path: None,
163            analytical_worker: false,
164            iota_names_options: IotaNamesOptions::default(),
165        }
166    }
167}
168
169#[derive(Args, Debug, Clone)]
170pub struct IotaNamesOptions {
171    #[arg(default_value_t = IotaNamesConfig::default().package_address)]
172    #[arg(long = "iota-names-package-address")]
173    pub package_address: IotaAddress,
174    #[arg(default_value_t = IotaNamesConfig::default().object_id)]
175    #[arg(long = "iota-names-object-id")]
176    pub object_id: ObjectID,
177    #[arg(default_value_t = IotaNamesConfig::default().payments_package_address)]
178    #[arg(long = "iota-names-payments-package-address")]
179    pub payments_package_address: IotaAddress,
180    #[arg(default_value_t = IotaNamesConfig::default().registry_id)]
181    #[arg(long = "iota-names-registry-id")]
182    pub registry_id: ObjectID,
183    #[arg(default_value_t = IotaNamesConfig::default().reverse_registry_id)]
184    #[arg(long = "iota-names-reverse-registry-id")]
185    pub reverse_registry_id: ObjectID,
186}
187
188impl From<IotaNamesOptions> for IotaNamesConfig {
189    fn from(options: IotaNamesOptions) -> Self {
190        let IotaNamesOptions {
191            package_address,
192            object_id,
193            payments_package_address,
194            registry_id,
195            reverse_registry_id,
196        } = options;
197        Self {
198            package_address,
199            object_id,
200            payments_package_address,
201            registry_id,
202            reverse_registry_id,
203        }
204    }
205}
206
207impl From<IotaNamesConfig> for IotaNamesOptions {
208    fn from(config: IotaNamesConfig) -> Self {
209        let IotaNamesConfig {
210            package_address,
211            object_id,
212            payments_package_address,
213            registry_id,
214            reverse_registry_id,
215        } = config;
216        Self {
217            package_address,
218            object_id,
219            payments_package_address,
220            registry_id,
221            reverse_registry_id,
222        }
223    }
224}
225
226impl Default for IotaNamesOptions {
227    fn default() -> Self {
228        IotaNamesConfig::default().into()
229    }
230}
231
232pub async fn build_json_rpc_server(
233    prometheus_registry: &Registry,
234    reader: IndexerReader,
235    config: &IndexerConfig,
236    custom_runtime: Option<Handle>,
237) -> Result<ServerHandle, IndexerError> {
238    let mut builder =
239        JsonRpcServerBuilder::new(env!("CARGO_PKG_VERSION"), prometheus_registry, None, None);
240    let http_client = crate::get_http_client(config.rpc_client_url.as_str())?;
241
242    builder.register_module(WriteApi::new(http_client.clone()))?;
243    builder.register_module(IndexerApi::new(
244        reader.clone(),
245        config.iota_names_options.clone().into(),
246    ))?;
247    builder.register_module(TransactionBuilderApi::new(reader.clone()))?;
248    builder.register_module(MoveUtilsApi::new(reader.clone()))?;
249    builder.register_module(GovernanceReadApi::new(reader.clone()))?;
250    builder.register_module(ReadApi::new(reader.clone()))?;
251    builder.register_module(CoinReadApi::new(reader.clone())?)?;
252    builder.register_module(ExtendedApi::new(reader.clone()))?;
253
254    let default_socket_addr: SocketAddr = SocketAddr::new(
255        // unwrap() here is safe b/c the address is a static config.
256        config.rpc_server_url.as_str().parse().unwrap(),
257        config.rpc_server_port,
258    );
259
260    let cancel = CancellationToken::new();
261    let system_package_task =
262        SystemPackageTask::new(reader.clone(), cancel.clone(), Duration::from_secs(10));
263
264    tracing::info!("Starting system package task");
265    spawn_monitored_task!(async move { system_package_task.run().await });
266
267    Ok(builder
268        .start(
269            default_socket_addr,
270            custom_runtime,
271            ServerType::Http,
272            Some(cancel),
273        )
274        .await?)
275}
276
277fn get_http_client(rpc_client_url: &str) -> Result<HttpClient, IndexerError> {
278    let mut headers = HeaderMap::new();
279    headers.insert(CLIENT_SDK_TYPE_HEADER, HeaderValue::from_static("indexer"));
280
281    HttpClientBuilder::default()
282        .max_request_size(2 << 30)
283        .set_headers(headers.clone())
284        .build(rpc_client_url)
285        .map_err(|e| {
286            warn!("Failed to get new Http client with error: {:?}", e);
287            IndexerError::HttpClientInit(format!(
288                "Failed to initialize fullnode RPC client with error: {:?}",
289                e
290            ))
291        })
292}