1use std::{env, path::PathBuf, sync::Arc};
6
7use anyhow::Result;
8use clap::*;
9use ethers::providers::{Http, Middleware, Provider};
10use iota_bridge::metrics::BridgeMetrics;
11use iota_bridge_indexer::{
12 config::IndexerConfig,
13 eth_bridge_indexer::{EthDataMapper, EthSubscriptionDatasource, EthSyncDatasource},
14 iota_bridge_indexer::{IotaBridgeDataMapper, PgBridgePersistent},
15 iota_transaction_handler::handle_iota_transactions_loop,
16 iota_transaction_queries::start_iota_tx_polling_task,
17 metrics::BridgeIndexerMetrics,
18 postgres_manager::{get_connection_pool, read_iota_progress_store},
19};
20use iota_config::Config;
21use iota_data_ingestion_core::DataIngestionMetrics;
22use iota_indexer_builder::{
23 indexer_builder::{BackfillStrategy, IndexerBuilder},
24 iota_datasource::IotaCheckpointDatasource,
25};
26use iota_metrics::{
27 metered_channel::channel, spawn_logged_monitored_task, start_prometheus_server,
28};
29use iota_sdk::IotaClientBuilder;
30use tokio::task::JoinHandle;
31use tracing::info;
32
33#[derive(Parser, Clone, Debug)]
34struct Args {
35 #[arg(long, short)]
37 config_path: Option<PathBuf>,
38}
39
40#[tokio::main]
41async fn main() -> Result<()> {
42 let _guard = telemetry_subscribers::TelemetryConfig::new()
43 .with_env()
44 .init();
45
46 let args = Args::parse();
47
48 let config_path = if let Some(path) = args.config_path {
50 path
51 } else {
52 env::current_dir()
53 .expect("Couldn't get current directory")
54 .join("config.yaml")
55 };
56 let config = IndexerConfig::load(&config_path)?;
57
58 let registry_service = start_prometheus_server(
60 format!("{}:{}", config.metric_url, config.metric_port,)
61 .parse()
62 .unwrap_or_else(|err| panic!("Failed to parse metric address: {}", err)),
63 );
64 let registry = registry_service.default_registry();
65
66 iota_metrics::init_metrics(®istry);
67
68 info!(
69 "Metrics server started at {}::{}",
70 config.metric_url, config.metric_port
71 );
72 let indexer_meterics = BridgeIndexerMetrics::new(®istry);
73 let ingestion_metrics = DataIngestionMetrics::new(®istry);
74 let bridge_metrics = Arc::new(BridgeMetrics::new(®istry));
75
76 let db_url = config.db_url.clone();
77 let datastore = PgBridgePersistent::new(get_connection_pool(db_url.clone()));
78
79 let provider = Arc::new(
80 Provider::<Http>::try_from(config.eth_rpc_url.clone())?
81 .interval(std::time::Duration::from_millis(2000)),
82 );
83
84 let current_block = provider.get_block_number().await?.as_u64();
85 let subscription_end_block = u64::MAX;
86
87 let eth_subscription_datasource = EthSubscriptionDatasource::new(
90 config.eth_iota_bridge_contract_address.clone(),
91 config.eth_ws_url.clone(),
92 indexer_meterics.clone(),
93 )?;
94 let eth_subscription_indexer = IndexerBuilder::new(
95 "EthBridgeSubscriptionIndexer",
96 eth_subscription_datasource,
97 EthDataMapper {
98 metrics: indexer_meterics.clone(),
99 },
100 )
101 .with_backfill_strategy(BackfillStrategy::Disabled)
102 .build(current_block, subscription_end_block, datastore.clone());
103 let subscription_indexer_fut = spawn_logged_monitored_task!(eth_subscription_indexer.start());
104
105 let eth_sync_datasource = EthSyncDatasource::new(
107 config.eth_iota_bridge_contract_address.clone(),
108 config.eth_rpc_url.clone(),
109 indexer_meterics.clone(),
110 bridge_metrics.clone(),
111 )?;
112 let eth_sync_indexer = IndexerBuilder::new(
113 "EthBridgeSyncIndexer",
114 eth_sync_datasource,
115 EthDataMapper {
116 metrics: indexer_meterics.clone(),
117 },
118 )
119 .with_backfill_strategy(BackfillStrategy::Partitioned { task_size: 1000 })
120 .disable_live_task()
121 .build(current_block, config.start_block, datastore.clone());
122 let sync_indexer_fut = spawn_logged_monitored_task!(eth_sync_indexer.start());
123
124 if let Some(iota_rpc_url) = config.iota_rpc_url.clone() {
125 start_processing_iota_checkpoints_by_querying_txns(
127 iota_rpc_url,
128 db_url.clone(),
129 indexer_meterics.clone(),
130 bridge_metrics,
131 )
132 .await?;
133 } else {
134 let iota_checkpoint_datasource = IotaCheckpointDatasource::new(
135 config.remote_store_url,
136 config.concurrency as usize,
137 config.checkpoints_path.clone().into(),
138 ingestion_metrics.clone(),
139 );
140 let indexer = IndexerBuilder::new(
141 "IotaBridgeIndexer",
142 iota_checkpoint_datasource,
143 IotaBridgeDataMapper {
144 metrics: indexer_meterics.clone(),
145 },
146 )
147 .build(
148 config
149 .resume_from_checkpoint
150 .unwrap_or(config.bridge_genesis_checkpoint),
151 config.bridge_genesis_checkpoint,
152 datastore,
153 );
154 indexer.start().await?;
155 }
156 futures::future::join_all(vec![subscription_indexer_fut, sync_indexer_fut]).await;
158
159 Ok(())
160}
161
162async fn start_processing_iota_checkpoints_by_querying_txns(
163 iota_rpc_url: String,
164 db_url: String,
165 indexer_metrics: BridgeIndexerMetrics,
166 bridge_metrics: Arc<BridgeMetrics>,
167) -> Result<Vec<JoinHandle<()>>> {
168 let pg_pool = get_connection_pool(db_url.clone());
169 let (tx, rx) = channel(
170 100,
171 &iota_metrics::get_metrics()
172 .unwrap()
173 .channel_inflight
174 .with_label_values(&["iota_transaction_processing_queue"]),
175 );
176 let mut handles = vec![];
177 let cursor =
178 read_iota_progress_store(&pg_pool).expect("Failed to read cursor from iota progress store");
179 let iota_client = IotaClientBuilder::default().build(iota_rpc_url).await?;
180 handles.push(spawn_logged_monitored_task!(
181 start_iota_tx_polling_task(iota_client, cursor, tx, bridge_metrics),
182 "start_iota_tx_polling_task"
183 ));
184 handles.push(spawn_logged_monitored_task!(
185 handle_iota_transactions_loop(pg_pool.clone(), rx, indexer_metrics.clone()),
186 "handle_iota_transactions_loop"
187 ));
188 Ok(handles)
189}