iota_analytics_indexer/handlers/
transaction_objects_handler.rs1use std::sync::Arc;
6
7use anyhow::Result;
8use iota_data_ingestion_core::Worker;
9use iota_rest_api::{CheckpointData, CheckpointTransaction};
10use iota_types::{
11 base_types::ObjectID, effects::TransactionEffects, transaction::TransactionDataAPI,
12};
13use tokio::sync::Mutex;
14
15use crate::{
16 FileType,
17 handlers::{AnalyticsHandler, InputObjectTracker, ObjectStatusTracker},
18 tables::TransactionObjectEntry,
19};
20
21pub struct TransactionObjectsHandler {
22 state: Mutex<State>,
23}
24
25struct State {
26 transaction_objects: Vec<TransactionObjectEntry>,
27}
28
29#[async_trait::async_trait]
30impl Worker for TransactionObjectsHandler {
31 type Message = ();
32 type Error = anyhow::Error;
33
34 async fn process_checkpoint(
35 &self,
36 checkpoint_data: Arc<CheckpointData>,
37 ) -> Result<Self::Message, Self::Error> {
38 let CheckpointData {
39 checkpoint_summary,
40 transactions: checkpoint_transactions,
41 ..
42 } = checkpoint_data.as_ref();
43 let mut state = self.state.lock().await;
44 for checkpoint_transaction in checkpoint_transactions {
45 self.process_transaction(
46 checkpoint_summary.epoch,
47 checkpoint_summary.sequence_number,
48 checkpoint_summary.timestamp_ms,
49 checkpoint_transaction,
50 &checkpoint_transaction.effects,
51 &mut state,
52 );
53 }
54 Ok(())
55 }
56}
57
58#[async_trait::async_trait]
59impl AnalyticsHandler<TransactionObjectEntry> for TransactionObjectsHandler {
60 async fn read(&self) -> Result<Vec<TransactionObjectEntry>> {
61 let mut state = self.state.lock().await;
62 let cloned = state.transaction_objects.clone();
63 state.transaction_objects.clear();
64 Ok(cloned)
65 }
66
67 fn file_type(&self) -> Result<FileType> {
68 Ok(FileType::TransactionObjects)
69 }
70
71 fn name(&self) -> &str {
72 "transaction_objects"
73 }
74}
75
76impl TransactionObjectsHandler {
77 pub fn new() -> Self {
78 TransactionObjectsHandler {
79 state: Mutex::new(State {
80 transaction_objects: vec![],
81 }),
82 }
83 }
84 fn process_transaction(
85 &self,
86 epoch: u64,
87 checkpoint: u64,
88 timestamp_ms: u64,
89 checkpoint_transaction: &CheckpointTransaction,
90 effects: &TransactionEffects,
91 state: &mut State,
92 ) {
93 let transaction = &checkpoint_transaction.transaction;
94 let transaction_digest = transaction.digest().base58_encode();
95 let txn_data = transaction.transaction_data();
96 let input_object_tracker = InputObjectTracker::new(txn_data);
97 let object_status_tracker = ObjectStatusTracker::new(effects);
98 txn_data
100 .input_objects()
101 .expect("Input objects must be valid")
102 .iter()
103 .map(|object| (object.object_id(), object.version().map(|v| v.value())))
104 .for_each(|(object_id, version)| {
105 self.process_transaction_object(
106 epoch,
107 checkpoint,
108 timestamp_ms,
109 transaction_digest.clone(),
110 &object_id,
111 version,
112 &input_object_tracker,
113 &object_status_tracker,
114 state,
115 )
116 });
117 checkpoint_transaction
119 .output_objects
120 .iter()
121 .map(|object| (object.id(), Some(object.version().value())))
122 .for_each(|(object_id, version)| {
123 self.process_transaction_object(
124 epoch,
125 checkpoint,
126 timestamp_ms,
127 transaction_digest.clone(),
128 &object_id,
129 version,
130 &input_object_tracker,
131 &object_status_tracker,
132 state,
133 )
134 });
135 }
136 fn process_transaction_object(
139 &self,
140 epoch: u64,
141 checkpoint: u64,
142 timestamp_ms: u64,
143 transaction_digest: String,
144 object_id: &ObjectID,
145 version: Option<u64>,
146 input_object_tracker: &InputObjectTracker,
147 object_status_tracker: &ObjectStatusTracker,
148 state: &mut State,
149 ) {
150 let entry = TransactionObjectEntry {
151 object_id: object_id.to_string(),
152 version,
153 transaction_digest,
154 checkpoint,
155 epoch,
156 timestamp_ms,
157 input_kind: input_object_tracker.get_input_object_kind(object_id),
158 object_status: object_status_tracker.get_object_status(object_id),
159 };
160 state.transaction_objects.push(entry);
161 }
162}