iota_indexer/models/
transactions.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use diesel::prelude::*;
8use iota_json_rpc_types::{
9    BalanceChange, IotaEvent, IotaTransactionBlock, IotaTransactionBlockEffects,
10    IotaTransactionBlockEvents, IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
11    ObjectChange,
12};
13use iota_package_resolver::{PackageStore, Resolver};
14use iota_types::{
15    digests::TransactionDigest,
16    effects::{TransactionEffects, TransactionEvents},
17    event::Event,
18    transaction::SenderSignedData,
19};
20use move_core_types::{
21    annotated_value::{MoveDatatypeLayout, MoveTypeLayout},
22    language_storage::TypeTag,
23};
24#[cfg(feature = "shared_test_runtime")]
25use serde::Deserialize;
26
27use crate::{
28    errors::IndexerError,
29    schema::{optimistic_transactions, transactions, tx_insertion_order},
30    types::{IndexedObjectChange, IndexedTransaction, IndexerResult},
31};
32
33#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
34#[diesel(table_name = tx_insertion_order)]
35pub struct TxInsertionOrder {
36    /// Insertion order number that each transaction (either optimistic or
37    /// checkpointed) is assigned when being indexed. It provides common
38    /// ordering for optimistic and checkpointed transactions, whereas
39    /// `tx_sequence_number` provides ordering only for checkpointed
40    /// transactions. We skip it on insertion since it's autogenerated by the
41    /// database.
42    #[diesel(skip_insertion)]
43    pub insertion_order: i64,
44    pub tx_digest: Vec<u8>,
45}
46
47#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
48#[diesel(table_name = transactions)]
49#[cfg_attr(feature = "shared_test_runtime", derive(Deserialize))]
50pub struct StoredTransaction {
51    /// The index of the transaction in the global ordering that starts
52    /// from genesis.
53    pub tx_sequence_number: i64,
54    pub transaction_digest: Vec<u8>,
55    pub raw_transaction: Vec<u8>,
56    pub raw_effects: Vec<u8>,
57    pub checkpoint_sequence_number: i64,
58    pub timestamp_ms: i64,
59    pub object_changes: Vec<Option<Vec<u8>>>,
60    pub balance_changes: Vec<Option<Vec<u8>>>,
61    pub events: Vec<Option<Vec<u8>>>,
62    pub transaction_kind: i16,
63    pub success_command_count: i16,
64}
65
66#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
67#[diesel(table_name = optimistic_transactions)]
68pub struct OptimisticTransaction {
69    pub insertion_order: i64,
70    pub transaction_digest: Vec<u8>,
71    pub raw_transaction: Vec<u8>,
72    pub raw_effects: Vec<u8>,
73    pub object_changes: Vec<Option<Vec<u8>>>,
74    pub balance_changes: Vec<Option<Vec<u8>>>,
75    pub events: Vec<Option<Vec<u8>>>,
76    pub transaction_kind: i16,
77    pub success_command_count: i16,
78}
79
80impl From<OptimisticTransaction> for StoredTransaction {
81    fn from(tx: OptimisticTransaction) -> Self {
82        StoredTransaction {
83            tx_sequence_number: tx.insertion_order,
84            transaction_digest: tx.transaction_digest,
85            raw_transaction: tx.raw_transaction,
86            raw_effects: tx.raw_effects,
87            checkpoint_sequence_number: -1,
88            timestamp_ms: -1,
89            object_changes: tx.object_changes,
90            balance_changes: tx.balance_changes,
91            events: tx.events,
92            transaction_kind: tx.transaction_kind,
93            success_command_count: tx.success_command_count,
94        }
95    }
96}
97
98impl From<StoredTransaction> for OptimisticTransaction {
99    fn from(tx: StoredTransaction) -> Self {
100        OptimisticTransaction {
101            insertion_order: tx.tx_sequence_number,
102            transaction_digest: tx.transaction_digest,
103            raw_transaction: tx.raw_transaction,
104            raw_effects: tx.raw_effects,
105            object_changes: tx.object_changes,
106            balance_changes: tx.balance_changes,
107            events: tx.events,
108            transaction_kind: tx.transaction_kind,
109            success_command_count: tx.success_command_count,
110        }
111    }
112}
113
114pub type StoredTransactionEvents = Vec<Option<Vec<u8>>>;
115
116#[derive(Debug, Queryable)]
117pub struct TxSeq {
118    pub seq: i64,
119}
120
121impl Default for TxSeq {
122    fn default() -> Self {
123        Self { seq: -1 }
124    }
125}
126
127#[derive(Clone, Debug, Queryable)]
128pub struct StoredTransactionTimestamp {
129    pub tx_sequence_number: i64,
130    pub timestamp_ms: i64,
131}
132
133#[derive(Clone, Debug, Queryable)]
134pub struct StoredTransactionCheckpoint {
135    pub tx_sequence_number: i64,
136    pub checkpoint_sequence_number: i64,
137}
138
139#[derive(Clone, Debug, Queryable)]
140pub struct StoredTransactionSuccessCommandCount {
141    pub tx_sequence_number: i64,
142    pub checkpoint_sequence_number: i64,
143    pub success_command_count: i16,
144    pub timestamp_ms: i64,
145}
146
147impl From<&IndexedTransaction> for StoredTransaction {
148    fn from(tx: &IndexedTransaction) -> Self {
149        StoredTransaction {
150            tx_sequence_number: tx.tx_sequence_number as i64,
151            transaction_digest: tx.tx_digest.into_inner().to_vec(),
152            raw_transaction: bcs::to_bytes(&tx.sender_signed_data).unwrap(),
153            raw_effects: bcs::to_bytes(&tx.effects).unwrap(),
154            checkpoint_sequence_number: tx.checkpoint_sequence_number as i64,
155            object_changes: tx
156                .object_changes
157                .iter()
158                .map(|oc| Some(bcs::to_bytes(&oc).unwrap()))
159                .collect(),
160            balance_changes: tx
161                .balance_change
162                .iter()
163                .map(|bc| Some(bcs::to_bytes(&bc).unwrap()))
164                .collect(),
165            events: tx
166                .events
167                .iter()
168                .map(|e| Some(bcs::to_bytes(&e).unwrap()))
169                .collect(),
170            timestamp_ms: tx.timestamp_ms as i64,
171            transaction_kind: tx.transaction_kind as i16,
172            success_command_count: tx.successful_tx_num as i16,
173        }
174    }
175}
176
177impl StoredTransaction {
178    pub fn get_balance_len(&self) -> usize {
179        self.balance_changes.len()
180    }
181
182    pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
183        self.balance_changes.get(idx).cloned().flatten()
184    }
185
186    pub fn get_object_len(&self) -> usize {
187        self.object_changes.len()
188    }
189
190    pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
191        self.object_changes.get(idx).cloned().flatten()
192    }
193
194    pub fn get_event_len(&self) -> usize {
195        self.events.len()
196    }
197
198    pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
199        self.events.get(idx).cloned().flatten()
200    }
201
202    /// True for checkpointed transactions, False for optimistically indexed
203    /// transactions
204    pub fn is_checkpointed_transaction(&self) -> bool {
205        self.checkpoint_sequence_number >= 0
206    }
207
208    pub async fn try_into_iota_transaction_block_response(
209        self,
210        options: IotaTransactionBlockResponseOptions,
211        package_resolver: Arc<Resolver<impl PackageStore>>,
212    ) -> IndexerResult<IotaTransactionBlockResponse> {
213        let options = options.clone();
214        let tx_digest =
215            TransactionDigest::try_from(self.transaction_digest.as_slice()).map_err(|e| {
216                IndexerError::PersistentStorageDataCorruption(format!(
217                    "Can't convert {:?} as tx_digest. Error: {e}",
218                    self.transaction_digest
219                ))
220            })?;
221
222        let timestamp_ms = self
223            .is_checkpointed_transaction()
224            .then_some(self.timestamp_ms as u64);
225        let checkpoint = self
226            .is_checkpointed_transaction()
227            .then_some(self.checkpoint_sequence_number as u64);
228
229        let transaction = if options.show_input {
230            let sender_signed_data = self.try_into_sender_signed_data()?;
231            let tx_block = IotaTransactionBlock::try_from_with_package_resolver(
232                sender_signed_data,
233                package_resolver.clone(),
234                tx_digest,
235            )
236            .await?;
237            Some(tx_block)
238        } else {
239            None
240        };
241
242        let effects = options
243            .show_effects
244            .then(|| self.try_into_iota_transaction_effects())
245            .transpose()?;
246
247        let raw_transaction = options
248            .show_raw_input
249            .then_some(self.raw_transaction)
250            .unwrap_or_default();
251
252        let events = if options.show_events {
253            let events = {
254                self
255                        .events
256                        .into_iter()
257                        .map(|event| match event {
258                            Some(event) => {
259                                let event: Event = bcs::from_bytes(&event).map_err(|e| {
260                                    IndexerError::PersistentStorageDataCorruption(format!(
261                                        "Can't convert event bytes into Event. tx_digest={:?} Error: {e}",
262                                        tx_digest
263                                    ))
264                                })?;
265                                Ok(event)
266                            }
267                            None => Err(IndexerError::PersistentStorageDataCorruption(format!(
268                                "Event should not be null, tx_digest={:?}",
269                                tx_digest
270                            ))),
271                        })
272                        .collect::<Result<Vec<Event>, IndexerError>>()?
273            };
274            let timestamp = self.timestamp_ms as u64;
275            let tx_events = TransactionEvents { data: events };
276
277            tx_events_to_iota_tx_events(tx_events, package_resolver, tx_digest, timestamp).await?
278        } else {
279            None
280        };
281
282        let object_changes = if options.show_object_changes {
283            let object_changes = {
284                self.object_changes.into_iter().map(|object_change| {
285                        match object_change {
286                            Some(object_change) => {
287                                let object_change: IndexedObjectChange = bcs::from_bytes(&object_change)
288                                    .map_err(|e| IndexerError::PersistentStorageDataCorruption(
289                                        format!("Can't convert object_change bytes into IndexedObjectChange. tx_digest={:?} Error: {e}", tx_digest)
290                                    ))?;
291                                Ok(ObjectChange::from(object_change))
292                            }
293                            None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={:?}", tx_digest))),
294                        }
295                    }).collect::<Result<Vec<ObjectChange>, IndexerError>>()?
296            };
297            Some(object_changes)
298        } else {
299            None
300        };
301
302        let balance_changes = if options.show_balance_changes {
303            let balance_changes = {
304                self.balance_changes.into_iter().map(|balance_change| {
305                        match balance_change {
306                            Some(balance_change) => {
307                                let balance_change: BalanceChange = bcs::from_bytes(&balance_change)
308                                    .map_err(|e| IndexerError::PersistentStorageDataCorruption(
309                                        format!("Can't convert balance_change bytes into BalanceChange. tx_digest={:?} Error: {e}", tx_digest)
310                                    ))?;
311                                Ok(balance_change)
312                            }
313                            None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={:?}", tx_digest))),
314                        }
315                    }).collect::<Result<Vec<BalanceChange>, IndexerError>>()?
316            };
317            Some(balance_changes)
318        } else {
319            None
320        };
321
322        let raw_effects = options
323            .show_raw_effects
324            .then_some(self.raw_effects)
325            .unwrap_or_default();
326
327        Ok(IotaTransactionBlockResponse {
328            digest: tx_digest,
329            transaction,
330            raw_transaction,
331            effects,
332            events,
333            object_changes,
334            balance_changes,
335            timestamp_ms,
336            checkpoint,
337            confirmed_local_execution: None,
338            errors: vec![],
339            raw_effects,
340        })
341    }
342
343    fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
344        let sender_signed_data: SenderSignedData =
345            bcs::from_bytes(&self.raw_transaction).map_err(|e| {
346                IndexerError::PersistentStorageDataCorruption(format!(
347                    "Can't convert raw_transaction of {} into SenderSignedData. Error: {e}",
348                    self.tx_sequence_number
349                ))
350            })?;
351        Ok(sender_signed_data)
352    }
353
354    pub fn try_into_iota_transaction_effects(&self) -> IndexerResult<IotaTransactionBlockEffects> {
355        let effects: TransactionEffects = bcs::from_bytes(&self.raw_effects).map_err(|e| {
356            IndexerError::PersistentStorageDataCorruption(format!(
357                "Can't convert raw_effects of {} into TransactionEffects. Error: {e}",
358                self.tx_sequence_number
359            ))
360        })?;
361        let effects = IotaTransactionBlockEffects::try_from(effects)?;
362        Ok(effects)
363    }
364
365    /// Check if this is the genesis transaction relying on the global ordering.
366    pub fn is_genesis(&self) -> bool {
367        self.tx_sequence_number == 0
368    }
369}
370
371pub fn stored_events_to_events(
372    stored_events: StoredTransactionEvents,
373) -> Result<Vec<Event>, IndexerError> {
374    stored_events
375        .into_iter()
376        .map(|event| match event {
377            Some(event) => {
378                let event: Event = bcs::from_bytes(&event).map_err(|e| {
379                    IndexerError::PersistentStorageDataCorruption(format!(
380                        "Can't convert event bytes into Event. Error: {e}",
381                    ))
382                })?;
383                Ok(event)
384            }
385            None => Err(IndexerError::PersistentStorageDataCorruption(
386                "Event should not be null".to_string(),
387            )),
388        })
389        .collect::<Result<Vec<Event>, IndexerError>>()
390}
391
392pub async fn tx_events_to_iota_tx_events(
393    tx_events: TransactionEvents,
394    package_resolver: Arc<Resolver<impl PackageStore>>,
395    tx_digest: TransactionDigest,
396    timestamp: u64,
397) -> Result<Option<IotaTransactionBlockEvents>, IndexerError> {
398    let mut iota_event_futures = vec![];
399    let tx_events_data_len = tx_events.data.len();
400    for tx_event in tx_events.data.clone() {
401        let package_resolver_clone = package_resolver.clone();
402        iota_event_futures.push(tokio::task::spawn(async move {
403            let resolver = package_resolver_clone;
404            resolver
405                .type_layout(TypeTag::Struct(Box::new(tx_event.type_.clone())))
406                .await
407        }));
408    }
409    let event_move_type_layouts = futures::future::join_all(iota_event_futures)
410        .await
411        .into_iter()
412        .collect::<Result<Vec<_>, _>>()?
413        .into_iter()
414        .collect::<Result<Vec<_>, _>>()
415        .map_err(|e| {
416            IndexerError::ResolveMoveStruct(format!(
417                "Failed to convert to iota event with Error: {e}",
418            ))
419        })?;
420    let event_move_datatype_layouts = event_move_type_layouts
421        .into_iter()
422        .filter_map(|move_type_layout| match move_type_layout {
423            MoveTypeLayout::Struct(s) => Some(MoveDatatypeLayout::Struct(s)),
424            MoveTypeLayout::Enum(e) => Some(MoveDatatypeLayout::Enum(e)),
425            _ => None,
426        })
427        .collect::<Vec<_>>();
428    assert!(tx_events_data_len == event_move_datatype_layouts.len());
429    let iota_events = tx_events
430        .data
431        .into_iter()
432        .enumerate()
433        .zip(event_move_datatype_layouts)
434        .map(|((seq, tx_event), move_datatype_layout)| {
435            IotaEvent::try_from(
436                tx_event,
437                tx_digest,
438                seq as u64,
439                Some(timestamp),
440                move_datatype_layout,
441            )
442        })
443        .collect::<Result<Vec<_>, _>>()?;
444    let iota_tx_events = IotaTransactionBlockEvents { data: iota_events };
445    Ok(Some(iota_tx_events))
446}