iota_analytics_indexer/handlers/
checkpoint_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use anyhow::Result;
8use fastcrypto::traits::EncodeDecodeBase64;
9use iota_data_ingestion_core::Worker;
10use iota_rest_api::{CheckpointData, CheckpointTransaction};
11use iota_types::{
12    effects::TransactionEffectsAPI,
13    messages_checkpoint::{CertifiedCheckpointSummary, CheckpointSummary},
14    transaction::TransactionDataAPI,
15};
16use tokio::sync::Mutex;
17
18use crate::{FileType, handlers::AnalyticsHandler, tables::CheckpointEntry};
19
20pub struct CheckpointHandler {
21    state: Mutex<State>,
22}
23
24struct State {
25    checkpoints: Vec<CheckpointEntry>,
26}
27
28#[async_trait::async_trait]
29impl Worker for CheckpointHandler {
30    type Message = ();
31    type Error = anyhow::Error;
32
33    async fn process_checkpoint(
34        &self,
35        checkpoint_data: Arc<CheckpointData>,
36    ) -> Result<Self::Message, Self::Error> {
37        let CheckpointData {
38            checkpoint_summary,
39            transactions: checkpoint_transactions,
40            ..
41        } = checkpoint_data.as_ref();
42        self.process_checkpoint_transactions(checkpoint_summary, checkpoint_transactions)
43            .await;
44        Ok(())
45    }
46}
47
48#[async_trait::async_trait]
49impl AnalyticsHandler<CheckpointEntry> for CheckpointHandler {
50    async fn read(&self) -> Result<Vec<CheckpointEntry>> {
51        let mut state = self.state.lock().await;
52        let cloned = state.checkpoints.clone();
53        state.checkpoints.clear();
54        Ok(cloned)
55    }
56
57    fn file_type(&self) -> Result<FileType> {
58        Ok(FileType::Checkpoint)
59    }
60
61    fn name(&self) -> &str {
62        "checkpoint"
63    }
64}
65
66impl CheckpointHandler {
67    pub fn new() -> Self {
68        CheckpointHandler {
69            state: Mutex::new(State {
70                checkpoints: vec![],
71            }),
72        }
73    }
74    async fn process_checkpoint_transactions(
75        &self,
76        summary: &CertifiedCheckpointSummary,
77        checkpoint_transactions: &[CheckpointTransaction],
78    ) {
79        let CheckpointSummary {
80            epoch,
81            sequence_number,
82            network_total_transactions,
83            previous_digest,
84            epoch_rolling_gas_cost_summary,
85            timestamp_ms,
86            end_of_epoch_data,
87            ..
88        } = summary.data();
89
90        let total_gas_cost = epoch_rolling_gas_cost_summary.computation_cost as i64
91            + epoch_rolling_gas_cost_summary.storage_cost as i64
92            - epoch_rolling_gas_cost_summary.storage_rebate as i64;
93        let total_transaction_blocks = checkpoint_transactions.len() as u64;
94        let mut total_transactions: u64 = 0;
95        let mut total_successful_transaction_blocks: u64 = 0;
96        let mut total_successful_transactions: u64 = 0;
97        for checkpoint_transaction in checkpoint_transactions {
98            let txn_data = checkpoint_transaction.transaction.transaction_data();
99            let cmds = txn_data.kind().num_commands() as u64;
100            total_transactions += cmds;
101            if checkpoint_transaction.effects.status().is_ok() {
102                total_successful_transaction_blocks += 1;
103                total_successful_transactions += cmds;
104            }
105        }
106
107        let checkpoint_entry = CheckpointEntry {
108            sequence_number: *sequence_number,
109            checkpoint_digest: summary.digest().base58_encode(),
110            previous_checkpoint_digest: previous_digest.map(|d| d.base58_encode()),
111            epoch: *epoch,
112            end_of_epoch: end_of_epoch_data.is_some(),
113            total_gas_cost,
114            computation_cost: epoch_rolling_gas_cost_summary.computation_cost,
115            computation_cost_burned: epoch_rolling_gas_cost_summary.computation_cost_burned,
116            storage_cost: epoch_rolling_gas_cost_summary.storage_cost,
117            storage_rebate: epoch_rolling_gas_cost_summary.storage_rebate,
118            non_refundable_storage_fee: epoch_rolling_gas_cost_summary.non_refundable_storage_fee,
119            total_transaction_blocks,
120            total_transactions,
121            total_successful_transaction_blocks,
122            total_successful_transactions,
123            network_total_transaction: *network_total_transactions,
124            timestamp_ms: *timestamp_ms,
125            validator_signature: summary.auth_sig().signature.encode_base64(),
126        };
127        let mut state = self.state.lock().await;
128        state.checkpoints.push(checkpoint_entry);
129    }
130}