iota_bridge/
eth_syncer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! The EthSyncer module is responsible for synchronizing Events emitted on
6//! Ethereum blockchain from concerned contracts. Each contract is associated
7//! with a start block number, and the syncer will only query from that block
8//! number onwards. The syncer also keeps track of the last finalized
9//! block on Ethereum and will only query for events up to that block number.
10
11use std::{collections::HashMap, sync::Arc};
12
13use ethers::types::Address as EthAddress;
14use iota_metrics::spawn_logged_monitored_task;
15use tokio::{
16    sync::watch,
17    task::JoinHandle,
18    time::{self, Duration, Instant},
19};
20use tracing::error;
21
22use crate::{
23    error::BridgeResult, eth_client::EthClient, metrics::BridgeMetrics,
24    retry_with_max_elapsed_time, types::EthLog,
25};
26
27const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000;
28const ETH_EVENTS_CHANNEL_SIZE: usize = 1000;
29const FINALIZED_BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(5);
30
31pub struct EthSyncer<P> {
32    eth_client: Arc<EthClient<P>>,
33    contract_addresses: EthTargetAddresses,
34}
35
36/// Map from contract address to their start block.
37pub type EthTargetAddresses = HashMap<EthAddress, u64>;
38
39impl<P> EthSyncer<P>
40where
41    P: ethers::providers::JsonRpcClient + 'static,
42{
43    pub fn new(eth_client: Arc<EthClient<P>>, contract_addresses: EthTargetAddresses) -> Self {
44        Self {
45            eth_client,
46            contract_addresses,
47        }
48    }
49
50    pub async fn run(
51        self,
52        metrics: Arc<BridgeMetrics>,
53    ) -> BridgeResult<(
54        Vec<JoinHandle<()>>,
55        iota_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
56        watch::Receiver<u64>,
57    )> {
58        let (eth_evnets_tx, eth_events_rx) = iota_metrics::metered_channel::channel(
59            ETH_EVENTS_CHANNEL_SIZE,
60            &iota_metrics::get_metrics()
61                .unwrap()
62                .channel_inflight
63                .with_label_values(&["eth_events_queue"]),
64        );
65        let last_finalized_block = self.eth_client.get_last_finalized_block_id().await?;
66        let (last_finalized_block_tx, last_finalized_block_rx) =
67            watch::channel(last_finalized_block);
68        let mut task_handles = vec![];
69        let eth_client_clone = self.eth_client.clone();
70        let metrics_clone = metrics.clone();
71        task_handles.push(spawn_logged_monitored_task!(
72            Self::run_finalized_block_refresh_task(
73                last_finalized_block_tx,
74                eth_client_clone,
75                metrics_clone
76            )
77        ));
78        for (contract_address, start_block) in self.contract_addresses {
79            let eth_evnets_tx_clone = eth_evnets_tx.clone();
80            let last_finalized_block_rx_clone = last_finalized_block_rx.clone();
81            let eth_client_clone = self.eth_client.clone();
82            let metrics_clone = metrics.clone();
83            task_handles.push(spawn_logged_monitored_task!(
84                Self::run_event_listening_task(
85                    contract_address,
86                    start_block,
87                    last_finalized_block_rx_clone,
88                    eth_evnets_tx_clone,
89                    eth_client_clone,
90                    metrics_clone,
91                )
92            ));
93        }
94        Ok((task_handles, eth_events_rx, last_finalized_block_rx))
95    }
96
97    async fn run_finalized_block_refresh_task(
98        last_finalized_block_sender: watch::Sender<u64>,
99        eth_client: Arc<EthClient<P>>,
100        metrics: Arc<BridgeMetrics>,
101    ) {
102        tracing::info!("Starting finalized block refresh task.");
103        let mut last_block_number = 0;
104        let mut interval = time::interval(FINALIZED_BLOCK_QUERY_INTERVAL);
105        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
106        loop {
107            interval.tick().await;
108            // TODO: allow to pass custom initial interval
109            let Ok(Ok(new_value)) = retry_with_max_elapsed_time!(
110                eth_client.get_last_finalized_block_id(),
111                time::Duration::from_secs(600)
112            ) else {
113                error!("Failed to get last finalized block from eth client after retry");
114                continue;
115            };
116            tracing::debug!("Last finalized block: {}", new_value);
117            metrics.last_finalized_eth_block.set(new_value as i64);
118
119            // TODO add a metrics for the last finalized block
120
121            if new_value > last_block_number {
122                last_finalized_block_sender
123                    .send(new_value)
124                    .expect("last_finalized_block channel receiver is closed");
125                tracing::info!("Observed new finalized eth block: {}", new_value);
126                last_block_number = new_value;
127            }
128        }
129    }
130
131    // TODO: define a type for block number for readability
132    // TODO: add a metrics for current start block
133    async fn run_event_listening_task(
134        contract_address: EthAddress,
135        mut start_block: u64,
136        mut last_finalized_block_receiver: watch::Receiver<u64>,
137        events_sender: iota_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
138        eth_client: Arc<EthClient<P>>,
139        metrics: Arc<BridgeMetrics>,
140    ) {
141        tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
142        let mut more_blocks = false;
143        loop {
144            // If no more known blocks, wait for the next finalized block.
145            if !more_blocks {
146                last_finalized_block_receiver
147                    .changed()
148                    .await
149                    .expect("last_finalized_block channel sender is closed");
150            }
151            let new_finalized_block = *last_finalized_block_receiver.borrow();
152            if new_finalized_block < start_block {
153                tracing::info!(
154                    contract_address=?contract_address,
155                    "New finalized block {} is smaller than start block {}, ignore",
156                    new_finalized_block,
157                    start_block,
158                );
159                continue;
160            }
161            // Each query does at most ETH_LOG_QUERY_MAX_BLOCK_RANGE blocks.
162            let end_block = std::cmp::min(
163                start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1,
164                new_finalized_block,
165            );
166            more_blocks = end_block < new_finalized_block;
167            let timer = Instant::now();
168            let Ok(Ok(events)) = retry_with_max_elapsed_time!(
169                eth_client.get_events_in_range(contract_address, start_block, end_block),
170                Duration::from_secs(600)
171            ) else {
172                error!("Failed to get events from eth client after retry");
173                continue;
174            };
175            tracing::debug!(
176                ?contract_address,
177                start_block,
178                end_block,
179                "Querying eth events took {:?}",
180                timer.elapsed()
181            );
182            let len = events.len();
183            let last_block = events.last().map(|e| e.block_number);
184
185            // Note 1: we always events to the channel even when it is empty. This is
186            // because of how `eth_getLogs` api is designed - we want cursor to
187            // move forward continuously.
188
189            // Note 2: it's extremely critical to make sure the Logs we send via this
190            // channel are complete per block height. Namely, we should never
191            // send a partial list of events for a block. Otherwise, we may end
192            // up missing events.
193            events_sender
194                .send((contract_address, end_block, events))
195                .await
196                .expect("All Eth event channel receivers are closed");
197            if len != 0 {
198                tracing::info!(
199                    ?contract_address,
200                    start_block,
201                    end_block,
202                    "Observed {len} new Eth events",
203                );
204            }
205            if let Some(last_block) = last_block {
206                metrics.last_synced_eth_block.set(last_block as i64);
207            }
208            start_block = end_block + 1;
209        }
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use std::{collections::HashSet, str::FromStr};
216
217    use ethers::types::{Log, TxHash, U64, U256};
218    use prometheus::Registry;
219    use tokio::sync::mpsc::error::TryRecvError;
220
221    use super::*;
222    use crate::{
223        eth_mock_provider::EthMockProvider,
224        test_utils::{mock_get_logs, mock_last_finalized_block},
225    };
226
227    #[tokio::test]
228    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
229    async fn test_last_finalized_block() -> anyhow::Result<()> {
230        telemetry_subscribers::init_for_testing();
231        let registry = Registry::new();
232        iota_metrics::init_metrics(&registry);
233        let mock_provider = EthMockProvider::new();
234        mock_last_finalized_block(&mock_provider, 777);
235        let client = EthClient::new_mocked(
236            mock_provider.clone(),
237            HashSet::from_iter(vec![EthAddress::zero()]),
238        );
239        let result = client.get_last_finalized_block_id().await.unwrap();
240        assert_eq!(result, 777);
241
242        let addresses = HashMap::from_iter(vec![(EthAddress::zero(), 100)]);
243        let log = Log {
244            address: EthAddress::zero(),
245            transaction_hash: Some(TxHash::random()),
246            block_number: Some(U64::from(777)),
247            log_index: Some(U256::from(3)),
248            ..Default::default()
249        };
250        let eth_log = EthLog {
251            block_number: 777,
252            tx_hash: log.transaction_hash.unwrap(),
253            log_index_in_tx: 0,
254            log: log.clone(),
255        };
256        mock_get_logs(
257            &mock_provider,
258            EthAddress::zero(),
259            100,
260            777,
261            vec![log.clone()],
262        );
263        let (_handles, mut logs_rx, mut finalized_block_rx) =
264            EthSyncer::new(Arc::new(client), addresses)
265                .run(Arc::new(BridgeMetrics::new_for_testing()))
266                .await
267                .unwrap();
268
269        // The latest finalized block stays at 777, event listener should not query
270        // again.
271        finalized_block_rx.changed().await.unwrap();
272        assert_eq!(*finalized_block_rx.borrow(), 777);
273        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
274        assert_eq!(contract_address, EthAddress::zero());
275        assert_eq!(end_block, 777);
276        assert_eq!(received_logs, vec![eth_log.clone()]);
277        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
278
279        mock_get_logs(
280            &mock_provider,
281            EthAddress::zero(),
282            778,
283            888,
284            vec![log.clone()],
285        );
286        // The latest finalized block is updated to 888, event listener should query
287        // again.
288        mock_last_finalized_block(&mock_provider, 888);
289        finalized_block_rx.changed().await.unwrap();
290        assert_eq!(*finalized_block_rx.borrow(), 888);
291        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
292        assert_eq!(contract_address, EthAddress::zero());
293        assert_eq!(end_block, 888);
294        assert_eq!(received_logs, vec![eth_log]);
295        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
296
297        Ok(())
298    }
299
300    #[tokio::test]
301    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
302    async fn test_multiple_addresses() -> anyhow::Result<()> {
303        telemetry_subscribers::init_for_testing();
304        let registry = Registry::new();
305        iota_metrics::init_metrics(&registry);
306
307        let mock_provider = EthMockProvider::new();
308        mock_last_finalized_block(&mock_provider, 198);
309
310        let another_address =
311            EthAddress::from_str("0x00000000219ab540356cbb839cbe05303d7705fa").unwrap();
312        let client = EthClient::new_mocked(
313            mock_provider.clone(),
314            HashSet::from_iter(vec![another_address]),
315        );
316
317        let addresses = HashMap::from_iter(vec![(EthAddress::zero(), 100), (another_address, 200)]);
318
319        let log1 = Log {
320            address: EthAddress::zero(),
321            transaction_hash: Some(TxHash::random()),
322            block_number: Some(U64::from(101)),
323            log_index: Some(U256::from(5)),
324            ..Default::default()
325        };
326        let eth_log1 = EthLog {
327            block_number: log1.block_number.unwrap().as_u64(),
328            tx_hash: log1.transaction_hash.unwrap(),
329            log_index_in_tx: 0,
330            log: log1.clone(),
331        };
332        mock_get_logs(
333            &mock_provider,
334            EthAddress::zero(),
335            100,
336            198,
337            vec![log1.clone()],
338        );
339        let log2 = Log {
340            address: another_address,
341            transaction_hash: Some(TxHash::random()),
342            block_number: Some(U64::from(201)),
343            log_index: Some(U256::from(6)),
344            ..Default::default()
345        };
346        // Mock logs for another_address although it shouldn't be queried. We don't
347        // expect to see log2 in the logs channel later on.
348        mock_get_logs(
349            &mock_provider,
350            another_address,
351            200,
352            198,
353            vec![log2.clone()],
354        );
355
356        let (_handles, mut logs_rx, mut finalized_block_rx) =
357            EthSyncer::new(Arc::new(client), addresses)
358                .run(Arc::new(BridgeMetrics::new_for_testing()))
359                .await
360                .unwrap();
361
362        // The latest finalized block stays at 198.
363        finalized_block_rx.changed().await.unwrap();
364        assert_eq!(*finalized_block_rx.borrow(), 198);
365        let (_contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
366        assert_eq!(end_block, 198);
367        assert_eq!(received_logs, vec![eth_log1.clone()]);
368        // log2 should not be received as another_address's start block is 200.
369        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
370
371        let log1 = Log {
372            address: EthAddress::zero(),
373            block_number: Some(U64::from(200)),
374            transaction_hash: Some(TxHash::random()),
375            log_index: Some(U256::from(7)),
376            ..Default::default()
377        };
378        let eth_log1 = EthLog {
379            block_number: log1.block_number.unwrap().as_u64(),
380            tx_hash: log1.transaction_hash.unwrap(),
381            log_index_in_tx: 0,
382            log: log1.clone(),
383        };
384        mock_get_logs(
385            &mock_provider,
386            EthAddress::zero(),
387            199,
388            400,
389            vec![log1.clone()],
390        );
391        let log2 = Log {
392            address: another_address,
393            transaction_hash: Some(TxHash::random()),
394            block_number: Some(U64::from(201)),
395            log_index: Some(U256::from(9)),
396            ..Default::default()
397        };
398        let eth_log2 = EthLog {
399            block_number: log2.block_number.unwrap().as_u64(),
400            tx_hash: log2.transaction_hash.unwrap(),
401            log_index_in_tx: 0,
402            log: log2.clone(),
403        };
404        mock_get_logs(
405            &mock_provider,
406            another_address,
407            200,
408            400,
409            vec![log2.clone()],
410        );
411        mock_last_finalized_block(&mock_provider, 400);
412
413        finalized_block_rx.changed().await.unwrap();
414        assert_eq!(*finalized_block_rx.borrow(), 400);
415        let mut logs_set = HashSet::new();
416        logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
417            logs_set.insert(format!("{:?}", log));
418        });
419        logs_rx.recv().await.unwrap().2.into_iter().for_each(|log| {
420            logs_set.insert(format!("{:?}", log));
421        });
422        assert_eq!(
423            logs_set,
424            HashSet::from_iter(vec![format!("{:?}", eth_log1), format!("{:?}", eth_log2)])
425        );
426        // No more finalized block change, no more logs.
427        assert_eq!(logs_rx.try_recv().unwrap_err(), TryRecvError::Empty);
428        Ok(())
429    }
430
431    /// Test that the syncer will query for logs in multiple queries if the
432    /// range is too big.
433    #[tokio::test]
434    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
435    async fn test_paginated_eth_log_query() -> anyhow::Result<()> {
436        telemetry_subscribers::init_for_testing();
437        let registry = Registry::new();
438        iota_metrics::init_metrics(&registry);
439        let mock_provider = EthMockProvider::new();
440        let start_block = 100;
441        // range too big, we need two queries
442        let last_finalized_block = start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE + 1;
443        mock_last_finalized_block(&mock_provider, last_finalized_block);
444        let client = EthClient::new_mocked(
445            mock_provider.clone(),
446            HashSet::from_iter(vec![EthAddress::zero()]),
447        );
448        let result = client.get_last_finalized_block_id().await.unwrap();
449        assert_eq!(result, last_finalized_block);
450
451        let addresses = HashMap::from_iter(vec![(EthAddress::zero(), start_block)]);
452        let log = Log {
453            address: EthAddress::zero(),
454            transaction_hash: Some(TxHash::random()),
455            block_number: Some(U64::from(start_block)),
456            log_index: Some(U256::from(3)),
457            ..Default::default()
458        };
459        let log2 = Log {
460            address: EthAddress::zero(),
461            transaction_hash: Some(TxHash::random()),
462            block_number: Some(U64::from(last_finalized_block)),
463            log_index: Some(U256::from(3)),
464            ..Default::default()
465        };
466        let eth_log = EthLog {
467            block_number: start_block,
468            tx_hash: log.transaction_hash.unwrap(),
469            log_index_in_tx: 0,
470            log: log.clone(),
471        };
472        let eth_log2 = EthLog {
473            block_number: last_finalized_block,
474            tx_hash: log2.transaction_hash.unwrap(),
475            log_index_in_tx: 0,
476            log: log2.clone(),
477        };
478        // First query handles [start, start + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1]
479        mock_get_logs(
480            &mock_provider,
481            EthAddress::zero(),
482            start_block,
483            start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1,
484            vec![log.clone()],
485        );
486        // Second query handles [start + ETH_LOG_QUERY_MAX_BLOCK_RANGE,
487        // last_finalized_block]
488        mock_get_logs(
489            &mock_provider,
490            EthAddress::zero(),
491            start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE,
492            last_finalized_block,
493            vec![log2.clone()],
494        );
495
496        let (_handles, mut logs_rx, mut finalized_block_rx) =
497            EthSyncer::new(Arc::new(client), addresses)
498                .run(Arc::new(BridgeMetrics::new_for_testing()))
499                .await
500                .unwrap();
501
502        finalized_block_rx.changed().await.unwrap();
503        assert_eq!(*finalized_block_rx.borrow(), last_finalized_block);
504        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
505        assert_eq!(contract_address, EthAddress::zero());
506        assert_eq!(end_block, start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1);
507        assert_eq!(received_logs, vec![eth_log.clone()]);
508        let (contract_address, end_block, received_logs) = logs_rx.recv().await.unwrap();
509        assert_eq!(contract_address, EthAddress::zero());
510        assert_eq!(end_block, last_finalized_block);
511        assert_eq!(received_logs, vec![eth_log2.clone()]);
512        Ok(())
513    }
514}