iota_analytics_indexer/handlers/
wrapped_object_handler.rs1use std::{collections::BTreeMap, path::Path, sync::Arc};
6
7use anyhow::Result;
8use iota_data_ingestion_core::Worker;
9use iota_package_resolver::Resolver;
10use iota_rest_api::{CheckpointData, CheckpointTransaction};
11use iota_types::{SYSTEM_PACKAGE_ADDRESSES, object::Object};
12use tokio::sync::Mutex;
13
14use crate::{
15 FileType,
16 handlers::{AnalyticsHandler, get_move_struct, parse_struct},
17 package_store::{LocalDBPackageStore, PackageCache},
18 tables::WrappedObjectEntry,
19};
20
21pub struct WrappedObjectHandler {
22 state: Mutex<State>,
23}
24
25struct State {
26 wrapped_objects: Vec<WrappedObjectEntry>,
27 package_store: LocalDBPackageStore,
28 resolver: Resolver<PackageCache>,
29}
30
31#[async_trait::async_trait]
32impl Worker for WrappedObjectHandler {
33 type Message = ();
34 type Error = anyhow::Error;
35
36 async fn process_checkpoint(
37 &self,
38 checkpoint_data: Arc<CheckpointData>,
39 ) -> Result<Self::Message, Self::Error> {
40 let CheckpointData {
41 checkpoint_summary,
42 transactions: checkpoint_transactions,
43 ..
44 } = checkpoint_data.as_ref();
45 let mut state = self.state.lock().await;
46 for checkpoint_transaction in checkpoint_transactions {
47 for object in checkpoint_transaction.output_objects.iter() {
48 state.package_store.update(object)?;
49 }
50 self.process_transaction(
51 checkpoint_summary.epoch,
52 checkpoint_summary.sequence_number,
53 checkpoint_summary.timestamp_ms,
54 checkpoint_transaction,
55 &mut state,
56 )
57 .await?;
58 if checkpoint_summary.end_of_epoch_data.is_some() {
59 state
60 .resolver
61 .package_store()
62 .evict(SYSTEM_PACKAGE_ADDRESSES.iter().copied());
63 }
64 }
65 Ok(())
66 }
67}
68
69#[async_trait::async_trait]
70impl AnalyticsHandler<WrappedObjectEntry> for WrappedObjectHandler {
71 async fn read(&self) -> Result<Vec<WrappedObjectEntry>> {
72 let mut state = self.state.lock().await;
73 let cloned = state.wrapped_objects.clone();
74 state.wrapped_objects.clear();
75 Ok(cloned)
76 }
77
78 fn file_type(&self) -> Result<FileType> {
79 Ok(FileType::WrappedObject)
80 }
81
82 fn name(&self) -> &str {
83 "wrapped_object"
84 }
85}
86
87impl WrappedObjectHandler {
88 pub fn new(store_path: &Path, rest_uri: &str) -> Self {
89 let package_store = LocalDBPackageStore::new(&store_path.join("wrapped_object"), rest_uri);
90 let state = Mutex::new(State {
91 wrapped_objects: vec![],
92 package_store: package_store.clone(),
93 resolver: Resolver::new(PackageCache::new(package_store)),
94 });
95 WrappedObjectHandler { state }
96 }
97 async fn process_transaction(
98 &self,
99 epoch: u64,
100 checkpoint: u64,
101 timestamp_ms: u64,
102 checkpoint_transaction: &CheckpointTransaction,
103 state: &mut State,
104 ) -> Result<()> {
105 for object in checkpoint_transaction.output_objects.iter() {
106 self.process_object(epoch, checkpoint, timestamp_ms, object, state)
107 .await?;
108 }
109 Ok(())
110 }
111
112 async fn process_object(
113 &self,
114 epoch: u64,
115 checkpoint: u64,
116 timestamp_ms: u64,
117 object: &Object,
118 state: &mut State,
119 ) -> Result<()> {
120 let move_struct = if let Some((tag, contents)) = object
121 .struct_tag()
122 .and_then(|tag| object.data.try_as_move().map(|mo| (tag, mo.contents())))
123 {
124 let move_struct = get_move_struct(&tag, contents, &state.resolver).await?;
125 Some(move_struct)
126 } else {
127 None
128 };
129 let mut wrapped_structs = BTreeMap::new();
130 if let Some(move_struct) = move_struct {
131 parse_struct("$", move_struct, &mut wrapped_structs);
132 }
133 for (json_path, wrapped_struct) in wrapped_structs.iter() {
134 let entry = WrappedObjectEntry {
135 object_id: wrapped_struct.object_id.map(|id| id.to_string()),
136 root_object_id: object.id().to_string(),
137 root_object_version: object.version().value(),
138 checkpoint,
139 epoch,
140 timestamp_ms,
141 json_path: json_path.to_string(),
142 struct_tag: wrapped_struct.struct_tag.clone().map(|tag| tag.to_string()),
143 };
144 state.wrapped_objects.push(entry);
145 }
146 Ok(())
147 }
148}