iota_bridge_indexer/
iota_transaction_handler.rs1use std::time::Duration;
6
7use anyhow::Result;
8use futures::StreamExt;
9use iota_bridge::events::{
10 MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed,
11};
12use iota_json_rpc_types::IotaTransactionBlockEffectsAPI;
13use iota_metrics::metered_channel::{Receiver, ReceiverStream};
14use iota_types::{BRIDGE_ADDRESS, digests::TransactionDigest};
15use tracing::{error, info};
16
17use crate::{
18 BridgeDataSource, ProcessedTxnData, TokenTransfer, TokenTransferData, TokenTransferStatus,
19 metrics::BridgeIndexerMetrics,
20 postgres_manager::{PgPool, update_iota_progress_store, write},
21 types::RetrievedTransaction,
22};
23
24pub(crate) const COMMIT_BATCH_SIZE: usize = 10;
25
26pub async fn handle_iota_transactions_loop(
27 pg_pool: PgPool,
28 rx: Receiver<(Vec<RetrievedTransaction>, Option<TransactionDigest>)>,
29 metrics: BridgeIndexerMetrics,
30) {
31 let checkpoint_commit_batch_size = std::env::var("COMMIT_BATCH_SIZE")
32 .unwrap_or(COMMIT_BATCH_SIZE.to_string())
33 .parse::<usize>()
34 .unwrap();
35 let mut stream = ReceiverStream::new(rx).ready_chunks(checkpoint_commit_batch_size);
36 while let Some(batch) = stream.next().await {
37 let (txns, cursor) = batch.last().cloned().unwrap();
39 let data = batch
40 .into_iter()
41 .flat_map(|(chunk, _)| process_transactions(chunk, &metrics).unwrap())
44 .collect::<Vec<_>>();
45
46 if !data.is_empty() {
48 let last_ckp = txns.last().map(|tx| tx.checkpoint).unwrap_or_default();
50 while let Err(err) = write(&pg_pool, data.clone()) {
51 error!("Failed to write iota transactions to DB: {:?}", err);
52 tokio::time::sleep(Duration::from_secs(5)).await;
53 }
54 info!("Wrote {} bridge transaction data to DB", data.len());
55 metrics.last_committed_iota_checkpoint.set(last_ckp as i64);
56 }
57
58 if let Some(cursor) = cursor {
60 while let Err(err) = update_iota_progress_store(&pg_pool, cursor) {
61 error!("Failed to update iota progress tore DB: {:?}", err);
62 tokio::time::sleep(Duration::from_secs(5)).await;
63 }
64 info!("Updated iota transaction cursor to {}", cursor);
65 }
66 }
67 unreachable!("Channel closed unexpectedly");
68}
69
70fn process_transactions(
71 txns: Vec<RetrievedTransaction>,
72 metrics: &BridgeIndexerMetrics,
73) -> Result<Vec<ProcessedTxnData>> {
74 txns.into_iter().try_fold(vec![], |mut result, tx| {
75 result.append(&mut into_token_transfers(tx, metrics)?);
76 Ok(result)
77 })
78}
79
80pub fn into_token_transfers(
81 tx: RetrievedTransaction,
82 metrics: &BridgeIndexerMetrics,
83) -> Result<Vec<ProcessedTxnData>> {
84 let mut transfers = Vec::new();
85 let tx_digest = tx.tx_digest;
86 let timestamp_ms = tx.timestamp_ms;
87 let checkpoint_num = tx.checkpoint;
88 let effects = tx.effects;
89 for ev in tx.events.data {
90 if ev.type_.address != BRIDGE_ADDRESS {
91 continue;
92 }
93 match ev.type_.name.as_str() {
94 "TokenDepositedEvent" => {
95 info!("Observed IOTA Deposit {:?}", ev);
96 metrics.total_iota_token_deposited.inc();
97 let move_event: MoveTokenDepositedEvent = bcs::from_bytes(ev.bcs.bytes())?;
98 transfers.push(ProcessedTxnData::TokenTransfer(TokenTransfer {
99 chain_id: move_event.source_chain,
100 nonce: move_event.seq_num,
101 block_height: checkpoint_num,
102 timestamp_ms,
103 txn_hash: tx_digest.inner().to_vec(),
104 txn_sender: ev.sender.to_vec(),
105 status: TokenTransferStatus::Deposited,
106 gas_usage: effects.gas_cost_summary().net_gas_usage(),
107 data_source: BridgeDataSource::Iota,
108 data: Some(TokenTransferData {
109 destination_chain: move_event.target_chain,
110 sender_address: move_event.sender_address.clone(),
111 recipient_address: move_event.target_address.clone(),
112 token_id: move_event.token_type,
113 amount: move_event.amount_iota_adjusted,
114 }),
115 }));
116 }
117 "TokenTransferApproved" => {
118 info!("Observed IOTA Approval {:?}", ev);
119 metrics.total_iota_token_transfer_approved.inc();
120 let event: MoveTokenTransferApproved = bcs::from_bytes(ev.bcs.bytes())?;
121 transfers.push(ProcessedTxnData::TokenTransfer(TokenTransfer {
122 chain_id: event.message_key.source_chain,
123 nonce: event.message_key.bridge_seq_num,
124 block_height: checkpoint_num,
125 timestamp_ms,
126 txn_hash: tx_digest.inner().to_vec(),
127 txn_sender: ev.sender.to_vec(),
128 status: TokenTransferStatus::Approved,
129 gas_usage: effects.gas_cost_summary().net_gas_usage(),
130 data_source: BridgeDataSource::Iota,
131 data: None,
132 }));
133 }
134 "TokenTransferClaimed" => {
135 info!("Observed IOTA Claim {:?}", ev);
136 metrics.total_iota_token_transfer_claimed.inc();
137 let event: MoveTokenTransferClaimed = bcs::from_bytes(ev.bcs.bytes())?;
138 transfers.push(ProcessedTxnData::TokenTransfer(TokenTransfer {
139 chain_id: event.message_key.source_chain,
140 nonce: event.message_key.bridge_seq_num,
141 block_height: checkpoint_num,
142 timestamp_ms,
143 txn_hash: tx_digest.inner().to_vec(),
144 txn_sender: ev.sender.to_vec(),
145 status: TokenTransferStatus::Claimed,
146 gas_usage: effects.gas_cost_summary().net_gas_usage(),
147 data_source: BridgeDataSource::Iota,
148 data: None,
149 }));
150 }
151 _ => {
152 metrics.total_iota_bridge_txn_other.inc();
153 }
154 }
155 }
156 if !transfers.is_empty() {
157 info!(
158 ?tx_digest,
159 "IOTA: Extracted {} bridge token transfer data entries",
160 transfers.len(),
161 );
162 }
163 Ok(transfers)
164}