1use std::sync::Arc;
6
7use diesel::prelude::*;
8use iota_json_rpc_types::{
9 BalanceChange, IotaEvent, IotaTransactionBlock, IotaTransactionBlockEffects,
10 IotaTransactionBlockEvents, IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
11 ObjectChange,
12};
13use iota_package_resolver::{PackageStore, Resolver};
14use iota_types::{
15 digests::TransactionDigest,
16 effects::{TransactionEffects, TransactionEvents},
17 event::Event,
18 transaction::SenderSignedData,
19};
20use move_core_types::{
21 annotated_value::{MoveDatatypeLayout, MoveTypeLayout},
22 language_storage::TypeTag,
23};
24#[cfg(feature = "shared_test_runtime")]
25use serde::Deserialize;
26
27use crate::{
28 errors::IndexerError,
29 schema::{optimistic_transactions, transactions, tx_insertion_order},
30 types::{IndexedObjectChange, IndexedTransaction, IndexerResult},
31};
32
33#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
34#[diesel(table_name = tx_insertion_order)]
35pub struct TxInsertionOrder {
36 #[diesel(skip_insertion)]
43 pub insertion_order: i64,
44 pub tx_digest: Vec<u8>,
45}
46
47#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
48#[diesel(table_name = transactions)]
49#[cfg_attr(feature = "shared_test_runtime", derive(Deserialize))]
50pub struct StoredTransaction {
51 pub tx_sequence_number: i64,
54 pub transaction_digest: Vec<u8>,
55 pub raw_transaction: Vec<u8>,
56 pub raw_effects: Vec<u8>,
57 pub checkpoint_sequence_number: i64,
58 pub timestamp_ms: i64,
59 pub object_changes: Vec<Option<Vec<u8>>>,
60 pub balance_changes: Vec<Option<Vec<u8>>>,
61 pub events: Vec<Option<Vec<u8>>>,
62 pub transaction_kind: i16,
63 pub success_command_count: i16,
64}
65
66#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
67#[diesel(table_name = optimistic_transactions)]
68pub struct OptimisticTransaction {
69 pub insertion_order: i64,
70 pub transaction_digest: Vec<u8>,
71 pub raw_transaction: Vec<u8>,
72 pub raw_effects: Vec<u8>,
73 pub object_changes: Vec<Option<Vec<u8>>>,
74 pub balance_changes: Vec<Option<Vec<u8>>>,
75 pub events: Vec<Option<Vec<u8>>>,
76 pub transaction_kind: i16,
77 pub success_command_count: i16,
78}
79
80impl From<OptimisticTransaction> for StoredTransaction {
81 fn from(tx: OptimisticTransaction) -> Self {
82 StoredTransaction {
83 tx_sequence_number: tx.insertion_order,
84 transaction_digest: tx.transaction_digest,
85 raw_transaction: tx.raw_transaction,
86 raw_effects: tx.raw_effects,
87 checkpoint_sequence_number: -1,
88 timestamp_ms: -1,
89 object_changes: tx.object_changes,
90 balance_changes: tx.balance_changes,
91 events: tx.events,
92 transaction_kind: tx.transaction_kind,
93 success_command_count: tx.success_command_count,
94 }
95 }
96}
97
98impl From<StoredTransaction> for OptimisticTransaction {
99 fn from(tx: StoredTransaction) -> Self {
100 OptimisticTransaction {
101 insertion_order: tx.tx_sequence_number,
102 transaction_digest: tx.transaction_digest,
103 raw_transaction: tx.raw_transaction,
104 raw_effects: tx.raw_effects,
105 object_changes: tx.object_changes,
106 balance_changes: tx.balance_changes,
107 events: tx.events,
108 transaction_kind: tx.transaction_kind,
109 success_command_count: tx.success_command_count,
110 }
111 }
112}
113
114pub type StoredTransactionEvents = Vec<Option<Vec<u8>>>;
115
116#[derive(Debug, Queryable)]
117pub struct TxSeq {
118 pub seq: i64,
119}
120
121impl Default for TxSeq {
122 fn default() -> Self {
123 Self { seq: -1 }
124 }
125}
126
127#[derive(Clone, Debug, Queryable)]
128pub struct StoredTransactionTimestamp {
129 pub tx_sequence_number: i64,
130 pub timestamp_ms: i64,
131}
132
133#[derive(Clone, Debug, Queryable)]
134pub struct StoredTransactionCheckpoint {
135 pub tx_sequence_number: i64,
136 pub checkpoint_sequence_number: i64,
137}
138
139#[derive(Clone, Debug, Queryable)]
140pub struct StoredTransactionSuccessCommandCount {
141 pub tx_sequence_number: i64,
142 pub checkpoint_sequence_number: i64,
143 pub success_command_count: i16,
144 pub timestamp_ms: i64,
145}
146
147impl From<&IndexedTransaction> for StoredTransaction {
148 fn from(tx: &IndexedTransaction) -> Self {
149 StoredTransaction {
150 tx_sequence_number: tx.tx_sequence_number as i64,
151 transaction_digest: tx.tx_digest.into_inner().to_vec(),
152 raw_transaction: bcs::to_bytes(&tx.sender_signed_data).unwrap(),
153 raw_effects: bcs::to_bytes(&tx.effects).unwrap(),
154 checkpoint_sequence_number: tx.checkpoint_sequence_number as i64,
155 object_changes: tx
156 .object_changes
157 .iter()
158 .map(|oc| Some(bcs::to_bytes(&oc).unwrap()))
159 .collect(),
160 balance_changes: tx
161 .balance_change
162 .iter()
163 .map(|bc| Some(bcs::to_bytes(&bc).unwrap()))
164 .collect(),
165 events: tx
166 .events
167 .iter()
168 .map(|e| Some(bcs::to_bytes(&e).unwrap()))
169 .collect(),
170 timestamp_ms: tx.timestamp_ms as i64,
171 transaction_kind: tx.transaction_kind as i16,
172 success_command_count: tx.successful_tx_num as i16,
173 }
174 }
175}
176
177impl StoredTransaction {
178 pub fn get_balance_len(&self) -> usize {
179 self.balance_changes.len()
180 }
181
182 pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
183 self.balance_changes.get(idx).cloned().flatten()
184 }
185
186 pub fn get_object_len(&self) -> usize {
187 self.object_changes.len()
188 }
189
190 pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
191 self.object_changes.get(idx).cloned().flatten()
192 }
193
194 pub fn get_event_len(&self) -> usize {
195 self.events.len()
196 }
197
198 pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
199 self.events.get(idx).cloned().flatten()
200 }
201
202 pub fn is_checkpointed_transaction(&self) -> bool {
205 self.checkpoint_sequence_number >= 0
206 }
207
208 pub async fn try_into_iota_transaction_block_response(
209 self,
210 options: IotaTransactionBlockResponseOptions,
211 package_resolver: Arc<Resolver<impl PackageStore>>,
212 ) -> IndexerResult<IotaTransactionBlockResponse> {
213 let options = options.clone();
214 let tx_digest =
215 TransactionDigest::try_from(self.transaction_digest.as_slice()).map_err(|e| {
216 IndexerError::PersistentStorageDataCorruption(format!(
217 "Can't convert {:?} as tx_digest. Error: {e}",
218 self.transaction_digest
219 ))
220 })?;
221
222 let timestamp_ms = self
223 .is_checkpointed_transaction()
224 .then_some(self.timestamp_ms as u64);
225 let checkpoint = self
226 .is_checkpointed_transaction()
227 .then_some(self.checkpoint_sequence_number as u64);
228
229 let transaction = if options.show_input {
230 let sender_signed_data = self.try_into_sender_signed_data()?;
231 let tx_block = IotaTransactionBlock::try_from_with_package_resolver(
232 sender_signed_data,
233 package_resolver.clone(),
234 tx_digest,
235 )
236 .await?;
237 Some(tx_block)
238 } else {
239 None
240 };
241
242 let effects = options
243 .show_effects
244 .then(|| self.try_into_iota_transaction_effects())
245 .transpose()?;
246
247 let raw_transaction = options
248 .show_raw_input
249 .then_some(self.raw_transaction)
250 .unwrap_or_default();
251
252 let events = if options.show_events {
253 let events = {
254 self
255 .events
256 .into_iter()
257 .map(|event| match event {
258 Some(event) => {
259 let event: Event = bcs::from_bytes(&event).map_err(|e| {
260 IndexerError::PersistentStorageDataCorruption(format!(
261 "Can't convert event bytes into Event. tx_digest={:?} Error: {e}",
262 tx_digest
263 ))
264 })?;
265 Ok(event)
266 }
267 None => Err(IndexerError::PersistentStorageDataCorruption(format!(
268 "Event should not be null, tx_digest={:?}",
269 tx_digest
270 ))),
271 })
272 .collect::<Result<Vec<Event>, IndexerError>>()?
273 };
274 let timestamp = self.timestamp_ms as u64;
275 let tx_events = TransactionEvents { data: events };
276
277 tx_events_to_iota_tx_events(tx_events, package_resolver, tx_digest, timestamp).await?
278 } else {
279 None
280 };
281
282 let object_changes = if options.show_object_changes {
283 let object_changes = {
284 self.object_changes.into_iter().map(|object_change| {
285 match object_change {
286 Some(object_change) => {
287 let object_change: IndexedObjectChange = bcs::from_bytes(&object_change)
288 .map_err(|e| IndexerError::PersistentStorageDataCorruption(
289 format!("Can't convert object_change bytes into IndexedObjectChange. tx_digest={:?} Error: {e}", tx_digest)
290 ))?;
291 Ok(ObjectChange::from(object_change))
292 }
293 None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={:?}", tx_digest))),
294 }
295 }).collect::<Result<Vec<ObjectChange>, IndexerError>>()?
296 };
297 Some(object_changes)
298 } else {
299 None
300 };
301
302 let balance_changes = if options.show_balance_changes {
303 let balance_changes = {
304 self.balance_changes.into_iter().map(|balance_change| {
305 match balance_change {
306 Some(balance_change) => {
307 let balance_change: BalanceChange = bcs::from_bytes(&balance_change)
308 .map_err(|e| IndexerError::PersistentStorageDataCorruption(
309 format!("Can't convert balance_change bytes into BalanceChange. tx_digest={:?} Error: {e}", tx_digest)
310 ))?;
311 Ok(balance_change)
312 }
313 None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={:?}", tx_digest))),
314 }
315 }).collect::<Result<Vec<BalanceChange>, IndexerError>>()?
316 };
317 Some(balance_changes)
318 } else {
319 None
320 };
321
322 let raw_effects = options
323 .show_raw_effects
324 .then_some(self.raw_effects)
325 .unwrap_or_default();
326
327 Ok(IotaTransactionBlockResponse {
328 digest: tx_digest,
329 transaction,
330 raw_transaction,
331 effects,
332 events,
333 object_changes,
334 balance_changes,
335 timestamp_ms,
336 checkpoint,
337 confirmed_local_execution: None,
338 errors: vec![],
339 raw_effects,
340 })
341 }
342
343 fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
344 let sender_signed_data: SenderSignedData =
345 bcs::from_bytes(&self.raw_transaction).map_err(|e| {
346 IndexerError::PersistentStorageDataCorruption(format!(
347 "Can't convert raw_transaction of {} into SenderSignedData. Error: {e}",
348 self.tx_sequence_number
349 ))
350 })?;
351 Ok(sender_signed_data)
352 }
353
354 pub fn try_into_iota_transaction_effects(&self) -> IndexerResult<IotaTransactionBlockEffects> {
355 let effects: TransactionEffects = bcs::from_bytes(&self.raw_effects).map_err(|e| {
356 IndexerError::PersistentStorageDataCorruption(format!(
357 "Can't convert raw_effects of {} into TransactionEffects. Error: {e}",
358 self.tx_sequence_number
359 ))
360 })?;
361 let effects = IotaTransactionBlockEffects::try_from(effects)?;
362 Ok(effects)
363 }
364
365 pub fn is_genesis(&self) -> bool {
367 self.tx_sequence_number == 0
368 }
369}
370
371pub fn stored_events_to_events(
372 stored_events: StoredTransactionEvents,
373) -> Result<Vec<Event>, IndexerError> {
374 stored_events
375 .into_iter()
376 .map(|event| match event {
377 Some(event) => {
378 let event: Event = bcs::from_bytes(&event).map_err(|e| {
379 IndexerError::PersistentStorageDataCorruption(format!(
380 "Can't convert event bytes into Event. Error: {e}",
381 ))
382 })?;
383 Ok(event)
384 }
385 None => Err(IndexerError::PersistentStorageDataCorruption(
386 "Event should not be null".to_string(),
387 )),
388 })
389 .collect::<Result<Vec<Event>, IndexerError>>()
390}
391
392pub async fn tx_events_to_iota_tx_events(
393 tx_events: TransactionEvents,
394 package_resolver: Arc<Resolver<impl PackageStore>>,
395 tx_digest: TransactionDigest,
396 timestamp: u64,
397) -> Result<Option<IotaTransactionBlockEvents>, IndexerError> {
398 let mut iota_event_futures = vec![];
399 let tx_events_data_len = tx_events.data.len();
400 for tx_event in tx_events.data.clone() {
401 let package_resolver_clone = package_resolver.clone();
402 iota_event_futures.push(tokio::task::spawn(async move {
403 let resolver = package_resolver_clone;
404 resolver
405 .type_layout(TypeTag::Struct(Box::new(tx_event.type_.clone())))
406 .await
407 }));
408 }
409 let event_move_type_layouts = futures::future::join_all(iota_event_futures)
410 .await
411 .into_iter()
412 .collect::<Result<Vec<_>, _>>()?
413 .into_iter()
414 .collect::<Result<Vec<_>, _>>()
415 .map_err(|e| {
416 IndexerError::ResolveMoveStruct(format!(
417 "Failed to convert to iota event with Error: {e}",
418 ))
419 })?;
420 let event_move_datatype_layouts = event_move_type_layouts
421 .into_iter()
422 .filter_map(|move_type_layout| match move_type_layout {
423 MoveTypeLayout::Struct(s) => Some(MoveDatatypeLayout::Struct(s)),
424 MoveTypeLayout::Enum(e) => Some(MoveDatatypeLayout::Enum(e)),
425 _ => None,
426 })
427 .collect::<Vec<_>>();
428 assert!(tx_events_data_len == event_move_datatype_layouts.len());
429 let iota_events = tx_events
430 .data
431 .into_iter()
432 .enumerate()
433 .zip(event_move_datatype_layouts)
434 .map(|((seq, tx_event), move_datatype_layout)| {
435 IotaEvent::try_from(
436 tx_event,
437 tx_digest,
438 seq as u64,
439 Some(timestamp),
440 move_datatype_layout,
441 )
442 })
443 .collect::<Result<Vec<_>, _>>()?;
444 let iota_tx_events = IotaTransactionBlockEvents { data: iota_events };
445 Ok(Some(iota_tx_events))
446}