1use std::{
6 collections::{HashMap, HashSet},
7 str::FromStr,
8 sync::Arc,
9 time::Duration,
10};
11
12use anyhow::Error;
13use async_trait::async_trait;
14use ethers::{
15 prelude::Transaction,
16 providers::{Http, Middleware, Provider, StreamExt, Ws},
17 types::{Address as EthAddress, Block, Filter, H256},
18};
19use iota_bridge::{
20 abi::{EthBridgeEvent, EthIotaBridgeEvents},
21 error::BridgeError,
22 eth_client::EthClient,
23 metered_eth_provider::MeteredEthHttpProvider,
24 metrics::BridgeMetrics,
25 retry_with_max_elapsed_time,
26 types::{EthEvent, RawEthLog},
27};
28use iota_indexer_builder::indexer_builder::{DataMapper, DataSender, Datasource};
29use iota_metrics::spawn_monitored_task;
30use tokio::task::JoinHandle;
31use tracing::info;
32
33use crate::{
34 BridgeDataSource, ProcessedTxnData, TokenTransfer, TokenTransferData, TokenTransferStatus,
35 metrics::BridgeIndexerMetrics,
36};
37
38type RawEthData = (RawEthLog, Block<H256>, Transaction);
39
40pub struct EthSubscriptionDatasource {
41 bridge_address: EthAddress,
42 eth_ws_url: String,
43 indexer_metrics: BridgeIndexerMetrics,
44}
45
46impl EthSubscriptionDatasource {
47 pub fn new(
48 eth_iota_bridge_contract_address: String,
49 eth_ws_url: String,
50 indexer_metrics: BridgeIndexerMetrics,
51 ) -> Result<Self, anyhow::Error> {
52 let bridge_address = EthAddress::from_str(ð_iota_bridge_contract_address)?;
53 Ok(Self {
54 bridge_address,
55 eth_ws_url,
56 indexer_metrics,
57 })
58 }
59}
60#[async_trait]
61impl Datasource<RawEthData> for EthSubscriptionDatasource {
62 async fn start_data_retrieval(
63 &self,
64 starting_checkpoint: u64,
65 target_checkpoint: u64,
66 data_sender: DataSender<RawEthData>,
67 ) -> Result<JoinHandle<Result<(), Error>>, Error> {
68 let filter = Filter::new()
69 .address(self.bridge_address)
70 .from_block(starting_checkpoint)
71 .to_block(target_checkpoint);
72
73 let eth_ws_url = self.eth_ws_url.clone();
74 let indexer_metrics: BridgeIndexerMetrics = self.indexer_metrics.clone();
75
76 let handle = spawn_monitored_task!(async move {
77 let eth_ws_client = Provider::<Ws>::connect(ð_ws_url).await?;
78
79 let mut cached_blocks: HashMap<u64, Block<H256>> = HashMap::new();
80
81 let mut stream = eth_ws_client.subscribe_logs(&filter).await?;
82 while let Some(log) = stream.next().await {
83 let raw_log = RawEthLog {
84 block_number: log
85 .block_number
86 .ok_or(BridgeError::Provider(
87 "Provider returns log without block_number".into(),
88 ))
89 .unwrap()
90 .as_u64(),
91 tx_hash: log
92 .transaction_hash
93 .ok_or(BridgeError::Provider(
94 "Provider returns log without transaction_hash".into(),
95 ))
96 .unwrap(),
97 log,
98 };
99
100 let block_number = raw_log.block_number();
101
102 let block = if let Some(cached_block) = cached_blocks.get(&block_number) {
103 cached_block.clone()
104 } else {
105 let Ok(Ok(Some(block))) = retry_with_max_elapsed_time!(
106 eth_ws_client.get_block(block_number),
107 Duration::from_secs(30000)
108 ) else {
109 panic!("Unable to get block from provider");
110 };
111
112 cached_blocks.insert(block_number, block.clone());
113 block
114 };
115
116 let Ok(Ok(Some(transaction))) = retry_with_max_elapsed_time!(
117 eth_ws_client.get_transaction(raw_log.tx_hash),
118 Duration::from_secs(30000)
119 ) else {
120 panic!("Unable to get transaction from provider");
121 };
122
123 data_sender
124 .send((block_number, vec![(raw_log, block, transaction)]))
125 .await?;
126
127 indexer_metrics
128 .latest_committed_eth_block
129 .set(block_number as i64);
130 }
131
132 Ok::<_, Error>(())
133 });
134 Ok(handle)
135 }
136}
137
138pub struct EthSyncDatasource {
139 bridge_address: EthAddress,
140 eth_http_url: String,
141 indexer_metrics: BridgeIndexerMetrics,
142 bridge_metrics: Arc<BridgeMetrics>,
143}
144
145impl EthSyncDatasource {
146 pub fn new(
147 eth_iota_bridge_contract_address: String,
148 eth_http_url: String,
149 indexer_metrics: BridgeIndexerMetrics,
150 bridge_metrics: Arc<BridgeMetrics>,
151 ) -> Result<Self, anyhow::Error> {
152 let bridge_address = EthAddress::from_str(ð_iota_bridge_contract_address)?;
153 Ok(Self {
154 bridge_address,
155 eth_http_url,
156 indexer_metrics,
157 bridge_metrics,
158 })
159 }
160}
161#[async_trait]
162impl Datasource<RawEthData> for EthSyncDatasource {
163 async fn start_data_retrieval(
164 &self,
165 starting_checkpoint: u64,
166 target_checkpoint: u64,
167 data_sender: DataSender<RawEthData>,
168 ) -> Result<JoinHandle<Result<(), Error>>, Error> {
169 let client: Arc<EthClient<MeteredEthHttpProvider>> = Arc::new(
170 EthClient::<MeteredEthHttpProvider>::new(
171 &self.eth_http_url,
172 HashSet::from_iter(vec![self.bridge_address]),
173 self.bridge_metrics.clone(),
174 )
175 .await?,
176 );
177
178 let provider = Arc::new(
179 Provider::<Http>::try_from(&self.eth_http_url)?
180 .interval(std::time::Duration::from_millis(2000)),
181 );
182
183 let bridge_address = self.bridge_address;
184 let indexer_metrics: BridgeIndexerMetrics = self.indexer_metrics.clone();
185 let client = Arc::clone(&client);
186 let provider = Arc::clone(&provider);
187
188 let handle = spawn_monitored_task!(async move {
189 let mut cached_blocks: HashMap<u64, Block<H256>> = HashMap::new();
190
191 let Ok(Ok(logs)) = retry_with_max_elapsed_time!(
192 client.get_raw_events_in_range(
193 bridge_address,
194 starting_checkpoint,
195 target_checkpoint
196 ),
197 Duration::from_secs(30000)
198 ) else {
199 panic!("Unable to get logs from provider");
200 };
201
202 let mut data = Vec::new();
203 let mut first_block = 0;
204
205 for log in logs {
206 let block = if let Some(cached_block) = cached_blocks.get(&log.block_number) {
207 cached_block.clone()
208 } else {
209 let Ok(Ok(Some(block))) = retry_with_max_elapsed_time!(
210 provider.get_block(log.block_number),
211 Duration::from_secs(30000)
212 ) else {
213 panic!("Unable to get block from provider");
214 };
215
216 cached_blocks.insert(log.block_number, block.clone());
217 block
218 };
219
220 if first_block == 0 {
221 first_block = log.block_number;
222 }
223
224 let Ok(Ok(Some(transaction))) = retry_with_max_elapsed_time!(
225 provider.get_transaction(log.tx_hash),
226 Duration::from_secs(30000)
227 ) else {
228 panic!("Unable to get transaction from provider");
229 };
230
231 data.push((log, block, transaction));
232 }
233
234 data_sender.send((target_checkpoint, data)).await?;
235
236 indexer_metrics
237 .last_synced_eth_block
238 .set(first_block as i64);
239
240 Ok::<_, Error>(())
241 });
242
243 Ok(handle)
244 }
245}
246
247#[derive(Clone)]
248pub struct EthDataMapper {
249 pub metrics: BridgeIndexerMetrics,
250}
251
252impl<E: EthEvent> DataMapper<(E, Block<H256>, Transaction), ProcessedTxnData> for EthDataMapper {
253 fn map(
254 &self,
255 (log, block, transaction): (E, Block<H256>, Transaction),
256 ) -> Result<Vec<ProcessedTxnData>, Error> {
257 let eth_bridge_event = EthBridgeEvent::try_from_log(log.log());
258 if eth_bridge_event.is_none() {
259 return Ok(vec![]);
260 }
261 self.metrics.total_eth_bridge_transactions.inc();
262 let bridge_event = eth_bridge_event.unwrap();
263 let timestamp_ms = block.timestamp.as_u64() * 1000;
264 let gas = transaction.gas;
265
266 let transfer = match bridge_event {
267 EthBridgeEvent::EthIotaBridgeEvents(bridge_event) => match bridge_event {
268 EthIotaBridgeEvents::TokensDepositedFilter(bridge_event) => {
269 info!("Observed Eth Deposit at block: {}", log.block_number());
270 self.metrics.total_eth_token_deposited.inc();
271 ProcessedTxnData::TokenTransfer(TokenTransfer {
272 chain_id: bridge_event.source_chain_id,
273 nonce: bridge_event.nonce,
274 block_height: log.block_number(),
275 timestamp_ms,
276 txn_hash: transaction.hash.as_bytes().to_vec(),
277 txn_sender: bridge_event.sender_address.as_bytes().to_vec(),
278 status: TokenTransferStatus::Deposited,
279 gas_usage: gas.as_u64() as i64,
280 data_source: BridgeDataSource::Eth,
281 data: Some(TokenTransferData {
282 sender_address: bridge_event.sender_address.as_bytes().to_vec(),
283 destination_chain: bridge_event.destination_chain_id,
284 recipient_address: bridge_event.recipient_address.to_vec(),
285 token_id: bridge_event.token_id,
286 amount: bridge_event.iota_adjusted_amount,
287 }),
288 })
289 }
290 EthIotaBridgeEvents::TokensClaimedFilter(bridge_event) => {
291 info!("Observed Eth Claim at block: {}", log.block_number());
292 self.metrics.total_eth_token_transfer_claimed.inc();
293 ProcessedTxnData::TokenTransfer(TokenTransfer {
294 chain_id: bridge_event.source_chain_id,
295 nonce: bridge_event.nonce,
296 block_height: log.block_number(),
297 timestamp_ms,
298 txn_hash: transaction.hash.as_bytes().to_vec(),
299 txn_sender: bridge_event.sender_address.to_vec(),
300 status: TokenTransferStatus::Claimed,
301 gas_usage: gas.as_u64() as i64,
302 data_source: BridgeDataSource::Eth,
303 data: None,
304 })
305 }
306 EthIotaBridgeEvents::PausedFilter(_)
307 | EthIotaBridgeEvents::UnpausedFilter(_)
308 | EthIotaBridgeEvents::UpgradedFilter(_)
309 | EthIotaBridgeEvents::InitializedFilter(_) => {
310 self.metrics.total_eth_bridge_txn_other.inc();
312 return Ok(vec![]);
313 }
314 },
315 EthBridgeEvent::EthBridgeCommitteeEvents(_)
316 | EthBridgeEvent::EthBridgeLimiterEvents(_)
317 | EthBridgeEvent::EthBridgeConfigEvents(_)
318 | EthBridgeEvent::EthCommitteeUpgradeableContractEvents(_) => {
319 self.metrics.total_eth_bridge_txn_other.inc();
321 return Ok(vec![]);
322 }
323 };
324 Ok(vec![transfer])
325 }
326}