bridge_indexer/
main.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{env, path::PathBuf, sync::Arc};
6
7use anyhow::Result;
8use clap::*;
9use ethers::providers::{Http, Middleware, Provider};
10use iota_bridge::metrics::BridgeMetrics;
11use iota_bridge_indexer::{
12    config::IndexerConfig,
13    eth_bridge_indexer::{EthDataMapper, EthSubscriptionDatasource, EthSyncDatasource},
14    iota_bridge_indexer::{IotaBridgeDataMapper, PgBridgePersistent},
15    iota_transaction_handler::handle_iota_transactions_loop,
16    iota_transaction_queries::start_iota_tx_polling_task,
17    metrics::BridgeIndexerMetrics,
18    postgres_manager::{get_connection_pool, read_iota_progress_store},
19};
20use iota_config::Config;
21use iota_data_ingestion_core::DataIngestionMetrics;
22use iota_indexer_builder::{
23    indexer_builder::{BackfillStrategy, IndexerBuilder},
24    iota_datasource::IotaCheckpointDatasource,
25};
26use iota_metrics::{
27    metered_channel::channel, spawn_logged_monitored_task, start_prometheus_server,
28};
29use iota_sdk::IotaClientBuilder;
30use tokio::task::JoinHandle;
31use tracing::info;
32
33#[derive(Parser, Clone, Debug)]
34struct Args {
35    /// Path to a yaml config
36    #[arg(long, short)]
37    config_path: Option<PathBuf>,
38}
39
40#[tokio::main]
41async fn main() -> Result<()> {
42    let _guard = telemetry_subscribers::TelemetryConfig::new()
43        .with_env()
44        .init();
45
46    let args = Args::parse();
47
48    // load config
49    let config_path = if let Some(path) = args.config_path {
50        path
51    } else {
52        env::current_dir()
53            .expect("Couldn't get current directory")
54            .join("config.yaml")
55    };
56    let config = IndexerConfig::load(&config_path)?;
57
58    // Init metrics server
59    let registry_service = start_prometheus_server(
60        format!("{}:{}", config.metric_url, config.metric_port,)
61            .parse()
62            .unwrap_or_else(|err| panic!("Failed to parse metric address: {}", err)),
63    );
64    let registry = registry_service.default_registry();
65
66    iota_metrics::init_metrics(&registry);
67
68    info!(
69        "Metrics server started at {}::{}",
70        config.metric_url, config.metric_port
71    );
72    let indexer_meterics = BridgeIndexerMetrics::new(&registry);
73    let ingestion_metrics = DataIngestionMetrics::new(&registry);
74    let bridge_metrics = Arc::new(BridgeMetrics::new(&registry));
75
76    let db_url = config.db_url.clone();
77    let datastore = PgBridgePersistent::new(get_connection_pool(db_url.clone()));
78
79    let provider = Arc::new(
80        Provider::<Http>::try_from(config.eth_rpc_url.clone())?
81            .interval(std::time::Duration::from_millis(2000)),
82    );
83
84    let current_block = provider.get_block_number().await?.as_u64();
85    let subscription_end_block = u64::MAX;
86
87    // Start the eth subscription indexer
88
89    let eth_subscription_datasource = EthSubscriptionDatasource::new(
90        config.eth_iota_bridge_contract_address.clone(),
91        config.eth_ws_url.clone(),
92        indexer_meterics.clone(),
93    )?;
94    let eth_subscription_indexer = IndexerBuilder::new(
95        "EthBridgeSubscriptionIndexer",
96        eth_subscription_datasource,
97        EthDataMapper {
98            metrics: indexer_meterics.clone(),
99        },
100    )
101    .with_backfill_strategy(BackfillStrategy::Disabled)
102    .build(current_block, subscription_end_block, datastore.clone());
103    let subscription_indexer_fut = spawn_logged_monitored_task!(eth_subscription_indexer.start());
104
105    // Start the eth sync indexer
106    let eth_sync_datasource = EthSyncDatasource::new(
107        config.eth_iota_bridge_contract_address.clone(),
108        config.eth_rpc_url.clone(),
109        indexer_meterics.clone(),
110        bridge_metrics.clone(),
111    )?;
112    let eth_sync_indexer = IndexerBuilder::new(
113        "EthBridgeSyncIndexer",
114        eth_sync_datasource,
115        EthDataMapper {
116            metrics: indexer_meterics.clone(),
117        },
118    )
119    .with_backfill_strategy(BackfillStrategy::Partitioned { task_size: 1000 })
120    .disable_live_task()
121    .build(current_block, config.start_block, datastore.clone());
122    let sync_indexer_fut = spawn_logged_monitored_task!(eth_sync_indexer.start());
123
124    if let Some(iota_rpc_url) = config.iota_rpc_url.clone() {
125        // Todo: impl datasource for iota RPC datasource
126        start_processing_iota_checkpoints_by_querying_txns(
127            iota_rpc_url,
128            db_url.clone(),
129            indexer_meterics.clone(),
130            bridge_metrics,
131        )
132        .await?;
133    } else {
134        let iota_checkpoint_datasource = IotaCheckpointDatasource::new(
135            config.remote_store_url,
136            config.concurrency as usize,
137            config.checkpoints_path.clone().into(),
138            ingestion_metrics.clone(),
139        );
140        let indexer = IndexerBuilder::new(
141            "IotaBridgeIndexer",
142            iota_checkpoint_datasource,
143            IotaBridgeDataMapper {
144                metrics: indexer_meterics.clone(),
145            },
146        )
147        .build(
148            config
149                .resume_from_checkpoint
150                .unwrap_or(config.bridge_genesis_checkpoint),
151            config.bridge_genesis_checkpoint,
152            datastore,
153        );
154        indexer.start().await?;
155    }
156    // We are not waiting for the iota tasks to finish here, which is ok.
157    futures::future::join_all(vec![subscription_indexer_fut, sync_indexer_fut]).await;
158
159    Ok(())
160}
161
162async fn start_processing_iota_checkpoints_by_querying_txns(
163    iota_rpc_url: String,
164    db_url: String,
165    indexer_metrics: BridgeIndexerMetrics,
166    bridge_metrics: Arc<BridgeMetrics>,
167) -> Result<Vec<JoinHandle<()>>> {
168    let pg_pool = get_connection_pool(db_url.clone());
169    let (tx, rx) = channel(
170        100,
171        &iota_metrics::get_metrics()
172            .unwrap()
173            .channel_inflight
174            .with_label_values(&["iota_transaction_processing_queue"]),
175    );
176    let mut handles = vec![];
177    let cursor =
178        read_iota_progress_store(&pg_pool).expect("Failed to read cursor from iota progress store");
179    let iota_client = IotaClientBuilder::default().build(iota_rpc_url).await?;
180    handles.push(spawn_logged_monitored_task!(
181        start_iota_tx_polling_task(iota_client, cursor, tx, bridge_metrics),
182        "start_iota_tx_polling_task"
183    ));
184    handles.push(spawn_logged_monitored_task!(
185        handle_iota_transactions_loop(pg_pool.clone(), rx, indexer_metrics.clone()),
186        "handle_iota_transactions_loop"
187    ));
188    Ok(handles)
189}