iota_indexer/models/
transactions.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use diesel::prelude::*;
8use iota_json_rpc_types::{
9    BalanceChange, IotaEvent, IotaExecutionStatus, IotaTransactionBlock,
10    IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI, IotaTransactionBlockEvents,
11    IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions, ObjectChange,
12};
13use iota_package_resolver::{PackageStore, Resolver};
14use iota_types::{
15    base_types::TypeTag,
16    digests::TransactionDigest,
17    effects::{TransactionEffects, TransactionEvents},
18    event::Event,
19    transaction::SenderSignedData,
20};
21use move_core_types::annotated_value::{MoveDatatypeLayout, MoveTypeLayout};
22#[cfg(feature = "shared_test_runtime")]
23use serde::Deserialize;
24
25use crate::{
26    errors::IndexerError,
27    schema::{optimistic_transactions, transactions, tx_global_order},
28    types::{IndexedObjectChange, IndexedTransaction, IndexerResult},
29};
30
31#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
32#[diesel(table_name = tx_global_order)]
33pub struct TxGlobalOrder {
34    /// Sequence number of transaction according to checkpoint ordering.
35    /// Set after transaction is checkpoint-indexed.
36    pub chk_tx_sequence_number: Option<i64>,
37    /// Number that represents the global ordering between optimistic and
38    /// checkpointed transactions.
39    ///
40    /// Optimistic transactions will share the same number as checkpointed
41    /// transactions. In this case, ties are resolved by the
42    /// `(global_sequence_number, optimistic_sequence_number)` pair that
43    /// guarantees deterministic ordering.
44    pub global_sequence_number: i64,
45    pub tx_digest: Vec<u8>,
46    /// Monotonically increasing number that represents the order
47    /// of execution of optimistic transactions.
48    ///
49    /// Checkpointed transactions use [`CHECKPOINT_TX_OPTIMISTIC_SEQ`] (-1).
50    /// Optimistic transactions should set this value to `None`,
51    /// so that it is auto-generated on the database.
52    #[diesel(deserialize_as = i64)]
53    pub optimistic_sequence_number: Option<i64>,
54}
55
56/// Value stored in `optimistic_sequence_number` for checkpointed
57/// transactions, to distinguish them from optimistic transactions
58/// which use positive auto-generated values.
59pub const CHECKPOINT_TX_OPTIMISTIC_SEQ: i64 = -1;
60
61impl From<&IndexedTransaction> for TxGlobalOrder {
62    fn from(tx: &IndexedTransaction) -> Self {
63        Self {
64            chk_tx_sequence_number: Some(tx.tx_sequence_number as i64),
65            global_sequence_number: tx.tx_sequence_number as i64,
66            tx_digest: tx.tx_digest.into_inner().to_vec(),
67            optimistic_sequence_number: Some(CHECKPOINT_TX_OPTIMISTIC_SEQ),
68        }
69    }
70}
71
72#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
73#[diesel(table_name = transactions)]
74#[cfg_attr(feature = "shared_test_runtime", derive(Deserialize))]
75pub struct StoredTransaction {
76    /// The index of the transaction in the global ordering that starts
77    /// from genesis.
78    pub tx_sequence_number: i64,
79    pub transaction_digest: Vec<u8>,
80    pub raw_transaction: Vec<u8>,
81    pub raw_effects: Vec<u8>,
82    pub checkpoint_sequence_number: i64,
83    pub timestamp_ms: i64,
84    pub object_changes: Vec<Option<Vec<u8>>>,
85    pub balance_changes: Vec<Option<Vec<u8>>>,
86    pub events: Vec<Option<Vec<u8>>>,
87    pub transaction_kind: i16,
88    pub success_command_count: i16,
89}
90
91#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
92#[diesel(table_name = optimistic_transactions)]
93pub struct OptimisticTransaction {
94    pub global_sequence_number: i64,
95    pub optimistic_sequence_number: i64,
96    pub transaction_digest: Vec<u8>,
97    pub raw_transaction: Vec<u8>,
98    pub raw_effects: Vec<u8>,
99    pub object_changes: Vec<Option<Vec<u8>>>,
100    pub balance_changes: Vec<Option<Vec<u8>>>,
101    pub events: Vec<Option<Vec<u8>>>,
102    pub transaction_kind: i16,
103    pub success_command_count: i16,
104}
105
106impl From<OptimisticTransaction> for StoredTransaction {
107    fn from(tx: OptimisticTransaction) -> Self {
108        StoredTransaction {
109            tx_sequence_number: tx.optimistic_sequence_number,
110            transaction_digest: tx.transaction_digest,
111            raw_transaction: tx.raw_transaction,
112            raw_effects: tx.raw_effects,
113            checkpoint_sequence_number: -1,
114            timestamp_ms: -1,
115            object_changes: tx.object_changes,
116            balance_changes: tx.balance_changes,
117            events: tx.events,
118            transaction_kind: tx.transaction_kind,
119            success_command_count: tx.success_command_count,
120        }
121    }
122}
123
124impl OptimisticTransaction {
125    pub fn from_stored(global_sequence_number: i64, stored: StoredTransaction) -> Self {
126        OptimisticTransaction {
127            global_sequence_number,
128            optimistic_sequence_number: stored.tx_sequence_number,
129            transaction_digest: stored.transaction_digest,
130            raw_transaction: stored.raw_transaction,
131            raw_effects: stored.raw_effects,
132            object_changes: stored.object_changes,
133            balance_changes: stored.balance_changes,
134            events: stored.events,
135            transaction_kind: stored.transaction_kind,
136            success_command_count: stored.success_command_count,
137        }
138    }
139
140    pub fn get_balance_len(&self) -> usize {
141        self.balance_changes.len()
142    }
143
144    pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
145        self.balance_changes.get(idx).cloned().flatten()
146    }
147
148    pub fn get_object_len(&self) -> usize {
149        self.object_changes.len()
150    }
151
152    pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
153        self.object_changes.get(idx).cloned().flatten()
154    }
155
156    pub fn get_event_len(&self) -> usize {
157        self.events.len()
158    }
159
160    pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
161        self.events.get(idx).cloned().flatten()
162    }
163}
164
165pub type StoredTransactionEvents = Vec<Option<Vec<u8>>>;
166
167#[derive(Debug, Queryable)]
168pub struct TxSeq {
169    pub seq: i64,
170}
171
172impl Default for TxSeq {
173    fn default() -> Self {
174        Self { seq: -1 }
175    }
176}
177
178#[derive(Clone, Debug, Queryable)]
179pub struct StoredTransactionTimestamp {
180    pub tx_sequence_number: i64,
181    pub timestamp_ms: i64,
182}
183
184#[derive(Clone, Debug, Queryable)]
185pub struct StoredTransactionCheckpoint {
186    pub tx_sequence_number: i64,
187    pub checkpoint_sequence_number: i64,
188}
189
190#[derive(Clone, Debug, Queryable)]
191pub struct StoredTransactionSuccessCommandCount {
192    pub tx_sequence_number: i64,
193    pub checkpoint_sequence_number: i64,
194    pub success_command_count: i16,
195    pub timestamp_ms: i64,
196}
197
198impl From<&IndexedTransaction> for StoredTransaction {
199    fn from(tx: &IndexedTransaction) -> Self {
200        StoredTransaction {
201            tx_sequence_number: tx.tx_sequence_number as i64,
202            transaction_digest: tx.tx_digest.into_inner().to_vec(),
203            raw_transaction: bcs::to_bytes(&tx.sender_signed_data).unwrap(),
204            raw_effects: bcs::to_bytes(&tx.effects).unwrap(),
205            checkpoint_sequence_number: tx.checkpoint_sequence_number as i64,
206            object_changes: tx
207                .object_changes
208                .iter()
209                .map(|oc| Some(bcs::to_bytes(&oc).unwrap()))
210                .collect(),
211            balance_changes: tx
212                .balance_change
213                .iter()
214                .map(|bc| Some(bcs::to_bytes(&bc).unwrap()))
215                .collect(),
216            events: tx
217                .events
218                .iter()
219                .map(|e| Some(bcs::to_bytes(&e).unwrap()))
220                .collect(),
221            timestamp_ms: tx.timestamp_ms as i64,
222            transaction_kind: tx.transaction_kind as i16,
223            success_command_count: tx.successful_tx_num as i16,
224        }
225    }
226}
227
228impl StoredTransaction {
229    pub fn get_balance_len(&self) -> usize {
230        self.balance_changes.len()
231    }
232
233    pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
234        self.balance_changes.get(idx).cloned().flatten()
235    }
236
237    pub fn get_object_len(&self) -> usize {
238        self.object_changes.len()
239    }
240
241    pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
242        self.object_changes.get(idx).cloned().flatten()
243    }
244
245    pub fn get_event_len(&self) -> usize {
246        self.events.len()
247    }
248
249    pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
250        self.events.get(idx).cloned().flatten()
251    }
252
253    /// True for checkpointed transactions, False for optimistically indexed
254    /// transactions
255    pub fn is_checkpointed_transaction(&self) -> bool {
256        self.checkpoint_sequence_number >= 0
257    }
258
259    pub async fn try_into_iota_transaction_block_response(
260        self,
261        options: IotaTransactionBlockResponseOptions,
262        package_resolver: &Arc<Resolver<impl PackageStore>>,
263    ) -> IndexerResult<IotaTransactionBlockResponse> {
264        let options = options.clone();
265        let tx_digest =
266            TransactionDigest::from_bytes(self.transaction_digest.as_slice()).map_err(|e| {
267                IndexerError::PersistentStorageDataCorruption(format!(
268                    "Can't convert {:?} as tx_digest. Error: {e}",
269                    self.transaction_digest
270                ))
271            })?;
272
273        let timestamp_ms = self
274            .is_checkpointed_transaction()
275            .then_some(self.timestamp_ms as u64);
276        let checkpoint = self
277            .is_checkpointed_transaction()
278            .then_some(self.checkpoint_sequence_number as u64);
279
280        let transaction = if options.show_input {
281            let sender_signed_data = self.try_into_sender_signed_data()?;
282            let tx_block = IotaTransactionBlock::try_from_with_package_resolver(
283                sender_signed_data,
284                package_resolver,
285                tx_digest,
286            )
287            .await?;
288            Some(tx_block)
289        } else {
290            None
291        };
292
293        let effects = if options.show_effects {
294            Some(
295                self.try_into_iota_transaction_effects(package_resolver)
296                    .await?,
297            )
298        } else {
299            None
300        };
301
302        let raw_transaction = if options.show_raw_input {
303            self.raw_transaction
304        } else {
305            Default::default()
306        };
307
308        let events = if options.show_events {
309            let events = {
310                self
311                        .events
312                        .into_iter()
313                        .map(|event| match event {
314                            Some(event) => {
315                                let event: Event = bcs::from_bytes(&event).map_err(|e| {
316                                    IndexerError::PersistentStorageDataCorruption(format!(
317                                        "Can't convert event bytes into Event. tx_digest={tx_digest} Error: {e}"
318                                    ))
319                                })?;
320                                Ok(event)
321                            }
322                            None => Err(IndexerError::PersistentStorageDataCorruption(format!(
323                                "Event should not be null, tx_digest={tx_digest}"
324                            ))),
325                        })
326                        .collect::<Result<Vec<Event>, IndexerError>>()?
327            };
328            let tx_events = TransactionEvents { data: events };
329
330            Some(
331                tx_events_to_iota_tx_events(tx_events, package_resolver, tx_digest, timestamp_ms)
332                    .await?,
333            )
334        } else {
335            None
336        };
337
338        let object_changes = if options.show_object_changes {
339            let object_changes = {
340                self.object_changes.into_iter().map(|object_change| {
341                        match object_change {
342                            Some(object_change) => {
343                                let object_change: IndexedObjectChange = bcs::from_bytes(&object_change)
344                                    .map_err(|e| IndexerError::PersistentStorageDataCorruption(
345                                        format!("Can't convert object_change bytes into IndexedObjectChange. tx_digest={tx_digest} Error: {e}")
346                                    ))?;
347                                Ok(ObjectChange::from(object_change))
348                            }
349                            None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={tx_digest}"))),
350                        }
351                    }).collect::<Result<Vec<ObjectChange>, IndexerError>>()?
352            };
353            Some(object_changes)
354        } else {
355            None
356        };
357
358        let balance_changes = if options.show_balance_changes {
359            let balance_changes = {
360                self.balance_changes.into_iter().map(|balance_change| {
361                        match balance_change {
362                            Some(balance_change) => {
363                                let balance_change: BalanceChange = bcs::from_bytes(&balance_change)
364                                    .map_err(|e| IndexerError::PersistentStorageDataCorruption(
365                                        format!("Can't convert balance_change bytes into BalanceChange. tx_digest={tx_digest} Error: {e}")
366                                    ))?;
367                                Ok(balance_change)
368                            }
369                            None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={tx_digest}"))),
370                        }
371                    }).collect::<Result<Vec<BalanceChange>, IndexerError>>()?
372            };
373            Some(balance_changes)
374        } else {
375            None
376        };
377
378        let raw_effects = if options.show_raw_effects {
379            self.raw_effects
380        } else {
381            Default::default()
382        };
383
384        let errors = match effects.as_ref().map(|e| e.status()) {
385            Some(IotaExecutionStatus::Failure { error }) => vec![error.clone()],
386            _ => vec![],
387        };
388
389        Ok(IotaTransactionBlockResponse {
390            digest: tx_digest,
391            transaction,
392            raw_transaction,
393            effects,
394            events,
395            object_changes,
396            balance_changes,
397            timestamp_ms,
398            checkpoint,
399            confirmed_local_execution: None,
400            errors,
401            raw_effects,
402        })
403    }
404
405    pub fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
406        let sender_signed_data: SenderSignedData =
407            bcs::from_bytes(&self.raw_transaction).map_err(|e| {
408                IndexerError::PersistentStorageDataCorruption(format!(
409                    "Can't convert raw_transaction of {} into SenderSignedData. Error: {e}",
410                    self.tx_sequence_number
411                ))
412            })?;
413        Ok(sender_signed_data)
414    }
415
416    pub async fn try_into_iota_transaction_effects(
417        &self,
418        package_resolver: &Arc<Resolver<impl PackageStore>>,
419    ) -> IndexerResult<IotaTransactionBlockEffects> {
420        let effects: TransactionEffects = bcs::from_bytes(&self.raw_effects).map_err(|e| {
421            IndexerError::PersistentStorageDataCorruption(format!(
422                "Can't convert raw_effects of {} into TransactionEffects. Error: {e}",
423                self.tx_sequence_number
424            ))
425        })?;
426        let effects =
427            IotaTransactionBlockEffects::from_native_with_clever_error(effects, package_resolver)
428                .await;
429        Ok(effects)
430    }
431
432    /// Check if this is the genesis transaction relying on the global ordering.
433    pub fn is_genesis(&self) -> bool {
434        self.tx_sequence_number == 0
435    }
436}
437
438pub fn stored_events_to_events(
439    stored_events: StoredTransactionEvents,
440) -> Result<Vec<Event>, IndexerError> {
441    stored_events
442        .into_iter()
443        .map(|event| match event {
444            Some(event) => {
445                let event: Event = bcs::from_bytes(&event).map_err(|e| {
446                    IndexerError::PersistentStorageDataCorruption(format!(
447                        "Can't convert event bytes into Event. Error: {e}",
448                    ))
449                })?;
450                Ok(event)
451            }
452            None => Err(IndexerError::PersistentStorageDataCorruption(
453                "Event should not be null".to_string(),
454            )),
455        })
456        .collect::<Result<Vec<Event>, IndexerError>>()
457}
458
459pub async fn tx_events_to_iota_tx_events(
460    tx_events: TransactionEvents,
461    package_resolver: &Arc<Resolver<impl PackageStore>>,
462    tx_digest: TransactionDigest,
463    timestamp: Option<u64>,
464) -> Result<IotaTransactionBlockEvents, IndexerError> {
465    let mut iota_event_futures = vec![];
466    let tx_events_data_len = tx_events.data.len();
467    for tx_event in tx_events.data.clone() {
468        let package_resolver_clone = package_resolver.clone();
469        iota_event_futures.push(tokio::task::spawn(async move {
470            let resolver = package_resolver_clone;
471            resolver
472                .type_layout(TypeTag::Struct(Box::new(tx_event.type_)))
473                .await
474        }));
475    }
476    let event_move_type_layouts = futures::future::join_all(iota_event_futures)
477        .await
478        .into_iter()
479        .collect::<Result<Vec<_>, _>>()?
480        .into_iter()
481        .collect::<Result<Vec<_>, _>>()
482        .map_err(|e| {
483            IndexerError::ResolveMoveStruct(format!(
484                "Failed to convert to iota event with Error: {e}",
485            ))
486        })?;
487    let event_move_datatype_layouts = event_move_type_layouts
488        .into_iter()
489        .filter_map(|move_type_layout| match move_type_layout {
490            MoveTypeLayout::Struct(s) => Some(MoveDatatypeLayout::Struct(s)),
491            MoveTypeLayout::Enum(e) => Some(MoveDatatypeLayout::Enum(e)),
492            _ => None,
493        })
494        .collect::<Vec<_>>();
495    assert!(tx_events_data_len == event_move_datatype_layouts.len());
496    let iota_events = tx_events
497        .data
498        .into_iter()
499        .enumerate()
500        .zip(event_move_datatype_layouts)
501        .map(|((seq, tx_event), move_datatype_layout)| {
502            IotaEvent::try_from(
503                tx_event,
504                tx_digest,
505                seq as u64,
506                timestamp,
507                move_datatype_layout,
508            )
509        })
510        .collect::<Result<Vec<_>, _>>()?;
511    let iota_tx_events = IotaTransactionBlockEvents { data: iota_events };
512    Ok(iota_tx_events)
513}