iota_analytics_indexer/handlers/
move_call_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use anyhow::Result;
8use iota_data_ingestion_core::Worker;
9use iota_rest_api::CheckpointData;
10use iota_types::{base_types::ObjectID, transaction::TransactionDataAPI};
11use move_core_types::identifier::IdentStr;
12use tokio::sync::Mutex;
13
14use crate::{FileType, handlers::AnalyticsHandler, tables::MoveCallEntry};
15
16pub struct MoveCallHandler {
17    state: Mutex<State>,
18}
19
20struct State {
21    move_calls: Vec<MoveCallEntry>,
22}
23
24#[async_trait::async_trait]
25impl Worker for MoveCallHandler {
26    type Message = ();
27    type Error = anyhow::Error;
28
29    async fn process_checkpoint(
30        &self,
31        checkpoint_data: Arc<CheckpointData>,
32    ) -> Result<Self::Message, Self::Error> {
33        let CheckpointData {
34            checkpoint_summary,
35            transactions: checkpoint_transactions,
36            ..
37        } = checkpoint_data.as_ref();
38        let mut state = self.state.lock().await;
39        for checkpoint_transaction in checkpoint_transactions {
40            let move_calls = checkpoint_transaction
41                .transaction
42                .transaction_data()
43                .move_calls();
44            self.process_move_calls(
45                checkpoint_summary.epoch,
46                checkpoint_summary.sequence_number,
47                checkpoint_summary.timestamp_ms,
48                checkpoint_transaction.transaction.digest().base58_encode(),
49                &move_calls,
50                &mut state,
51            );
52        }
53        Ok(())
54    }
55}
56
57#[async_trait::async_trait]
58impl AnalyticsHandler<MoveCallEntry> for MoveCallHandler {
59    async fn read(&self) -> Result<Vec<MoveCallEntry>> {
60        let mut state = self.state.lock().await;
61        let cloned = state.move_calls.clone();
62        state.move_calls.clear();
63        Ok(cloned)
64    }
65
66    fn file_type(&self) -> Result<FileType> {
67        Ok(FileType::MoveCall)
68    }
69
70    fn name(&self) -> &str {
71        "move_call"
72    }
73}
74
75impl MoveCallHandler {
76    pub fn new() -> Self {
77        let state = State { move_calls: vec![] };
78        Self {
79            state: Mutex::new(state),
80        }
81    }
82    fn process_move_calls(
83        &self,
84        epoch: u64,
85        checkpoint: u64,
86        timestamp_ms: u64,
87        transaction_digest: String,
88        move_calls: &[(&ObjectID, &IdentStr, &IdentStr)],
89        state: &mut State,
90    ) {
91        for (package, module, function) in move_calls.iter() {
92            let entry = MoveCallEntry {
93                transaction_digest: transaction_digest.clone(),
94                checkpoint,
95                epoch,
96                timestamp_ms,
97                package: package.to_string(),
98                module: module.to_string(),
99                function: function.to_string(),
100            };
101            state.move_calls.push(entry);
102        }
103    }
104}