iota_indexer/models/
transactions.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use diesel::prelude::*;
8use iota_json_rpc_types::{
9    BalanceChange, IotaEvent, IotaTransactionBlock, IotaTransactionBlockEffects,
10    IotaTransactionBlockEvents, IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
11    ObjectChange,
12};
13use iota_package_resolver::{PackageStore, Resolver};
14use iota_types::{
15    digests::TransactionDigest,
16    effects::{TransactionEffects, TransactionEvents},
17    event::Event,
18    transaction::SenderSignedData,
19};
20use move_core_types::{
21    annotated_value::{MoveDatatypeLayout, MoveTypeLayout},
22    language_storage::TypeTag,
23};
24
25use crate::{
26    errors::IndexerError,
27    schema::{optimistic_transactions, transactions, tx_insertion_order},
28    types::{IndexedObjectChange, IndexedTransaction, IndexerResult},
29};
30
31#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
32#[diesel(table_name = tx_insertion_order)]
33pub struct TxInsertionOrder {
34    /// Insertion order number that each transaction (either optimistic or
35    /// checkpointed) is assigned when being indexed. It provides common
36    /// ordering for optimistic and checkpointed transactions, whereas
37    /// `tx_sequence_number` provides ordering only for checkpointed
38    /// transactions. We skip it on insertion since it's autogenerated by the
39    /// database.
40    #[diesel(skip_insertion)]
41    pub insertion_order: i64,
42    pub tx_digest: Vec<u8>,
43}
44
45#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
46#[diesel(table_name = transactions)]
47pub struct StoredTransaction {
48    /// The index of the transaction in the global ordering that starts
49    /// from genesis.
50    pub tx_sequence_number: i64,
51    pub transaction_digest: Vec<u8>,
52    pub raw_transaction: Vec<u8>,
53    pub raw_effects: Vec<u8>,
54    pub checkpoint_sequence_number: i64,
55    pub timestamp_ms: i64,
56    pub object_changes: Vec<Option<Vec<u8>>>,
57    pub balance_changes: Vec<Option<Vec<u8>>>,
58    pub events: Vec<Option<Vec<u8>>>,
59    pub transaction_kind: i16,
60    pub success_command_count: i16,
61}
62
63#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
64#[diesel(table_name = optimistic_transactions)]
65pub struct OptimisticTransaction {
66    pub insertion_order: i64,
67    pub transaction_digest: Vec<u8>,
68    pub raw_transaction: Vec<u8>,
69    pub raw_effects: Vec<u8>,
70    pub object_changes: Vec<Option<Vec<u8>>>,
71    pub balance_changes: Vec<Option<Vec<u8>>>,
72    pub events: Vec<Option<Vec<u8>>>,
73    pub transaction_kind: i16,
74    pub success_command_count: i16,
75}
76
77impl From<OptimisticTransaction> for StoredTransaction {
78    fn from(tx: OptimisticTransaction) -> Self {
79        StoredTransaction {
80            tx_sequence_number: tx.insertion_order,
81            transaction_digest: tx.transaction_digest,
82            raw_transaction: tx.raw_transaction,
83            raw_effects: tx.raw_effects,
84            checkpoint_sequence_number: -1,
85            timestamp_ms: -1,
86            object_changes: tx.object_changes,
87            balance_changes: tx.balance_changes,
88            events: tx.events,
89            transaction_kind: tx.transaction_kind,
90            success_command_count: tx.success_command_count,
91        }
92    }
93}
94
95impl From<StoredTransaction> for OptimisticTransaction {
96    fn from(tx: StoredTransaction) -> Self {
97        OptimisticTransaction {
98            insertion_order: tx.tx_sequence_number,
99            transaction_digest: tx.transaction_digest,
100            raw_transaction: tx.raw_transaction,
101            raw_effects: tx.raw_effects,
102            object_changes: tx.object_changes,
103            balance_changes: tx.balance_changes,
104            events: tx.events,
105            transaction_kind: tx.transaction_kind,
106            success_command_count: tx.success_command_count,
107        }
108    }
109}
110
111pub type StoredTransactionEvents = Vec<Option<Vec<u8>>>;
112
113#[derive(Debug, Queryable)]
114pub struct TxSeq {
115    pub seq: i64,
116}
117
118impl Default for TxSeq {
119    fn default() -> Self {
120        Self { seq: -1 }
121    }
122}
123
124#[derive(Clone, Debug, Queryable)]
125pub struct StoredTransactionTimestamp {
126    pub tx_sequence_number: i64,
127    pub timestamp_ms: i64,
128}
129
130#[derive(Clone, Debug, Queryable)]
131pub struct StoredTransactionCheckpoint {
132    pub tx_sequence_number: i64,
133    pub checkpoint_sequence_number: i64,
134}
135
136#[derive(Clone, Debug, Queryable)]
137pub struct StoredTransactionSuccessCommandCount {
138    pub tx_sequence_number: i64,
139    pub checkpoint_sequence_number: i64,
140    pub success_command_count: i16,
141    pub timestamp_ms: i64,
142}
143
144impl From<&IndexedTransaction> for StoredTransaction {
145    fn from(tx: &IndexedTransaction) -> Self {
146        StoredTransaction {
147            tx_sequence_number: tx.tx_sequence_number as i64,
148            transaction_digest: tx.tx_digest.into_inner().to_vec(),
149            raw_transaction: bcs::to_bytes(&tx.sender_signed_data).unwrap(),
150            raw_effects: bcs::to_bytes(&tx.effects).unwrap(),
151            checkpoint_sequence_number: tx.checkpoint_sequence_number as i64,
152            object_changes: tx
153                .object_changes
154                .iter()
155                .map(|oc| Some(bcs::to_bytes(&oc).unwrap()))
156                .collect(),
157            balance_changes: tx
158                .balance_change
159                .iter()
160                .map(|bc| Some(bcs::to_bytes(&bc).unwrap()))
161                .collect(),
162            events: tx
163                .events
164                .iter()
165                .map(|e| Some(bcs::to_bytes(&e).unwrap()))
166                .collect(),
167            timestamp_ms: tx.timestamp_ms as i64,
168            transaction_kind: tx.transaction_kind as i16,
169            success_command_count: tx.successful_tx_num as i16,
170        }
171    }
172}
173
174impl StoredTransaction {
175    pub fn get_balance_len(&self) -> usize {
176        self.balance_changes.len()
177    }
178
179    pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
180        self.balance_changes.get(idx).cloned().flatten()
181    }
182
183    pub fn get_object_len(&self) -> usize {
184        self.object_changes.len()
185    }
186
187    pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
188        self.object_changes.get(idx).cloned().flatten()
189    }
190
191    pub fn get_event_len(&self) -> usize {
192        self.events.len()
193    }
194
195    pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
196        self.events.get(idx).cloned().flatten()
197    }
198
199    /// True for checkpointed transactions, False for optimistically indexed
200    /// transactions
201    pub fn is_checkpointed_transaction(&self) -> bool {
202        self.checkpoint_sequence_number >= 0
203    }
204
205    pub async fn try_into_iota_transaction_block_response(
206        self,
207        options: IotaTransactionBlockResponseOptions,
208        package_resolver: Arc<Resolver<impl PackageStore>>,
209    ) -> IndexerResult<IotaTransactionBlockResponse> {
210        let options = options.clone();
211        let tx_digest =
212            TransactionDigest::try_from(self.transaction_digest.as_slice()).map_err(|e| {
213                IndexerError::PersistentStorageDataCorruption(format!(
214                    "Can't convert {:?} as tx_digest. Error: {e}",
215                    self.transaction_digest
216                ))
217            })?;
218
219        let timestamp_ms = self
220            .is_checkpointed_transaction()
221            .then_some(self.timestamp_ms as u64);
222        let checkpoint = self
223            .is_checkpointed_transaction()
224            .then_some(self.checkpoint_sequence_number as u64);
225
226        let transaction = if options.show_input {
227            let sender_signed_data = self.try_into_sender_signed_data()?;
228            let tx_block = IotaTransactionBlock::try_from_with_package_resolver(
229                sender_signed_data,
230                package_resolver.clone(),
231                tx_digest,
232            )
233            .await?;
234            Some(tx_block)
235        } else {
236            None
237        };
238
239        let effects = options
240            .show_effects
241            .then(|| self.try_into_iota_transaction_effects())
242            .transpose()?;
243
244        let raw_transaction = options
245            .show_raw_input
246            .then_some(self.raw_transaction)
247            .unwrap_or_default();
248
249        let events = if options.show_events {
250            let events = {
251                self
252                        .events
253                        .into_iter()
254                        .map(|event| match event {
255                            Some(event) => {
256                                let event: Event = bcs::from_bytes(&event).map_err(|e| {
257                                    IndexerError::PersistentStorageDataCorruption(format!(
258                                        "Can't convert event bytes into Event. tx_digest={:?} Error: {e}",
259                                        tx_digest
260                                    ))
261                                })?;
262                                Ok(event)
263                            }
264                            None => Err(IndexerError::PersistentStorageDataCorruption(format!(
265                                "Event should not be null, tx_digest={:?}",
266                                tx_digest
267                            ))),
268                        })
269                        .collect::<Result<Vec<Event>, IndexerError>>()?
270            };
271            let timestamp = self.timestamp_ms as u64;
272            let tx_events = TransactionEvents { data: events };
273
274            tx_events_to_iota_tx_events(tx_events, package_resolver, tx_digest, timestamp).await?
275        } else {
276            None
277        };
278
279        let object_changes = if options.show_object_changes {
280            let object_changes = {
281                self.object_changes.into_iter().map(|object_change| {
282                        match object_change {
283                            Some(object_change) => {
284                                let object_change: IndexedObjectChange = bcs::from_bytes(&object_change)
285                                    .map_err(|e| IndexerError::PersistentStorageDataCorruption(
286                                        format!("Can't convert object_change bytes into IndexedObjectChange. tx_digest={:?} Error: {e}", tx_digest)
287                                    ))?;
288                                Ok(ObjectChange::from(object_change))
289                            }
290                            None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={:?}", tx_digest))),
291                        }
292                    }).collect::<Result<Vec<ObjectChange>, IndexerError>>()?
293            };
294            Some(object_changes)
295        } else {
296            None
297        };
298
299        let balance_changes = if options.show_balance_changes {
300            let balance_changes = {
301                self.balance_changes.into_iter().map(|balance_change| {
302                        match balance_change {
303                            Some(balance_change) => {
304                                let balance_change: BalanceChange = bcs::from_bytes(&balance_change)
305                                    .map_err(|e| IndexerError::PersistentStorageDataCorruption(
306                                        format!("Can't convert balance_change bytes into BalanceChange. tx_digest={:?} Error: {e}", tx_digest)
307                                    ))?;
308                                Ok(balance_change)
309                            }
310                            None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={:?}", tx_digest))),
311                        }
312                    }).collect::<Result<Vec<BalanceChange>, IndexerError>>()?
313            };
314            Some(balance_changes)
315        } else {
316            None
317        };
318
319        let raw_effects = options
320            .show_raw_effects
321            .then_some(self.raw_effects)
322            .unwrap_or_default();
323
324        Ok(IotaTransactionBlockResponse {
325            digest: tx_digest,
326            transaction,
327            raw_transaction,
328            effects,
329            events,
330            object_changes,
331            balance_changes,
332            timestamp_ms,
333            checkpoint,
334            confirmed_local_execution: None,
335            errors: vec![],
336            raw_effects,
337        })
338    }
339
340    fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
341        let sender_signed_data: SenderSignedData =
342            bcs::from_bytes(&self.raw_transaction).map_err(|e| {
343                IndexerError::PersistentStorageDataCorruption(format!(
344                    "Can't convert raw_transaction of {} into SenderSignedData. Error: {e}",
345                    self.tx_sequence_number
346                ))
347            })?;
348        Ok(sender_signed_data)
349    }
350
351    pub fn try_into_iota_transaction_effects(&self) -> IndexerResult<IotaTransactionBlockEffects> {
352        let effects: TransactionEffects = bcs::from_bytes(&self.raw_effects).map_err(|e| {
353            IndexerError::PersistentStorageDataCorruption(format!(
354                "Can't convert raw_effects of {} into TransactionEffects. Error: {e}",
355                self.tx_sequence_number
356            ))
357        })?;
358        let effects = IotaTransactionBlockEffects::try_from(effects)?;
359        Ok(effects)
360    }
361
362    /// Check if this is the genesis transaction relying on the global ordering.
363    pub fn is_genesis(&self) -> bool {
364        self.tx_sequence_number == 0
365    }
366}
367
368pub fn stored_events_to_events(
369    stored_events: StoredTransactionEvents,
370) -> Result<Vec<Event>, IndexerError> {
371    stored_events
372        .into_iter()
373        .map(|event| match event {
374            Some(event) => {
375                let event: Event = bcs::from_bytes(&event).map_err(|e| {
376                    IndexerError::PersistentStorageDataCorruption(format!(
377                        "Can't convert event bytes into Event. Error: {e}",
378                    ))
379                })?;
380                Ok(event)
381            }
382            None => Err(IndexerError::PersistentStorageDataCorruption(
383                "Event should not be null".to_string(),
384            )),
385        })
386        .collect::<Result<Vec<Event>, IndexerError>>()
387}
388
389pub async fn tx_events_to_iota_tx_events(
390    tx_events: TransactionEvents,
391    package_resolver: Arc<Resolver<impl PackageStore>>,
392    tx_digest: TransactionDigest,
393    timestamp: u64,
394) -> Result<Option<IotaTransactionBlockEvents>, IndexerError> {
395    let mut iota_event_futures = vec![];
396    let tx_events_data_len = tx_events.data.len();
397    for tx_event in tx_events.data.clone() {
398        let package_resolver_clone = package_resolver.clone();
399        iota_event_futures.push(tokio::task::spawn(async move {
400            let resolver = package_resolver_clone;
401            resolver
402                .type_layout(TypeTag::Struct(Box::new(tx_event.type_.clone())))
403                .await
404        }));
405    }
406    let event_move_type_layouts = futures::future::join_all(iota_event_futures)
407        .await
408        .into_iter()
409        .collect::<Result<Vec<_>, _>>()?
410        .into_iter()
411        .collect::<Result<Vec<_>, _>>()
412        .map_err(|e| {
413            IndexerError::ResolveMoveStruct(format!(
414                "Failed to convert to iota event with Error: {e}",
415            ))
416        })?;
417    let event_move_datatype_layouts = event_move_type_layouts
418        .into_iter()
419        .filter_map(|move_type_layout| match move_type_layout {
420            MoveTypeLayout::Struct(s) => Some(MoveDatatypeLayout::Struct(s)),
421            MoveTypeLayout::Enum(e) => Some(MoveDatatypeLayout::Enum(e)),
422            _ => None,
423        })
424        .collect::<Vec<_>>();
425    assert!(tx_events_data_len == event_move_datatype_layouts.len());
426    let iota_events = tx_events
427        .data
428        .into_iter()
429        .enumerate()
430        .zip(event_move_datatype_layouts)
431        .map(|((seq, tx_event), move_datatype_layout)| {
432            IotaEvent::try_from(
433                tx_event,
434                tx_digest,
435                seq as u64,
436                Some(timestamp),
437                move_datatype_layout,
438            )
439        })
440        .collect::<Result<Vec<_>, _>>()?;
441    let iota_tx_events = IotaTransactionBlockEvents { data: iota_events };
442    Ok(Some(iota_tx_events))
443}