iota_bridge_indexer/
iota_transaction_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::time::Duration;
6
7use anyhow::Result;
8use futures::StreamExt;
9use iota_bridge::events::{
10    MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed,
11};
12use iota_json_rpc_types::IotaTransactionBlockEffectsAPI;
13use iota_metrics::metered_channel::{Receiver, ReceiverStream};
14use iota_types::{BRIDGE_ADDRESS, digests::TransactionDigest};
15use tracing::{error, info};
16
17use crate::{
18    BridgeDataSource, ProcessedTxnData, TokenTransfer, TokenTransferData, TokenTransferStatus,
19    metrics::BridgeIndexerMetrics,
20    postgres_manager::{PgPool, update_iota_progress_store, write},
21    types::RetrievedTransaction,
22};
23
24pub(crate) const COMMIT_BATCH_SIZE: usize = 10;
25
26pub async fn handle_iota_transactions_loop(
27    pg_pool: PgPool,
28    rx: Receiver<(Vec<RetrievedTransaction>, Option<TransactionDigest>)>,
29    metrics: BridgeIndexerMetrics,
30) {
31    let checkpoint_commit_batch_size = std::env::var("COMMIT_BATCH_SIZE")
32        .unwrap_or(COMMIT_BATCH_SIZE.to_string())
33        .parse::<usize>()
34        .unwrap();
35    let mut stream = ReceiverStream::new(rx).ready_chunks(checkpoint_commit_batch_size);
36    while let Some(batch) = stream.next().await {
37        // unwrap: batch must not be empty
38        let (txns, cursor) = batch.last().cloned().unwrap();
39        let data = batch
40            .into_iter()
41            // TODO: letting it panic so we can capture errors, but we should handle this more
42            // gracefully
43            .flat_map(|(chunk, _)| process_transactions(chunk, &metrics).unwrap())
44            .collect::<Vec<_>>();
45
46        // write batched transaction data to DB
47        if !data.is_empty() {
48            // unwrap: token_transfers is not empty
49            let last_ckp = txns.last().map(|tx| tx.checkpoint).unwrap_or_default();
50            while let Err(err) = write(&pg_pool, data.clone()) {
51                error!("Failed to write iota transactions to DB: {:?}", err);
52                tokio::time::sleep(Duration::from_secs(5)).await;
53            }
54            info!("Wrote {} bridge transaction data to DB", data.len());
55            metrics.last_committed_iota_checkpoint.set(last_ckp as i64);
56        }
57
58        // update iota progress store using the latest cursor
59        if let Some(cursor) = cursor {
60            while let Err(err) = update_iota_progress_store(&pg_pool, cursor) {
61                error!("Failed to update iota progress tore DB: {:?}", err);
62                tokio::time::sleep(Duration::from_secs(5)).await;
63            }
64            info!("Updated iota transaction cursor to {}", cursor);
65        }
66    }
67    unreachable!("Channel closed unexpectedly");
68}
69
70fn process_transactions(
71    txns: Vec<RetrievedTransaction>,
72    metrics: &BridgeIndexerMetrics,
73) -> Result<Vec<ProcessedTxnData>> {
74    txns.into_iter().try_fold(vec![], |mut result, tx| {
75        result.append(&mut into_token_transfers(tx, metrics)?);
76        Ok(result)
77    })
78}
79
80pub fn into_token_transfers(
81    tx: RetrievedTransaction,
82    metrics: &BridgeIndexerMetrics,
83) -> Result<Vec<ProcessedTxnData>> {
84    let mut transfers = Vec::new();
85    let tx_digest = tx.tx_digest;
86    let timestamp_ms = tx.timestamp_ms;
87    let checkpoint_num = tx.checkpoint;
88    let effects = tx.effects;
89    for ev in tx.events.data {
90        if ev.type_.address != BRIDGE_ADDRESS {
91            continue;
92        }
93        match ev.type_.name.as_str() {
94            "TokenDepositedEvent" => {
95                info!("Observed IOTA Deposit {:?}", ev);
96                metrics.total_iota_token_deposited.inc();
97                let move_event: MoveTokenDepositedEvent = bcs::from_bytes(ev.bcs.bytes())?;
98                transfers.push(ProcessedTxnData::TokenTransfer(TokenTransfer {
99                    chain_id: move_event.source_chain,
100                    nonce: move_event.seq_num,
101                    block_height: checkpoint_num,
102                    timestamp_ms,
103                    txn_hash: tx_digest.inner().to_vec(),
104                    txn_sender: ev.sender.to_vec(),
105                    status: TokenTransferStatus::Deposited,
106                    gas_usage: effects.gas_cost_summary().net_gas_usage(),
107                    data_source: BridgeDataSource::Iota,
108                    data: Some(TokenTransferData {
109                        destination_chain: move_event.target_chain,
110                        sender_address: move_event.sender_address.clone(),
111                        recipient_address: move_event.target_address.clone(),
112                        token_id: move_event.token_type,
113                        amount: move_event.amount_iota_adjusted,
114                    }),
115                }));
116            }
117            "TokenTransferApproved" => {
118                info!("Observed IOTA Approval {:?}", ev);
119                metrics.total_iota_token_transfer_approved.inc();
120                let event: MoveTokenTransferApproved = bcs::from_bytes(ev.bcs.bytes())?;
121                transfers.push(ProcessedTxnData::TokenTransfer(TokenTransfer {
122                    chain_id: event.message_key.source_chain,
123                    nonce: event.message_key.bridge_seq_num,
124                    block_height: checkpoint_num,
125                    timestamp_ms,
126                    txn_hash: tx_digest.inner().to_vec(),
127                    txn_sender: ev.sender.to_vec(),
128                    status: TokenTransferStatus::Approved,
129                    gas_usage: effects.gas_cost_summary().net_gas_usage(),
130                    data_source: BridgeDataSource::Iota,
131                    data: None,
132                }));
133            }
134            "TokenTransferClaimed" => {
135                info!("Observed IOTA Claim {:?}", ev);
136                metrics.total_iota_token_transfer_claimed.inc();
137                let event: MoveTokenTransferClaimed = bcs::from_bytes(ev.bcs.bytes())?;
138                transfers.push(ProcessedTxnData::TokenTransfer(TokenTransfer {
139                    chain_id: event.message_key.source_chain,
140                    nonce: event.message_key.bridge_seq_num,
141                    block_height: checkpoint_num,
142                    timestamp_ms,
143                    txn_hash: tx_digest.inner().to_vec(),
144                    txn_sender: ev.sender.to_vec(),
145                    status: TokenTransferStatus::Claimed,
146                    gas_usage: effects.gas_cost_summary().net_gas_usage(),
147                    data_source: BridgeDataSource::Iota,
148                    data: None,
149                }));
150            }
151            _ => {
152                metrics.total_iota_bridge_txn_other.inc();
153            }
154        }
155    }
156    if !transfers.is_empty() {
157        info!(
158            ?tx_digest,
159            "IOTA: Extracted {} bridge token transfer data entries",
160            transfers.len(),
161        );
162    }
163    Ok(transfers)
164}