1use std::{collections::HashSet, sync::Arc};
6
7use ethers::{
8 providers::{JsonRpcClient, Middleware, Provider},
9 types::{Address as EthAddress, Block, Filter, TxHash},
10};
11use tap::TapFallible;
12
13#[cfg(test)]
14use crate::eth_mock_provider::EthMockProvider;
15use crate::{
16 abi::EthBridgeEvent,
17 error::{BridgeError, BridgeResult},
18 metered_eth_provider::{MeteredEthHttpProvider, new_metered_eth_provider},
19 metrics::BridgeMetrics,
20 types::{BridgeAction, EthLog, RawEthLog},
21};
22pub struct EthClient<P> {
23 provider: Provider<P>,
24 contract_addresses: HashSet<EthAddress>,
25}
26
27impl EthClient<MeteredEthHttpProvider> {
28 pub async fn new(
29 provider_url: &str,
30 contract_addresses: HashSet<EthAddress>,
31 metrics: Arc<BridgeMetrics>,
32 ) -> anyhow::Result<Self> {
33 let provider = new_metered_eth_provider(provider_url, metrics)?;
34 let self_ = Self {
35 provider,
36 contract_addresses,
37 };
38 self_.describe().await?;
39 Ok(self_)
40 }
41}
42
43#[cfg(test)]
44impl EthClient<EthMockProvider> {
45 pub fn new_mocked(provider: EthMockProvider, contract_addresses: HashSet<EthAddress>) -> Self {
46 let provider = Provider::new(provider);
47 Self {
48 provider,
49 contract_addresses,
50 }
51 }
52}
53
54impl<P> EthClient<P>
55where
56 P: JsonRpcClient,
57{
58 async fn describe(&self) -> anyhow::Result<()> {
60 let chain_id = self.provider.get_chainid().await?;
61 let block_number = self.provider.get_block_number().await?;
62 tracing::info!(
63 "EthClient is connected to chain {chain_id}, current block number: {block_number}"
64 );
65 Ok(())
66 }
67
68 pub async fn get_finalized_bridge_action_maybe(
72 &self,
73 tx_hash: TxHash,
74 event_idx: u16,
75 ) -> BridgeResult<BridgeAction> {
76 let receipt = self
77 .provider
78 .get_transaction_receipt(tx_hash)
79 .await
80 .map_err(BridgeError::from)?
81 .ok_or(BridgeError::TxNotFound)?;
82 let receipt_block_num = receipt.block_number.ok_or(BridgeError::Provider(
83 "Provider returns log without block_number".into(),
84 ))?;
85 let last_finalized_block_id = self.get_last_finalized_block_id().await?;
88 if receipt_block_num.as_u64() > last_finalized_block_id {
89 return Err(BridgeError::TxNotFinalized);
90 }
91 let log = receipt
92 .logs
93 .get(event_idx as usize)
94 .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
95
96 if !self.contract_addresses.contains(&log.address) {
98 return Err(BridgeError::BridgeEventInUnrecognizedEthContract);
99 }
100
101 let eth_log = EthLog {
102 block_number: receipt_block_num.as_u64(),
103 tx_hash,
104 log_index_in_tx: event_idx,
105 log: log.clone(),
106 };
107 let bridge_event = EthBridgeEvent::try_from_eth_log(ð_log)
108 .ok_or(BridgeError::NoBridgeEventsInTxPosition)?;
109 bridge_event
110 .try_into_bridge_action(tx_hash, event_idx)?
111 .ok_or(BridgeError::BridgeEventNotActionable)
112 }
113
114 pub async fn get_last_finalized_block_id(&self) -> BridgeResult<u64> {
115 let block: Result<Option<Block<ethers::types::TxHash>>, ethers::prelude::ProviderError> =
116 self.provider
117 .request("eth_getBlockByNumber", ("finalized", false))
118 .await;
119 let block = block?.ok_or(BridgeError::TransientProvider(
120 "Provider fails to return last finalized block".into(),
121 ))?;
122 let number = block.number.ok_or(BridgeError::TransientProvider(
123 "Provider returns block without number".into(),
124 ))?;
125 Ok(number.as_u64())
126 }
127
128 pub async fn get_events_in_range(
131 &self,
132 address: ethers::types::Address,
133 start_block: u64,
134 end_block: u64,
135 ) -> BridgeResult<Vec<EthLog>> {
136 let filter = Filter::new()
137 .from_block(start_block)
138 .to_block(end_block)
139 .address(address);
140 let logs = self
141 .provider
142 .get_logs(&filter)
144 .await
145 .map_err(BridgeError::from)
146 .tap_err(|e| {
147 tracing::error!(
148 "get_events_in_range failed. Filter: {:?}. Error {:?}",
149 filter,
150 e
151 )
152 })?;
153
154 if logs.iter().any(|log| log.address != address) {
156 return Err(BridgeError::Provider(format!(
157 "Provider returns logs from different contract address (expected: {:?}): {:?}",
158 address, logs
159 )));
160 }
161 if logs.is_empty() {
162 return Ok(vec![]);
163 }
164
165 let tasks = logs.into_iter().map(|log| self.get_log_tx_details(log));
166 futures::future::join_all(tasks)
167 .await
168 .into_iter()
169 .collect::<Result<Vec<_>, _>>()
170 .tap_err(|e| {
171 tracing::error!(
172 "get_log_tx_details failed. Filter: {:?}. Error {:?}",
173 filter,
174 e
175 )
176 })
177 }
178
179 pub async fn get_raw_events_in_range(
182 &self,
183 address: ethers::types::Address,
184 start_block: u64,
185 end_block: u64,
186 ) -> BridgeResult<Vec<RawEthLog>> {
187 let filter = Filter::new()
188 .from_block(start_block)
189 .to_block(end_block)
190 .address(address);
191 let logs = self
192 .provider
193 .get_logs(&filter)
194 .await
195 .map_err(BridgeError::from)
196 .tap_err(|e| {
197 tracing::error!(
198 "get_events_in_range failed. Filter: {:?}. Error {:?}",
199 filter,
200 e
201 )
202 })?;
203 logs.into_iter().map(
205 |log| {
206 if log.address != address {
207 return Err(BridgeError::Provider(format!("Provider returns logs from different contract address (expected: {:?}): {:?}", address, log)));
208 }
209 Ok(RawEthLog {
210 block_number: log.block_number.ok_or(BridgeError::Provider("Provider returns log without block_number".into()))?.as_u64(),
211 tx_hash: log.transaction_hash.ok_or(BridgeError::Provider("Provider returns log without transaction_hash".into()))?,
212 log,
213 })}
214 ).collect::<Result<Vec<_>, _>>()
215 }
216
217 async fn get_log_tx_details(&self, log: ethers::types::Log) -> BridgeResult<EthLog> {
222 let block_number = log
223 .block_number
224 .ok_or(BridgeError::Provider(
225 "Provider returns log without block_number".into(),
226 ))?
227 .as_u64();
228 let tx_hash = log.transaction_hash.ok_or(BridgeError::Provider(
229 "Provider returns log without transaction_hash".into(),
230 ))?;
231 let log_index = log.log_index.ok_or(BridgeError::Provider(
233 "Provider returns log without log_index".into(),
234 ))?;
235
236 let receipt = self
240 .provider
241 .get_transaction_receipt(tx_hash)
242 .await
243 .map_err(BridgeError::from)?
244 .ok_or(BridgeError::Provider(format!(
245 "Provide cannot find eth transaction for log: {:?})",
246 log
247 )))?;
248
249 let receipt_block_num = receipt.block_number.ok_or(BridgeError::Provider(
250 "Provider returns log without block_number".into(),
251 ))?;
252 if receipt_block_num.as_u64() != block_number {
253 return Err(BridgeError::Provider(format!(
254 "Provider returns receipt with different block number from log. Receipt: {:?}, Log: {:?}",
255 receipt, log
256 )));
257 }
258
259 let mut log_index_in_tx = None;
261 for (idx, receipt_log) in receipt.logs.iter().enumerate() {
262 if receipt_log.log_index == Some(log_index) {
264 if receipt_log.topics != log.topics || receipt_log.data != log.data {
266 return Err(BridgeError::Provider(format!(
267 "Provider returns receipt with different log from log. Receipt: {:?}, Log: {:?}",
268 receipt, log
269 )));
270 }
271 log_index_in_tx = Some(idx);
272 }
273 }
274 let log_index_in_tx = log_index_in_tx.ok_or(BridgeError::Provider(format!(
275 "Couldn't find matching log: {:?} in transaction {}",
276 log, tx_hash
277 )))?;
278
279 Ok(EthLog {
280 block_number,
281 tx_hash,
282 log_index_in_tx: log_index_in_tx as u16,
283 log,
284 })
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use ethers::types::{Address as EthAddress, Log, TransactionReceipt, U64};
291 use prometheus::Registry;
292
293 use super::*;
294 use crate::test_utils::{get_test_log_and_action, mock_last_finalized_block};
295
296 #[tokio::test]
297 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
298 async fn test_get_finalized_bridge_action_maybe() {
299 telemetry_subscribers::init_for_testing();
300 let registry = Registry::new();
301 iota_metrics::init_metrics(®istry);
302 let mock_provider = EthMockProvider::new();
303 mock_last_finalized_block(&mock_provider, 777);
304
305 let client = EthClient::new_mocked(
306 mock_provider.clone(),
307 HashSet::from_iter(vec![EthAddress::zero()]),
308 );
309 let result = client.get_last_finalized_block_id().await.unwrap();
310 assert_eq!(result, 777);
311
312 let eth_tx_hash = TxHash::random();
313 let log = Log {
314 transaction_hash: Some(eth_tx_hash),
315 block_number: Some(U64::from(778)),
316 ..Default::default()
317 };
318 let (good_log, bridge_action) = get_test_log_and_action(EthAddress::zero(), eth_tx_hash, 1);
319 mock_provider
321 .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
322 "eth_getTransactionReceipt",
323 [log.transaction_hash.unwrap()],
324 TransactionReceipt {
325 block_number: log.block_number,
326 logs: vec![log, good_log],
327 ..Default::default()
328 },
329 )
330 .unwrap();
331
332 let error = client
333 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
334 .await
335 .unwrap_err();
336 match error {
337 BridgeError::TxNotFinalized => {}
338 _ => panic!("expected TxNotFinalized"),
339 };
340
341 mock_last_finalized_block(&mock_provider, 778);
343
344 let error = client
345 .get_finalized_bridge_action_maybe(eth_tx_hash, 2)
346 .await
347 .unwrap_err();
348 match error {
350 BridgeError::NoBridgeEventsInTxPosition => {}
351 _ => panic!("expected NoBridgeEventsInTxPosition"),
352 };
353
354 let error = client
355 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
356 .await
357 .unwrap_err();
358 match error {
360 BridgeError::NoBridgeEventsInTxPosition => {}
361 _ => panic!("expected NoBridgeEventsInTxPosition"),
362 };
363
364 let action = client
365 .get_finalized_bridge_action_maybe(eth_tx_hash, 1)
366 .await
367 .unwrap();
368 assert_eq!(action, bridge_action);
369 }
370
371 #[tokio::test]
372 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
373 async fn test_get_finalized_bridge_action_maybe_unrecognized_contract() {
374 telemetry_subscribers::init_for_testing();
375 let registry = Registry::new();
376 iota_metrics::init_metrics(®istry);
377 let mock_provider = EthMockProvider::new();
378 mock_last_finalized_block(&mock_provider, 777);
379
380 let client = EthClient::new_mocked(
381 mock_provider.clone(),
382 HashSet::from_iter(vec![
383 EthAddress::repeat_byte(5),
384 EthAddress::repeat_byte(6),
385 EthAddress::repeat_byte(7),
386 ]),
387 );
388 let result = client.get_last_finalized_block_id().await.unwrap();
389 assert_eq!(result, 777);
390
391 let eth_tx_hash = TxHash::random();
392 let (log, _bridge_action) =
394 get_test_log_and_action(EthAddress::repeat_byte(4), eth_tx_hash, 0);
395 mock_provider
396 .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
397 "eth_getTransactionReceipt",
398 [log.transaction_hash.unwrap()],
399 TransactionReceipt {
400 block_number: log.block_number,
401 logs: vec![log],
402 ..Default::default()
403 },
404 )
405 .unwrap();
406
407 let error = client
408 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
409 .await
410 .unwrap_err();
411 match error {
412 BridgeError::BridgeEventInUnrecognizedEthContract => {}
413 _ => panic!("expected TxNotFinalized"),
414 };
415
416 let (log, bridge_action) =
418 get_test_log_and_action(EthAddress::repeat_byte(6), eth_tx_hash, 0);
419 mock_provider
420 .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
421 "eth_getTransactionReceipt",
422 [log.transaction_hash.unwrap()],
423 TransactionReceipt {
424 block_number: log.block_number,
425 logs: vec![log],
426 ..Default::default()
427 },
428 )
429 .unwrap();
430 let action = client
431 .get_finalized_bridge_action_maybe(eth_tx_hash, 0)
432 .await
433 .unwrap();
434 assert_eq!(action, bridge_action);
435 }
436}