iota_analytics_indexer/handlers/
package_handler.rs1use std::sync::Arc;
6
7use anyhow::Result;
8use fastcrypto::encoding::{Base64, Encoding};
9use iota_data_ingestion_core::Worker;
10use iota_rest_api::CheckpointData;
11use iota_types::{full_checkpoint_content::CheckpointTransaction, object::Object};
12use tokio::sync::Mutex;
13
14use crate::{FileType, handlers::AnalyticsHandler, tables::MovePackageEntry};
15
16pub struct PackageHandler {
17 state: Mutex<State>,
18}
19
20struct State {
21 packages: Vec<MovePackageEntry>,
22}
23
24#[async_trait::async_trait]
25impl Worker for PackageHandler {
26 type Message = ();
27 type Error = anyhow::Error;
28
29 async fn process_checkpoint(
30 &self,
31 checkpoint_data: Arc<CheckpointData>,
32 ) -> Result<Self::Message, Self::Error> {
33 let CheckpointData {
34 checkpoint_summary,
35 transactions: checkpoint_transactions,
36 ..
37 } = checkpoint_data.as_ref();
38 let mut state = self.state.lock().await;
39 for checkpoint_transaction in checkpoint_transactions {
40 self.process_transaction(
41 checkpoint_summary.epoch,
42 checkpoint_summary.sequence_number,
43 checkpoint_summary.timestamp_ms,
44 checkpoint_transaction,
45 &mut state,
46 )?;
47 }
48 Ok(())
49 }
50}
51
52#[async_trait::async_trait]
53impl AnalyticsHandler<MovePackageEntry> for PackageHandler {
54 async fn read(&self) -> Result<Vec<MovePackageEntry>> {
55 let mut state = self.state.lock().await;
56 let cloned = state.packages.clone();
57 state.packages.clear();
58 Ok(cloned)
59 }
60
61 fn file_type(&self) -> Result<FileType> {
62 Ok(FileType::MovePackage)
63 }
64
65 fn name(&self) -> &str {
66 "package"
67 }
68}
69
70impl PackageHandler {
71 pub fn new() -> Self {
72 let state = Mutex::new(State { packages: vec![] });
73 PackageHandler { state }
74 }
75 fn process_transaction(
76 &self,
77 epoch: u64,
78 checkpoint: u64,
79 timestamp_ms: u64,
80 checkpoint_transaction: &CheckpointTransaction,
81 state: &mut State,
82 ) -> Result<()> {
83 for object in checkpoint_transaction.output_objects.iter() {
84 self.process_package(epoch, checkpoint, timestamp_ms, object, state)?;
85 }
86 Ok(())
87 }
88 fn process_package(
89 &self,
90 epoch: u64,
91 checkpoint: u64,
92 timestamp_ms: u64,
93 object: &Object,
94 state: &mut State,
95 ) -> Result<()> {
96 if let iota_types::object::Data::Package(p) = &object.data {
97 let package_id = p.id();
98 let package_version = p.version().value();
99 let original_package_id = p.original_package_id();
100 let package = MovePackageEntry {
101 package_id: package_id.to_string(),
102 package_version: Some(package_version),
103 checkpoint,
104 epoch,
105 timestamp_ms,
106 bcs: Base64::encode(bcs::to_bytes(p).unwrap()),
107 transaction_digest: object.previous_transaction.to_string(),
108 original_package_id: Some(original_package_id.to_string()),
109 };
110 state.packages.push(package)
111 }
112 Ok(())
113 }
114}