1use std::sync::Arc;
6
7use diesel::{
8 backend::Backend,
9 deserialize::{FromSql, Result as DeserializeResult},
10 expression::AsExpression,
11 prelude::*,
12 serialize::{Output, Result as SerializeResult, ToSql},
13 sql_types::BigInt,
14};
15use iota_json_rpc_types::{
16 BalanceChange, IotaEvent, IotaTransactionBlock, IotaTransactionBlockEffects,
17 IotaTransactionBlockEvents, IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
18 ObjectChange,
19};
20use iota_package_resolver::{PackageStore, Resolver};
21use iota_types::{
22 digests::TransactionDigest,
23 effects::{TransactionEffects, TransactionEvents},
24 event::Event,
25 transaction::SenderSignedData,
26};
27use move_core_types::{
28 annotated_value::{MoveDatatypeLayout, MoveTypeLayout},
29 language_storage::TypeTag,
30};
31#[cfg(feature = "shared_test_runtime")]
32use serde::Deserialize;
33
34use crate::{
35 errors::IndexerError,
36 schema::{optimistic_transactions, transactions, tx_global_order},
37 types::{IndexedObjectChange, IndexedTransaction, IndexerResult},
38};
39
40#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
41#[diesel(table_name = tx_global_order)]
42pub struct TxGlobalOrder {
43 pub chk_tx_sequence_number: Option<i64>,
46 pub global_sequence_number: i64,
54 pub tx_digest: Vec<u8>,
55 #[diesel(deserialize_as = i64)]
66 pub optimistic_sequence_number: Option<i64>,
67}
68
69impl From<&IndexedTransaction> for CheckpointTxGlobalOrder {
70 fn from(tx: &IndexedTransaction) -> Self {
71 Self {
72 chk_tx_sequence_number: Some(tx.tx_sequence_number as i64),
73 global_sequence_number: tx.tx_sequence_number as i64,
74 tx_digest: tx.tx_digest.into_inner().to_vec(),
75 index_status: Some(IndexStatus::Started),
76 }
77 }
78}
79
80#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
86#[diesel(table_name = tx_global_order)]
87pub struct CheckpointTxGlobalOrder {
88 pub(crate) chk_tx_sequence_number: Option<i64>,
89 pub(crate) global_sequence_number: i64,
90 pub(crate) tx_digest: Vec<u8>,
91 #[diesel(deserialize_as = IndexStatus, column_name = "optimistic_sequence_number")]
98 pub(crate) index_status: Option<IndexStatus>,
99}
100
101#[derive(Clone, Debug, Copy, AsExpression, PartialEq, Eq)]
103#[diesel(sql_type = BigInt)]
104pub enum IndexStatus {
105 Started,
106 Completed,
107}
108
109impl<DB> ToSql<BigInt, DB> for IndexStatus
110where
111 DB: Backend,
112 i64: ToSql<BigInt, DB>,
113{
114 fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, DB>) -> SerializeResult {
115 match *self {
116 IndexStatus::Started => 0.to_sql(out),
117 IndexStatus::Completed => (-1).to_sql(out),
118 }
119 }
120}
121
122impl<DB> FromSql<BigInt, DB> for IndexStatus
123where
124 DB: Backend,
125 i64: FromSql<BigInt, DB>,
126{
127 fn from_sql(bytes: DB::RawValue<'_>) -> DeserializeResult<Self> {
128 match i64::from_sql(bytes)? {
129 0 => Ok(IndexStatus::Started),
130 -1 => Ok(IndexStatus::Completed),
131 other => Err(format!("invalid index status {other}").into()),
132 }
133 }
134}
135
136#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
137#[diesel(table_name = transactions)]
138#[cfg_attr(feature = "shared_test_runtime", derive(Deserialize))]
139pub struct StoredTransaction {
140 pub tx_sequence_number: i64,
143 pub transaction_digest: Vec<u8>,
144 pub raw_transaction: Vec<u8>,
145 pub raw_effects: Vec<u8>,
146 pub checkpoint_sequence_number: i64,
147 pub timestamp_ms: i64,
148 pub object_changes: Vec<Option<Vec<u8>>>,
149 pub balance_changes: Vec<Option<Vec<u8>>>,
150 pub events: Vec<Option<Vec<u8>>>,
151 pub transaction_kind: i16,
152 pub success_command_count: i16,
153}
154
155#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
156#[diesel(table_name = optimistic_transactions)]
157pub struct OptimisticTransaction {
158 pub global_sequence_number: i64,
159 pub optimistic_sequence_number: i64,
160 pub transaction_digest: Vec<u8>,
161 pub raw_transaction: Vec<u8>,
162 pub raw_effects: Vec<u8>,
163 pub object_changes: Vec<Option<Vec<u8>>>,
164 pub balance_changes: Vec<Option<Vec<u8>>>,
165 pub events: Vec<Option<Vec<u8>>>,
166 pub transaction_kind: i16,
167 pub success_command_count: i16,
168}
169
170impl From<OptimisticTransaction> for StoredTransaction {
171 fn from(tx: OptimisticTransaction) -> Self {
172 StoredTransaction {
173 tx_sequence_number: tx.optimistic_sequence_number,
174 transaction_digest: tx.transaction_digest,
175 raw_transaction: tx.raw_transaction,
176 raw_effects: tx.raw_effects,
177 checkpoint_sequence_number: -1,
178 timestamp_ms: -1,
179 object_changes: tx.object_changes,
180 balance_changes: tx.balance_changes,
181 events: tx.events,
182 transaction_kind: tx.transaction_kind,
183 success_command_count: tx.success_command_count,
184 }
185 }
186}
187
188impl OptimisticTransaction {
189 pub fn from_stored(global_sequence_number: i64, stored: StoredTransaction) -> Self {
190 OptimisticTransaction {
191 global_sequence_number,
192 optimistic_sequence_number: stored.tx_sequence_number,
193 transaction_digest: stored.transaction_digest,
194 raw_transaction: stored.raw_transaction,
195 raw_effects: stored.raw_effects,
196 object_changes: stored.object_changes,
197 balance_changes: stored.balance_changes,
198 events: stored.events,
199 transaction_kind: stored.transaction_kind,
200 success_command_count: stored.success_command_count,
201 }
202 }
203
204 pub fn get_balance_len(&self) -> usize {
205 self.balance_changes.len()
206 }
207
208 pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
209 self.balance_changes.get(idx).cloned().flatten()
210 }
211
212 pub fn get_object_len(&self) -> usize {
213 self.object_changes.len()
214 }
215
216 pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
217 self.object_changes.get(idx).cloned().flatten()
218 }
219
220 pub fn get_event_len(&self) -> usize {
221 self.events.len()
222 }
223
224 pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
225 self.events.get(idx).cloned().flatten()
226 }
227}
228
229pub type StoredTransactionEvents = Vec<Option<Vec<u8>>>;
230
231#[derive(Debug, Queryable)]
232pub struct TxSeq {
233 pub seq: i64,
234}
235
236impl Default for TxSeq {
237 fn default() -> Self {
238 Self { seq: -1 }
239 }
240}
241
242#[derive(Clone, Debug, Queryable)]
243pub struct StoredTransactionTimestamp {
244 pub tx_sequence_number: i64,
245 pub timestamp_ms: i64,
246}
247
248#[derive(Clone, Debug, Queryable)]
249pub struct StoredTransactionCheckpoint {
250 pub tx_sequence_number: i64,
251 pub checkpoint_sequence_number: i64,
252}
253
254#[derive(Clone, Debug, Queryable)]
255pub struct StoredTransactionSuccessCommandCount {
256 pub tx_sequence_number: i64,
257 pub checkpoint_sequence_number: i64,
258 pub success_command_count: i16,
259 pub timestamp_ms: i64,
260}
261
262impl From<&IndexedTransaction> for StoredTransaction {
263 fn from(tx: &IndexedTransaction) -> Self {
264 StoredTransaction {
265 tx_sequence_number: tx.tx_sequence_number as i64,
266 transaction_digest: tx.tx_digest.into_inner().to_vec(),
267 raw_transaction: bcs::to_bytes(&tx.sender_signed_data).unwrap(),
268 raw_effects: bcs::to_bytes(&tx.effects).unwrap(),
269 checkpoint_sequence_number: tx.checkpoint_sequence_number as i64,
270 object_changes: tx
271 .object_changes
272 .iter()
273 .map(|oc| Some(bcs::to_bytes(&oc).unwrap()))
274 .collect(),
275 balance_changes: tx
276 .balance_change
277 .iter()
278 .map(|bc| Some(bcs::to_bytes(&bc).unwrap()))
279 .collect(),
280 events: tx
281 .events
282 .iter()
283 .map(|e| Some(bcs::to_bytes(&e).unwrap()))
284 .collect(),
285 timestamp_ms: tx.timestamp_ms as i64,
286 transaction_kind: tx.transaction_kind as i16,
287 success_command_count: tx.successful_tx_num as i16,
288 }
289 }
290}
291
292impl StoredTransaction {
293 pub fn get_balance_len(&self) -> usize {
294 self.balance_changes.len()
295 }
296
297 pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
298 self.balance_changes.get(idx).cloned().flatten()
299 }
300
301 pub fn get_object_len(&self) -> usize {
302 self.object_changes.len()
303 }
304
305 pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
306 self.object_changes.get(idx).cloned().flatten()
307 }
308
309 pub fn get_event_len(&self) -> usize {
310 self.events.len()
311 }
312
313 pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
314 self.events.get(idx).cloned().flatten()
315 }
316
317 pub fn is_checkpointed_transaction(&self) -> bool {
320 self.checkpoint_sequence_number >= 0
321 }
322
323 pub async fn try_into_iota_transaction_block_response(
324 self,
325 options: IotaTransactionBlockResponseOptions,
326 package_resolver: &Arc<Resolver<impl PackageStore>>,
327 ) -> IndexerResult<IotaTransactionBlockResponse> {
328 let options = options.clone();
329 let tx_digest =
330 TransactionDigest::try_from(self.transaction_digest.as_slice()).map_err(|e| {
331 IndexerError::PersistentStorageDataCorruption(format!(
332 "Can't convert {:?} as tx_digest. Error: {e}",
333 self.transaction_digest
334 ))
335 })?;
336
337 let timestamp_ms = self
338 .is_checkpointed_transaction()
339 .then_some(self.timestamp_ms as u64);
340 let checkpoint = self
341 .is_checkpointed_transaction()
342 .then_some(self.checkpoint_sequence_number as u64);
343
344 let transaction = if options.show_input {
345 let sender_signed_data = self.try_into_sender_signed_data()?;
346 let tx_block = IotaTransactionBlock::try_from_with_package_resolver(
347 sender_signed_data,
348 package_resolver,
349 tx_digest,
350 )
351 .await?;
352 Some(tx_block)
353 } else {
354 None
355 };
356
357 let effects = if options.show_effects {
358 Some(
359 self.try_into_iota_transaction_effects(package_resolver)
360 .await?,
361 )
362 } else {
363 None
364 };
365
366 let raw_transaction = if options.show_raw_input {
367 self.raw_transaction
368 } else {
369 Default::default()
370 };
371
372 let events = if options.show_events {
373 let events = {
374 self
375 .events
376 .into_iter()
377 .map(|event| match event {
378 Some(event) => {
379 let event: Event = bcs::from_bytes(&event).map_err(|e| {
380 IndexerError::PersistentStorageDataCorruption(format!(
381 "Can't convert event bytes into Event. tx_digest={tx_digest:?} Error: {e}"
382 ))
383 })?;
384 Ok(event)
385 }
386 None => Err(IndexerError::PersistentStorageDataCorruption(format!(
387 "Event should not be null, tx_digest={tx_digest:?}"
388 ))),
389 })
390 .collect::<Result<Vec<Event>, IndexerError>>()?
391 };
392 let tx_events = TransactionEvents { data: events };
393
394 Some(
395 tx_events_to_iota_tx_events(tx_events, package_resolver, tx_digest, timestamp_ms)
396 .await?,
397 )
398 } else {
399 None
400 };
401
402 let object_changes = if options.show_object_changes {
403 let object_changes = {
404 self.object_changes.into_iter().map(|object_change| {
405 match object_change {
406 Some(object_change) => {
407 let object_change: IndexedObjectChange = bcs::from_bytes(&object_change)
408 .map_err(|e| IndexerError::PersistentStorageDataCorruption(
409 format!("Can't convert object_change bytes into IndexedObjectChange. tx_digest={tx_digest:?} Error: {e}")
410 ))?;
411 Ok(ObjectChange::from(object_change))
412 }
413 None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={tx_digest:?}"))),
414 }
415 }).collect::<Result<Vec<ObjectChange>, IndexerError>>()?
416 };
417 Some(object_changes)
418 } else {
419 None
420 };
421
422 let balance_changes = if options.show_balance_changes {
423 let balance_changes = {
424 self.balance_changes.into_iter().map(|balance_change| {
425 match balance_change {
426 Some(balance_change) => {
427 let balance_change: BalanceChange = bcs::from_bytes(&balance_change)
428 .map_err(|e| IndexerError::PersistentStorageDataCorruption(
429 format!("Can't convert balance_change bytes into BalanceChange. tx_digest={tx_digest:?} Error: {e}")
430 ))?;
431 Ok(balance_change)
432 }
433 None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={tx_digest:?}"))),
434 }
435 }).collect::<Result<Vec<BalanceChange>, IndexerError>>()?
436 };
437 Some(balance_changes)
438 } else {
439 None
440 };
441
442 let raw_effects = if options.show_raw_effects {
443 self.raw_effects
444 } else {
445 Default::default()
446 };
447
448 Ok(IotaTransactionBlockResponse {
449 digest: tx_digest,
450 transaction,
451 raw_transaction,
452 effects,
453 events,
454 object_changes,
455 balance_changes,
456 timestamp_ms,
457 checkpoint,
458 confirmed_local_execution: None,
459 errors: vec![],
460 raw_effects,
461 })
462 }
463
464 pub fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
465 let sender_signed_data: SenderSignedData =
466 bcs::from_bytes(&self.raw_transaction).map_err(|e| {
467 IndexerError::PersistentStorageDataCorruption(format!(
468 "Can't convert raw_transaction of {} into SenderSignedData. Error: {e}",
469 self.tx_sequence_number
470 ))
471 })?;
472 Ok(sender_signed_data)
473 }
474
475 pub async fn try_into_iota_transaction_effects(
476 &self,
477 package_resolver: &Arc<Resolver<impl PackageStore>>,
478 ) -> IndexerResult<IotaTransactionBlockEffects> {
479 let effects: TransactionEffects = bcs::from_bytes(&self.raw_effects).map_err(|e| {
480 IndexerError::PersistentStorageDataCorruption(format!(
481 "Can't convert raw_effects of {} into TransactionEffects. Error: {e}",
482 self.tx_sequence_number
483 ))
484 })?;
485 let effects =
486 IotaTransactionBlockEffects::from_native_with_clever_error(effects, package_resolver)
487 .await;
488 Ok(effects)
489 }
490
491 pub fn is_genesis(&self) -> bool {
493 self.tx_sequence_number == 0
494 }
495}
496
497pub fn stored_events_to_events(
498 stored_events: StoredTransactionEvents,
499) -> Result<Vec<Event>, IndexerError> {
500 stored_events
501 .into_iter()
502 .map(|event| match event {
503 Some(event) => {
504 let event: Event = bcs::from_bytes(&event).map_err(|e| {
505 IndexerError::PersistentStorageDataCorruption(format!(
506 "Can't convert event bytes into Event. Error: {e}",
507 ))
508 })?;
509 Ok(event)
510 }
511 None => Err(IndexerError::PersistentStorageDataCorruption(
512 "Event should not be null".to_string(),
513 )),
514 })
515 .collect::<Result<Vec<Event>, IndexerError>>()
516}
517
518pub async fn tx_events_to_iota_tx_events(
519 tx_events: TransactionEvents,
520 package_resolver: &Arc<Resolver<impl PackageStore>>,
521 tx_digest: TransactionDigest,
522 timestamp: Option<u64>,
523) -> Result<IotaTransactionBlockEvents, IndexerError> {
524 let mut iota_event_futures = vec![];
525 let tx_events_data_len = tx_events.data.len();
526 for tx_event in tx_events.data.clone() {
527 let package_resolver_clone = package_resolver.clone();
528 iota_event_futures.push(tokio::task::spawn(async move {
529 let resolver = package_resolver_clone;
530 resolver
531 .type_layout(TypeTag::Struct(Box::new(tx_event.type_.clone())))
532 .await
533 }));
534 }
535 let event_move_type_layouts = futures::future::join_all(iota_event_futures)
536 .await
537 .into_iter()
538 .collect::<Result<Vec<_>, _>>()?
539 .into_iter()
540 .collect::<Result<Vec<_>, _>>()
541 .map_err(|e| {
542 IndexerError::ResolveMoveStruct(format!(
543 "Failed to convert to iota event with Error: {e}",
544 ))
545 })?;
546 let event_move_datatype_layouts = event_move_type_layouts
547 .into_iter()
548 .filter_map(|move_type_layout| match move_type_layout {
549 MoveTypeLayout::Struct(s) => Some(MoveDatatypeLayout::Struct(s)),
550 MoveTypeLayout::Enum(e) => Some(MoveDatatypeLayout::Enum(e)),
551 _ => None,
552 })
553 .collect::<Vec<_>>();
554 assert!(tx_events_data_len == event_move_datatype_layouts.len());
555 let iota_events = tx_events
556 .data
557 .into_iter()
558 .enumerate()
559 .zip(event_move_datatype_layouts)
560 .map(|((seq, tx_event), move_datatype_layout)| {
561 IotaEvent::try_from(
562 tx_event,
563 tx_digest,
564 seq as u64,
565 timestamp,
566 move_datatype_layout,
567 )
568 })
569 .collect::<Result<Vec<_>, _>>()?;
570 let iota_tx_events = IotaTransactionBlockEvents { data: iota_events };
571 Ok(iota_tx_events)
572}