iota_analytics_indexer/handlers/
object_handler.rs1use std::{path::Path, sync::Arc};
6
7use anyhow::Result;
8use fastcrypto::encoding::{Base64, Encoding};
9use iota_data_ingestion_core::Worker;
10use iota_json_rpc_types::IotaMoveStruct;
11use iota_package_resolver::Resolver;
12use iota_rest_api::{CheckpointData, CheckpointTransaction};
13use iota_types::{SYSTEM_PACKAGE_ADDRESSES, effects::TransactionEffects, object::Object};
14use tokio::sync::Mutex;
15
16use crate::{
17 FileType,
18 handlers::{
19 AnalyticsHandler, ObjectStatusTracker, get_move_struct, get_owner_address, get_owner_type,
20 initial_shared_version,
21 },
22 package_store::{LocalDBPackageStore, PackageCache},
23 tables::{ObjectEntry, ObjectStatus},
24};
25
26pub struct ObjectHandler {
27 state: Mutex<State>,
28}
29
30struct State {
31 objects: Vec<ObjectEntry>,
32 package_store: LocalDBPackageStore,
33 resolver: Resolver<PackageCache>,
34}
35
36#[async_trait::async_trait]
37impl Worker for ObjectHandler {
38 type Message = ();
39 type Error = anyhow::Error;
40
41 async fn process_checkpoint(
42 &self,
43 checkpoint_data: Arc<CheckpointData>,
44 ) -> Result<Self::Message, Self::Error> {
45 let CheckpointData {
46 checkpoint_summary,
47 transactions: checkpoint_transactions,
48 ..
49 } = checkpoint_data.as_ref();
50 let mut state = self.state.lock().await;
51 for checkpoint_transaction in checkpoint_transactions {
52 for object in checkpoint_transaction.output_objects.iter() {
53 state.package_store.update(object)?;
54 }
55 self.process_transaction(
56 checkpoint_summary.epoch,
57 checkpoint_summary.sequence_number,
58 checkpoint_summary.timestamp_ms,
59 checkpoint_transaction,
60 &checkpoint_transaction.effects,
61 &mut state,
62 )
63 .await?;
64 if checkpoint_summary.end_of_epoch_data.is_some() {
65 state
66 .resolver
67 .package_store()
68 .evict(SYSTEM_PACKAGE_ADDRESSES.iter().copied());
69 }
70 }
71 Ok(())
72 }
73}
74
75#[async_trait::async_trait]
76impl AnalyticsHandler<ObjectEntry> for ObjectHandler {
77 async fn read(&self) -> Result<Vec<ObjectEntry>> {
78 let mut state = self.state.lock().await;
79 let cloned = state.objects.clone();
80 state.objects.clear();
81 Ok(cloned)
82 }
83
84 fn file_type(&self) -> Result<FileType> {
85 Ok(FileType::Object)
86 }
87
88 fn name(&self) -> &str {
89 "object"
90 }
91}
92
93impl ObjectHandler {
94 pub fn new(store_path: &Path, rest_uri: &str) -> Self {
95 let package_store = LocalDBPackageStore::new(&store_path.join("object"), rest_uri);
96 let state = State {
97 objects: vec![],
98 package_store: package_store.clone(),
99 resolver: Resolver::new(PackageCache::new(package_store)),
100 };
101 Self {
102 state: Mutex::new(state),
103 }
104 }
105 async fn process_transaction(
106 &self,
107 epoch: u64,
108 checkpoint: u64,
109 timestamp_ms: u64,
110 checkpoint_transaction: &CheckpointTransaction,
111 effects: &TransactionEffects,
112 state: &mut State,
113 ) -> Result<()> {
114 let object_status_tracker = ObjectStatusTracker::new(effects);
115 for object in checkpoint_transaction.output_objects.iter() {
116 self.process_object(
117 epoch,
118 checkpoint,
119 timestamp_ms,
120 object,
121 &object_status_tracker,
122 state,
123 )
124 .await?;
125 }
126 for (object_ref, _) in effects.all_removed_objects().iter() {
127 let entry = ObjectEntry {
128 object_id: object_ref.0.to_string(),
129 digest: object_ref.2.to_string(),
130 version: u64::from(object_ref.1),
131 type_: None,
132 checkpoint,
133 epoch,
134 timestamp_ms,
135 owner_type: None,
136 owner_address: None,
137 object_status: ObjectStatus::Deleted,
138 initial_shared_version: None,
139 previous_transaction: checkpoint_transaction.transaction.digest().base58_encode(),
140 storage_rebate: None,
141 bcs: None,
142 coin_type: None,
143 coin_balance: None,
144 struct_tag: None,
145 object_json: None,
146 };
147 state.objects.push(entry);
148 }
149 Ok(())
150 }
151 async fn process_object(
154 &self,
155 epoch: u64,
156 checkpoint: u64,
157 timestamp_ms: u64,
158 object: &Object,
159 object_status_tracker: &ObjectStatusTracker,
160 state: &mut State,
161 ) -> Result<()> {
162 let move_obj_opt = object.data.try_as_move();
163 let move_struct = if let Some((tag, contents)) = object
164 .struct_tag()
165 .and_then(|tag| object.data.try_as_move().map(|mo| (tag, mo.contents())))
166 {
167 let move_struct = get_move_struct(&tag, contents, &state.resolver).await?;
168 Some(move_struct)
169 } else {
170 None
171 };
172 let (struct_tag, iota_move_struct) = if let Some(move_struct) = move_struct {
173 match move_struct.into() {
174 IotaMoveStruct::WithTypes { type_, fields } => {
175 (Some(type_), Some(IotaMoveStruct::WithFields(fields)))
176 }
177 fields => (object.struct_tag(), Some(fields)),
178 }
179 } else {
180 (None, None)
181 };
182 let object_type = move_obj_opt.map(|o| o.type_().to_string());
183 let object_id = object.id();
184 let entry = ObjectEntry {
185 object_id: object_id.to_string(),
186 digest: object.digest().to_string(),
187 version: object.version().value(),
188 type_: object_type,
189 checkpoint,
190 epoch,
191 timestamp_ms,
192 owner_type: Some(get_owner_type(object)),
193 owner_address: get_owner_address(object),
194 object_status: object_status_tracker
195 .get_object_status(&object_id)
196 .expect("Object must be in output objects"),
197 initial_shared_version: initial_shared_version(object),
198 previous_transaction: object.previous_transaction.base58_encode(),
199 storage_rebate: Some(object.storage_rebate),
200 bcs: Some(Base64::encode(bcs::to_bytes(object).unwrap())),
201 coin_type: object.coin_type_maybe().map(|t| t.to_string()),
202 coin_balance: if object.coin_type_maybe().is_some() {
203 Some(object.get_coin_value_unsafe())
204 } else {
205 None
206 },
207 struct_tag: struct_tag.map(|x| x.to_string()),
208 object_json: iota_move_struct.map(|x| x.to_json_value().to_string()),
209 };
210 state.objects.push(entry);
211 Ok(())
212 }
213}