iota_analytics_indexer/handlers/
event_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{path::Path, sync::Arc};
6
7use anyhow::Result;
8use fastcrypto::encoding::{Base64, Encoding};
9use iota_data_ingestion_core::Worker;
10use iota_json_rpc_types::type_and_fields_from_move_event_data;
11use iota_package_resolver::Resolver;
12use iota_rest_api::CheckpointData;
13use iota_types::{
14    SYSTEM_PACKAGE_ADDRESSES, digests::TransactionDigest, effects::TransactionEvents, event::Event,
15};
16use move_core_types::annotated_value::MoveValue;
17use tokio::sync::Mutex;
18
19use crate::{
20    FileType,
21    handlers::AnalyticsHandler,
22    package_store::{LocalDBPackageStore, PackageCache},
23    tables::EventEntry,
24};
25
26pub struct EventHandler {
27    state: Mutex<State>,
28}
29
30struct State {
31    events: Vec<EventEntry>,
32    package_store: LocalDBPackageStore,
33    resolver: Resolver<PackageCache>,
34}
35
36#[async_trait::async_trait]
37impl Worker for EventHandler {
38    type Message = ();
39    type Error = anyhow::Error;
40
41    async fn process_checkpoint(
42        &self,
43        checkpoint_data: Arc<CheckpointData>,
44    ) -> Result<Self::Message, Self::Error> {
45        let CheckpointData {
46            checkpoint_summary,
47            transactions: checkpoint_transactions,
48            ..
49        } = checkpoint_data.as_ref();
50        let mut state = self.state.lock().await;
51        for checkpoint_transaction in checkpoint_transactions {
52            for object in checkpoint_transaction.output_objects.iter() {
53                state.package_store.update(object)?;
54            }
55            if let Some(events) = &checkpoint_transaction.events {
56                self.process_events(
57                    checkpoint_summary.epoch,
58                    checkpoint_summary.sequence_number,
59                    checkpoint_transaction.transaction.digest(),
60                    checkpoint_summary.timestamp_ms,
61                    events,
62                    &mut state,
63                )
64                .await?;
65            }
66            if checkpoint_summary.end_of_epoch_data.is_some() {
67                state
68                    .resolver
69                    .package_store()
70                    .evict(SYSTEM_PACKAGE_ADDRESSES.iter().copied());
71            }
72        }
73        Ok(())
74    }
75}
76
77#[async_trait::async_trait]
78impl AnalyticsHandler<EventEntry> for EventHandler {
79    async fn read(&self) -> Result<Vec<EventEntry>> {
80        let mut state = self.state.lock().await;
81        let cloned = state.events.clone();
82        state.events.clear();
83        Ok(cloned)
84    }
85
86    fn file_type(&self) -> Result<FileType> {
87        Ok(FileType::Event)
88    }
89
90    fn name(&self) -> &str {
91        "event"
92    }
93}
94
95impl EventHandler {
96    pub fn new(store_path: &Path, rest_uri: &str) -> Self {
97        let package_store = LocalDBPackageStore::new(&store_path.join("event"), rest_uri);
98        let state = State {
99            events: vec![],
100            package_store: package_store.clone(),
101            resolver: Resolver::new(PackageCache::new(package_store)),
102        };
103        Self {
104            state: Mutex::new(state),
105        }
106    }
107    async fn process_events(
108        &self,
109        epoch: u64,
110        checkpoint: u64,
111        digest: &TransactionDigest,
112        timestamp_ms: u64,
113        events: &TransactionEvents,
114        state: &mut State,
115    ) -> Result<()> {
116        for (idx, event) in events.data.iter().enumerate() {
117            let Event {
118                package_id,
119                transaction_module,
120                sender,
121                type_,
122                contents,
123            } = event;
124            let layout = state
125                .resolver
126                .type_layout(move_core_types::language_storage::TypeTag::Struct(
127                    Box::new(type_.clone()),
128                ))
129                .await?;
130            let move_value = MoveValue::simple_deserialize(contents, &layout)?;
131            let (_, event_json) = type_and_fields_from_move_event_data(move_value)?;
132            let entry = EventEntry {
133                transaction_digest: digest.base58_encode(),
134                event_index: idx as u64,
135                checkpoint,
136                epoch,
137                timestamp_ms,
138                sender: sender.to_string(),
139                package: package_id.to_string(),
140                module: transaction_module.to_string(),
141                event_type: type_.to_string(),
142                bcs: Base64::encode(contents.clone()),
143                event_json: event_json.to_string(),
144            };
145
146            state.events.push(entry);
147        }
148        Ok(())
149    }
150}