iota_analytics_indexer/handlers/
transaction_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::BTreeSet, sync::Arc};
6
7use anyhow::Result;
8use fastcrypto::encoding::{Base64, Encoding};
9use iota_data_ingestion_core::Worker;
10use iota_rest_api::{CheckpointData, CheckpointTransaction};
11use iota_types::{
12    effects::{TransactionEffects, TransactionEffectsAPI},
13    transaction::{Command, TransactionDataAPI, TransactionKind},
14};
15use tokio::sync::Mutex;
16use tracing::error;
17
18use crate::{FileType, handlers::AnalyticsHandler, tables::TransactionEntry};
19
20pub struct TransactionHandler {
21    pub(crate) state: Mutex<State>,
22}
23
24pub(crate) struct State {
25    pub(crate) transactions: Vec<TransactionEntry>,
26}
27
28#[async_trait::async_trait]
29impl Worker for TransactionHandler {
30    type Message = ();
31    type Error = anyhow::Error;
32
33    async fn process_checkpoint(
34        &self,
35        checkpoint_data: Arc<CheckpointData>,
36    ) -> Result<Self::Message, Self::Error> {
37        let CheckpointData {
38            checkpoint_summary,
39            transactions: checkpoint_transactions,
40            ..
41        } = checkpoint_data.as_ref();
42        let mut state = self.state.lock().await;
43        for checkpoint_transaction in checkpoint_transactions {
44            self.process_transaction(
45                checkpoint_summary.epoch,
46                checkpoint_summary.sequence_number,
47                checkpoint_summary.timestamp_ms,
48                checkpoint_transaction,
49                &checkpoint_transaction.effects,
50                &mut state,
51            )?;
52        }
53        Ok(())
54    }
55}
56
57#[async_trait::async_trait]
58impl AnalyticsHandler<TransactionEntry> for TransactionHandler {
59    async fn read(&self) -> Result<Vec<TransactionEntry>> {
60        let mut state = self.state.lock().await;
61        let cloned = state.transactions.clone();
62        state.transactions.clear();
63        Ok(cloned)
64    }
65
66    fn file_type(&self) -> Result<FileType> {
67        Ok(FileType::Transaction)
68    }
69
70    fn name(&self) -> &str {
71        "transaction"
72    }
73}
74
75impl TransactionHandler {
76    pub fn new() -> Self {
77        let state = Mutex::new(State {
78            transactions: vec![],
79        });
80        TransactionHandler { state }
81    }
82    fn process_transaction(
83        &self,
84        epoch: u64,
85        checkpoint: u64,
86        timestamp_ms: u64,
87        checkpoint_transaction: &CheckpointTransaction,
88        effects: &TransactionEffects,
89        state: &mut State,
90    ) -> Result<()> {
91        let transaction = &checkpoint_transaction.transaction;
92        let txn_data = transaction.transaction_data();
93        let gas_object = effects.gas_object();
94        let gas_summary = effects.gas_cost_summary();
95        let move_calls_vec = txn_data.move_calls();
96        let packages: BTreeSet<_> = move_calls_vec
97            .iter()
98            .map(|(package, _, _)| package.to_canonical_string(/* with_prefix */ false))
99            .collect();
100        let packages = packages
101            .iter()
102            .map(|s| s.as_str())
103            .collect::<Vec<_>>()
104            .join("-");
105        let transaction_digest = transaction.digest().base58_encode();
106
107        let mut transfers: u64 = 0;
108        let mut split_coins: u64 = 0;
109        let mut merge_coins: u64 = 0;
110        let mut publish: u64 = 0;
111        let mut upgrade: u64 = 0;
112        let mut others: u64 = 0;
113        let mut move_calls_count = 0;
114        let move_calls = move_calls_vec.len() as u64;
115
116        let is_sponsored_tx = txn_data.is_sponsored_tx();
117        let is_system_txn = txn_data.is_system_tx();
118        if !is_system_txn {
119            let kind = txn_data.kind();
120            if let TransactionKind::ProgrammableTransaction(pt) = txn_data.kind() {
121                for cmd in &pt.commands {
122                    match cmd {
123                        Command::MoveCall(_) => move_calls_count += 1,
124                        Command::TransferObjects(_, _) => transfers += 1,
125                        Command::SplitCoins(_, _) => split_coins += 1,
126                        Command::MergeCoins(_, _) => merge_coins += 1,
127                        Command::Publish(_, _) => publish += 1,
128                        Command::Upgrade(_, _, _, _) => upgrade += 1,
129                        _ => others += 1,
130                    }
131                }
132            } else {
133                error!(
134                    "Transaction kind [{kind}] is not programmable transaction and not a system transaction"
135                );
136            }
137            if move_calls_count != move_calls {
138                error!(
139                    "Mismatch in move calls count: commands {move_calls_count} != {move_calls} calls"
140                );
141            }
142        }
143        let transaction_json = serde_json::to_string(&transaction)?;
144        let effects_json = serde_json::to_string(&checkpoint_transaction.effects)?;
145        let entry = TransactionEntry {
146            transaction_digest,
147            checkpoint,
148            epoch,
149            timestamp_ms,
150
151            sender: txn_data.sender().to_string(),
152            transaction_kind: txn_data.kind().name().to_owned(),
153            is_system_txn,
154            is_sponsored_tx,
155            transaction_count: txn_data.kind().num_commands() as u64,
156            execution_success: effects.status().is_ok(),
157            input: txn_data
158                .input_objects()
159                .expect("Input objects must be valid")
160                .len() as u64,
161            shared_input: txn_data.shared_input_objects().len() as u64,
162            gas_coins: txn_data.gas().len() as u64,
163            created: effects.created().len() as u64,
164            mutated: (effects.mutated().len() + effects.unwrapped().len()) as u64,
165            deleted: (effects.deleted().len()
166                + effects.unwrapped_then_deleted().len()
167                + effects.wrapped().len()) as u64,
168            transfers,
169            split_coins,
170            merge_coins,
171            publish,
172            upgrade,
173            others,
174            move_calls,
175            packages,
176            gas_owner: txn_data.gas_owner().to_string(),
177            gas_object_id: gas_object.0.0.to_string(),
178            gas_object_sequence: gas_object.0.1.value(),
179            gas_object_digest: gas_object.0.2.to_string(),
180            gas_budget: txn_data.gas_budget(),
181            total_gas_cost: gas_summary.net_gas_usage(),
182            computation_cost: gas_summary.computation_cost,
183            computation_cost_burned: gas_summary.computation_cost_burned,
184            storage_cost: gas_summary.storage_cost,
185            storage_rebate: gas_summary.storage_rebate,
186            non_refundable_storage_fee: gas_summary.non_refundable_storage_fee,
187
188            gas_price: txn_data.gas_price(),
189
190            raw_transaction: Base64::encode(bcs::to_bytes(&txn_data).unwrap()),
191
192            has_zklogin_sig: transaction.has_zklogin_sig(),
193            has_upgraded_multisig: transaction.has_upgraded_multisig(),
194            transaction_json: Some(transaction_json),
195            effects_json: Some(effects_json),
196        };
197        state.transactions.push(entry);
198        Ok(())
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use std::sync::Arc;
205
206    use fastcrypto::encoding::{Base64, Encoding};
207    use iota_data_ingestion_core::Worker;
208    use iota_types::{base_types::IotaAddress, storage::ReadStore};
209    use simulacrum::Simulacrum;
210
211    use crate::handlers::transaction_handler::TransactionHandler;
212
213    #[tokio::test]
214    pub async fn test_transaction_handler() -> anyhow::Result<()> {
215        let mut sim = Simulacrum::new();
216
217        // Execute a simple transaction.
218        let transfer_recipient = IotaAddress::random_for_testing_only();
219        let (transaction, _) = sim.transfer_txn(transfer_recipient);
220        let (_effects, err) = sim.execute_transaction(transaction.clone()).unwrap();
221        assert!(err.is_none());
222
223        // Create a checkpoint which should include the transaction we executed.
224        let checkpoint = sim.create_checkpoint();
225        let checkpoint_data = sim.get_checkpoint_data(
226            checkpoint.clone(),
227            sim.get_checkpoint_contents_by_digest(&checkpoint.content_digest)?
228                .unwrap(),
229        )?;
230        let shared_checkpoint_data = Arc::new(checkpoint_data);
231        let txn_handler = TransactionHandler::new();
232        txn_handler
233            .process_checkpoint(shared_checkpoint_data)
234            .await?;
235        let transaction_entries = txn_handler.state.lock().await.transactions.clone();
236        assert_eq!(transaction_entries.len(), 1);
237        let db_txn = transaction_entries.first().unwrap();
238
239        // Check that the transaction was stored correctly.
240        assert_eq!(db_txn.transaction_digest, transaction.digest().to_string());
241        assert_eq!(
242            db_txn.raw_transaction,
243            Base64::encode(bcs::to_bytes(&transaction.transaction_data()).unwrap())
244        );
245        assert_eq!(db_txn.epoch, checkpoint.epoch);
246        assert_eq!(db_txn.timestamp_ms, checkpoint.timestamp_ms);
247        assert_eq!(db_txn.checkpoint, checkpoint.sequence_number);
248        Ok(())
249    }
250}