1use std::sync::Arc;
6
7use diesel::prelude::*;
8use iota_json_rpc_types::{
9 BalanceChange, IotaEvent, IotaExecutionStatus, IotaTransactionBlock,
10 IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI, IotaTransactionBlockEvents,
11 IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions, ObjectChange,
12};
13use iota_package_resolver::{PackageStore, Resolver};
14use iota_types::{
15 base_types::TypeTag,
16 digests::TransactionDigest,
17 effects::{TransactionEffects, TransactionEvents},
18 event::Event,
19 transaction::SenderSignedData,
20};
21use move_core_types::annotated_value::{MoveDatatypeLayout, MoveTypeLayout};
22#[cfg(feature = "shared_test_runtime")]
23use serde::Deserialize;
24
25use crate::{
26 errors::IndexerError,
27 schema::{optimistic_transactions, transactions, tx_global_order},
28 types::{IndexedObjectChange, IndexedTransaction, IndexerResult},
29};
30
31#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
32#[diesel(table_name = tx_global_order)]
33pub struct TxGlobalOrder {
34 pub chk_tx_sequence_number: Option<i64>,
37 pub global_sequence_number: i64,
45 pub tx_digest: Vec<u8>,
46 #[diesel(deserialize_as = i64)]
53 pub optimistic_sequence_number: Option<i64>,
54}
55
56pub const CHECKPOINT_TX_OPTIMISTIC_SEQ: i64 = -1;
60
61impl From<&IndexedTransaction> for TxGlobalOrder {
62 fn from(tx: &IndexedTransaction) -> Self {
63 Self {
64 chk_tx_sequence_number: Some(tx.tx_sequence_number as i64),
65 global_sequence_number: tx.tx_sequence_number as i64,
66 tx_digest: tx.tx_digest.into_inner().to_vec(),
67 optimistic_sequence_number: Some(CHECKPOINT_TX_OPTIMISTIC_SEQ),
68 }
69 }
70}
71
72#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
73#[diesel(table_name = transactions)]
74#[cfg_attr(feature = "shared_test_runtime", derive(Deserialize))]
75pub struct StoredTransaction {
76 pub tx_sequence_number: i64,
79 pub transaction_digest: Vec<u8>,
80 pub raw_transaction: Vec<u8>,
81 pub raw_effects: Vec<u8>,
82 pub checkpoint_sequence_number: i64,
83 pub timestamp_ms: i64,
84 pub object_changes: Vec<Option<Vec<u8>>>,
85 pub balance_changes: Vec<Option<Vec<u8>>>,
86 pub events: Vec<Option<Vec<u8>>>,
87 pub transaction_kind: i16,
88 pub success_command_count: i16,
89}
90
91#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
92#[diesel(table_name = optimistic_transactions)]
93pub struct OptimisticTransaction {
94 pub global_sequence_number: i64,
95 pub optimistic_sequence_number: i64,
96 pub transaction_digest: Vec<u8>,
97 pub raw_transaction: Vec<u8>,
98 pub raw_effects: Vec<u8>,
99 pub object_changes: Vec<Option<Vec<u8>>>,
100 pub balance_changes: Vec<Option<Vec<u8>>>,
101 pub events: Vec<Option<Vec<u8>>>,
102 pub transaction_kind: i16,
103 pub success_command_count: i16,
104}
105
106impl From<OptimisticTransaction> for StoredTransaction {
107 fn from(tx: OptimisticTransaction) -> Self {
108 StoredTransaction {
109 tx_sequence_number: tx.optimistic_sequence_number,
110 transaction_digest: tx.transaction_digest,
111 raw_transaction: tx.raw_transaction,
112 raw_effects: tx.raw_effects,
113 checkpoint_sequence_number: -1,
114 timestamp_ms: -1,
115 object_changes: tx.object_changes,
116 balance_changes: tx.balance_changes,
117 events: tx.events,
118 transaction_kind: tx.transaction_kind,
119 success_command_count: tx.success_command_count,
120 }
121 }
122}
123
124impl OptimisticTransaction {
125 pub fn from_stored(global_sequence_number: i64, stored: StoredTransaction) -> Self {
126 OptimisticTransaction {
127 global_sequence_number,
128 optimistic_sequence_number: stored.tx_sequence_number,
129 transaction_digest: stored.transaction_digest,
130 raw_transaction: stored.raw_transaction,
131 raw_effects: stored.raw_effects,
132 object_changes: stored.object_changes,
133 balance_changes: stored.balance_changes,
134 events: stored.events,
135 transaction_kind: stored.transaction_kind,
136 success_command_count: stored.success_command_count,
137 }
138 }
139
140 pub fn get_balance_len(&self) -> usize {
141 self.balance_changes.len()
142 }
143
144 pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
145 self.balance_changes.get(idx).cloned().flatten()
146 }
147
148 pub fn get_object_len(&self) -> usize {
149 self.object_changes.len()
150 }
151
152 pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
153 self.object_changes.get(idx).cloned().flatten()
154 }
155
156 pub fn get_event_len(&self) -> usize {
157 self.events.len()
158 }
159
160 pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
161 self.events.get(idx).cloned().flatten()
162 }
163}
164
165pub type StoredTransactionEvents = Vec<Option<Vec<u8>>>;
166
167#[derive(Debug, Queryable)]
168pub struct TxSeq {
169 pub seq: i64,
170}
171
172impl Default for TxSeq {
173 fn default() -> Self {
174 Self { seq: -1 }
175 }
176}
177
178#[derive(Clone, Debug, Queryable)]
179pub struct StoredTransactionTimestamp {
180 pub tx_sequence_number: i64,
181 pub timestamp_ms: i64,
182}
183
184#[derive(Clone, Debug, Queryable)]
185pub struct StoredTransactionCheckpoint {
186 pub tx_sequence_number: i64,
187 pub checkpoint_sequence_number: i64,
188}
189
190#[derive(Clone, Debug, Queryable)]
191pub struct StoredTransactionSuccessCommandCount {
192 pub tx_sequence_number: i64,
193 pub checkpoint_sequence_number: i64,
194 pub success_command_count: i16,
195 pub timestamp_ms: i64,
196}
197
198impl From<&IndexedTransaction> for StoredTransaction {
199 fn from(tx: &IndexedTransaction) -> Self {
200 StoredTransaction {
201 tx_sequence_number: tx.tx_sequence_number as i64,
202 transaction_digest: tx.tx_digest.into_inner().to_vec(),
203 raw_transaction: bcs::to_bytes(&tx.sender_signed_data).unwrap(),
204 raw_effects: bcs::to_bytes(&tx.effects).unwrap(),
205 checkpoint_sequence_number: tx.checkpoint_sequence_number as i64,
206 object_changes: tx
207 .object_changes
208 .iter()
209 .map(|oc| Some(bcs::to_bytes(&oc).unwrap()))
210 .collect(),
211 balance_changes: tx
212 .balance_change
213 .iter()
214 .map(|bc| Some(bcs::to_bytes(&bc).unwrap()))
215 .collect(),
216 events: tx
217 .events
218 .iter()
219 .map(|e| Some(bcs::to_bytes(&e).unwrap()))
220 .collect(),
221 timestamp_ms: tx.timestamp_ms as i64,
222 transaction_kind: tx.transaction_kind as i16,
223 success_command_count: tx.successful_tx_num as i16,
224 }
225 }
226}
227
228impl StoredTransaction {
229 pub fn get_balance_len(&self) -> usize {
230 self.balance_changes.len()
231 }
232
233 pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
234 self.balance_changes.get(idx).cloned().flatten()
235 }
236
237 pub fn get_object_len(&self) -> usize {
238 self.object_changes.len()
239 }
240
241 pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
242 self.object_changes.get(idx).cloned().flatten()
243 }
244
245 pub fn get_event_len(&self) -> usize {
246 self.events.len()
247 }
248
249 pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
250 self.events.get(idx).cloned().flatten()
251 }
252
253 pub fn is_checkpointed_transaction(&self) -> bool {
256 self.checkpoint_sequence_number >= 0
257 }
258
259 pub async fn try_into_iota_transaction_block_response(
260 self,
261 options: IotaTransactionBlockResponseOptions,
262 package_resolver: &Arc<Resolver<impl PackageStore>>,
263 ) -> IndexerResult<IotaTransactionBlockResponse> {
264 let options = options.clone();
265 let tx_digest =
266 TransactionDigest::from_bytes(self.transaction_digest.as_slice()).map_err(|e| {
267 IndexerError::PersistentStorageDataCorruption(format!(
268 "Can't convert {:?} as tx_digest. Error: {e}",
269 self.transaction_digest
270 ))
271 })?;
272
273 let timestamp_ms = self
274 .is_checkpointed_transaction()
275 .then_some(self.timestamp_ms as u64);
276 let checkpoint = self
277 .is_checkpointed_transaction()
278 .then_some(self.checkpoint_sequence_number as u64);
279
280 let transaction = if options.show_input {
281 let sender_signed_data = self.try_into_sender_signed_data()?;
282 let tx_block = IotaTransactionBlock::try_from_with_package_resolver(
283 sender_signed_data,
284 package_resolver,
285 tx_digest,
286 )
287 .await?;
288 Some(tx_block)
289 } else {
290 None
291 };
292
293 let effects = if options.show_effects {
294 Some(
295 self.try_into_iota_transaction_effects(package_resolver)
296 .await?,
297 )
298 } else {
299 None
300 };
301
302 let raw_transaction = if options.show_raw_input {
303 self.raw_transaction
304 } else {
305 Default::default()
306 };
307
308 let events = if options.show_events {
309 let events = {
310 self
311 .events
312 .into_iter()
313 .map(|event| match event {
314 Some(event) => {
315 let event: Event = bcs::from_bytes(&event).map_err(|e| {
316 IndexerError::PersistentStorageDataCorruption(format!(
317 "Can't convert event bytes into Event. tx_digest={tx_digest} Error: {e}"
318 ))
319 })?;
320 Ok(event)
321 }
322 None => Err(IndexerError::PersistentStorageDataCorruption(format!(
323 "Event should not be null, tx_digest={tx_digest}"
324 ))),
325 })
326 .collect::<Result<Vec<Event>, IndexerError>>()?
327 };
328 let tx_events = TransactionEvents { data: events };
329
330 Some(
331 tx_events_to_iota_tx_events(tx_events, package_resolver, tx_digest, timestamp_ms)
332 .await?,
333 )
334 } else {
335 None
336 };
337
338 let object_changes = if options.show_object_changes {
339 let object_changes = {
340 self.object_changes.into_iter().map(|object_change| {
341 match object_change {
342 Some(object_change) => {
343 let object_change: IndexedObjectChange = bcs::from_bytes(&object_change)
344 .map_err(|e| IndexerError::PersistentStorageDataCorruption(
345 format!("Can't convert object_change bytes into IndexedObjectChange. tx_digest={tx_digest} Error: {e}")
346 ))?;
347 Ok(ObjectChange::from(object_change))
348 }
349 None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={tx_digest}"))),
350 }
351 }).collect::<Result<Vec<ObjectChange>, IndexerError>>()?
352 };
353 Some(object_changes)
354 } else {
355 None
356 };
357
358 let balance_changes = if options.show_balance_changes {
359 let balance_changes = {
360 self.balance_changes.into_iter().map(|balance_change| {
361 match balance_change {
362 Some(balance_change) => {
363 let balance_change: BalanceChange = bcs::from_bytes(&balance_change)
364 .map_err(|e| IndexerError::PersistentStorageDataCorruption(
365 format!("Can't convert balance_change bytes into BalanceChange. tx_digest={tx_digest} Error: {e}")
366 ))?;
367 Ok(balance_change)
368 }
369 None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={tx_digest}"))),
370 }
371 }).collect::<Result<Vec<BalanceChange>, IndexerError>>()?
372 };
373 Some(balance_changes)
374 } else {
375 None
376 };
377
378 let raw_effects = if options.show_raw_effects {
379 self.raw_effects
380 } else {
381 Default::default()
382 };
383
384 let errors = match effects.as_ref().map(|e| e.status()) {
385 Some(IotaExecutionStatus::Failure { error }) => vec![error.clone()],
386 _ => vec![],
387 };
388
389 Ok(IotaTransactionBlockResponse {
390 digest: tx_digest,
391 transaction,
392 raw_transaction,
393 effects,
394 events,
395 object_changes,
396 balance_changes,
397 timestamp_ms,
398 checkpoint,
399 confirmed_local_execution: None,
400 errors,
401 raw_effects,
402 })
403 }
404
405 pub fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
406 let sender_signed_data: SenderSignedData =
407 bcs::from_bytes(&self.raw_transaction).map_err(|e| {
408 IndexerError::PersistentStorageDataCorruption(format!(
409 "Can't convert raw_transaction of {} into SenderSignedData. Error: {e}",
410 self.tx_sequence_number
411 ))
412 })?;
413 Ok(sender_signed_data)
414 }
415
416 pub async fn try_into_iota_transaction_effects(
417 &self,
418 package_resolver: &Arc<Resolver<impl PackageStore>>,
419 ) -> IndexerResult<IotaTransactionBlockEffects> {
420 let effects: TransactionEffects = bcs::from_bytes(&self.raw_effects).map_err(|e| {
421 IndexerError::PersistentStorageDataCorruption(format!(
422 "Can't convert raw_effects of {} into TransactionEffects. Error: {e}",
423 self.tx_sequence_number
424 ))
425 })?;
426 let effects =
427 IotaTransactionBlockEffects::from_native_with_clever_error(effects, package_resolver)
428 .await;
429 Ok(effects)
430 }
431
432 pub fn is_genesis(&self) -> bool {
434 self.tx_sequence_number == 0
435 }
436}
437
438pub fn stored_events_to_events(
439 stored_events: StoredTransactionEvents,
440) -> Result<Vec<Event>, IndexerError> {
441 stored_events
442 .into_iter()
443 .map(|event| match event {
444 Some(event) => {
445 let event: Event = bcs::from_bytes(&event).map_err(|e| {
446 IndexerError::PersistentStorageDataCorruption(format!(
447 "Can't convert event bytes into Event. Error: {e}",
448 ))
449 })?;
450 Ok(event)
451 }
452 None => Err(IndexerError::PersistentStorageDataCorruption(
453 "Event should not be null".to_string(),
454 )),
455 })
456 .collect::<Result<Vec<Event>, IndexerError>>()
457}
458
459pub async fn tx_events_to_iota_tx_events(
460 tx_events: TransactionEvents,
461 package_resolver: &Arc<Resolver<impl PackageStore>>,
462 tx_digest: TransactionDigest,
463 timestamp: Option<u64>,
464) -> Result<IotaTransactionBlockEvents, IndexerError> {
465 let mut iota_event_futures = vec![];
466 let tx_events_data_len = tx_events.data.len();
467 for tx_event in tx_events.data.clone() {
468 let package_resolver_clone = package_resolver.clone();
469 iota_event_futures.push(tokio::task::spawn(async move {
470 let resolver = package_resolver_clone;
471 resolver
472 .type_layout(TypeTag::Struct(Box::new(tx_event.type_)))
473 .await
474 }));
475 }
476 let event_move_type_layouts = futures::future::join_all(iota_event_futures)
477 .await
478 .into_iter()
479 .collect::<Result<Vec<_>, _>>()?
480 .into_iter()
481 .collect::<Result<Vec<_>, _>>()
482 .map_err(|e| {
483 IndexerError::ResolveMoveStruct(format!(
484 "Failed to convert to iota event with Error: {e}",
485 ))
486 })?;
487 let event_move_datatype_layouts = event_move_type_layouts
488 .into_iter()
489 .filter_map(|move_type_layout| match move_type_layout {
490 MoveTypeLayout::Struct(s) => Some(MoveDatatypeLayout::Struct(s)),
491 MoveTypeLayout::Enum(e) => Some(MoveDatatypeLayout::Enum(e)),
492 _ => None,
493 })
494 .collect::<Vec<_>>();
495 assert!(tx_events_data_len == event_move_datatype_layouts.len());
496 let iota_events = tx_events
497 .data
498 .into_iter()
499 .enumerate()
500 .zip(event_move_datatype_layouts)
501 .map(|((seq, tx_event), move_datatype_layout)| {
502 IotaEvent::try_from(
503 tx_event,
504 tx_digest,
505 seq as u64,
506 timestamp,
507 move_datatype_layout,
508 )
509 })
510 .collect::<Result<Vec<_>, _>>()?;
511 let iota_tx_events = IotaTransactionBlockEvents { data: iota_events };
512 Ok(iota_tx_events)
513}