iota_indexer/models/
transactions.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use diesel::{
8    backend::Backend,
9    deserialize::{FromSql, Result as DeserializeResult},
10    expression::AsExpression,
11    prelude::*,
12    serialize::{Output, Result as SerializeResult, ToSql},
13    sql_types::BigInt,
14};
15use iota_json_rpc_types::{
16    BalanceChange, IotaEvent, IotaTransactionBlock, IotaTransactionBlockEffects,
17    IotaTransactionBlockEvents, IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
18    ObjectChange,
19};
20use iota_package_resolver::{PackageStore, Resolver};
21use iota_types::{
22    digests::TransactionDigest,
23    effects::{TransactionEffects, TransactionEvents},
24    event::Event,
25    transaction::SenderSignedData,
26};
27use move_core_types::{
28    annotated_value::{MoveDatatypeLayout, MoveTypeLayout},
29    language_storage::TypeTag,
30};
31#[cfg(feature = "shared_test_runtime")]
32use serde::Deserialize;
33
34use crate::{
35    errors::IndexerError,
36    schema::{optimistic_transactions, transactions, tx_global_order},
37    types::{IndexedObjectChange, IndexedTransaction, IndexerResult},
38};
39
40#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
41#[diesel(table_name = tx_global_order)]
42pub struct TxGlobalOrder {
43    /// Sequence number of transaction according to checkpoint ordering.
44    /// Set after transaction is checkpoint-indexed.
45    pub chk_tx_sequence_number: Option<i64>,
46    /// Number that represents the global ordering between optimistic and
47    /// checkpointed transactions.
48    ///
49    /// Optimistic transactions will share the same number as checkpointed
50    /// transactions. In this case, ties are resolved by the
51    /// `(global_sequence_number, optimistic_sequence_number)` pair that
52    /// guarantees deterministic ordering.
53    pub global_sequence_number: i64,
54    pub tx_digest: Vec<u8>,
55    /// Monotonically increasing number that represents the order
56    /// of execution of optimistic transactions.
57    ///
58    /// To maintain these semantics the value should be non-positive for
59    /// checkpointed transactions. More specifically we allow values
60    /// in the set `[0, -1]` to represent the index status of checkpointed
61    /// transactions.
62    ///
63    /// Optimistic transactions should set this value to `None`,
64    /// so that it is auto-generated on the database.
65    #[diesel(deserialize_as = i64)]
66    pub optimistic_sequence_number: Option<i64>,
67}
68
69impl From<&IndexedTransaction> for CheckpointTxGlobalOrder {
70    fn from(tx: &IndexedTransaction) -> Self {
71        Self {
72            chk_tx_sequence_number: Some(tx.tx_sequence_number as i64),
73            global_sequence_number: tx.tx_sequence_number as i64,
74            tx_digest: tx.tx_digest.into_inner().to_vec(),
75            index_status: Some(IndexStatus::Started),
76        }
77    }
78}
79
80/// Stored value for checkpointed transactions.
81///
82/// Differs from [`TxGlobalOrder`] in that it allows values
83/// for `optimistic_sequence_number` in the set `[0, -1]`
84/// that represent the index status.
85#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
86#[diesel(table_name = tx_global_order)]
87pub struct CheckpointTxGlobalOrder {
88    pub(crate) chk_tx_sequence_number: Option<i64>,
89    pub(crate) global_sequence_number: i64,
90    pub(crate) tx_digest: Vec<u8>,
91    /// The index status of checkpointed transactions.
92    ///
93    /// This essentially reserves the values `[0, -1]` of the
94    /// `optimistic_sequence_number` stored in the table for checkpointed
95    /// transactions, to distinguish from optimistic transactions, and
96    /// assign semantics related to the index status.
97    #[diesel(deserialize_as = IndexStatus, column_name = "optimistic_sequence_number")]
98    pub(crate) index_status: Option<IndexStatus>,
99}
100
101/// Index status.
102#[derive(Clone, Debug, Copy, AsExpression, PartialEq, Eq)]
103#[diesel(sql_type = BigInt)]
104pub enum IndexStatus {
105    Started,
106    Completed,
107}
108
109impl<DB> ToSql<BigInt, DB> for IndexStatus
110where
111    DB: Backend,
112    i64: ToSql<BigInt, DB>,
113{
114    fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, DB>) -> SerializeResult {
115        match *self {
116            IndexStatus::Started => 0.to_sql(out),
117            IndexStatus::Completed => (-1).to_sql(out),
118        }
119    }
120}
121
122impl<DB> FromSql<BigInt, DB> for IndexStatus
123where
124    DB: Backend,
125    i64: FromSql<BigInt, DB>,
126{
127    fn from_sql(bytes: DB::RawValue<'_>) -> DeserializeResult<Self> {
128        match i64::from_sql(bytes)? {
129            0 => Ok(IndexStatus::Started),
130            -1 => Ok(IndexStatus::Completed),
131            other => Err(format!("invalid index status {other}").into()),
132        }
133    }
134}
135
136#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
137#[diesel(table_name = transactions)]
138#[cfg_attr(feature = "shared_test_runtime", derive(Deserialize))]
139pub struct StoredTransaction {
140    /// The index of the transaction in the global ordering that starts
141    /// from genesis.
142    pub tx_sequence_number: i64,
143    pub transaction_digest: Vec<u8>,
144    pub raw_transaction: Vec<u8>,
145    pub raw_effects: Vec<u8>,
146    pub checkpoint_sequence_number: i64,
147    pub timestamp_ms: i64,
148    pub object_changes: Vec<Option<Vec<u8>>>,
149    pub balance_changes: Vec<Option<Vec<u8>>>,
150    pub events: Vec<Option<Vec<u8>>>,
151    pub transaction_kind: i16,
152    pub success_command_count: i16,
153}
154
155#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
156#[diesel(table_name = optimistic_transactions)]
157pub struct OptimisticTransaction {
158    pub global_sequence_number: i64,
159    pub optimistic_sequence_number: i64,
160    pub transaction_digest: Vec<u8>,
161    pub raw_transaction: Vec<u8>,
162    pub raw_effects: Vec<u8>,
163    pub object_changes: Vec<Option<Vec<u8>>>,
164    pub balance_changes: Vec<Option<Vec<u8>>>,
165    pub events: Vec<Option<Vec<u8>>>,
166    pub transaction_kind: i16,
167    pub success_command_count: i16,
168}
169
170impl From<OptimisticTransaction> for StoredTransaction {
171    fn from(tx: OptimisticTransaction) -> Self {
172        StoredTransaction {
173            tx_sequence_number: tx.optimistic_sequence_number,
174            transaction_digest: tx.transaction_digest,
175            raw_transaction: tx.raw_transaction,
176            raw_effects: tx.raw_effects,
177            checkpoint_sequence_number: -1,
178            timestamp_ms: -1,
179            object_changes: tx.object_changes,
180            balance_changes: tx.balance_changes,
181            events: tx.events,
182            transaction_kind: tx.transaction_kind,
183            success_command_count: tx.success_command_count,
184        }
185    }
186}
187
188impl OptimisticTransaction {
189    pub fn from_stored(global_sequence_number: i64, stored: StoredTransaction) -> Self {
190        OptimisticTransaction {
191            global_sequence_number,
192            optimistic_sequence_number: stored.tx_sequence_number,
193            transaction_digest: stored.transaction_digest,
194            raw_transaction: stored.raw_transaction,
195            raw_effects: stored.raw_effects,
196            object_changes: stored.object_changes,
197            balance_changes: stored.balance_changes,
198            events: stored.events,
199            transaction_kind: stored.transaction_kind,
200            success_command_count: stored.success_command_count,
201        }
202    }
203
204    pub fn get_balance_len(&self) -> usize {
205        self.balance_changes.len()
206    }
207
208    pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
209        self.balance_changes.get(idx).cloned().flatten()
210    }
211
212    pub fn get_object_len(&self) -> usize {
213        self.object_changes.len()
214    }
215
216    pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
217        self.object_changes.get(idx).cloned().flatten()
218    }
219
220    pub fn get_event_len(&self) -> usize {
221        self.events.len()
222    }
223
224    pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
225        self.events.get(idx).cloned().flatten()
226    }
227}
228
229pub type StoredTransactionEvents = Vec<Option<Vec<u8>>>;
230
231#[derive(Debug, Queryable)]
232pub struct TxSeq {
233    pub seq: i64,
234}
235
236impl Default for TxSeq {
237    fn default() -> Self {
238        Self { seq: -1 }
239    }
240}
241
242#[derive(Clone, Debug, Queryable)]
243pub struct StoredTransactionTimestamp {
244    pub tx_sequence_number: i64,
245    pub timestamp_ms: i64,
246}
247
248#[derive(Clone, Debug, Queryable)]
249pub struct StoredTransactionCheckpoint {
250    pub tx_sequence_number: i64,
251    pub checkpoint_sequence_number: i64,
252}
253
254#[derive(Clone, Debug, Queryable)]
255pub struct StoredTransactionSuccessCommandCount {
256    pub tx_sequence_number: i64,
257    pub checkpoint_sequence_number: i64,
258    pub success_command_count: i16,
259    pub timestamp_ms: i64,
260}
261
262impl From<&IndexedTransaction> for StoredTransaction {
263    fn from(tx: &IndexedTransaction) -> Self {
264        StoredTransaction {
265            tx_sequence_number: tx.tx_sequence_number as i64,
266            transaction_digest: tx.tx_digest.into_inner().to_vec(),
267            raw_transaction: bcs::to_bytes(&tx.sender_signed_data).unwrap(),
268            raw_effects: bcs::to_bytes(&tx.effects).unwrap(),
269            checkpoint_sequence_number: tx.checkpoint_sequence_number as i64,
270            object_changes: tx
271                .object_changes
272                .iter()
273                .map(|oc| Some(bcs::to_bytes(&oc).unwrap()))
274                .collect(),
275            balance_changes: tx
276                .balance_change
277                .iter()
278                .map(|bc| Some(bcs::to_bytes(&bc).unwrap()))
279                .collect(),
280            events: tx
281                .events
282                .iter()
283                .map(|e| Some(bcs::to_bytes(&e).unwrap()))
284                .collect(),
285            timestamp_ms: tx.timestamp_ms as i64,
286            transaction_kind: tx.transaction_kind as i16,
287            success_command_count: tx.successful_tx_num as i16,
288        }
289    }
290}
291
292impl StoredTransaction {
293    pub fn get_balance_len(&self) -> usize {
294        self.balance_changes.len()
295    }
296
297    pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
298        self.balance_changes.get(idx).cloned().flatten()
299    }
300
301    pub fn get_object_len(&self) -> usize {
302        self.object_changes.len()
303    }
304
305    pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
306        self.object_changes.get(idx).cloned().flatten()
307    }
308
309    pub fn get_event_len(&self) -> usize {
310        self.events.len()
311    }
312
313    pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
314        self.events.get(idx).cloned().flatten()
315    }
316
317    /// True for checkpointed transactions, False for optimistically indexed
318    /// transactions
319    pub fn is_checkpointed_transaction(&self) -> bool {
320        self.checkpoint_sequence_number >= 0
321    }
322
323    pub async fn try_into_iota_transaction_block_response(
324        self,
325        options: IotaTransactionBlockResponseOptions,
326        package_resolver: &Arc<Resolver<impl PackageStore>>,
327    ) -> IndexerResult<IotaTransactionBlockResponse> {
328        let options = options.clone();
329        let tx_digest =
330            TransactionDigest::try_from(self.transaction_digest.as_slice()).map_err(|e| {
331                IndexerError::PersistentStorageDataCorruption(format!(
332                    "Can't convert {:?} as tx_digest. Error: {e}",
333                    self.transaction_digest
334                ))
335            })?;
336
337        let timestamp_ms = self
338            .is_checkpointed_transaction()
339            .then_some(self.timestamp_ms as u64);
340        let checkpoint = self
341            .is_checkpointed_transaction()
342            .then_some(self.checkpoint_sequence_number as u64);
343
344        let transaction = if options.show_input {
345            let sender_signed_data = self.try_into_sender_signed_data()?;
346            let tx_block = IotaTransactionBlock::try_from_with_package_resolver(
347                sender_signed_data,
348                package_resolver,
349                tx_digest,
350            )
351            .await?;
352            Some(tx_block)
353        } else {
354            None
355        };
356
357        let effects = if options.show_effects {
358            Some(
359                self.try_into_iota_transaction_effects(package_resolver)
360                    .await?,
361            )
362        } else {
363            None
364        };
365
366        let raw_transaction = if options.show_raw_input {
367            self.raw_transaction
368        } else {
369            Default::default()
370        };
371
372        let events = if options.show_events {
373            let events = {
374                self
375                        .events
376                        .into_iter()
377                        .map(|event| match event {
378                            Some(event) => {
379                                let event: Event = bcs::from_bytes(&event).map_err(|e| {
380                                    IndexerError::PersistentStorageDataCorruption(format!(
381                                        "Can't convert event bytes into Event. tx_digest={tx_digest:?} Error: {e}"
382                                    ))
383                                })?;
384                                Ok(event)
385                            }
386                            None => Err(IndexerError::PersistentStorageDataCorruption(format!(
387                                "Event should not be null, tx_digest={tx_digest:?}"
388                            ))),
389                        })
390                        .collect::<Result<Vec<Event>, IndexerError>>()?
391            };
392            let tx_events = TransactionEvents { data: events };
393
394            Some(
395                tx_events_to_iota_tx_events(tx_events, package_resolver, tx_digest, timestamp_ms)
396                    .await?,
397            )
398        } else {
399            None
400        };
401
402        let object_changes = if options.show_object_changes {
403            let object_changes = {
404                self.object_changes.into_iter().map(|object_change| {
405                        match object_change {
406                            Some(object_change) => {
407                                let object_change: IndexedObjectChange = bcs::from_bytes(&object_change)
408                                    .map_err(|e| IndexerError::PersistentStorageDataCorruption(
409                                        format!("Can't convert object_change bytes into IndexedObjectChange. tx_digest={tx_digest:?} Error: {e}")
410                                    ))?;
411                                Ok(ObjectChange::from(object_change))
412                            }
413                            None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={tx_digest:?}"))),
414                        }
415                    }).collect::<Result<Vec<ObjectChange>, IndexerError>>()?
416            };
417            Some(object_changes)
418        } else {
419            None
420        };
421
422        let balance_changes = if options.show_balance_changes {
423            let balance_changes = {
424                self.balance_changes.into_iter().map(|balance_change| {
425                        match balance_change {
426                            Some(balance_change) => {
427                                let balance_change: BalanceChange = bcs::from_bytes(&balance_change)
428                                    .map_err(|e| IndexerError::PersistentStorageDataCorruption(
429                                        format!("Can't convert balance_change bytes into BalanceChange. tx_digest={tx_digest:?} Error: {e}")
430                                    ))?;
431                                Ok(balance_change)
432                            }
433                            None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={tx_digest:?}"))),
434                        }
435                    }).collect::<Result<Vec<BalanceChange>, IndexerError>>()?
436            };
437            Some(balance_changes)
438        } else {
439            None
440        };
441
442        let raw_effects = if options.show_raw_effects {
443            self.raw_effects
444        } else {
445            Default::default()
446        };
447
448        Ok(IotaTransactionBlockResponse {
449            digest: tx_digest,
450            transaction,
451            raw_transaction,
452            effects,
453            events,
454            object_changes,
455            balance_changes,
456            timestamp_ms,
457            checkpoint,
458            confirmed_local_execution: None,
459            errors: vec![],
460            raw_effects,
461        })
462    }
463
464    pub fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
465        let sender_signed_data: SenderSignedData =
466            bcs::from_bytes(&self.raw_transaction).map_err(|e| {
467                IndexerError::PersistentStorageDataCorruption(format!(
468                    "Can't convert raw_transaction of {} into SenderSignedData. Error: {e}",
469                    self.tx_sequence_number
470                ))
471            })?;
472        Ok(sender_signed_data)
473    }
474
475    pub async fn try_into_iota_transaction_effects(
476        &self,
477        package_resolver: &Arc<Resolver<impl PackageStore>>,
478    ) -> IndexerResult<IotaTransactionBlockEffects> {
479        let effects: TransactionEffects = bcs::from_bytes(&self.raw_effects).map_err(|e| {
480            IndexerError::PersistentStorageDataCorruption(format!(
481                "Can't convert raw_effects of {} into TransactionEffects. Error: {e}",
482                self.tx_sequence_number
483            ))
484        })?;
485        let effects =
486            IotaTransactionBlockEffects::from_native_with_clever_error(effects, package_resolver)
487                .await;
488        Ok(effects)
489    }
490
491    /// Check if this is the genesis transaction relying on the global ordering.
492    pub fn is_genesis(&self) -> bool {
493        self.tx_sequence_number == 0
494    }
495}
496
497pub fn stored_events_to_events(
498    stored_events: StoredTransactionEvents,
499) -> Result<Vec<Event>, IndexerError> {
500    stored_events
501        .into_iter()
502        .map(|event| match event {
503            Some(event) => {
504                let event: Event = bcs::from_bytes(&event).map_err(|e| {
505                    IndexerError::PersistentStorageDataCorruption(format!(
506                        "Can't convert event bytes into Event. Error: {e}",
507                    ))
508                })?;
509                Ok(event)
510            }
511            None => Err(IndexerError::PersistentStorageDataCorruption(
512                "Event should not be null".to_string(),
513            )),
514        })
515        .collect::<Result<Vec<Event>, IndexerError>>()
516}
517
518pub async fn tx_events_to_iota_tx_events(
519    tx_events: TransactionEvents,
520    package_resolver: &Arc<Resolver<impl PackageStore>>,
521    tx_digest: TransactionDigest,
522    timestamp: Option<u64>,
523) -> Result<IotaTransactionBlockEvents, IndexerError> {
524    let mut iota_event_futures = vec![];
525    let tx_events_data_len = tx_events.data.len();
526    for tx_event in tx_events.data.clone() {
527        let package_resolver_clone = package_resolver.clone();
528        iota_event_futures.push(tokio::task::spawn(async move {
529            let resolver = package_resolver_clone;
530            resolver
531                .type_layout(TypeTag::Struct(Box::new(tx_event.type_.clone())))
532                .await
533        }));
534    }
535    let event_move_type_layouts = futures::future::join_all(iota_event_futures)
536        .await
537        .into_iter()
538        .collect::<Result<Vec<_>, _>>()?
539        .into_iter()
540        .collect::<Result<Vec<_>, _>>()
541        .map_err(|e| {
542            IndexerError::ResolveMoveStruct(format!(
543                "Failed to convert to iota event with Error: {e}",
544            ))
545        })?;
546    let event_move_datatype_layouts = event_move_type_layouts
547        .into_iter()
548        .filter_map(|move_type_layout| match move_type_layout {
549            MoveTypeLayout::Struct(s) => Some(MoveDatatypeLayout::Struct(s)),
550            MoveTypeLayout::Enum(e) => Some(MoveDatatypeLayout::Enum(e)),
551            _ => None,
552        })
553        .collect::<Vec<_>>();
554    assert!(tx_events_data_len == event_move_datatype_layouts.len());
555    let iota_events = tx_events
556        .data
557        .into_iter()
558        .enumerate()
559        .zip(event_move_datatype_layouts)
560        .map(|((seq, tx_event), move_datatype_layout)| {
561            IotaEvent::try_from(
562                tx_event,
563                tx_digest,
564                seq as u64,
565                timestamp,
566                move_datatype_layout,
567            )
568        })
569        .collect::<Result<Vec<_>, _>>()?;
570    let iota_tx_events = IotaTransactionBlockEvents { data: iota_events };
571    Ok(iota_tx_events)
572}