iota_bridge_indexer/
eth_bridge_indexer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{HashMap, HashSet},
7    str::FromStr,
8    sync::Arc,
9    time::Duration,
10};
11
12use anyhow::Error;
13use async_trait::async_trait;
14use ethers::{
15    prelude::Transaction,
16    providers::{Http, Middleware, Provider, StreamExt, Ws},
17    types::{Address as EthAddress, Block, Filter, H256},
18};
19use iota_bridge::{
20    abi::{EthBridgeEvent, EthIotaBridgeEvents},
21    error::BridgeError,
22    eth_client::EthClient,
23    metered_eth_provider::MeteredEthHttpProvider,
24    metrics::BridgeMetrics,
25    retry_with_max_elapsed_time,
26    types::{EthEvent, RawEthLog},
27};
28use iota_indexer_builder::indexer_builder::{DataMapper, DataSender, Datasource};
29use iota_metrics::spawn_monitored_task;
30use tokio::task::JoinHandle;
31use tracing::info;
32
33use crate::{
34    BridgeDataSource, ProcessedTxnData, TokenTransfer, TokenTransferData, TokenTransferStatus,
35    metrics::BridgeIndexerMetrics,
36};
37
38type RawEthData = (RawEthLog, Block<H256>, Transaction);
39
40pub struct EthSubscriptionDatasource {
41    bridge_address: EthAddress,
42    eth_ws_url: String,
43    indexer_metrics: BridgeIndexerMetrics,
44}
45
46impl EthSubscriptionDatasource {
47    pub fn new(
48        eth_iota_bridge_contract_address: String,
49        eth_ws_url: String,
50        indexer_metrics: BridgeIndexerMetrics,
51    ) -> Result<Self, anyhow::Error> {
52        let bridge_address = EthAddress::from_str(&eth_iota_bridge_contract_address)?;
53        Ok(Self {
54            bridge_address,
55            eth_ws_url,
56            indexer_metrics,
57        })
58    }
59}
60#[async_trait]
61impl Datasource<RawEthData> for EthSubscriptionDatasource {
62    async fn start_data_retrieval(
63        &self,
64        starting_checkpoint: u64,
65        target_checkpoint: u64,
66        data_sender: DataSender<RawEthData>,
67    ) -> Result<JoinHandle<Result<(), Error>>, Error> {
68        let filter = Filter::new()
69            .address(self.bridge_address)
70            .from_block(starting_checkpoint)
71            .to_block(target_checkpoint);
72
73        let eth_ws_url = self.eth_ws_url.clone();
74        let indexer_metrics: BridgeIndexerMetrics = self.indexer_metrics.clone();
75
76        let handle = spawn_monitored_task!(async move {
77            let eth_ws_client = Provider::<Ws>::connect(&eth_ws_url).await?;
78
79            let mut cached_blocks: HashMap<u64, Block<H256>> = HashMap::new();
80
81            let mut stream = eth_ws_client.subscribe_logs(&filter).await?;
82            while let Some(log) = stream.next().await {
83                let raw_log = RawEthLog {
84                    block_number: log
85                        .block_number
86                        .ok_or(BridgeError::Provider(
87                            "Provider returns log without block_number".into(),
88                        ))
89                        .unwrap()
90                        .as_u64(),
91                    tx_hash: log
92                        .transaction_hash
93                        .ok_or(BridgeError::Provider(
94                            "Provider returns log without transaction_hash".into(),
95                        ))
96                        .unwrap(),
97                    log,
98                };
99
100                let block_number = raw_log.block_number();
101
102                let block = if let Some(cached_block) = cached_blocks.get(&block_number) {
103                    cached_block.clone()
104                } else {
105                    let Ok(Ok(Some(block))) = retry_with_max_elapsed_time!(
106                        eth_ws_client.get_block(block_number),
107                        Duration::from_secs(30000)
108                    ) else {
109                        panic!("Unable to get block from provider");
110                    };
111
112                    cached_blocks.insert(block_number, block.clone());
113                    block
114                };
115
116                let Ok(Ok(Some(transaction))) = retry_with_max_elapsed_time!(
117                    eth_ws_client.get_transaction(raw_log.tx_hash),
118                    Duration::from_secs(30000)
119                ) else {
120                    panic!("Unable to get transaction from provider");
121                };
122
123                data_sender
124                    .send((block_number, vec![(raw_log, block, transaction)]))
125                    .await?;
126
127                indexer_metrics
128                    .latest_committed_eth_block
129                    .set(block_number as i64);
130            }
131
132            Ok::<_, Error>(())
133        });
134        Ok(handle)
135    }
136}
137
138pub struct EthSyncDatasource {
139    bridge_address: EthAddress,
140    eth_http_url: String,
141    indexer_metrics: BridgeIndexerMetrics,
142    bridge_metrics: Arc<BridgeMetrics>,
143}
144
145impl EthSyncDatasource {
146    pub fn new(
147        eth_iota_bridge_contract_address: String,
148        eth_http_url: String,
149        indexer_metrics: BridgeIndexerMetrics,
150        bridge_metrics: Arc<BridgeMetrics>,
151    ) -> Result<Self, anyhow::Error> {
152        let bridge_address = EthAddress::from_str(&eth_iota_bridge_contract_address)?;
153        Ok(Self {
154            bridge_address,
155            eth_http_url,
156            indexer_metrics,
157            bridge_metrics,
158        })
159    }
160}
161#[async_trait]
162impl Datasource<RawEthData> for EthSyncDatasource {
163    async fn start_data_retrieval(
164        &self,
165        starting_checkpoint: u64,
166        target_checkpoint: u64,
167        data_sender: DataSender<RawEthData>,
168    ) -> Result<JoinHandle<Result<(), Error>>, Error> {
169        let client: Arc<EthClient<MeteredEthHttpProvider>> = Arc::new(
170            EthClient::<MeteredEthHttpProvider>::new(
171                &self.eth_http_url,
172                HashSet::from_iter(vec![self.bridge_address]),
173                self.bridge_metrics.clone(),
174            )
175            .await?,
176        );
177
178        let provider = Arc::new(
179            Provider::<Http>::try_from(&self.eth_http_url)?
180                .interval(std::time::Duration::from_millis(2000)),
181        );
182
183        let bridge_address = self.bridge_address;
184        let indexer_metrics: BridgeIndexerMetrics = self.indexer_metrics.clone();
185        let client = Arc::clone(&client);
186        let provider = Arc::clone(&provider);
187
188        let handle = spawn_monitored_task!(async move {
189            let mut cached_blocks: HashMap<u64, Block<H256>> = HashMap::new();
190
191            let Ok(Ok(logs)) = retry_with_max_elapsed_time!(
192                client.get_raw_events_in_range(
193                    bridge_address,
194                    starting_checkpoint,
195                    target_checkpoint
196                ),
197                Duration::from_secs(30000)
198            ) else {
199                panic!("Unable to get logs from provider");
200            };
201
202            let mut data = Vec::new();
203            let mut first_block = 0;
204
205            for log in logs {
206                let block = if let Some(cached_block) = cached_blocks.get(&log.block_number) {
207                    cached_block.clone()
208                } else {
209                    let Ok(Ok(Some(block))) = retry_with_max_elapsed_time!(
210                        provider.get_block(log.block_number),
211                        Duration::from_secs(30000)
212                    ) else {
213                        panic!("Unable to get block from provider");
214                    };
215
216                    cached_blocks.insert(log.block_number, block.clone());
217                    block
218                };
219
220                if first_block == 0 {
221                    first_block = log.block_number;
222                }
223
224                let Ok(Ok(Some(transaction))) = retry_with_max_elapsed_time!(
225                    provider.get_transaction(log.tx_hash),
226                    Duration::from_secs(30000)
227                ) else {
228                    panic!("Unable to get transaction from provider");
229                };
230
231                data.push((log, block, transaction));
232            }
233
234            data_sender.send((target_checkpoint, data)).await?;
235
236            indexer_metrics
237                .last_synced_eth_block
238                .set(first_block as i64);
239
240            Ok::<_, Error>(())
241        });
242
243        Ok(handle)
244    }
245}
246
247#[derive(Clone)]
248pub struct EthDataMapper {
249    pub metrics: BridgeIndexerMetrics,
250}
251
252impl<E: EthEvent> DataMapper<(E, Block<H256>, Transaction), ProcessedTxnData> for EthDataMapper {
253    fn map(
254        &self,
255        (log, block, transaction): (E, Block<H256>, Transaction),
256    ) -> Result<Vec<ProcessedTxnData>, Error> {
257        let eth_bridge_event = EthBridgeEvent::try_from_log(log.log());
258        if eth_bridge_event.is_none() {
259            return Ok(vec![]);
260        }
261        self.metrics.total_eth_bridge_transactions.inc();
262        let bridge_event = eth_bridge_event.unwrap();
263        let timestamp_ms = block.timestamp.as_u64() * 1000;
264        let gas = transaction.gas;
265
266        let transfer = match bridge_event {
267            EthBridgeEvent::EthIotaBridgeEvents(bridge_event) => match bridge_event {
268                EthIotaBridgeEvents::TokensDepositedFilter(bridge_event) => {
269                    info!("Observed Eth Deposit at block: {}", log.block_number());
270                    self.metrics.total_eth_token_deposited.inc();
271                    ProcessedTxnData::TokenTransfer(TokenTransfer {
272                        chain_id: bridge_event.source_chain_id,
273                        nonce: bridge_event.nonce,
274                        block_height: log.block_number(),
275                        timestamp_ms,
276                        txn_hash: transaction.hash.as_bytes().to_vec(),
277                        txn_sender: bridge_event.sender_address.as_bytes().to_vec(),
278                        status: TokenTransferStatus::Deposited,
279                        gas_usage: gas.as_u64() as i64,
280                        data_source: BridgeDataSource::Eth,
281                        data: Some(TokenTransferData {
282                            sender_address: bridge_event.sender_address.as_bytes().to_vec(),
283                            destination_chain: bridge_event.destination_chain_id,
284                            recipient_address: bridge_event.recipient_address.to_vec(),
285                            token_id: bridge_event.token_id,
286                            amount: bridge_event.iota_adjusted_amount,
287                        }),
288                    })
289                }
290                EthIotaBridgeEvents::TokensClaimedFilter(bridge_event) => {
291                    info!("Observed Eth Claim at block: {}", log.block_number());
292                    self.metrics.total_eth_token_transfer_claimed.inc();
293                    ProcessedTxnData::TokenTransfer(TokenTransfer {
294                        chain_id: bridge_event.source_chain_id,
295                        nonce: bridge_event.nonce,
296                        block_height: log.block_number(),
297                        timestamp_ms,
298                        txn_hash: transaction.hash.as_bytes().to_vec(),
299                        txn_sender: bridge_event.sender_address.to_vec(),
300                        status: TokenTransferStatus::Claimed,
301                        gas_usage: gas.as_u64() as i64,
302                        data_source: BridgeDataSource::Eth,
303                        data: None,
304                    })
305                }
306                EthIotaBridgeEvents::PausedFilter(_)
307                | EthIotaBridgeEvents::UnpausedFilter(_)
308                | EthIotaBridgeEvents::UpgradedFilter(_)
309                | EthIotaBridgeEvents::InitializedFilter(_) => {
310                    // TODO: handle these events
311                    self.metrics.total_eth_bridge_txn_other.inc();
312                    return Ok(vec![]);
313                }
314            },
315            EthBridgeEvent::EthBridgeCommitteeEvents(_)
316            | EthBridgeEvent::EthBridgeLimiterEvents(_)
317            | EthBridgeEvent::EthBridgeConfigEvents(_)
318            | EthBridgeEvent::EthCommitteeUpgradeableContractEvents(_) => {
319                // TODO: handle these events
320                self.metrics.total_eth_bridge_txn_other.inc();
321                return Ok(vec![]);
322            }
323        };
324        Ok(vec![transfer])
325    }
326}