1use anyhow::{Error, anyhow};
6use async_trait::async_trait;
7use diesel::{
8 Connection, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, SelectableHelper,
9 TextExpressionMethods, dsl::now,
10};
11use iota_bridge::events::{
12 MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed,
13};
14use iota_indexer_builder::{
15 Task,
16 indexer_builder::{DataMapper, IndexerProgressStore, Persistent},
17 iota_datasource::CheckpointTxnData,
18};
19use iota_types::{
20 BRIDGE_ADDRESS, IOTA_BRIDGE_OBJECT_ID, effects::TransactionEffectsAPI, event::Event,
21 execution_status::ExecutionStatus, full_checkpoint_content::CheckpointTransaction,
22};
23use tracing::info;
24
25use crate::{
26 BridgeDataSource, IotaTxnError, ProcessedTxnData, TokenTransfer, TokenTransferData,
27 TokenTransferStatus,
28 metrics::BridgeIndexerMetrics,
29 models,
30 postgres_manager::PgPool,
31 schema,
32 schema::{
33 iota_error_transactions,
34 progress_store::{columns, dsl},
35 token_transfer, token_transfer_data,
36 },
37};
38
39#[derive(Clone)]
41pub struct PgBridgePersistent {
42 pool: PgPool,
43}
44
45impl PgBridgePersistent {
46 pub fn new(pool: PgPool) -> Self {
47 Self { pool }
48 }
49}
50
51#[async_trait]
53impl Persistent<ProcessedTxnData> for PgBridgePersistent {
54 async fn write(&self, data: Vec<ProcessedTxnData>) -> Result<(), Error> {
55 if data.is_empty() {
56 return Ok(());
57 }
58 let connection = &mut self.pool.get()?;
59 connection.transaction(|conn| {
60 for d in data {
61 match d {
62 ProcessedTxnData::TokenTransfer(t) => {
63 diesel::insert_into(token_transfer::table)
64 .values(&t.to_db())
65 .on_conflict_do_nothing()
66 .execute(conn)?;
67
68 if let Some(d) = t.to_data_maybe() {
69 diesel::insert_into(token_transfer_data::table)
70 .values(&d)
71 .on_conflict_do_nothing()
72 .execute(conn)?;
73 }
74 }
75 ProcessedTxnData::Error(e) => {
76 diesel::insert_into(iota_error_transactions::table)
77 .values(&e.to_db())
78 .on_conflict_do_nothing()
79 .execute(conn)?;
80 }
81 }
82 }
83 Ok(())
84 })
85 }
86}
87
88#[async_trait]
89impl IndexerProgressStore for PgBridgePersistent {
90 async fn load_progress(&self, task_name: String) -> anyhow::Result<u64> {
91 let mut conn = self.pool.get()?;
92 let cp: Option<models::ProgressStore> = dsl::progress_store
93 .find(&task_name)
94 .select(models::ProgressStore::as_select())
95 .first(&mut conn)
96 .optional()?;
97 Ok(cp
98 .ok_or(anyhow!("Cannot found progress for task {task_name}"))?
99 .checkpoint as u64)
100 }
101
102 async fn save_progress(
103 &mut self,
104 task_name: String,
105 checkpoint_number: u64,
106 ) -> anyhow::Result<()> {
107 let mut conn = self.pool.get()?;
108 diesel::insert_into(schema::progress_store::table)
109 .values(&models::ProgressStore {
110 task_name,
111 checkpoint: checkpoint_number as i64,
112 target_checkpoint: i64::MAX,
114 timestamp: None,
116 })
117 .on_conflict(dsl::task_name)
118 .do_update()
119 .set((
120 columns::checkpoint.eq(checkpoint_number as i64),
121 columns::timestamp.eq(now),
122 ))
123 .execute(&mut conn)?;
124 Ok(())
125 }
126
127 async fn tasks(&self, prefix: &str) -> Result<Vec<Task>, anyhow::Error> {
128 let mut conn = self.pool.get()?;
129 let cp: Vec<models::ProgressStore> = dsl::progress_store
131 .filter(columns::task_name.like(format!("{prefix} - %")))
134 .filter(columns::checkpoint.lt(columns::target_checkpoint))
135 .order_by(columns::target_checkpoint.desc())
136 .load(&mut conn)?;
137 Ok(cp.into_iter().map(|d| d.into()).collect())
138 }
139
140 async fn register_task(
141 &mut self,
142 task_name: String,
143 checkpoint: u64,
144 target_checkpoint: u64,
145 ) -> Result<(), anyhow::Error> {
146 let mut conn = self.pool.get()?;
147 diesel::insert_into(schema::progress_store::table)
148 .values(models::ProgressStore {
149 task_name,
150 checkpoint: checkpoint as i64,
151 target_checkpoint: target_checkpoint as i64,
152 timestamp: None,
154 })
155 .execute(&mut conn)?;
156 Ok(())
157 }
158
159 async fn update_task(&mut self, task: Task) -> Result<(), anyhow::Error> {
160 let mut conn = self.pool.get()?;
161 diesel::update(dsl::progress_store.filter(columns::task_name.eq(task.task_name)))
162 .set((
163 columns::checkpoint.eq(task.checkpoint as i64),
164 columns::target_checkpoint.eq(task.target_checkpoint as i64),
165 columns::timestamp.eq(now),
166 ))
167 .execute(&mut conn)?;
168 Ok(())
169 }
170}
171
172#[derive(Clone)]
174pub struct IotaBridgeDataMapper {
175 pub metrics: BridgeIndexerMetrics,
176}
177
178impl DataMapper<CheckpointTxnData, ProcessedTxnData> for IotaBridgeDataMapper {
179 fn map(
180 &self,
181 (data, checkpoint_num, timestamp_ms): CheckpointTxnData,
182 ) -> Result<Vec<ProcessedTxnData>, Error> {
183 self.metrics.total_iota_bridge_transactions.inc();
184 if !data
185 .input_objects
186 .iter()
187 .any(|obj| obj.id() == IOTA_BRIDGE_OBJECT_ID)
188 {
189 return Ok(vec![]);
190 }
191
192 match &data.events {
193 Some(events) => {
194 let token_transfers = events.data.iter().try_fold(vec![], |mut result, ev| {
195 if let Some(data) = process_iota_event(ev, &data, checkpoint_num, timestamp_ms)?
196 {
197 result.push(data);
198 }
199 Ok::<_, anyhow::Error>(result)
200 })?;
201
202 if !token_transfers.is_empty() {
203 info!(
204 "IOTA: Extracted {} bridge token transfer data entries for tx {}.",
205 token_transfers.len(),
206 data.transaction.digest()
207 );
208 }
209 Ok(token_transfers)
210 }
211 None => {
212 if let ExecutionStatus::Failure { error, command } = data.effects.status() {
213 Ok(vec![ProcessedTxnData::Error(IotaTxnError {
214 tx_digest: *data.transaction.digest(),
215 sender: data.transaction.sender_address(),
216 timestamp_ms,
217 failure_status: error.to_string(),
218 cmd_idx: command.map(|idx| idx as u64),
219 })])
220 } else {
221 Ok(vec![])
222 }
223 }
224 }
225 }
226}
227
228fn process_iota_event(
229 ev: &Event,
230 tx: &CheckpointTransaction,
231 checkpoint: u64,
232 timestamp_ms: u64,
233) -> Result<Option<ProcessedTxnData>, anyhow::Error> {
234 Ok(if ev.type_.address == BRIDGE_ADDRESS {
235 match ev.type_.name.as_str() {
236 "TokenDepositedEvent" => {
237 info!("Observed IOTA Deposit {:?}", ev);
238 let move_event: MoveTokenDepositedEvent = bcs::from_bytes(&ev.contents)?;
240 Some(ProcessedTxnData::TokenTransfer(TokenTransfer {
241 chain_id: move_event.source_chain,
242 nonce: move_event.seq_num,
243 block_height: checkpoint,
244 timestamp_ms,
245 txn_hash: tx.transaction.digest().inner().to_vec(),
246 txn_sender: ev.sender.to_vec(),
247 status: TokenTransferStatus::Deposited,
248 gas_usage: tx.effects.gas_cost_summary().net_gas_usage(),
249 data_source: BridgeDataSource::Iota,
250 data: Some(TokenTransferData {
251 destination_chain: move_event.target_chain,
252 sender_address: move_event.sender_address.clone(),
253 recipient_address: move_event.target_address.clone(),
254 token_id: move_event.token_type,
255 amount: move_event.amount_iota_adjusted,
256 }),
257 }))
258 }
259 "TokenTransferApproved" => {
260 info!("Observed IOTA Approval {:?}", ev);
261 let event: MoveTokenTransferApproved = bcs::from_bytes(&ev.contents)?;
263 Some(ProcessedTxnData::TokenTransfer(TokenTransfer {
264 chain_id: event.message_key.source_chain,
265 nonce: event.message_key.bridge_seq_num,
266 block_height: checkpoint,
267 timestamp_ms,
268 txn_hash: tx.transaction.digest().inner().to_vec(),
269 txn_sender: ev.sender.to_vec(),
270 status: TokenTransferStatus::Approved,
271 gas_usage: tx.effects.gas_cost_summary().net_gas_usage(),
272 data_source: BridgeDataSource::Iota,
273 data: None,
274 }))
275 }
276 "TokenTransferClaimed" => {
277 info!("Observed IOTA Claim {:?}", ev);
278 let event: MoveTokenTransferClaimed = bcs::from_bytes(&ev.contents)?;
280 Some(ProcessedTxnData::TokenTransfer(TokenTransfer {
281 chain_id: event.message_key.source_chain,
282 nonce: event.message_key.bridge_seq_num,
283 block_height: checkpoint,
284 timestamp_ms,
285 txn_hash: tx.transaction.digest().inner().to_vec(),
286 txn_sender: ev.sender.to_vec(),
287 status: TokenTransferStatus::Claimed,
288 gas_usage: tx.effects.gas_cost_summary().net_gas_usage(),
289 data_source: BridgeDataSource::Iota,
290 data: None,
291 }))
292 }
293 _ => {
294 None
296 }
297 }
298 } else {
299 None
300 })
301}