iota_analytics_indexer/handlers/
event_handler.rs1use std::{path::Path, sync::Arc};
6
7use anyhow::Result;
8use fastcrypto::encoding::{Base64, Encoding};
9use iota_data_ingestion_core::Worker;
10use iota_json_rpc_types::type_and_fields_from_move_event_data;
11use iota_package_resolver::Resolver;
12use iota_rest_api::CheckpointData;
13use iota_types::{
14 SYSTEM_PACKAGE_ADDRESSES, digests::TransactionDigest, effects::TransactionEvents, event::Event,
15};
16use move_core_types::annotated_value::MoveValue;
17use tokio::sync::Mutex;
18
19use crate::{
20 FileType,
21 handlers::AnalyticsHandler,
22 package_store::{LocalDBPackageStore, PackageCache},
23 tables::EventEntry,
24};
25
26pub struct EventHandler {
27 state: Mutex<State>,
28}
29
30struct State {
31 events: Vec<EventEntry>,
32 package_store: LocalDBPackageStore,
33 resolver: Resolver<PackageCache>,
34}
35
36#[async_trait::async_trait]
37impl Worker for EventHandler {
38 type Message = ();
39 type Error = anyhow::Error;
40
41 async fn process_checkpoint(
42 &self,
43 checkpoint_data: Arc<CheckpointData>,
44 ) -> Result<Self::Message, Self::Error> {
45 let CheckpointData {
46 checkpoint_summary,
47 transactions: checkpoint_transactions,
48 ..
49 } = checkpoint_data.as_ref();
50 let mut state = self.state.lock().await;
51 for checkpoint_transaction in checkpoint_transactions {
52 for object in checkpoint_transaction.output_objects.iter() {
53 state.package_store.update(object)?;
54 }
55 if let Some(events) = &checkpoint_transaction.events {
56 self.process_events(
57 checkpoint_summary.epoch,
58 checkpoint_summary.sequence_number,
59 checkpoint_transaction.transaction.digest(),
60 checkpoint_summary.timestamp_ms,
61 events,
62 &mut state,
63 )
64 .await?;
65 }
66 if checkpoint_summary.end_of_epoch_data.is_some() {
67 state
68 .resolver
69 .package_store()
70 .evict(SYSTEM_PACKAGE_ADDRESSES.iter().copied());
71 }
72 }
73 Ok(())
74 }
75}
76
77#[async_trait::async_trait]
78impl AnalyticsHandler<EventEntry> for EventHandler {
79 async fn read(&self) -> Result<Vec<EventEntry>> {
80 let mut state = self.state.lock().await;
81 let cloned = state.events.clone();
82 state.events.clear();
83 Ok(cloned)
84 }
85
86 fn file_type(&self) -> Result<FileType> {
87 Ok(FileType::Event)
88 }
89
90 fn name(&self) -> &str {
91 "event"
92 }
93}
94
95impl EventHandler {
96 pub fn new(store_path: &Path, rest_uri: &str) -> Self {
97 let package_store = LocalDBPackageStore::new(&store_path.join("event"), rest_uri);
98 let state = State {
99 events: vec![],
100 package_store: package_store.clone(),
101 resolver: Resolver::new(PackageCache::new(package_store)),
102 };
103 Self {
104 state: Mutex::new(state),
105 }
106 }
107 async fn process_events(
108 &self,
109 epoch: u64,
110 checkpoint: u64,
111 digest: &TransactionDigest,
112 timestamp_ms: u64,
113 events: &TransactionEvents,
114 state: &mut State,
115 ) -> Result<()> {
116 for (idx, event) in events.data.iter().enumerate() {
117 let Event {
118 package_id,
119 transaction_module,
120 sender,
121 type_,
122 contents,
123 } = event;
124 let layout = state
125 .resolver
126 .type_layout(move_core_types::language_storage::TypeTag::Struct(
127 Box::new(type_.clone()),
128 ))
129 .await?;
130 let move_value = MoveValue::simple_deserialize(contents, &layout)?;
131 let (_, event_json) = type_and_fields_from_move_event_data(move_value)?;
132 let entry = EventEntry {
133 transaction_digest: digest.base58_encode(),
134 event_index: idx as u64,
135 checkpoint,
136 epoch,
137 timestamp_ms,
138 sender: sender.to_string(),
139 package: package_id.to_string(),
140 module: transaction_module.to_string(),
141 event_type: type_.to_string(),
142 bcs: Base64::encode(contents.clone()),
143 event_json: event_json.to_string(),
144 };
145
146 state.events.push(entry);
147 }
148 Ok(())
149 }
150}