iota_analytics_indexer/handlers/
transaction_objects_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use anyhow::Result;
8use iota_data_ingestion_core::Worker;
9use iota_rest_api::{CheckpointData, CheckpointTransaction};
10use iota_types::{
11    base_types::ObjectID, effects::TransactionEffects, transaction::TransactionDataAPI,
12};
13use tokio::sync::Mutex;
14
15use crate::{
16    FileType,
17    handlers::{AnalyticsHandler, InputObjectTracker, ObjectStatusTracker},
18    tables::TransactionObjectEntry,
19};
20
21pub struct TransactionObjectsHandler {
22    state: Mutex<State>,
23}
24
25struct State {
26    transaction_objects: Vec<TransactionObjectEntry>,
27}
28
29#[async_trait::async_trait]
30impl Worker for TransactionObjectsHandler {
31    type Message = ();
32    type Error = anyhow::Error;
33
34    async fn process_checkpoint(
35        &self,
36        checkpoint_data: Arc<CheckpointData>,
37    ) -> Result<Self::Message, Self::Error> {
38        let CheckpointData {
39            checkpoint_summary,
40            transactions: checkpoint_transactions,
41            ..
42        } = checkpoint_data.as_ref();
43        let mut state = self.state.lock().await;
44        for checkpoint_transaction in checkpoint_transactions {
45            self.process_transaction(
46                checkpoint_summary.epoch,
47                checkpoint_summary.sequence_number,
48                checkpoint_summary.timestamp_ms,
49                checkpoint_transaction,
50                &checkpoint_transaction.effects,
51                &mut state,
52            );
53        }
54        Ok(())
55    }
56}
57
58#[async_trait::async_trait]
59impl AnalyticsHandler<TransactionObjectEntry> for TransactionObjectsHandler {
60    async fn read(&self) -> Result<Vec<TransactionObjectEntry>> {
61        let mut state = self.state.lock().await;
62        let cloned = state.transaction_objects.clone();
63        state.transaction_objects.clear();
64        Ok(cloned)
65    }
66
67    fn file_type(&self) -> Result<FileType> {
68        Ok(FileType::TransactionObjects)
69    }
70
71    fn name(&self) -> &str {
72        "transaction_objects"
73    }
74}
75
76impl TransactionObjectsHandler {
77    pub fn new() -> Self {
78        TransactionObjectsHandler {
79            state: Mutex::new(State {
80                transaction_objects: vec![],
81            }),
82        }
83    }
84    fn process_transaction(
85        &self,
86        epoch: u64,
87        checkpoint: u64,
88        timestamp_ms: u64,
89        checkpoint_transaction: &CheckpointTransaction,
90        effects: &TransactionEffects,
91        state: &mut State,
92    ) {
93        let transaction = &checkpoint_transaction.transaction;
94        let transaction_digest = transaction.digest().base58_encode();
95        let txn_data = transaction.transaction_data();
96        let input_object_tracker = InputObjectTracker::new(txn_data);
97        let object_status_tracker = ObjectStatusTracker::new(effects);
98        // input
99        txn_data
100            .input_objects()
101            .expect("Input objects must be valid")
102            .iter()
103            .map(|object| (object.object_id(), object.version().map(|v| v.value())))
104            .for_each(|(object_id, version)| {
105                self.process_transaction_object(
106                    epoch,
107                    checkpoint,
108                    timestamp_ms,
109                    transaction_digest.clone(),
110                    &object_id,
111                    version,
112                    &input_object_tracker,
113                    &object_status_tracker,
114                    state,
115                )
116            });
117        // output
118        checkpoint_transaction
119            .output_objects
120            .iter()
121            .map(|object| (object.id(), Some(object.version().value())))
122            .for_each(|(object_id, version)| {
123                self.process_transaction_object(
124                    epoch,
125                    checkpoint,
126                    timestamp_ms,
127                    transaction_digest.clone(),
128                    &object_id,
129                    version,
130                    &input_object_tracker,
131                    &object_status_tracker,
132                    state,
133                )
134            });
135    }
136    // Transaction object data.
137    // Builds a view of the object in input and output of a transaction.
138    fn process_transaction_object(
139        &self,
140        epoch: u64,
141        checkpoint: u64,
142        timestamp_ms: u64,
143        transaction_digest: String,
144        object_id: &ObjectID,
145        version: Option<u64>,
146        input_object_tracker: &InputObjectTracker,
147        object_status_tracker: &ObjectStatusTracker,
148        state: &mut State,
149    ) {
150        let entry = TransactionObjectEntry {
151            object_id: object_id.to_string(),
152            version,
153            transaction_digest,
154            checkpoint,
155            epoch,
156            timestamp_ms,
157            input_kind: input_object_tracker.get_input_object_kind(object_id),
158            object_status: object_status_tracker.get_object_status(object_id),
159        };
160        state.transaction_objects.push(entry);
161    }
162}