iota_analytics_indexer/handlers/
transaction_handler.rs1use std::{collections::BTreeSet, sync::Arc};
6
7use anyhow::Result;
8use fastcrypto::encoding::{Base64, Encoding};
9use iota_data_ingestion_core::Worker;
10use iota_rest_api::{CheckpointData, CheckpointTransaction};
11use iota_types::{
12 effects::{TransactionEffects, TransactionEffectsAPI},
13 transaction::{Command, TransactionDataAPI, TransactionKind},
14};
15use tokio::sync::Mutex;
16use tracing::error;
17
18use crate::{FileType, handlers::AnalyticsHandler, tables::TransactionEntry};
19
20pub struct TransactionHandler {
21 pub(crate) state: Mutex<State>,
22}
23
24pub(crate) struct State {
25 pub(crate) transactions: Vec<TransactionEntry>,
26}
27
28#[async_trait::async_trait]
29impl Worker for TransactionHandler {
30 type Message = ();
31 type Error = anyhow::Error;
32
33 async fn process_checkpoint(
34 &self,
35 checkpoint_data: Arc<CheckpointData>,
36 ) -> Result<Self::Message, Self::Error> {
37 let CheckpointData {
38 checkpoint_summary,
39 transactions: checkpoint_transactions,
40 ..
41 } = checkpoint_data.as_ref();
42 let mut state = self.state.lock().await;
43 for checkpoint_transaction in checkpoint_transactions {
44 self.process_transaction(
45 checkpoint_summary.epoch,
46 checkpoint_summary.sequence_number,
47 checkpoint_summary.timestamp_ms,
48 checkpoint_transaction,
49 &checkpoint_transaction.effects,
50 &mut state,
51 )?;
52 }
53 Ok(())
54 }
55}
56
57#[async_trait::async_trait]
58impl AnalyticsHandler<TransactionEntry> for TransactionHandler {
59 async fn read(&self) -> Result<Vec<TransactionEntry>> {
60 let mut state = self.state.lock().await;
61 let cloned = state.transactions.clone();
62 state.transactions.clear();
63 Ok(cloned)
64 }
65
66 fn file_type(&self) -> Result<FileType> {
67 Ok(FileType::Transaction)
68 }
69
70 fn name(&self) -> &str {
71 "transaction"
72 }
73}
74
75impl TransactionHandler {
76 pub fn new() -> Self {
77 let state = Mutex::new(State {
78 transactions: vec![],
79 });
80 TransactionHandler { state }
81 }
82 fn process_transaction(
83 &self,
84 epoch: u64,
85 checkpoint: u64,
86 timestamp_ms: u64,
87 checkpoint_transaction: &CheckpointTransaction,
88 effects: &TransactionEffects,
89 state: &mut State,
90 ) -> Result<()> {
91 let transaction = &checkpoint_transaction.transaction;
92 let txn_data = transaction.transaction_data();
93 let gas_object = effects.gas_object();
94 let gas_summary = effects.gas_cost_summary();
95 let move_calls_vec = txn_data.move_calls();
96 let packages: BTreeSet<_> = move_calls_vec
97 .iter()
98 .map(|(package, _, _)| package.to_canonical_string(false))
99 .collect();
100 let packages = packages
101 .iter()
102 .map(|s| s.as_str())
103 .collect::<Vec<_>>()
104 .join("-");
105 let transaction_digest = transaction.digest().base58_encode();
106
107 let mut transfers: u64 = 0;
108 let mut split_coins: u64 = 0;
109 let mut merge_coins: u64 = 0;
110 let mut publish: u64 = 0;
111 let mut upgrade: u64 = 0;
112 let mut others: u64 = 0;
113 let mut move_calls_count = 0;
114 let move_calls = move_calls_vec.len() as u64;
115
116 let is_sponsored_tx = txn_data.is_sponsored_tx();
117 let is_system_txn = txn_data.is_system_tx();
118 if !is_system_txn {
119 let kind = txn_data.kind();
120 if let TransactionKind::ProgrammableTransaction(pt) = txn_data.kind() {
121 for cmd in &pt.commands {
122 match cmd {
123 Command::MoveCall(_) => move_calls_count += 1,
124 Command::TransferObjects(_, _) => transfers += 1,
125 Command::SplitCoins(_, _) => split_coins += 1,
126 Command::MergeCoins(_, _) => merge_coins += 1,
127 Command::Publish(_, _) => publish += 1,
128 Command::Upgrade(_, _, _, _) => upgrade += 1,
129 _ => others += 1,
130 }
131 }
132 } else {
133 error!(
134 "Transaction kind [{kind}] is not programmable transaction and not a system transaction"
135 );
136 }
137 if move_calls_count != move_calls {
138 error!(
139 "Mismatch in move calls count: commands {move_calls_count} != {move_calls} calls"
140 );
141 }
142 }
143 let transaction_json = serde_json::to_string(&transaction)?;
144 let effects_json = serde_json::to_string(&checkpoint_transaction.effects)?;
145 let entry = TransactionEntry {
146 transaction_digest,
147 checkpoint,
148 epoch,
149 timestamp_ms,
150
151 sender: txn_data.sender().to_string(),
152 transaction_kind: txn_data.kind().name().to_owned(),
153 is_system_txn,
154 is_sponsored_tx,
155 transaction_count: txn_data.kind().num_commands() as u64,
156 execution_success: effects.status().is_ok(),
157 input: txn_data
158 .input_objects()
159 .expect("Input objects must be valid")
160 .len() as u64,
161 shared_input: txn_data.shared_input_objects().len() as u64,
162 gas_coins: txn_data.gas().len() as u64,
163 created: effects.created().len() as u64,
164 mutated: (effects.mutated().len() + effects.unwrapped().len()) as u64,
165 deleted: (effects.deleted().len()
166 + effects.unwrapped_then_deleted().len()
167 + effects.wrapped().len()) as u64,
168 transfers,
169 split_coins,
170 merge_coins,
171 publish,
172 upgrade,
173 others,
174 move_calls,
175 packages,
176 gas_owner: txn_data.gas_owner().to_string(),
177 gas_object_id: gas_object.0.0.to_string(),
178 gas_object_sequence: gas_object.0.1.value(),
179 gas_object_digest: gas_object.0.2.to_string(),
180 gas_budget: txn_data.gas_budget(),
181 total_gas_cost: gas_summary.net_gas_usage(),
182 computation_cost: gas_summary.computation_cost,
183 computation_cost_burned: gas_summary.computation_cost_burned,
184 storage_cost: gas_summary.storage_cost,
185 storage_rebate: gas_summary.storage_rebate,
186 non_refundable_storage_fee: gas_summary.non_refundable_storage_fee,
187
188 gas_price: txn_data.gas_price(),
189
190 raw_transaction: Base64::encode(bcs::to_bytes(&txn_data).unwrap()),
191
192 has_zklogin_sig: transaction.has_zklogin_sig(),
193 has_upgraded_multisig: transaction.has_upgraded_multisig(),
194 transaction_json: Some(transaction_json),
195 effects_json: Some(effects_json),
196 };
197 state.transactions.push(entry);
198 Ok(())
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use std::sync::Arc;
205
206 use fastcrypto::encoding::{Base64, Encoding};
207 use iota_data_ingestion_core::Worker;
208 use iota_types::{base_types::IotaAddress, storage::ReadStore};
209 use simulacrum::Simulacrum;
210
211 use crate::handlers::transaction_handler::TransactionHandler;
212
213 #[tokio::test]
214 pub async fn test_transaction_handler() -> anyhow::Result<()> {
215 let mut sim = Simulacrum::new();
216
217 let transfer_recipient = IotaAddress::random_for_testing_only();
219 let (transaction, _) = sim.transfer_txn(transfer_recipient);
220 let (_effects, err) = sim.execute_transaction(transaction.clone()).unwrap();
221 assert!(err.is_none());
222
223 let checkpoint = sim.create_checkpoint();
225 let checkpoint_data = sim.get_checkpoint_data(
226 checkpoint.clone(),
227 sim.get_checkpoint_contents_by_digest(&checkpoint.content_digest)?
228 .unwrap(),
229 )?;
230 let shared_checkpoint_data = Arc::new(checkpoint_data);
231 let txn_handler = TransactionHandler::new();
232 txn_handler
233 .process_checkpoint(shared_checkpoint_data)
234 .await?;
235 let transaction_entries = txn_handler.state.lock().await.transactions.clone();
236 assert_eq!(transaction_entries.len(), 1);
237 let db_txn = transaction_entries.first().unwrap();
238
239 assert_eq!(db_txn.transaction_digest, transaction.digest().to_string());
241 assert_eq!(
242 db_txn.raw_transaction,
243 Base64::encode(bcs::to_bytes(&transaction.transaction_data()).unwrap())
244 );
245 assert_eq!(db_txn.epoch, checkpoint.epoch);
246 assert_eq!(db_txn.timestamp_ms, checkpoint.timestamp_ms);
247 assert_eq!(db_txn.checkpoint, checkpoint.sequence_number);
248 Ok(())
249 }
250}