iota_analytics_indexer/handlers/
object_handler.rs1use std::{path::Path, sync::Arc};
6
7use anyhow::Result;
8use fastcrypto::encoding::{Base64, Encoding};
9use iota_data_ingestion_core::Worker;
10use iota_json_rpc_types::IotaMoveStruct;
11use iota_package_resolver::Resolver;
12use iota_types::{
13 SYSTEM_PACKAGE_ADDRESSES,
14 effects::TransactionEffects,
15 full_checkpoint_content::{CheckpointData, CheckpointTransaction},
16 object::Object,
17};
18use tokio::sync::Mutex;
19
20use crate::{
21 FileType,
22 handlers::{
23 AnalyticsHandler, ObjectStatusTracker, get_move_struct, get_owner_address, get_owner_type,
24 initial_shared_version,
25 },
26 package_store::{LocalDBPackageStore, PackageCache},
27 tables::{ObjectEntry, ObjectStatus},
28};
29
30pub struct ObjectHandler {
31 state: Mutex<State>,
32}
33
34struct State {
35 objects: Vec<ObjectEntry>,
36 package_store: LocalDBPackageStore,
37 resolver: Resolver<PackageCache>,
38}
39
40#[async_trait::async_trait]
41impl Worker for ObjectHandler {
42 type Message = ();
43 type Error = anyhow::Error;
44
45 async fn process_checkpoint(
46 &self,
47 checkpoint_data: Arc<CheckpointData>,
48 ) -> Result<Self::Message, Self::Error> {
49 let CheckpointData {
50 checkpoint_summary,
51 transactions: checkpoint_transactions,
52 ..
53 } = checkpoint_data.as_ref();
54 let mut state = self.state.lock().await;
55 for checkpoint_transaction in checkpoint_transactions {
56 for object in checkpoint_transaction.output_objects.iter() {
57 state.package_store.update(object)?;
58 }
59 self.process_transaction(
60 checkpoint_summary.epoch,
61 checkpoint_summary.sequence_number,
62 checkpoint_summary.timestamp_ms,
63 checkpoint_transaction,
64 &checkpoint_transaction.effects,
65 &mut state,
66 )
67 .await?;
68 if checkpoint_summary.end_of_epoch_data.is_some() {
69 state
70 .resolver
71 .package_store()
72 .evict(SYSTEM_PACKAGE_ADDRESSES.iter().copied());
73 }
74 }
75 Ok(())
76 }
77}
78
79#[async_trait::async_trait]
80impl AnalyticsHandler<ObjectEntry> for ObjectHandler {
81 async fn read(&self) -> Result<Vec<ObjectEntry>> {
82 let mut state = self.state.lock().await;
83 let cloned = state.objects.clone();
84 state.objects.clear();
85 Ok(cloned)
86 }
87
88 fn file_type(&self) -> Result<FileType> {
89 Ok(FileType::Object)
90 }
91
92 fn name(&self) -> &str {
93 "object"
94 }
95}
96
97impl ObjectHandler {
98 pub fn new(store_path: &Path, rest_uri: &str) -> Self {
99 let package_store = LocalDBPackageStore::new(&store_path.join("object"), rest_uri);
100 let state = State {
101 objects: vec![],
102 package_store: package_store.clone(),
103 resolver: Resolver::new(PackageCache::new(package_store)),
104 };
105 Self {
106 state: Mutex::new(state),
107 }
108 }
109 async fn process_transaction(
110 &self,
111 epoch: u64,
112 checkpoint: u64,
113 timestamp_ms: u64,
114 checkpoint_transaction: &CheckpointTransaction,
115 effects: &TransactionEffects,
116 state: &mut State,
117 ) -> Result<()> {
118 let object_status_tracker = ObjectStatusTracker::new(effects);
119 for object in checkpoint_transaction.output_objects.iter() {
120 self.process_object(
121 epoch,
122 checkpoint,
123 timestamp_ms,
124 object,
125 &object_status_tracker,
126 state,
127 )
128 .await?;
129 }
130 for (object_ref, _) in effects.all_removed_objects().iter() {
131 let entry = ObjectEntry {
132 object_id: object_ref.0.to_string(),
133 digest: object_ref.2.to_string(),
134 version: u64::from(object_ref.1),
135 type_: None,
136 checkpoint,
137 epoch,
138 timestamp_ms,
139 owner_type: None,
140 owner_address: None,
141 object_status: ObjectStatus::Deleted,
142 initial_shared_version: None,
143 previous_transaction: checkpoint_transaction.transaction.digest().base58_encode(),
144 storage_rebate: None,
145 bcs: None,
146 coin_type: None,
147 coin_balance: None,
148 struct_tag: None,
149 object_json: None,
150 };
151 state.objects.push(entry);
152 }
153 Ok(())
154 }
155 async fn process_object(
158 &self,
159 epoch: u64,
160 checkpoint: u64,
161 timestamp_ms: u64,
162 object: &Object,
163 object_status_tracker: &ObjectStatusTracker,
164 state: &mut State,
165 ) -> Result<()> {
166 let move_obj_opt = object.data.try_as_move();
167 let move_struct = if let Some((tag, contents)) = object
168 .struct_tag()
169 .and_then(|tag| object.data.try_as_move().map(|mo| (tag, mo.contents())))
170 {
171 let move_struct = get_move_struct(&tag, contents, &state.resolver).await?;
172 Some(move_struct)
173 } else {
174 None
175 };
176 let (struct_tag, iota_move_struct) = if let Some(move_struct) = move_struct {
177 match move_struct.into() {
178 IotaMoveStruct::WithTypes { type_, fields } => {
179 (Some(type_), Some(IotaMoveStruct::WithFields(fields)))
180 }
181 fields => (object.struct_tag(), Some(fields)),
182 }
183 } else {
184 (None, None)
185 };
186 let object_type = move_obj_opt.map(|o| o.type_().to_string());
187 let object_id = object.id();
188 let entry = ObjectEntry {
189 object_id: object_id.to_string(),
190 digest: object.digest().to_string(),
191 version: object.version().value(),
192 type_: object_type,
193 checkpoint,
194 epoch,
195 timestamp_ms,
196 owner_type: Some(get_owner_type(object)),
197 owner_address: get_owner_address(object),
198 object_status: object_status_tracker
199 .get_object_status(&object_id)
200 .expect("object must be in output objects"),
201 initial_shared_version: initial_shared_version(object),
202 previous_transaction: object.previous_transaction.base58_encode(),
203 storage_rebate: Some(object.storage_rebate),
204 bcs: Some(Base64::encode(bcs::to_bytes(object).unwrap())),
205 coin_type: object.coin_type_maybe().map(|t| t.to_string()),
206 coin_balance: if object.coin_type_maybe().is_some() {
207 Some(object.get_coin_value_unsafe())
208 } else {
209 None
210 },
211 struct_tag: struct_tag.map(|x| x.to_string()),
212 object_json: iota_move_struct.map(|x| x.to_json_value().to_string()),
213 };
214 state.objects.push(entry);
215 Ok(())
216 }
217}