1use std::{collections::HashMap, sync::Arc};
12
13use ethers::types::Address as EthAddress;
14use iota_metrics::spawn_logged_monitored_task;
15use tokio::{
16 sync::watch,
17 task::JoinHandle,
18 time::{self, Duration, Instant},
19};
20use tracing::error;
21
22use crate::{
23 error::BridgeResult, eth_client::EthClient, metrics::BridgeMetrics,
24 retry_with_max_elapsed_time, types::EthLog,
25};
26
27const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000;
28const ETH_EVENTS_CHANNEL_SIZE: usize = 1000;
29const FINALIZED_BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(5);
30
31pub struct EthSyncer<P> {
32 eth_client: Arc<EthClient<P>>,
33 contract_addresses: EthTargetAddresses,
34}
35
36pub type EthTargetAddresses = HashMap<EthAddress, u64>;
38
39impl<P> EthSyncer<P>
40where
41 P: ethers::providers::JsonRpcClient + 'static,
42{
43 pub fn new(eth_client: Arc<EthClient<P>>, contract_addresses: EthTargetAddresses) -> Self {
44 Self {
45 eth_client,
46 contract_addresses,
47 }
48 }
49
50 pub async fn run(
51 self,
52 metrics: Arc<BridgeMetrics>,
53 ) -> BridgeResult<(
54 Vec<JoinHandle<()>>,
55 iota_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
56 watch::Receiver<u64>,
57 )> {
58 let (eth_evnets_tx, eth_events_rx) = iota_metrics::metered_channel::channel(
59 ETH_EVENTS_CHANNEL_SIZE,
60 &iota_metrics::get_metrics()
61 .unwrap()
62 .channel_inflight
63 .with_label_values(&["eth_events_queue"]),
64 );
65 let last_finalized_block = self.eth_client.get_last_finalized_block_id().await?;
66 let (last_finalized_block_tx, last_finalized_block_rx) =
67 watch::channel(last_finalized_block);
68 let mut task_handles = vec![];
69 let eth_client_clone = self.eth_client.clone();
70 let metrics_clone = metrics.clone();
71 task_handles.push(spawn_logged_monitored_task!(
72 Self::run_finalized_block_refresh_task(
73 last_finalized_block_tx,
74 eth_client_clone,
75 metrics_clone
76 )
77 ));
78 for (contract_address, start_block) in self.contract_addresses {
79 let eth_evnets_tx_clone = eth_evnets_tx.clone();
80 let last_finalized_block_rx_clone = last_finalized_block_rx.clone();
81 let eth_client_clone = self.eth_client.clone();
82 let metrics_clone = metrics.clone();
83 task_handles.push(spawn_logged_monitored_task!(
84 Self::run_event_listening_task(
85 contract_address,
86 start_block,
87 last_finalized_block_rx_clone,
88 eth_evnets_tx_clone,
89 eth_client_clone,
90 metrics_clone,
91 )
92 ));
93 }
94 Ok((task_handles, eth_events_rx, last_finalized_block_rx))
95 }
96
97 async fn run_finalized_block_refresh_task(
98 last_finalized_block_sender: watch::Sender<u64>,
99 eth_client: Arc<EthClient<P>>,
100 metrics: Arc<BridgeMetrics>,
101 ) {
102 tracing::info!("Starting finalized block refresh task.");
103 let mut last_block_number = 0;
104 let mut interval = time::interval(FINALIZED_BLOCK_QUERY_INTERVAL);
105 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
106 loop {
107 interval.tick().await;
108 let Ok(Ok(new_value)) = retry_with_max_elapsed_time!(
110 eth_client.get_last_finalized_block_id(),
111 time::Duration::from_secs(600)
112 ) else {
113 error!("Failed to get last finalized block from eth client after retry");
114 continue;
115 };
116 tracing::debug!("Last finalized block: {}", new_value);
117 metrics.last_finalized_eth_block.set(new_value as i64);
118
119 if new_value > last_block_number {
122 last_finalized_block_sender
123 .send(new_value)
124 .expect("last_finalized_block channel receiver is closed");
125 tracing::info!("Observed new finalized eth block: {}", new_value);
126 last_block_number = new_value;
127 }
128 }
129 }
130
131 async fn run_event_listening_task(
134 contract_address: EthAddress,
135 mut start_block: u64,
136 mut last_finalized_block_receiver: watch::Receiver<u64>,
137 events_sender: iota_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
138 eth_client: Arc<EthClient<P>>,
139 metrics: Arc<BridgeMetrics>,
140 ) {
141 tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
142 let mut more_blocks = false;
143 loop {
144 if !more_blocks {
146 last_finalized_block_receiver
147 .changed()
148 .await
149 .expect("last_finalized_block channel sender is closed");
150 }
151 let new_finalized_block = *last_finalized_block_receiver.borrow();
152 if new_finalized_block < start_block {
153 tracing::info!(
154 contract_address=?contract_address,
155 "New finalized block {} is smaller than start block {}, ignore",
156 new_finalized_block,
157 start_block,
158 );
159 continue;
160 }
161 let end_block = std::cmp::min(
163 start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1,
164 new_finalized_block,
165 );
166 more_blocks = end_block < new_finalized_block;
167 let timer = Instant::now();
168 let Ok(Ok(events)) = retry_with_max_elapsed_time!(
169 eth_client.get_events_in_range(contract_address, start_block, end_block),
170 Duration::from_secs(600)
171 ) else {
172 error!("Failed to get events from eth client after retry");
173 continue;
174 };
175 tracing::debug!(
176 ?contract_address,
177 start_block,
178 end_block,
179 "Querying eth events took {:?}",
180 timer.elapsed()
181 );
182 let len = events.len();
183 let last_block = events.last().map(|e| e.block_number);
184
185 events_sender
194 .send((contract_address, end_block, events))
195 .await
196 .expect("All Eth event channel receivers are closed");
197 if len != 0 {
198 tracing::info!(
199 ?contract_address,
200 start_block,
201 end_block,
202 "Observed {len} new Eth events",
203 );
204 }
205 if let Some(last_block) = last_block {
206 metrics.last_synced_eth_block.set(last_block as i64);
207 }
208 start_block = end_block + 1;
209 }
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use std::{collections::HashSet, str::FromStr};
216
217 use ethers::types::{Log, TxHash, U64, U256};
218 use prometheus::Registry;
219 use tokio::sync::mpsc::error::TryRecvError;
220
221 use super::*;
222 use crate::{
223 eth_mock_provider::EthMockProvider,
224 test_utils::{mock_get_logs, mock_last_finalized_block},
225 };
226
227 #[tokio::test]
228 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
229 async fn test_last_finalized_block() -> anyhow::Result<()> {
230 telemetry_subscribers::init_for_testing();
231 let registry = Registry::new();
232 iota_metrics::init_metrics(®istry);
233 let mock_provider = EthMockProvider::new();
234 mock_last_finalized_block(&mock_provider, 777);
235 let client = EthClient::new_mocked(
236 mock_provider.clone(),
237 HashSet::from_iter(vec![EthAddress::zero()]),
238 );
239 let result = client.get_last_finalized_block_id().await.unwrap();
240 assert_eq!(result, 777);
241
242 let addresses = HashMap::from_iter(vec![(EthAddress::zero(), 100)]);
243 let log = Log {
244 address: EthAddress::zero(),
245 transaction_hash: Some(TxHash::random()),
246 block_number: Some(U64::from(777)),
247 log_index: Some(U256::from(3)),
248 ..Default::default()
249 };
250 let eth_log = EthLog {
251 block_number: 777,
252 tx_hash: log.transaction_hash.unwrap(),
253 log_index_in_tx: 0,
254 log: log.clone(),
255 };
256 mock_get_logs(
257 &mock_provider,
258 EthAddress::zero(),
259 100,
260 777,
261 vec![log.clone()],
262 );
263 let (_handles, mut logs_rx, mut finalized_block_rx) =
264 EthSyncer::new(Arc::new(client), addresses)
265 .run(Arc::new(BridgeMetrics::new_for_testing()))
266 .await
267 .unwrap();
268
269 finalized_block_rx.changed().await.unwrap();
272 assert_eq!(*finalized_block_rx.borrow(), 777);
273 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
274 assert_eq!(contract_address, EthAddress::zero());
275 assert_eq!(end_block, 777);
276 assert_eq!(received_logs, vec![eth_log.clone()]);
277 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
278
279 mock_get_logs(
280 &mock_provider,
281 EthAddress::zero(),
282 778,
283 888,
284 vec![log.clone()],
285 );
286 mock_last_finalized_block(&mock_provider, 888);
289 finalized_block_rx.changed().await.unwrap();
290 assert_eq!(*finalized_block_rx.borrow(), 888);
291 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
292 assert_eq!(contract_address, EthAddress::zero());
293 assert_eq!(end_block, 888);
294 assert_eq!(received_logs, vec![eth_log]);
295 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
296
297 Ok(())
298 }
299
300 #[tokio::test]
301 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
302 async fn test_multiple_addresses() -> anyhow::Result<()> {
303 telemetry_subscribers::init_for_testing();
304 let registry = Registry::new();
305 iota_metrics::init_metrics(®istry);
306
307 let mock_provider = EthMockProvider::new();
308 mock_last_finalized_block(&mock_provider, 198);
309
310 let another_address =
311 EthAddress::from_str("0x00000000219ab540356cbb839cbe05303d7705fa").unwrap();
312 let client = EthClient::new_mocked(
313 mock_provider.clone(),
314 HashSet::from_iter(vec![another_address]),
315 );
316
317 let addresses = HashMap::from_iter(vec![(EthAddress::zero(), 100), (another_address, 200)]);
318
319 let log1 = Log {
320 address: EthAddress::zero(),
321 transaction_hash: Some(TxHash::random()),
322 block_number: Some(U64::from(101)),
323 log_index: Some(U256::from(5)),
324 ..Default::default()
325 };
326 let eth_log1 = EthLog {
327 block_number: log1.block_number.unwrap().as_u64(),
328 tx_hash: log1.transaction_hash.unwrap(),
329 log_index_in_tx: 0,
330 log: log1.clone(),
331 };
332 mock_get_logs(
333 &mock_provider,
334 EthAddress::zero(),
335 100,
336 198,
337 vec![log1.clone()],
338 );
339 let log2 = Log {
340 address: another_address,
341 transaction_hash: Some(TxHash::random()),
342 block_number: Some(U64::from(201)),
343 log_index: Some(U256::from(6)),
344 ..Default::default()
345 };
346 mock_get_logs(
349 &mock_provider,
350 another_address,
351 200,
352 198,
353 vec![log2.clone()],
354 );
355
356 let (_handles, mut logs_rx, mut finalized_block_rx) =
357 EthSyncer::new(Arc::new(client), addresses)
358 .run(Arc::new(BridgeMetrics::new_for_testing()))
359 .await
360 .unwrap();
361
362 finalized_block_rx.changed().await.unwrap();
364 assert_eq!(*finalized_block_rx.borrow(), 198);
365 let (_contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
366 assert_eq!(end_block, 198);
367 assert_eq!(received_logs, vec![eth_log1.clone()]);
368 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
370
371 let log1 = Log {
372 address: EthAddress::zero(),
373 block_number: Some(U64::from(200)),
374 transaction_hash: Some(TxHash::random()),
375 log_index: Some(U256::from(7)),
376 ..Default::default()
377 };
378 let eth_log1 = EthLog {
379 block_number: log1.block_number.unwrap().as_u64(),
380 tx_hash: log1.transaction_hash.unwrap(),
381 log_index_in_tx: 0,
382 log: log1.clone(),
383 };
384 mock_get_logs(
385 &mock_provider,
386 EthAddress::zero(),
387 199,
388 400,
389 vec![log1.clone()],
390 );
391 let log2 = Log {
392 address: another_address,
393 transaction_hash: Some(TxHash::random()),
394 block_number: Some(U64::from(201)),
395 log_index: Some(U256::from(9)),
396 ..Default::default()
397 };
398 let eth_log2 = EthLog {
399 block_number: log2.block_number.unwrap().as_u64(),
400 tx_hash: log2.transaction_hash.unwrap(),
401 log_index_in_tx: 0,
402 log: log2.clone(),
403 };
404 mock_get_logs(
405 &mock_provider,
406 another_address,
407 200,
408 400,
409 vec![log2.clone()],
410 );
411 mock_last_finalized_block(&mock_provider, 400);
412
413 finalized_block_rx.changed().await.unwrap();
414 assert_eq!(*finalized_block_rx.borrow(), 400);
415 let mut logs_set = HashSet::new();
416 logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
417 logs_set.insert(format!("{:?}", log));
418 });
419 logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
420 logs_set.insert(format!("{:?}", log));
421 });
422 assert_eq!(
423 logs_set,
424 HashSet::from_iter(vec![format!("{:?}", eth_log1), format!("{:?}", eth_log2)])
425 );
426 assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
428 Ok(())
429 }
430
431 #[tokio::test]
434 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
435 async fn test_paginated_eth_log_query() -> anyhow::Result<()> {
436 telemetry_subscribers::init_for_testing();
437 let registry = Registry::new();
438 iota_metrics::init_metrics(®istry);
439 let mock_provider = EthMockProvider::new();
440 let start_block = 100;
441 let last_finalized_block = start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE + 1;
443 mock_last_finalized_block(&mock_provider, last_finalized_block);
444 let client = EthClient::new_mocked(
445 mock_provider.clone(),
446 HashSet::from_iter(vec![EthAddress::zero()]),
447 );
448 let result = client.get_last_finalized_block_id().await.unwrap();
449 assert_eq!(result, last_finalized_block);
450
451 let addresses = HashMap::from_iter(vec![(EthAddress::zero(), start_block)]);
452 let log = Log {
453 address: EthAddress::zero(),
454 transaction_hash: Some(TxHash::random()),
455 block_number: Some(U64::from(start_block)),
456 log_index: Some(U256::from(3)),
457 ..Default::default()
458 };
459 let log2 = Log {
460 address: EthAddress::zero(),
461 transaction_hash: Some(TxHash::random()),
462 block_number: Some(U64::from(last_finalized_block)),
463 log_index: Some(U256::from(3)),
464 ..Default::default()
465 };
466 let eth_log = EthLog {
467 block_number: start_block,
468 tx_hash: log.transaction_hash.unwrap(),
469 log_index_in_tx: 0,
470 log: log.clone(),
471 };
472 let eth_log2 = EthLog {
473 block_number: last_finalized_block,
474 tx_hash: log2.transaction_hash.unwrap(),
475 log_index_in_tx: 0,
476 log: log2.clone(),
477 };
478 mock_get_logs(
480 &mock_provider,
481 EthAddress::zero(),
482 start_block,
483 start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1,
484 vec![log.clone()],
485 );
486 mock_get_logs(
489 &mock_provider,
490 EthAddress::zero(),
491 start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE,
492 last_finalized_block,
493 vec![log2.clone()],
494 );
495
496 let (_handles, mut logs_rx, mut finalized_block_rx) =
497 EthSyncer::new(Arc::new(client), addresses)
498 .run(Arc::new(BridgeMetrics::new_for_testing()))
499 .await
500 .unwrap();
501
502 finalized_block_rx.changed().await.unwrap();
503 assert_eq!(*finalized_block_rx.borrow(), last_finalized_block);
504 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
505 assert_eq!(contract_address, EthAddress::zero());
506 assert_eq!(end_block, start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1);
507 assert_eq!(received_logs, vec![eth_log.clone()]);
508 let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
509 assert_eq!(contract_address, EthAddress::zero());
510 assert_eq!(end_block, last_finalized_block);
511 assert_eq!(received_logs, vec![eth_log2.clone()]);
512 Ok(())
513 }
514}