iota_bridge_indexer/
iota_bridge_indexer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::{Error, anyhow};
6use async_trait::async_trait;
7use diesel::{
8    Connection, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, SelectableHelper,
9    TextExpressionMethods, dsl::now,
10};
11use iota_bridge::events::{
12    MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed,
13};
14use iota_indexer_builder::{
15    Task,
16    indexer_builder::{DataMapper, IndexerProgressStore, Persistent},
17    iota_datasource::CheckpointTxnData,
18};
19use iota_types::{
20    BRIDGE_ADDRESS, IOTA_BRIDGE_OBJECT_ID, effects::TransactionEffectsAPI, event::Event,
21    execution_status::ExecutionStatus, full_checkpoint_content::CheckpointTransaction,
22};
23use tracing::info;
24
25use crate::{
26    BridgeDataSource, IotaTxnError, ProcessedTxnData, TokenTransfer, TokenTransferData,
27    TokenTransferStatus,
28    metrics::BridgeIndexerMetrics,
29    models,
30    postgres_manager::PgPool,
31    schema,
32    schema::{
33        iota_error_transactions,
34        progress_store::{columns, dsl},
35        token_transfer, token_transfer_data,
36    },
37};
38
39/// Persistent layer impl
40#[derive(Clone)]
41pub struct PgBridgePersistent {
42    pool: PgPool,
43}
44
45impl PgBridgePersistent {
46    pub fn new(pool: PgPool) -> Self {
47        Self { pool }
48    }
49}
50
51// TODO: this is shared between IOTA and ETH, move to different file.
52#[async_trait]
53impl Persistent<ProcessedTxnData> for PgBridgePersistent {
54    async fn write(&self, data: Vec<ProcessedTxnData>) -> Result<(), Error> {
55        if data.is_empty() {
56            return Ok(());
57        }
58        let connection = &mut self.pool.get()?;
59        connection.transaction(|conn| {
60            for d in data {
61                match d {
62                    ProcessedTxnData::TokenTransfer(t) => {
63                        diesel::insert_into(token_transfer::table)
64                            .values(&t.to_db())
65                            .on_conflict_do_nothing()
66                            .execute(conn)?;
67
68                        if let Some(d) = t.to_data_maybe() {
69                            diesel::insert_into(token_transfer_data::table)
70                                .values(&d)
71                                .on_conflict_do_nothing()
72                                .execute(conn)?;
73                        }
74                    }
75                    ProcessedTxnData::Error(e) => {
76                        diesel::insert_into(iota_error_transactions::table)
77                            .values(&e.to_db())
78                            .on_conflict_do_nothing()
79                            .execute(conn)?;
80                    }
81                }
82            }
83            Ok(())
84        })
85    }
86}
87
88#[async_trait]
89impl IndexerProgressStore for PgBridgePersistent {
90    async fn load_progress(&self, task_name: String) -> anyhow::Result<u64> {
91        let mut conn = self.pool.get()?;
92        let cp: Option<models::ProgressStore> = dsl::progress_store
93            .find(&task_name)
94            .select(models::ProgressStore::as_select())
95            .first(&mut conn)
96            .optional()?;
97        Ok(cp
98            .ok_or(anyhow!("Cannot found progress for task {task_name}"))?
99            .checkpoint as u64)
100    }
101
102    async fn save_progress(
103        &mut self,
104        task_name: String,
105        checkpoint_number: u64,
106    ) -> anyhow::Result<()> {
107        let mut conn = self.pool.get()?;
108        diesel::insert_into(schema::progress_store::table)
109            .values(&models::ProgressStore {
110                task_name,
111                checkpoint: checkpoint_number as i64,
112                // Target checkpoint and timestamp will only be written for new entries
113                target_checkpoint: i64::MAX,
114                // Timestamp is defaulted to current time in DB if None
115                timestamp: None,
116            })
117            .on_conflict(dsl::task_name)
118            .do_update()
119            .set((
120                columns::checkpoint.eq(checkpoint_number as i64),
121                columns::timestamp.eq(now),
122            ))
123            .execute(&mut conn)?;
124        Ok(())
125    }
126
127    async fn tasks(&self, prefix: &str) -> Result<Vec<Task>, anyhow::Error> {
128        let mut conn = self.pool.get()?;
129        // get all unfinished tasks
130        let cp: Vec<models::ProgressStore> = dsl::progress_store
131            // TODO: using like could be error prone, change the progress store schema to stare the
132            // task name properly.
133            .filter(columns::task_name.like(format!("{prefix} - %")))
134            .filter(columns::checkpoint.lt(columns::target_checkpoint))
135            .order_by(columns::target_checkpoint.desc())
136            .load(&mut conn)?;
137        Ok(cp.into_iter().map(|d| d.into()).collect())
138    }
139
140    async fn register_task(
141        &mut self,
142        task_name: String,
143        checkpoint: u64,
144        target_checkpoint: u64,
145    ) -> Result<(), anyhow::Error> {
146        let mut conn = self.pool.get()?;
147        diesel::insert_into(schema::progress_store::table)
148            .values(models::ProgressStore {
149                task_name,
150                checkpoint: checkpoint as i64,
151                target_checkpoint: target_checkpoint as i64,
152                // Timestamp is defaulted to current time in DB if None
153                timestamp: None,
154            })
155            .execute(&mut conn)?;
156        Ok(())
157    }
158
159    async fn update_task(&mut self, task: Task) -> Result<(), anyhow::Error> {
160        let mut conn = self.pool.get()?;
161        diesel::update(dsl::progress_store.filter(columns::task_name.eq(task.task_name)))
162            .set((
163                columns::checkpoint.eq(task.checkpoint as i64),
164                columns::target_checkpoint.eq(task.target_checkpoint as i64),
165                columns::timestamp.eq(now),
166            ))
167            .execute(&mut conn)?;
168        Ok(())
169    }
170}
171
172/// Data mapper impl
173#[derive(Clone)]
174pub struct IotaBridgeDataMapper {
175    pub metrics: BridgeIndexerMetrics,
176}
177
178impl DataMapper<CheckpointTxnData, ProcessedTxnData> for IotaBridgeDataMapper {
179    fn map(
180        &self,
181        (data, checkpoint_num, timestamp_ms): CheckpointTxnData,
182    ) -> Result<Vec<ProcessedTxnData>, Error> {
183        self.metrics.total_iota_bridge_transactions.inc();
184        if !data
185            .input_objects
186            .iter()
187            .any(|obj| obj.id() == IOTA_BRIDGE_OBJECT_ID)
188        {
189            return Ok(vec![]);
190        }
191
192        match &data.events {
193            Some(events) => {
194                let token_transfers = events.data.iter().try_fold(vec![], |mut result, ev| {
195                    if let Some(data) = process_iota_event(ev, &data, checkpoint_num, timestamp_ms)?
196                    {
197                        result.push(data);
198                    }
199                    Ok::<_, anyhow::Error>(result)
200                })?;
201
202                if !token_transfers.is_empty() {
203                    info!(
204                        "IOTA: Extracted {} bridge token transfer data entries for tx {}.",
205                        token_transfers.len(),
206                        data.transaction.digest()
207                    );
208                }
209                Ok(token_transfers)
210            }
211            None => {
212                if let ExecutionStatus::Failure { error, command } = data.effects.status() {
213                    Ok(vec![ProcessedTxnData::Error(IotaTxnError {
214                        tx_digest: *data.transaction.digest(),
215                        sender: data.transaction.sender_address(),
216                        timestamp_ms,
217                        failure_status: error.to_string(),
218                        cmd_idx: command.map(|idx| idx as u64),
219                    })])
220                } else {
221                    Ok(vec![])
222                }
223            }
224        }
225    }
226}
227
228fn process_iota_event(
229    ev: &Event,
230    tx: &CheckpointTransaction,
231    checkpoint: u64,
232    timestamp_ms: u64,
233) -> Result<Option<ProcessedTxnData>, anyhow::Error> {
234    Ok(if ev.type_.address == BRIDGE_ADDRESS {
235        match ev.type_.name.as_str() {
236            "TokenDepositedEvent" => {
237                info!("Observed IOTA Deposit {:?}", ev);
238                // todo: metrics.total_iota_token_deposited.inc();
239                let move_event: MoveTokenDepositedEvent = bcs::from_bytes(&ev.contents)?;
240                Some(ProcessedTxnData::TokenTransfer(TokenTransfer {
241                    chain_id: move_event.source_chain,
242                    nonce: move_event.seq_num,
243                    block_height: checkpoint,
244                    timestamp_ms,
245                    txn_hash: tx.transaction.digest().inner().to_vec(),
246                    txn_sender: ev.sender.to_vec(),
247                    status: TokenTransferStatus::Deposited,
248                    gas_usage: tx.effects.gas_cost_summary().net_gas_usage(),
249                    data_source: BridgeDataSource::Iota,
250                    data: Some(TokenTransferData {
251                        destination_chain: move_event.target_chain,
252                        sender_address: move_event.sender_address.clone(),
253                        recipient_address: move_event.target_address.clone(),
254                        token_id: move_event.token_type,
255                        amount: move_event.amount_iota_adjusted,
256                    }),
257                }))
258            }
259            "TokenTransferApproved" => {
260                info!("Observed IOTA Approval {:?}", ev);
261                // todo: metrics.total_iota_token_transfer_approved.inc();
262                let event: MoveTokenTransferApproved = bcs::from_bytes(&ev.contents)?;
263                Some(ProcessedTxnData::TokenTransfer(TokenTransfer {
264                    chain_id: event.message_key.source_chain,
265                    nonce: event.message_key.bridge_seq_num,
266                    block_height: checkpoint,
267                    timestamp_ms,
268                    txn_hash: tx.transaction.digest().inner().to_vec(),
269                    txn_sender: ev.sender.to_vec(),
270                    status: TokenTransferStatus::Approved,
271                    gas_usage: tx.effects.gas_cost_summary().net_gas_usage(),
272                    data_source: BridgeDataSource::Iota,
273                    data: None,
274                }))
275            }
276            "TokenTransferClaimed" => {
277                info!("Observed IOTA Claim {:?}", ev);
278                // todo: metrics.total_iota_token_transfer_claimed.inc();
279                let event: MoveTokenTransferClaimed = bcs::from_bytes(&ev.contents)?;
280                Some(ProcessedTxnData::TokenTransfer(TokenTransfer {
281                    chain_id: event.message_key.source_chain,
282                    nonce: event.message_key.bridge_seq_num,
283                    block_height: checkpoint,
284                    timestamp_ms,
285                    txn_hash: tx.transaction.digest().inner().to_vec(),
286                    txn_sender: ev.sender.to_vec(),
287                    status: TokenTransferStatus::Claimed,
288                    gas_usage: tx.effects.gas_cost_summary().net_gas_usage(),
289                    data_source: BridgeDataSource::Iota,
290                    data: None,
291                }))
292            }
293            _ => {
294                // todo: metrics.total_iota_bridge_txn_other.inc();
295                None
296            }
297        }
298    } else {
299        None
300    })
301}