1use std::sync::Arc;
6
7use diesel::prelude::*;
8use iota_json_rpc_types::{
9 BalanceChange, IotaEvent, IotaTransactionBlock, IotaTransactionBlockEffects,
10 IotaTransactionBlockEvents, IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
11 ObjectChange,
12};
13use iota_package_resolver::{PackageStore, Resolver};
14use iota_types::{
15 digests::TransactionDigest,
16 effects::{TransactionEffects, TransactionEvents},
17 event::Event,
18 transaction::SenderSignedData,
19};
20use move_core_types::{
21 annotated_value::{MoveDatatypeLayout, MoveTypeLayout},
22 language_storage::TypeTag,
23};
24#[cfg(feature = "shared_test_runtime")]
25use serde::Deserialize;
26
27use crate::{
28 errors::IndexerError,
29 schema::{optimistic_transactions, transactions, tx_insertion_order},
30 types::{IndexedObjectChange, IndexedTransaction, IndexerResult},
31};
32
33#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
34#[diesel(table_name = tx_insertion_order)]
35pub struct TxInsertionOrder {
36 #[diesel(skip_insertion)]
43 pub insertion_order: i64,
44 pub tx_digest: Vec<u8>,
45}
46
47#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
48#[diesel(table_name = transactions)]
49#[cfg_attr(feature = "shared_test_runtime", derive(Deserialize))]
50pub struct StoredTransaction {
51 pub tx_sequence_number: i64,
54 pub transaction_digest: Vec<u8>,
55 pub raw_transaction: Vec<u8>,
56 pub raw_effects: Vec<u8>,
57 pub checkpoint_sequence_number: i64,
58 pub timestamp_ms: i64,
59 pub object_changes: Vec<Option<Vec<u8>>>,
60 pub balance_changes: Vec<Option<Vec<u8>>>,
61 pub events: Vec<Option<Vec<u8>>>,
62 pub transaction_kind: i16,
63 pub success_command_count: i16,
64}
65
66#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
67#[diesel(table_name = optimistic_transactions)]
68pub struct OptimisticTransaction {
69 pub insertion_order: i64,
70 pub transaction_digest: Vec<u8>,
71 pub raw_transaction: Vec<u8>,
72 pub raw_effects: Vec<u8>,
73 pub object_changes: Vec<Option<Vec<u8>>>,
74 pub balance_changes: Vec<Option<Vec<u8>>>,
75 pub events: Vec<Option<Vec<u8>>>,
76 pub transaction_kind: i16,
77 pub success_command_count: i16,
78}
79
80impl From<OptimisticTransaction> for StoredTransaction {
81 fn from(tx: OptimisticTransaction) -> Self {
82 StoredTransaction {
83 tx_sequence_number: tx.insertion_order,
84 transaction_digest: tx.transaction_digest,
85 raw_transaction: tx.raw_transaction,
86 raw_effects: tx.raw_effects,
87 checkpoint_sequence_number: -1,
88 timestamp_ms: -1,
89 object_changes: tx.object_changes,
90 balance_changes: tx.balance_changes,
91 events: tx.events,
92 transaction_kind: tx.transaction_kind,
93 success_command_count: tx.success_command_count,
94 }
95 }
96}
97
98impl From<StoredTransaction> for OptimisticTransaction {
99 fn from(tx: StoredTransaction) -> Self {
100 OptimisticTransaction {
101 insertion_order: tx.tx_sequence_number,
102 transaction_digest: tx.transaction_digest,
103 raw_transaction: tx.raw_transaction,
104 raw_effects: tx.raw_effects,
105 object_changes: tx.object_changes,
106 balance_changes: tx.balance_changes,
107 events: tx.events,
108 transaction_kind: tx.transaction_kind,
109 success_command_count: tx.success_command_count,
110 }
111 }
112}
113
114pub type StoredTransactionEvents = Vec<Option<Vec<u8>>>;
115
116#[derive(Debug, Queryable)]
117pub struct TxSeq {
118 pub seq: i64,
119}
120
121impl Default for TxSeq {
122 fn default() -> Self {
123 Self { seq: -1 }
124 }
125}
126
127#[derive(Clone, Debug, Queryable)]
128pub struct StoredTransactionTimestamp {
129 pub tx_sequence_number: i64,
130 pub timestamp_ms: i64,
131}
132
133#[derive(Clone, Debug, Queryable)]
134pub struct StoredTransactionCheckpoint {
135 pub tx_sequence_number: i64,
136 pub checkpoint_sequence_number: i64,
137}
138
139#[derive(Clone, Debug, Queryable)]
140pub struct StoredTransactionSuccessCommandCount {
141 pub tx_sequence_number: i64,
142 pub checkpoint_sequence_number: i64,
143 pub success_command_count: i16,
144 pub timestamp_ms: i64,
145}
146
147impl From<&IndexedTransaction> for StoredTransaction {
148 fn from(tx: &IndexedTransaction) -> Self {
149 StoredTransaction {
150 tx_sequence_number: tx.tx_sequence_number as i64,
151 transaction_digest: tx.tx_digest.into_inner().to_vec(),
152 raw_transaction: bcs::to_bytes(&tx.sender_signed_data).unwrap(),
153 raw_effects: bcs::to_bytes(&tx.effects).unwrap(),
154 checkpoint_sequence_number: tx.checkpoint_sequence_number as i64,
155 object_changes: tx
156 .object_changes
157 .iter()
158 .map(|oc| Some(bcs::to_bytes(&oc).unwrap()))
159 .collect(),
160 balance_changes: tx
161 .balance_change
162 .iter()
163 .map(|bc| Some(bcs::to_bytes(&bc).unwrap()))
164 .collect(),
165 events: tx
166 .events
167 .iter()
168 .map(|e| Some(bcs::to_bytes(&e).unwrap()))
169 .collect(),
170 timestamp_ms: tx.timestamp_ms as i64,
171 transaction_kind: tx.transaction_kind as i16,
172 success_command_count: tx.successful_tx_num as i16,
173 }
174 }
175}
176
177impl StoredTransaction {
178 pub fn get_balance_len(&self) -> usize {
179 self.balance_changes.len()
180 }
181
182 pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
183 self.balance_changes.get(idx).cloned().flatten()
184 }
185
186 pub fn get_object_len(&self) -> usize {
187 self.object_changes.len()
188 }
189
190 pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
191 self.object_changes.get(idx).cloned().flatten()
192 }
193
194 pub fn get_event_len(&self) -> usize {
195 self.events.len()
196 }
197
198 pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
199 self.events.get(idx).cloned().flatten()
200 }
201
202 pub fn is_checkpointed_transaction(&self) -> bool {
205 self.checkpoint_sequence_number >= 0
206 }
207
208 pub async fn try_into_iota_transaction_block_response(
209 self,
210 options: IotaTransactionBlockResponseOptions,
211 package_resolver: Arc<Resolver<impl PackageStore>>,
212 ) -> IndexerResult<IotaTransactionBlockResponse> {
213 let options = options.clone();
214 let tx_digest =
215 TransactionDigest::try_from(self.transaction_digest.as_slice()).map_err(|e| {
216 IndexerError::PersistentStorageDataCorruption(format!(
217 "Can't convert {:?} as tx_digest. Error: {e}",
218 self.transaction_digest
219 ))
220 })?;
221
222 let timestamp_ms = self
223 .is_checkpointed_transaction()
224 .then_some(self.timestamp_ms as u64);
225 let checkpoint = self
226 .is_checkpointed_transaction()
227 .then_some(self.checkpoint_sequence_number as u64);
228
229 let transaction = if options.show_input {
230 let sender_signed_data = self.try_into_sender_signed_data()?;
231 let tx_block = IotaTransactionBlock::try_from_with_package_resolver(
232 sender_signed_data,
233 package_resolver.clone(),
234 tx_digest,
235 )
236 .await?;
237 Some(tx_block)
238 } else {
239 None
240 };
241
242 let effects = options
243 .show_effects
244 .then(|| self.try_into_iota_transaction_effects())
245 .transpose()?;
246
247 let raw_transaction = options
248 .show_raw_input
249 .then_some(self.raw_transaction)
250 .unwrap_or_default();
251
252 let events = if options.show_events {
253 let events = {
254 self
255 .events
256 .into_iter()
257 .map(|event| match event {
258 Some(event) => {
259 let event: Event = bcs::from_bytes(&event).map_err(|e| {
260 IndexerError::PersistentStorageDataCorruption(format!(
261 "Can't convert event bytes into Event. tx_digest={tx_digest:?} Error: {e}"
262 ))
263 })?;
264 Ok(event)
265 }
266 None => Err(IndexerError::PersistentStorageDataCorruption(format!(
267 "Event should not be null, tx_digest={tx_digest:?}"
268 ))),
269 })
270 .collect::<Result<Vec<Event>, IndexerError>>()?
271 };
272 let timestamp = self.timestamp_ms as u64;
273 let tx_events = TransactionEvents { data: events };
274
275 tx_events_to_iota_tx_events(tx_events, package_resolver, tx_digest, timestamp).await?
276 } else {
277 None
278 };
279
280 let object_changes = if options.show_object_changes {
281 let object_changes = {
282 self.object_changes.into_iter().map(|object_change| {
283 match object_change {
284 Some(object_change) => {
285 let object_change: IndexedObjectChange = bcs::from_bytes(&object_change)
286 .map_err(|e| IndexerError::PersistentStorageDataCorruption(
287 format!("Can't convert object_change bytes into IndexedObjectChange. tx_digest={tx_digest:?} Error: {e}")
288 ))?;
289 Ok(ObjectChange::from(object_change))
290 }
291 None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={tx_digest:?}"))),
292 }
293 }).collect::<Result<Vec<ObjectChange>, IndexerError>>()?
294 };
295 Some(object_changes)
296 } else {
297 None
298 };
299
300 let balance_changes = if options.show_balance_changes {
301 let balance_changes = {
302 self.balance_changes.into_iter().map(|balance_change| {
303 match balance_change {
304 Some(balance_change) => {
305 let balance_change: BalanceChange = bcs::from_bytes(&balance_change)
306 .map_err(|e| IndexerError::PersistentStorageDataCorruption(
307 format!("Can't convert balance_change bytes into BalanceChange. tx_digest={tx_digest:?} Error: {e}")
308 ))?;
309 Ok(balance_change)
310 }
311 None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={tx_digest:?}"))),
312 }
313 }).collect::<Result<Vec<BalanceChange>, IndexerError>>()?
314 };
315 Some(balance_changes)
316 } else {
317 None
318 };
319
320 let raw_effects = options
321 .show_raw_effects
322 .then_some(self.raw_effects)
323 .unwrap_or_default();
324
325 Ok(IotaTransactionBlockResponse {
326 digest: tx_digest,
327 transaction,
328 raw_transaction,
329 effects,
330 events,
331 object_changes,
332 balance_changes,
333 timestamp_ms,
334 checkpoint,
335 confirmed_local_execution: None,
336 errors: vec![],
337 raw_effects,
338 })
339 }
340
341 fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
342 let sender_signed_data: SenderSignedData =
343 bcs::from_bytes(&self.raw_transaction).map_err(|e| {
344 IndexerError::PersistentStorageDataCorruption(format!(
345 "Can't convert raw_transaction of {} into SenderSignedData. Error: {e}",
346 self.tx_sequence_number
347 ))
348 })?;
349 Ok(sender_signed_data)
350 }
351
352 pub fn try_into_iota_transaction_effects(&self) -> IndexerResult<IotaTransactionBlockEffects> {
353 let effects: TransactionEffects = bcs::from_bytes(&self.raw_effects).map_err(|e| {
354 IndexerError::PersistentStorageDataCorruption(format!(
355 "Can't convert raw_effects of {} into TransactionEffects. Error: {e}",
356 self.tx_sequence_number
357 ))
358 })?;
359 let effects = IotaTransactionBlockEffects::try_from(effects)?;
360 Ok(effects)
361 }
362
363 pub fn is_genesis(&self) -> bool {
365 self.tx_sequence_number == 0
366 }
367}
368
369pub fn stored_events_to_events(
370 stored_events: StoredTransactionEvents,
371) -> Result<Vec<Event>, IndexerError> {
372 stored_events
373 .into_iter()
374 .map(|event| match event {
375 Some(event) => {
376 let event: Event = bcs::from_bytes(&event).map_err(|e| {
377 IndexerError::PersistentStorageDataCorruption(format!(
378 "Can't convert event bytes into Event. Error: {e}",
379 ))
380 })?;
381 Ok(event)
382 }
383 None => Err(IndexerError::PersistentStorageDataCorruption(
384 "Event should not be null".to_string(),
385 )),
386 })
387 .collect::<Result<Vec<Event>, IndexerError>>()
388}
389
390pub async fn tx_events_to_iota_tx_events(
391 tx_events: TransactionEvents,
392 package_resolver: Arc<Resolver<impl PackageStore>>,
393 tx_digest: TransactionDigest,
394 timestamp: u64,
395) -> Result<Option<IotaTransactionBlockEvents>, IndexerError> {
396 let mut iota_event_futures = vec![];
397 let tx_events_data_len = tx_events.data.len();
398 for tx_event in tx_events.data.clone() {
399 let package_resolver_clone = package_resolver.clone();
400 iota_event_futures.push(tokio::task::spawn(async move {
401 let resolver = package_resolver_clone;
402 resolver
403 .type_layout(TypeTag::Struct(Box::new(tx_event.type_.clone())))
404 .await
405 }));
406 }
407 let event_move_type_layouts = futures::future::join_all(iota_event_futures)
408 .await
409 .into_iter()
410 .collect::<Result<Vec<_>, _>>()?
411 .into_iter()
412 .collect::<Result<Vec<_>, _>>()
413 .map_err(|e| {
414 IndexerError::ResolveMoveStruct(format!(
415 "Failed to convert to iota event with Error: {e}",
416 ))
417 })?;
418 let event_move_datatype_layouts = event_move_type_layouts
419 .into_iter()
420 .filter_map(|move_type_layout| match move_type_layout {
421 MoveTypeLayout::Struct(s) => Some(MoveDatatypeLayout::Struct(s)),
422 MoveTypeLayout::Enum(e) => Some(MoveDatatypeLayout::Enum(e)),
423 _ => None,
424 })
425 .collect::<Vec<_>>();
426 assert!(tx_events_data_len == event_move_datatype_layouts.len());
427 let iota_events = tx_events
428 .data
429 .into_iter()
430 .enumerate()
431 .zip(event_move_datatype_layouts)
432 .map(|((seq, tx_event), move_datatype_layout)| {
433 IotaEvent::try_from(
434 tx_event,
435 tx_digest,
436 seq as u64,
437 Some(timestamp),
438 move_datatype_layout,
439 )
440 })
441 .collect::<Result<Vec<_>, _>>()?;
442 let iota_tx_events = IotaTransactionBlockEvents { data: iota_events };
443 Ok(Some(iota_tx_events))
444}