iota_bridge/
eth_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashSet, sync::Arc};
6
7use ethers::{
8    providers::{JsonRpcClient, Middleware, Provider},
9    types::{Address as EthAddress, Block, Filter, TxHash},
10};
11use tap::TapFallible;
12
13#[cfg(test)]
14use crate::eth_mock_provider::EthMockProvider;
15use crate::{
16    abi::EthBridgeEvent,
17    error::{BridgeError, BridgeResult},
18    metered_eth_provider::{MeteredEthHttpProvider, new_metered_eth_provider},
19    metrics::BridgeMetrics,
20    types::{BridgeAction, EthLog, RawEthLog},
21};
22pub struct EthClient<P> {
23    provider: Provider<P>,
24    contract_addresses: HashSet<EthAddress>,
25}
26
27impl EthClient<MeteredEthHttpProvider> {
28    pub async fn new(
29        provider_url: &str,
30        contract_addresses: HashSet<EthAddress>,
31        metrics: Arc<BridgeMetrics>,
32    ) -> anyhow::Result<Self> {
33        let provider = new_metered_eth_provider(provider_url, metrics)?;
34        let self_ = Self {
35            provider,
36            contract_addresses,
37        };
38        self_.describe().await?;
39        Ok(self_)
40    }
41}
42
43#[cfg(test)]
44impl EthClient<EthMockProvider> {
45    pub fn new_mocked(provider: EthMockProvider, contract_addresses: HashSet<EthAddress>) -> Self {
46        let provider = Provider::new(provider);
47        Self {
48            provider,
49            contract_addresses,
50        }
51    }
52}
53
54impl<P> EthClient<P>
55where
56    P: JsonRpcClient,
57{
58    // TODO assert chain identifier
59    async fn describe(&self) -> anyhow::Result<()> {
60        let chain_id = self.provider.get_chainid().await?;
61        let block_number = self.provider.get_block_number().await?;
62        tracing::info!(
63            "EthClient is connected to chain {chain_id}, current block number: {block_number}"
64        );
65        Ok(())
66    }
67
68    /// Returns BridgeAction from an Eth Transaction with transaction hash
69    /// and the event index. If event is declared in an unrecognized
70    /// contract, return error.
71    pub async fn get_finalized_bridge_action_maybe(
72        &self,
73        tx_hash: TxHash,
74        event_idx: u16,
75    ) -> BridgeResult<BridgeAction> {
76        let receipt = self
77            .provider
78            .get_transaction_receipt(tx_hash)
79            .await
80            .map_err(BridgeError::from)?
81            .ok_or(BridgeError::TxNotFound)?;
82        let receipt_block_num = receipt.block_number.ok_or(BridgeError::Provider(
83            "Provider returns log without block_number".into(),
84        ))?;
85        // TODO: save the latest finalized block id so we don't have to query it every
86        // time
87        let last_finalized_block_id = self.get_last_finalized_block_id().await?;
88        if receipt_block_num.as_u64() > last_finalized_block_id {
89            return Err(BridgeError::TxNotFinalized);
90        }
91        let log = receipt
92            .logs
93            .get(event_idx as usize)
94            .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
95
96        // Ignore events emitted from unrecognized contracts
97        if !self.contract_addresses.contains(&log.address) {
98            return Err(BridgeError::BridgeEventInUnrecognizedEthContract);
99        }
100
101        let eth_log = EthLog {
102            block_number: receipt_block_num.as_u64(),
103            tx_hash,
104            log_index_in_tx: event_idx,
105            log: log.clone(),
106        };
107        let bridge_event = EthBridgeEvent::try_from_eth_log(&eth_log)
108            .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
109        bridge_event
110            .try_into_bridge_action(tx_hash, event_idx)?
111            .ok_or(BridgeError::BridgeEventNotActionable)
112    }
113
114    pub async fn get_last_finalized_block_id(&self) -> BridgeResult<u64> {
115        let block: Result<Option<Block<ethers::types::TxHash>>, ethers::prelude::ProviderError> =
116            self.provider
117                .request("eth_getBlockByNumber", ("finalized", false))
118                .await;
119        let block = block?.ok_or(BridgeError::TransientProvider(
120            "Provider fails to return last finalized block".into(),
121        ))?;
122        let number = block.number.ok_or(BridgeError::TransientProvider(
123            "Provider returns block without number".into(),
124        ))?;
125        Ok(number.as_u64())
126    }
127
128    // Note: query may fail if range is too big. Callsite is responsible
129    // for chunking the query.
130    pub async fn get_events_in_range(
131        &self,
132        address: ethers::types::Address,
133        start_block: u64,
134        end_block: u64,
135    ) -> BridgeResult<Vec<EthLog>> {
136        let filter = Filter::new()
137            .from_block(start_block)
138            .to_block(end_block)
139            .address(address);
140        let logs = self
141            .provider
142            // TODO use get_logs_paginated?
143            .get_logs(&filter)
144            .await
145            .map_err(BridgeError::from)
146            .tap_err(|e| {
147                tracing::error!(
148                    "get_events_in_range failed. Filter: {:?}. Error {:?}",
149                    filter,
150                    e
151                )
152            })?;
153
154        // Safeguard check that all events are emitted from requested contract address
155        if logs.iter().any(|log| log.address != address) {
156            return Err(BridgeError::Provider(format!(
157                "Provider returns logs from different contract address (expected: {:?}): {:?}",
158                address, logs
159            )));
160        }
161        if logs.is_empty() {
162            return Ok(vec![]);
163        }
164
165        let tasks = logs.into_iter().map(|log| self.get_log_tx_details(log));
166        futures::future::join_all(tasks)
167            .await
168            .into_iter()
169            .collect::<Result<Vec<_>, _>>()
170            .tap_err(|e| {
171                tracing::error!(
172                    "get_log_tx_details failed. Filter: {:?}. Error {:?}",
173                    filter,
174                    e
175                )
176            })
177    }
178
179    // Note: query may fail if range is too big. Callsite is responsible
180    // for chunking the query.
181    pub async fn get_raw_events_in_range(
182        &self,
183        address: ethers::types::Address,
184        start_block: u64,
185        end_block: u64,
186    ) -> BridgeResult<Vec<RawEthLog>> {
187        let filter = Filter::new()
188            .from_block(start_block)
189            .to_block(end_block)
190            .address(address);
191        let logs = self
192            .provider
193            .get_logs(&filter)
194            .await
195            .map_err(BridgeError::from)
196            .tap_err(|e| {
197                tracing::error!(
198                    "get_events_in_range failed. Filter: {:?}. Error {:?}",
199                    filter,
200                    e
201                )
202            })?;
203        // Safeguard check that all events are emitted from requested contract address
204        logs.into_iter().map(
205            |log| {
206                if log.address != address {
207                    return Err(BridgeError::Provider(format!("Provider returns logs from different contract address (expected: {:?}): {:?}", address, log)));
208                }
209                Ok(RawEthLog {
210                block_number: log.block_number.ok_or(BridgeError::Provider("Provider returns log without block_number".into()))?.as_u64(),
211                tx_hash: log.transaction_hash.ok_or(BridgeError::Provider("Provider returns log without transaction_hash".into()))?,
212                log,
213            })}
214        ).collect::<Result<Vec<_>, _>>()
215    }
216
217    /// This function converts a `Log` to `EthLog`, to make sure the
218    /// `block_num`, `tx_hash` and `log_index_in_tx` are available for
219    /// downstream.
220    // It's frustratingly ugly because of the nulliability of many fields in `Log`.
221    async fn get_log_tx_details(&self, log: ethers::types::Log) -> BridgeResult<EthLog> {
222        let block_number = log
223            .block_number
224            .ok_or(BridgeError::Provider(
225                "Provider returns log without block_number".into(),
226            ))?
227            .as_u64();
228        let tx_hash = log.transaction_hash.ok_or(BridgeError::Provider(
229            "Provider returns log without transaction_hash".into(),
230        ))?;
231        // This is the log index in the block, rather than transaction.
232        let log_index = log.log_index.ok_or(BridgeError::Provider(
233            "Provider returns log without log_index".into(),
234        ))?;
235
236        // Now get the log's index in the transaction. There is `transaction_log_index`
237        // field in `Log`, but I never saw it populated.
238
239        let receipt = self
240            .provider
241            .get_transaction_receipt(tx_hash)
242            .await
243            .map_err(BridgeError::from)?
244            .ok_or(BridgeError::Provider(format!(
245                "Provide cannot find eth transaction for log: {:?})",
246                log
247            )))?;
248
249        let receipt_block_num = receipt.block_number.ok_or(BridgeError::Provider(
250            "Provider returns log without block_number".into(),
251        ))?;
252        if receipt_block_num.as_u64() != block_number {
253            return Err(BridgeError::Provider(format!(
254                "Provider returns receipt with different block number from log. Receipt: {:?}, Log: {:?}",
255                receipt, log
256            )));
257        }
258
259        // Find the log index in the transaction
260        let mut log_index_in_tx = None;
261        for (idx, receipt_log) in receipt.logs.iter().enumerate() {
262            // match log index (in the block)
263            if receipt_log.log_index == Some(log_index) {
264                // make sure the topics and data match
265                if receipt_log.topics != log.topics || receipt_log.data != log.data {
266                    return Err(BridgeError::Provider(format!(
267                        "Provider returns receipt with different log from log. Receipt: {:?}, Log: {:?}",
268                        receipt, log
269                    )));
270                }
271                log_index_in_tx = Some(idx);
272            }
273        }
274        let log_index_in_tx = log_index_in_tx.ok_or(BridgeError::Provider(format!(
275            "Couldn't find matching log: {:?} in transaction {}",
276            log, tx_hash
277        )))?;
278
279        Ok(EthLog {
280            block_number,
281            tx_hash,
282            log_index_in_tx: log_index_in_tx as u16,
283            log,
284        })
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use ethers::types::{Address as EthAddress, Log, TransactionReceipt, U64};
291    use prometheus::Registry;
292
293    use super::*;
294    use crate::test_utils::{get_test_log_and_action, mock_last_finalized_block};
295
296    #[tokio::test]
297    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
298    async fn test_get_finalized_bridge_action_maybe() {
299        telemetry_subscribers::init_for_testing();
300        let registry = Registry::new();
301        iota_metrics::init_metrics(&registry);
302        let mock_provider = EthMockProvider::new();
303        mock_last_finalized_block(&mock_provider, 777);
304
305        let client = EthClient::new_mocked(
306            mock_provider.clone(),
307            HashSet::from_iter(vec![EthAddress::zero()]),
308        );
309        let result = client.get_last_finalized_block_id().await.unwrap();
310        assert_eq!(result, 777);
311
312        let eth_tx_hash = TxHash::random();
313        let log = Log {
314            transaction_hash: Some(eth_tx_hash),
315            block_number: Some(U64::from(778)),
316            ..Default::default()
317        };
318        let (good_log, bridge_action) = get_test_log_and_action(EthAddress::zero(), eth_tx_hash, 1);
319        // Mocks `eth_getTransactionReceipt` to return `log` and `good_log` in order
320        mock_provider
321            .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
322                "eth_getTransactionReceipt",
323                [log.transaction_hash.unwrap()],
324                TransactionReceipt {
325                    block_number: log.block_number,
326                    logs: vec![log, good_log],
327                    ..Default::default()
328                },
329            )
330            .unwrap();
331
332        let error = client
333            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
334            .await
335            .unwrap_err();
336        match error {
337            BridgeError::TxNotFinalized => {}
338            _ => panic!("expected TxNotFinalized"),
339        };
340
341        // 778 is now finalized
342        mock_last_finalized_block(&mock_provider, 778);
343
344        let error = client
345            .get_finalized_bridge_action_maybe(eth_tx_hash, 2)
346            .await
347            .unwrap_err();
348        // Receipt only has 2 logs
349        match error {
350            BridgeError::NoBridgeEventsInTxPosition => {}
351            _ => panic!("expected NoBridgeEventsInTxPosition"),
352        };
353
354        let error = client
355            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
356            .await
357            .unwrap_err();
358        // Same, `log` is not a BridgeEvent
359        match error {
360            BridgeError::NoBridgeEventsInTxPosition => {}
361            _ => panic!("expected NoBridgeEventsInTxPosition"),
362        };
363
364        let action = client
365            .get_finalized_bridge_action_maybe(eth_tx_hash, 1)
366            .await
367            .unwrap();
368        assert_eq!(action, bridge_action);
369    }
370
371    #[tokio::test]
372    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
373    async fn test_get_finalized_bridge_action_maybe_unrecognized_contract() {
374        telemetry_subscribers::init_for_testing();
375        let registry = Registry::new();
376        iota_metrics::init_metrics(&registry);
377        let mock_provider = EthMockProvider::new();
378        mock_last_finalized_block(&mock_provider, 777);
379
380        let client = EthClient::new_mocked(
381            mock_provider.clone(),
382            HashSet::from_iter(vec![
383                EthAddress::repeat_byte(5),
384                EthAddress::repeat_byte(6),
385                EthAddress::repeat_byte(7),
386            ]),
387        );
388        let result = client.get_last_finalized_block_id().await.unwrap();
389        assert_eq!(result, 777);
390
391        let eth_tx_hash = TxHash::random();
392        // Event emitted from a different contract address
393        let (log, _bridge_action) =
394            get_test_log_and_action(EthAddress::repeat_byte(4), eth_tx_hash, 0);
395        mock_provider
396            .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
397                "eth_getTransactionReceipt",
398                [log.transaction_hash.unwrap()],
399                TransactionReceipt {
400                    block_number: log.block_number,
401                    logs: vec![log],
402                    ..Default::default()
403                },
404            )
405            .unwrap();
406
407        let error = client
408            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
409            .await
410            .unwrap_err();
411        match error {
412            BridgeError::BridgeEventInUnrecognizedEthContract => {}
413            _ => panic!("expected TxNotFinalized"),
414        };
415
416        // Ok if emitted from the right contract
417        let (log, bridge_action) =
418            get_test_log_and_action(EthAddress::repeat_byte(6), eth_tx_hash, 0);
419        mock_provider
420            .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
421                "eth_getTransactionReceipt",
422                [log.transaction_hash.unwrap()],
423                TransactionReceipt {
424                    block_number: log.block_number,
425                    logs: vec![log],
426                    ..Default::default()
427                },
428            )
429            .unwrap();
430        let action = client
431            .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
432            .await
433            .unwrap();
434        assert_eq!(action, bridge_action);
435    }
436}