1use std::sync::Arc;
6
7use diesel::prelude::*;
8use iota_json_rpc_types::{
9 BalanceChange, IotaEvent, IotaTransactionBlock, IotaTransactionBlockEffects,
10 IotaTransactionBlockEvents, IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
11 ObjectChange,
12};
13use iota_package_resolver::{PackageStore, Resolver};
14use iota_types::{
15 digests::TransactionDigest,
16 effects::{TransactionEffects, TransactionEvents},
17 event::Event,
18 transaction::SenderSignedData,
19};
20use move_core_types::{
21 annotated_value::{MoveDatatypeLayout, MoveTypeLayout},
22 language_storage::TypeTag,
23};
24
25use crate::{
26 errors::IndexerError,
27 schema::{optimistic_transactions, transactions, tx_insertion_order},
28 types::{IndexedObjectChange, IndexedTransaction, IndexerResult},
29};
30
31#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
32#[diesel(table_name = tx_insertion_order)]
33pub struct TxInsertionOrder {
34 #[diesel(skip_insertion)]
41 pub insertion_order: i64,
42 pub tx_digest: Vec<u8>,
43}
44
45#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
46#[diesel(table_name = transactions)]
47pub struct StoredTransaction {
48 pub tx_sequence_number: i64,
51 pub transaction_digest: Vec<u8>,
52 pub raw_transaction: Vec<u8>,
53 pub raw_effects: Vec<u8>,
54 pub checkpoint_sequence_number: i64,
55 pub timestamp_ms: i64,
56 pub object_changes: Vec<Option<Vec<u8>>>,
57 pub balance_changes: Vec<Option<Vec<u8>>>,
58 pub events: Vec<Option<Vec<u8>>>,
59 pub transaction_kind: i16,
60 pub success_command_count: i16,
61}
62
63#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
64#[diesel(table_name = optimistic_transactions)]
65pub struct OptimisticTransaction {
66 pub insertion_order: i64,
67 pub transaction_digest: Vec<u8>,
68 pub raw_transaction: Vec<u8>,
69 pub raw_effects: Vec<u8>,
70 pub object_changes: Vec<Option<Vec<u8>>>,
71 pub balance_changes: Vec<Option<Vec<u8>>>,
72 pub events: Vec<Option<Vec<u8>>>,
73 pub transaction_kind: i16,
74 pub success_command_count: i16,
75}
76
77impl From<OptimisticTransaction> for StoredTransaction {
78 fn from(tx: OptimisticTransaction) -> Self {
79 StoredTransaction {
80 tx_sequence_number: tx.insertion_order,
81 transaction_digest: tx.transaction_digest,
82 raw_transaction: tx.raw_transaction,
83 raw_effects: tx.raw_effects,
84 checkpoint_sequence_number: -1,
85 timestamp_ms: -1,
86 object_changes: tx.object_changes,
87 balance_changes: tx.balance_changes,
88 events: tx.events,
89 transaction_kind: tx.transaction_kind,
90 success_command_count: tx.success_command_count,
91 }
92 }
93}
94
95impl From<StoredTransaction> for OptimisticTransaction {
96 fn from(tx: StoredTransaction) -> Self {
97 OptimisticTransaction {
98 insertion_order: tx.tx_sequence_number,
99 transaction_digest: tx.transaction_digest,
100 raw_transaction: tx.raw_transaction,
101 raw_effects: tx.raw_effects,
102 object_changes: tx.object_changes,
103 balance_changes: tx.balance_changes,
104 events: tx.events,
105 transaction_kind: tx.transaction_kind,
106 success_command_count: tx.success_command_count,
107 }
108 }
109}
110
111pub type StoredTransactionEvents = Vec<Option<Vec<u8>>>;
112
113#[derive(Debug, Queryable)]
114pub struct TxSeq {
115 pub seq: i64,
116}
117
118impl Default for TxSeq {
119 fn default() -> Self {
120 Self { seq: -1 }
121 }
122}
123
124#[derive(Clone, Debug, Queryable)]
125pub struct StoredTransactionTimestamp {
126 pub tx_sequence_number: i64,
127 pub timestamp_ms: i64,
128}
129
130#[derive(Clone, Debug, Queryable)]
131pub struct StoredTransactionCheckpoint {
132 pub tx_sequence_number: i64,
133 pub checkpoint_sequence_number: i64,
134}
135
136#[derive(Clone, Debug, Queryable)]
137pub struct StoredTransactionSuccessCommandCount {
138 pub tx_sequence_number: i64,
139 pub checkpoint_sequence_number: i64,
140 pub success_command_count: i16,
141 pub timestamp_ms: i64,
142}
143
144impl From<&IndexedTransaction> for StoredTransaction {
145 fn from(tx: &IndexedTransaction) -> Self {
146 StoredTransaction {
147 tx_sequence_number: tx.tx_sequence_number as i64,
148 transaction_digest: tx.tx_digest.into_inner().to_vec(),
149 raw_transaction: bcs::to_bytes(&tx.sender_signed_data).unwrap(),
150 raw_effects: bcs::to_bytes(&tx.effects).unwrap(),
151 checkpoint_sequence_number: tx.checkpoint_sequence_number as i64,
152 object_changes: tx
153 .object_changes
154 .iter()
155 .map(|oc| Some(bcs::to_bytes(&oc).unwrap()))
156 .collect(),
157 balance_changes: tx
158 .balance_change
159 .iter()
160 .map(|bc| Some(bcs::to_bytes(&bc).unwrap()))
161 .collect(),
162 events: tx
163 .events
164 .iter()
165 .map(|e| Some(bcs::to_bytes(&e).unwrap()))
166 .collect(),
167 timestamp_ms: tx.timestamp_ms as i64,
168 transaction_kind: tx.transaction_kind as i16,
169 success_command_count: tx.successful_tx_num as i16,
170 }
171 }
172}
173
174impl StoredTransaction {
175 pub fn get_balance_len(&self) -> usize {
176 self.balance_changes.len()
177 }
178
179 pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
180 self.balance_changes.get(idx).cloned().flatten()
181 }
182
183 pub fn get_object_len(&self) -> usize {
184 self.object_changes.len()
185 }
186
187 pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
188 self.object_changes.get(idx).cloned().flatten()
189 }
190
191 pub fn get_event_len(&self) -> usize {
192 self.events.len()
193 }
194
195 pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
196 self.events.get(idx).cloned().flatten()
197 }
198
199 pub fn is_checkpointed_transaction(&self) -> bool {
202 self.checkpoint_sequence_number >= 0
203 }
204
205 pub async fn try_into_iota_transaction_block_response(
206 self,
207 options: IotaTransactionBlockResponseOptions,
208 package_resolver: Arc<Resolver<impl PackageStore>>,
209 ) -> IndexerResult<IotaTransactionBlockResponse> {
210 let options = options.clone();
211 let tx_digest =
212 TransactionDigest::try_from(self.transaction_digest.as_slice()).map_err(|e| {
213 IndexerError::PersistentStorageDataCorruption(format!(
214 "Can't convert {:?} as tx_digest. Error: {e}",
215 self.transaction_digest
216 ))
217 })?;
218
219 let timestamp_ms = self
220 .is_checkpointed_transaction()
221 .then_some(self.timestamp_ms as u64);
222 let checkpoint = self
223 .is_checkpointed_transaction()
224 .then_some(self.checkpoint_sequence_number as u64);
225
226 let transaction = if options.show_input {
227 let sender_signed_data = self.try_into_sender_signed_data()?;
228 let tx_block = IotaTransactionBlock::try_from_with_package_resolver(
229 sender_signed_data,
230 package_resolver.clone(),
231 tx_digest,
232 )
233 .await?;
234 Some(tx_block)
235 } else {
236 None
237 };
238
239 let effects = options
240 .show_effects
241 .then(|| self.try_into_iota_transaction_effects())
242 .transpose()?;
243
244 let raw_transaction = options
245 .show_raw_input
246 .then_some(self.raw_transaction)
247 .unwrap_or_default();
248
249 let events = if options.show_events {
250 let events = {
251 self
252 .events
253 .into_iter()
254 .map(|event| match event {
255 Some(event) => {
256 let event: Event = bcs::from_bytes(&event).map_err(|e| {
257 IndexerError::PersistentStorageDataCorruption(format!(
258 "Can't convert event bytes into Event. tx_digest={:?} Error: {e}",
259 tx_digest
260 ))
261 })?;
262 Ok(event)
263 }
264 None => Err(IndexerError::PersistentStorageDataCorruption(format!(
265 "Event should not be null, tx_digest={:?}",
266 tx_digest
267 ))),
268 })
269 .collect::<Result<Vec<Event>, IndexerError>>()?
270 };
271 let timestamp = self.timestamp_ms as u64;
272 let tx_events = TransactionEvents { data: events };
273
274 tx_events_to_iota_tx_events(tx_events, package_resolver, tx_digest, timestamp).await?
275 } else {
276 None
277 };
278
279 let object_changes = if options.show_object_changes {
280 let object_changes = {
281 self.object_changes.into_iter().map(|object_change| {
282 match object_change {
283 Some(object_change) => {
284 let object_change: IndexedObjectChange = bcs::from_bytes(&object_change)
285 .map_err(|e| IndexerError::PersistentStorageDataCorruption(
286 format!("Can't convert object_change bytes into IndexedObjectChange. tx_digest={:?} Error: {e}", tx_digest)
287 ))?;
288 Ok(ObjectChange::from(object_change))
289 }
290 None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={:?}", tx_digest))),
291 }
292 }).collect::<Result<Vec<ObjectChange>, IndexerError>>()?
293 };
294 Some(object_changes)
295 } else {
296 None
297 };
298
299 let balance_changes = if options.show_balance_changes {
300 let balance_changes = {
301 self.balance_changes.into_iter().map(|balance_change| {
302 match balance_change {
303 Some(balance_change) => {
304 let balance_change: BalanceChange = bcs::from_bytes(&balance_change)
305 .map_err(|e| IndexerError::PersistentStorageDataCorruption(
306 format!("Can't convert balance_change bytes into BalanceChange. tx_digest={:?} Error: {e}", tx_digest)
307 ))?;
308 Ok(balance_change)
309 }
310 None => Err(IndexerError::PersistentStorageDataCorruption(format!("object_change should not be null, tx_digest={:?}", tx_digest))),
311 }
312 }).collect::<Result<Vec<BalanceChange>, IndexerError>>()?
313 };
314 Some(balance_changes)
315 } else {
316 None
317 };
318
319 let raw_effects = options
320 .show_raw_effects
321 .then_some(self.raw_effects)
322 .unwrap_or_default();
323
324 Ok(IotaTransactionBlockResponse {
325 digest: tx_digest,
326 transaction,
327 raw_transaction,
328 effects,
329 events,
330 object_changes,
331 balance_changes,
332 timestamp_ms,
333 checkpoint,
334 confirmed_local_execution: None,
335 errors: vec![],
336 raw_effects,
337 })
338 }
339
340 fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
341 let sender_signed_data: SenderSignedData =
342 bcs::from_bytes(&self.raw_transaction).map_err(|e| {
343 IndexerError::PersistentStorageDataCorruption(format!(
344 "Can't convert raw_transaction of {} into SenderSignedData. Error: {e}",
345 self.tx_sequence_number
346 ))
347 })?;
348 Ok(sender_signed_data)
349 }
350
351 pub fn try_into_iota_transaction_effects(&self) -> IndexerResult<IotaTransactionBlockEffects> {
352 let effects: TransactionEffects = bcs::from_bytes(&self.raw_effects).map_err(|e| {
353 IndexerError::PersistentStorageDataCorruption(format!(
354 "Can't convert raw_effects of {} into TransactionEffects. Error: {e}",
355 self.tx_sequence_number
356 ))
357 })?;
358 let effects = IotaTransactionBlockEffects::try_from(effects)?;
359 Ok(effects)
360 }
361
362 pub fn is_genesis(&self) -> bool {
364 self.tx_sequence_number == 0
365 }
366}
367
368pub fn stored_events_to_events(
369 stored_events: StoredTransactionEvents,
370) -> Result<Vec<Event>, IndexerError> {
371 stored_events
372 .into_iter()
373 .map(|event| match event {
374 Some(event) => {
375 let event: Event = bcs::from_bytes(&event).map_err(|e| {
376 IndexerError::PersistentStorageDataCorruption(format!(
377 "Can't convert event bytes into Event. Error: {e}",
378 ))
379 })?;
380 Ok(event)
381 }
382 None => Err(IndexerError::PersistentStorageDataCorruption(
383 "Event should not be null".to_string(),
384 )),
385 })
386 .collect::<Result<Vec<Event>, IndexerError>>()
387}
388
389pub async fn tx_events_to_iota_tx_events(
390 tx_events: TransactionEvents,
391 package_resolver: Arc<Resolver<impl PackageStore>>,
392 tx_digest: TransactionDigest,
393 timestamp: u64,
394) -> Result<Option<IotaTransactionBlockEvents>, IndexerError> {
395 let mut iota_event_futures = vec![];
396 let tx_events_data_len = tx_events.data.len();
397 for tx_event in tx_events.data.clone() {
398 let package_resolver_clone = package_resolver.clone();
399 iota_event_futures.push(tokio::task::spawn(async move {
400 let resolver = package_resolver_clone;
401 resolver
402 .type_layout(TypeTag::Struct(Box::new(tx_event.type_.clone())))
403 .await
404 }));
405 }
406 let event_move_type_layouts = futures::future::join_all(iota_event_futures)
407 .await
408 .into_iter()
409 .collect::<Result<Vec<_>, _>>()?
410 .into_iter()
411 .collect::<Result<Vec<_>, _>>()
412 .map_err(|e| {
413 IndexerError::ResolveMoveStruct(format!(
414 "Failed to convert to iota event with Error: {e}",
415 ))
416 })?;
417 let event_move_datatype_layouts = event_move_type_layouts
418 .into_iter()
419 .filter_map(|move_type_layout| match move_type_layout {
420 MoveTypeLayout::Struct(s) => Some(MoveDatatypeLayout::Struct(s)),
421 MoveTypeLayout::Enum(e) => Some(MoveDatatypeLayout::Enum(e)),
422 _ => None,
423 })
424 .collect::<Vec<_>>();
425 assert!(tx_events_data_len == event_move_datatype_layouts.len());
426 let iota_events = tx_events
427 .data
428 .into_iter()
429 .enumerate()
430 .zip(event_move_datatype_layouts)
431 .map(|((seq, tx_event), move_datatype_layout)| {
432 IotaEvent::try_from(
433 tx_event,
434 tx_digest,
435 seq as u64,
436 Some(timestamp),
437 move_datatype_layout,
438 )
439 })
440 .collect::<Result<Vec<_>, _>>()?;
441 let iota_tx_events = IotaTransactionBlockEvents { data: iota_events };
442 Ok(Some(iota_tx_events))
443}