iota_indexer/models/
transactions.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use diesel::prelude::*;
8use iota_json_rpc_types::{
9    BalanceChange, IotaEvent, IotaTransactionBlock, IotaTransactionBlockEffects,
10    IotaTransactionBlockEvents, IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
11    ObjectChange,
12};
13use iota_package_resolver::{PackageStore, Resolver};
14use iota_types::{
15    digests::TransactionDigest,
16    effects::{TransactionEffects, TransactionEvents},
17    event::Event,
18    transaction::SenderSignedData,
19};
20use move_core_types::{
21    annotated_value::{MoveDatatypeLayout, MoveTypeLayout},
22    language_storage::TypeTag,
23};
24#[cfg(feature = "shared_test_runtime")]
25use serde::Deserialize;
26
27use crate::{
28    errors::IndexerError,
29    schema::{optimistic_transactions, transactions, tx_insertion_order},
30    types::{IndexedObjectChange, IndexedTransaction, IndexerResult},
31};
32
33#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
34#[diesel(table_name = tx_insertion_order)]
35pub struct TxInsertionOrder {
36    /// Insertion order number that each transaction (either optimistic or
37    /// checkpointed) is assigned when being indexed. It provides common
38    /// ordering for optimistic and checkpointed transactions, whereas
39    /// `tx_sequence_number` provides ordering only for checkpointed
40    /// transactions. We skip it on insertion since it's autogenerated by the
41    /// database.
42    #[diesel(skip_insertion)]
43    pub insertion_order: i64,
44    pub tx_digest: Vec<u8>,
45}
46
47#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
48#[diesel(table_name = transactions)]
49#[cfg_attr(feature = "shared_test_runtime", derive(Deserialize))]
50pub struct StoredTransaction {
51    /// The index of the transaction in the global ordering that starts
52    /// from genesis.
53    pub tx_sequence_number: i64,
54    pub transaction_digest: Vec<u8>,
55    pub raw_transaction: Vec<u8>,
56    pub raw_effects: Vec<u8>,
57    pub checkpoint_sequence_number: i64,
58    pub timestamp_ms: i64,
59    pub object_changes: Vec<Option<Vec<u8>>>,
60    pub balance_changes: Vec<Option<Vec<u8>>>,
61    pub events: Vec<Option<Vec<u8>>>,
62    pub transaction_kind: i16,
63    pub success_command_count: i16,
64}
65
66#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
67#[diesel(table_name = optimistic_transactions)]
68pub struct OptimisticTransaction {
69    pub insertion_order: i64,
70    pub transaction_digest: Vec<u8>,
71    pub raw_transaction: Vec<u8>,
72    pub raw_effects: Vec<u8>,
73    pub object_changes: Vec<Option<Vec<u8>>>,
74    pub balance_changes: Vec<Option<Vec<u8>>>,
75    pub events: Vec<Option<Vec<u8>>>,
76    pub transaction_kind: i16,
77    pub success_command_count: i16,
78}
79
80impl From<OptimisticTransaction> for StoredTransaction {
81    fn from(tx: OptimisticTransaction) -> Self {
82        StoredTransaction {
83            tx_sequence_number: tx.insertion_order,
84            transaction_digest: tx.transaction_digest,
85            raw_transaction: tx.raw_transaction,
86            raw_effects: tx.raw_effects,
87            checkpoint_sequence_number: -1,
88            timestamp_ms: -1,
89            object_changes: tx.object_changes,
90            balance_changes: tx.balance_changes,
91            events: tx.events,
92            transaction_kind: tx.transaction_kind,
93            success_command_count: tx.success_command_count,
94        }
95    }
96}
97
98impl From<StoredTransaction> for OptimisticTransaction {
99    fn from(tx: StoredTransaction) -> Self {
100        OptimisticTransaction {
101            insertion_order: tx.tx_sequence_number,
102            transaction_digest: tx.transaction_digest,
103            raw_transaction: tx.raw_transaction,
104            raw_effects: tx.raw_effects,
105            object_changes: tx.object_changes,
106            balance_changes: tx.balance_changes,
107            events: tx.events,
108            transaction_kind: tx.transaction_kind,
109            success_command_count: tx.success_command_count,
110        }
111    }
112}
113
114pub type StoredTransactionEvents = Vec<Option<Vec<u8>>>;
115
116#[derive(Debug, Queryable)]
117pub struct TxSeq {
118    pub seq: i64,
119}
120
121impl Default for TxSeq {
122    fn default() -> Self {
123        Self { seq: -1 }
124    }
125}
126
127#[derive(Clone, Debug, Queryable)]
128pub struct StoredTransactionTimestamp {
129    pub tx_sequence_number: i64,
130    pub timestamp_ms: i64,
131}
132
133#[derive(Clone, Debug, Queryable)]
134pub struct StoredTransactionCheckpoint {
135    pub tx_sequence_number: i64,
136    pub checkpoint_sequence_number: i64,
137}
138
139#[derive(Clone, Debug, Queryable)]
140pub struct StoredTransactionSuccessCommandCount {
141    pub tx_sequence_number: i64,
142    pub checkpoint_sequence_number: i64,
143    pub success_command_count: i16,
144    pub timestamp_ms: i64,
145}
146
147impl From<&IndexedTransaction> for StoredTransaction {
148    fn from(tx: &IndexedTransaction) -> Self {
149        StoredTransaction {
150            tx_sequence_number: tx.tx_sequence_number as i64,
151            transaction_digest: tx.tx_digest.into_inner().to_vec(),
152            raw_transaction: bcs::to_bytes(&tx.sender_signed_data).unwrap(),
153            raw_effects: bcs::to_bytes(&tx.effects).unwrap(),
154            checkpoint_sequence_number: tx.checkpoint_sequence_number as i64,
155            object_changes: tx
156                .object_changes
157                .iter()
158                .map(|oc| Some(bcs::to_bytes(&oc).unwrap()))
159                .collect(),
160            balance_changes: tx
161                .balance_change
162                .iter()
163                .map(|bc| Some(bcs::to_bytes(&bc).unwrap()))
164                .collect(),
165            events: tx
166                .events
167                .iter()
168                .map(|e| Some(bcs::to_bytes(&e).unwrap()))
169                .collect(),
170            timestamp_ms: tx.timestamp_ms as i64,
171            transaction_kind: tx.transaction_kind as i16,
172            success_command_count: tx.successful_tx_num as i16,
173        }
174    }
175}
176
177impl StoredTransaction {
178    pub fn get_balance_len(&self) -> usize {
179        self.balance_changes.len()
180    }
181
182    pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
183        self.balance_changes.get(idx).cloned().flatten()
184    }
185
186    pub fn get_object_len(&self) -> usize {
187        self.object_changes.len()
188    }
189
190    pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
191        self.object_changes.get(idx).cloned().flatten()
192    }
193
194    pub fn get_event_len(&self) -> usize {
195        self.events.len()
196    }
197
198    pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
199        self.events.get(idx).cloned().flatten()
200    }
201
202    /// True for checkpointed transactions, False for optimistically indexed
203    /// transactions
204    pub fn is_checkpointed_transaction(&self) -> bool {
205        self.checkpoint_sequence_number >= 0
206    }
207
208    pub async fn try_into_iota_transaction_block_response(
209        self,
210        options: IotaTransactionBlockResponseOptions,
211        package_resolver: Arc<Resolver<impl PackageStore>>,
212    ) -> IndexerResult<IotaTransactionBlockResponse> {
213        let options = options.clone();
214        let tx_digest =
215            TransactionDigest::try_from(self.transaction_digest.as_slice()).map_err(|e| {
216                IndexerError::PersistentStorageDataCorruption(format!(
217                    "Can't convert {:?} as tx_digest. Error: {e}",
218                    self.transaction_digest
219                ))
220            })?;
221
222        let timestamp_ms = self
223            .is_checkpointed_transaction()
224            .then_some(self.timestamp_ms as u64);
225        let checkpoint = self
226            .is_checkpointed_transaction()
227            .then_some(self.checkpoint_sequence_number as u64);
228
229        let transaction = if options.show_input {
230            let sender_signed_data = self.try_into_sender_signed_data()?;
231            let tx_block = IotaTransactionBlock::try_from_with_package_resolver(
232                sender_signed_data,
233                package_resolver.clone(),
234                tx_digest,
235            )
236            .await?;
237            Some(tx_block)
238        } else {
239            None
240        };
241
242        let effects = options
243            .show_effects
244            .then(|| self.try_into_iota_transaction_effects())
245            .transpose()?;
246
247        let raw_transaction = options
248            .show_raw_input
249            .then_some(self.raw_transaction)
250            .unwrap_or_default();
251
252        let events = if options.show_events {
253            let events = {
254                self
255                        .events
256                        .into_iter()
257                        .map(|event| match event {
258                            Some(event) => {
259                                let event: Event = bcs::from_bytes(&event).map_err(|e| {
260                                    IndexerError::PersistentStorageDataCorruption(format!(
261                                        "Can't convert event bytes into Event. tx_digest={tx_digest:?} Error: {e}"
262                                    ))
263                                })?;
264                                Ok(event)
265                            }
266                            None => Err(IndexerError::PersistentStorageDataCorruption(format!(
267                                "Event should not be null, tx_digest={tx_digest:?}"
268                            ))),
269                        })
270                        .collect::<Result<Vec<Event>, IndexerError>>()?
271            };
272            let timestamp = self.timestamp_ms as u64;
273            let tx_events = TransactionEvents { data: events };
274
275            tx_events_to_iota_tx_events(tx_events, package_resolver, tx_digest, timestamp).await?
276        } else {
277            None
278        };
279
280        let object_changes = if options.show_object_changes {
281            let object_changes = {
282                self.object_changes.into_iter().map(|object_change| {
283                        match object_change {
284                            Some(object_change) => {
285                                let object_change: IndexedObjectChange = bcs::from_bytes(&object_change)
286                                    .map_err(|e| IndexerError::PersistentStorageDataCorruption(
287                                        format!("Can't convert object_change bytes into IndexedObjectChange. tx_digest={tx_digest:?} Error: {e}")
288                                    ))?;
289                                Ok(ObjectChange::from(object_change))
290                            }
291                            None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={tx_digest:?}"))),
292                        }
293                    }).collect::<Result<Vec<ObjectChange>, IndexerError>>()?
294            };
295            Some(object_changes)
296        } else {
297            None
298        };
299
300        let balance_changes = if options.show_balance_changes {
301            let balance_changes = {
302                self.balance_changes.into_iter().map(|balance_change| {
303                        match balance_change {
304                            Some(balance_change) => {
305                                let balance_change: BalanceChange = bcs::from_bytes(&balance_change)
306                                    .map_err(|e| IndexerError::PersistentStorageDataCorruption(
307                                        format!("Can't convert balance_change bytes into BalanceChange. tx_digest={tx_digest:?} Error: {e}")
308                                    ))?;
309                                Ok(balance_change)
310                            }
311                            None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={tx_digest:?}"))),
312                        }
313                    }).collect::<Result<Vec<BalanceChange>, IndexerError>>()?
314            };
315            Some(balance_changes)
316        } else {
317            None
318        };
319
320        let raw_effects = options
321            .show_raw_effects
322            .then_some(self.raw_effects)
323            .unwrap_or_default();
324
325        Ok(IotaTransactionBlockResponse {
326            digest: tx_digest,
327            transaction,
328            raw_transaction,
329            effects,
330            events,
331            object_changes,
332            balance_changes,
333            timestamp_ms,
334            checkpoint,
335            confirmed_local_execution: None,
336            errors: vec![],
337            raw_effects,
338        })
339    }
340
341    fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
342        let sender_signed_data: SenderSignedData =
343            bcs::from_bytes(&self.raw_transaction).map_err(|e| {
344                IndexerError::PersistentStorageDataCorruption(format!(
345                    "Can't convert raw_transaction of {} into SenderSignedData. Error: {e}",
346                    self.tx_sequence_number
347                ))
348            })?;
349        Ok(sender_signed_data)
350    }
351
352    pub fn try_into_iota_transaction_effects(&self) -> IndexerResult<IotaTransactionBlockEffects> {
353        let effects: TransactionEffects = bcs::from_bytes(&self.raw_effects).map_err(|e| {
354            IndexerError::PersistentStorageDataCorruption(format!(
355                "Can't convert raw_effects of {} into TransactionEffects. Error: {e}",
356                self.tx_sequence_number
357            ))
358        })?;
359        let effects = IotaTransactionBlockEffects::try_from(effects)?;
360        Ok(effects)
361    }
362
363    /// Check if this is the genesis transaction relying on the global ordering.
364    pub fn is_genesis(&self) -> bool {
365        self.tx_sequence_number == 0
366    }
367}
368
369pub fn stored_events_to_events(
370    stored_events: StoredTransactionEvents,
371) -> Result<Vec<Event>, IndexerError> {
372    stored_events
373        .into_iter()
374        .map(|event| match event {
375            Some(event) => {
376                let event: Event = bcs::from_bytes(&event).map_err(|e| {
377                    IndexerError::PersistentStorageDataCorruption(format!(
378                        "Can't convert event bytes into Event. Error: {e}",
379                    ))
380                })?;
381                Ok(event)
382            }
383            None => Err(IndexerError::PersistentStorageDataCorruption(
384                "Event should not be null".to_string(),
385            )),
386        })
387        .collect::<Result<Vec<Event>, IndexerError>>()
388}
389
390pub async fn tx_events_to_iota_tx_events(
391    tx_events: TransactionEvents,
392    package_resolver: Arc<Resolver<impl PackageStore>>,
393    tx_digest: TransactionDigest,
394    timestamp: u64,
395) -> Result<Option<IotaTransactionBlockEvents>, IndexerError> {
396    let mut iota_event_futures = vec![];
397    let tx_events_data_len = tx_events.data.len();
398    for tx_event in tx_events.data.clone() {
399        let package_resolver_clone = package_resolver.clone();
400        iota_event_futures.push(tokio::task::spawn(async move {
401            let resolver = package_resolver_clone;
402            resolver
403                .type_layout(TypeTag::Struct(Box::new(tx_event.type_.clone())))
404                .await
405        }));
406    }
407    let event_move_type_layouts = futures::future::join_all(iota_event_futures)
408        .await
409        .into_iter()
410        .collect::<Result<Vec<_>, _>>()?
411        .into_iter()
412        .collect::<Result<Vec<_>, _>>()
413        .map_err(|e| {
414            IndexerError::ResolveMoveStruct(format!(
415                "Failed to convert to iota event with Error: {e}",
416            ))
417        })?;
418    let event_move_datatype_layouts = event_move_type_layouts
419        .into_iter()
420        .filter_map(|move_type_layout| match move_type_layout {
421            MoveTypeLayout::Struct(s) => Some(MoveDatatypeLayout::Struct(s)),
422            MoveTypeLayout::Enum(e) => Some(MoveDatatypeLayout::Enum(e)),
423            _ => None,
424        })
425        .collect::<Vec<_>>();
426    assert!(tx_events_data_len == event_move_datatype_layouts.len());
427    let iota_events = tx_events
428        .data
429        .into_iter()
430        .enumerate()
431        .zip(event_move_datatype_layouts)
432        .map(|((seq, tx_event), move_datatype_layout)| {
433            IotaEvent::try_from(
434                tx_event,
435                tx_digest,
436                seq as u64,
437                Some(timestamp),
438                move_datatype_layout,
439            )
440        })
441        .collect::<Result<Vec<_>, _>>()?;
442    let iota_tx_events = IotaTransactionBlockEvents { data: iota_events };
443    Ok(Some(iota_tx_events))
444}