iota_analytics_indexer/handlers/
checkpoint_handler.rs1use std::sync::Arc;
6
7use anyhow::Result;
8use fastcrypto::traits::EncodeDecodeBase64;
9use iota_data_ingestion_core::Worker;
10use iota_rest_api::{CheckpointData, CheckpointTransaction};
11use iota_types::{
12 effects::TransactionEffectsAPI,
13 messages_checkpoint::{CertifiedCheckpointSummary, CheckpointSummary},
14 transaction::TransactionDataAPI,
15};
16use tokio::sync::Mutex;
17
18use crate::{FileType, handlers::AnalyticsHandler, tables::CheckpointEntry};
19
20pub struct CheckpointHandler {
21 state: Mutex<State>,
22}
23
24struct State {
25 checkpoints: Vec<CheckpointEntry>,
26}
27
28#[async_trait::async_trait]
29impl Worker for CheckpointHandler {
30 type Message = ();
31 type Error = anyhow::Error;
32
33 async fn process_checkpoint(
34 &self,
35 checkpoint_data: Arc<CheckpointData>,
36 ) -> Result<Self::Message, Self::Error> {
37 let CheckpointData {
38 checkpoint_summary,
39 transactions: checkpoint_transactions,
40 ..
41 } = checkpoint_data.as_ref();
42 self.process_checkpoint_transactions(checkpoint_summary, checkpoint_transactions)
43 .await;
44 Ok(())
45 }
46}
47
48#[async_trait::async_trait]
49impl AnalyticsHandler<CheckpointEntry> for CheckpointHandler {
50 async fn read(&self) -> Result<Vec<CheckpointEntry>> {
51 let mut state = self.state.lock().await;
52 let cloned = state.checkpoints.clone();
53 state.checkpoints.clear();
54 Ok(cloned)
55 }
56
57 fn file_type(&self) -> Result<FileType> {
58 Ok(FileType::Checkpoint)
59 }
60
61 fn name(&self) -> &str {
62 "checkpoint"
63 }
64}
65
66impl CheckpointHandler {
67 pub fn new() -> Self {
68 CheckpointHandler {
69 state: Mutex::new(State {
70 checkpoints: vec![],
71 }),
72 }
73 }
74 async fn process_checkpoint_transactions(
75 &self,
76 summary: &CertifiedCheckpointSummary,
77 checkpoint_transactions: &[CheckpointTransaction],
78 ) {
79 let CheckpointSummary {
80 epoch,
81 sequence_number,
82 network_total_transactions,
83 previous_digest,
84 epoch_rolling_gas_cost_summary,
85 timestamp_ms,
86 end_of_epoch_data,
87 ..
88 } = summary.data();
89
90 let total_gas_cost = epoch_rolling_gas_cost_summary.computation_cost as i64
91 + epoch_rolling_gas_cost_summary.storage_cost as i64
92 - epoch_rolling_gas_cost_summary.storage_rebate as i64;
93 let total_transaction_blocks = checkpoint_transactions.len() as u64;
94 let mut total_transactions: u64 = 0;
95 let mut total_successful_transaction_blocks: u64 = 0;
96 let mut total_successful_transactions: u64 = 0;
97 for checkpoint_transaction in checkpoint_transactions {
98 let txn_data = checkpoint_transaction.transaction.transaction_data();
99 let cmds = txn_data.kind().num_commands() as u64;
100 total_transactions += cmds;
101 if checkpoint_transaction.effects.status().is_ok() {
102 total_successful_transaction_blocks += 1;
103 total_successful_transactions += cmds;
104 }
105 }
106
107 let checkpoint_entry = CheckpointEntry {
108 sequence_number: *sequence_number,
109 checkpoint_digest: summary.digest().base58_encode(),
110 previous_checkpoint_digest: previous_digest.map(|d| d.base58_encode()),
111 epoch: *epoch,
112 end_of_epoch: end_of_epoch_data.is_some(),
113 total_gas_cost,
114 computation_cost: epoch_rolling_gas_cost_summary.computation_cost,
115 computation_cost_burned: epoch_rolling_gas_cost_summary.computation_cost_burned,
116 storage_cost: epoch_rolling_gas_cost_summary.storage_cost,
117 storage_rebate: epoch_rolling_gas_cost_summary.storage_rebate,
118 non_refundable_storage_fee: epoch_rolling_gas_cost_summary.non_refundable_storage_fee,
119 total_transaction_blocks,
120 total_transactions,
121 total_successful_transaction_blocks,
122 total_successful_transactions,
123 network_total_transaction: *network_total_transactions,
124 timestamp_ms: *timestamp_ms,
125 validator_signature: summary.auth_sig().signature.encode_base64(),
126 };
127 let mut state = self.state.lock().await;
128 state.checkpoints.push(checkpoint_entry);
129 }
130}