iota_analytics_indexer/handlers/
package_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use anyhow::Result;
8use fastcrypto::encoding::{Base64, Encoding};
9use iota_data_ingestion_core::Worker;
10use iota_rest_api::CheckpointData;
11use iota_types::{full_checkpoint_content::CheckpointTransaction, object::Object};
12use tokio::sync::Mutex;
13
14use crate::{FileType, handlers::AnalyticsHandler, tables::MovePackageEntry};
15
16pub struct PackageHandler {
17    state: Mutex<State>,
18}
19
20struct State {
21    packages: Vec<MovePackageEntry>,
22}
23
24#[async_trait::async_trait]
25impl Worker for PackageHandler {
26    type Message = ();
27    type Error = anyhow::Error;
28
29    async fn process_checkpoint(
30        &self,
31        checkpoint_data: Arc<CheckpointData>,
32    ) -> Result<Self::Message, Self::Error> {
33        let CheckpointData {
34            checkpoint_summary,
35            transactions: checkpoint_transactions,
36            ..
37        } = checkpoint_data.as_ref();
38        let mut state = self.state.lock().await;
39        for checkpoint_transaction in checkpoint_transactions {
40            self.process_transaction(
41                checkpoint_summary.epoch,
42                checkpoint_summary.sequence_number,
43                checkpoint_summary.timestamp_ms,
44                checkpoint_transaction,
45                &mut state,
46            )?;
47        }
48        Ok(())
49    }
50}
51
52#[async_trait::async_trait]
53impl AnalyticsHandler<MovePackageEntry> for PackageHandler {
54    async fn read(&self) -> Result<Vec<MovePackageEntry>> {
55        let mut state = self.state.lock().await;
56        let cloned = state.packages.clone();
57        state.packages.clear();
58        Ok(cloned)
59    }
60
61    fn file_type(&self) -> Result<FileType> {
62        Ok(FileType::MovePackage)
63    }
64
65    fn name(&self) -> &str {
66        "package"
67    }
68}
69
70impl PackageHandler {
71    pub fn new() -> Self {
72        let state = Mutex::new(State { packages: vec![] });
73        PackageHandler { state }
74    }
75    fn process_transaction(
76        &self,
77        epoch: u64,
78        checkpoint: u64,
79        timestamp_ms: u64,
80        checkpoint_transaction: &CheckpointTransaction,
81        state: &mut State,
82    ) -> Result<()> {
83        for object in checkpoint_transaction.output_objects.iter() {
84            self.process_package(epoch, checkpoint, timestamp_ms, object, state)?;
85        }
86        Ok(())
87    }
88    fn process_package(
89        &self,
90        epoch: u64,
91        checkpoint: u64,
92        timestamp_ms: u64,
93        object: &Object,
94        state: &mut State,
95    ) -> Result<()> {
96        if let iota_types::object::Data::Package(p) = &object.data {
97            let package_id = p.id();
98            let package_version = p.version().value();
99            let original_package_id = p.original_package_id();
100            let package = MovePackageEntry {
101                package_id: package_id.to_string(),
102                package_version: Some(package_version),
103                checkpoint,
104                epoch,
105                timestamp_ms,
106                bcs: Base64::encode(bcs::to_bytes(p).unwrap()),
107                transaction_digest: object.previous_transaction.to_string(),
108                original_package_id: Some(original_package_id.to_string()),
109            };
110            state.packages.push(package)
111        }
112        Ok(())
113    }
114}