iota_analytics_indexer/handlers/
df_handler.rs1use std::{collections::HashMap, path::Path, sync::Arc};
6
7use anyhow::Result;
8use fastcrypto::encoding::{Base64, Encoding};
9use iota_data_ingestion_core::Worker;
10use iota_indexer::{errors::IndexerError, types::owner_to_owner_info};
11use iota_json_rpc_types::IotaMoveValue;
12use iota_package_resolver::Resolver;
13use iota_rest_api::{CheckpointData, CheckpointTransaction};
14use iota_types::{
15 SYSTEM_PACKAGE_ADDRESSES, TypeTag,
16 base_types::ObjectID,
17 dynamic_field::{DynamicFieldName, DynamicFieldType, visitor as DFV},
18 object::{Object, bounded_visitor::BoundedVisitor},
19};
20use tap::tap::TapFallible;
21use tokio::sync::Mutex;
22use tracing::error;
23
24use crate::{
25 FileType,
26 handlers::AnalyticsHandler,
27 package_store::{LocalDBPackageStore, PackageCache},
28 tables::DynamicFieldEntry,
29};
30
31pub struct DynamicFieldHandler {
32 state: Mutex<State>,
33}
34
35struct State {
36 dynamic_fields: Vec<DynamicFieldEntry>,
37 package_store: LocalDBPackageStore,
38 resolver: Resolver<PackageCache>,
39}
40
41#[async_trait::async_trait]
42impl Worker for DynamicFieldHandler {
43 type Message = ();
44 type Error = anyhow::Error;
45
46 async fn process_checkpoint(
47 &self,
48 checkpoint_data: Arc<CheckpointData>,
49 ) -> Result<Self::Message, Self::Error> {
50 let CheckpointData {
51 checkpoint_summary,
52 transactions: checkpoint_transactions,
53 ..
54 } = checkpoint_data.as_ref();
55 let mut state = self.state.lock().await;
56 for checkpoint_transaction in checkpoint_transactions {
57 for object in checkpoint_transaction.output_objects.iter() {
58 state.package_store.update(object)?;
59 }
60 self.process_transaction(
61 checkpoint_summary.epoch,
62 checkpoint_summary.sequence_number,
63 checkpoint_summary.timestamp_ms,
64 checkpoint_transaction,
65 &mut state,
66 )
67 .await?;
68 if checkpoint_summary.end_of_epoch_data.is_some() {
69 state
70 .resolver
71 .package_store()
72 .evict(SYSTEM_PACKAGE_ADDRESSES.iter().copied());
73 }
74 }
75 Ok(())
76 }
77}
78
79#[async_trait::async_trait]
80impl AnalyticsHandler<DynamicFieldEntry> for DynamicFieldHandler {
81 async fn read(&self) -> Result<Vec<DynamicFieldEntry>> {
82 let mut state = self.state.lock().await;
83 let cloned = state.dynamic_fields.clone();
84 state.dynamic_fields.clear();
85 Ok(cloned)
86 }
87
88 fn file_type(&self) -> Result<FileType> {
89 Ok(FileType::DynamicField)
90 }
91
92 fn name(&self) -> &str {
93 "dynamic_field"
94 }
95}
96
97impl DynamicFieldHandler {
98 pub fn new(store_path: &Path, rest_uri: &str) -> Self {
99 let package_store = LocalDBPackageStore::new(&store_path.join("dynamic_field"), rest_uri);
100 let state = State {
101 dynamic_fields: vec![],
102 package_store: package_store.clone(),
103 resolver: Resolver::new(PackageCache::new(package_store)),
104 };
105 Self {
106 state: Mutex::new(state),
107 }
108 }
109 async fn process_dynamic_field(
110 &self,
111 epoch: u64,
112 checkpoint: u64,
113 timestamp_ms: u64,
114 object: &Object,
115 all_written_objects: &HashMap<ObjectID, Object>,
116 state: &mut State,
117 ) -> Result<()> {
118 let move_obj_opt = object.data.try_as_move();
119 let Some(move_object) = move_obj_opt else {
121 return Ok(());
122 };
123 if !move_object.type_().is_dynamic_field() {
124 return Ok(());
125 }
126 let layout = state
127 .resolver
128 .type_layout(move_object.type_().clone().into())
129 .await?;
130 let object_id = object.id();
131
132 let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)?;
133
134 let type_ = field.kind;
135 let name_type: TypeTag = field.name_layout.into();
136 let bcs_name = field.name_bytes.to_owned();
137
138 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
139 .tap_err(|e| {
140 error!("{e}");
141 })?;
142 let name = DynamicFieldName {
143 type_: name_type,
144 value: IotaMoveValue::from(name_value).to_json_value(),
145 };
146 let name_json = serde_json::to_string(&name)?;
147 let (_owner_type, owner_id) = owner_to_owner_info(&object.owner);
148 let Some(parent_id) = owner_id else {
149 return Ok(());
150 };
151 let entry = match type_ {
152 DynamicFieldType::DynamicField => DynamicFieldEntry {
153 parent_object_id: parent_id.to_string(),
154 transaction_digest: object.previous_transaction.base58_encode(),
155 checkpoint,
156 epoch,
157 timestamp_ms,
158 name: name_json,
159 bcs_name: Base64::encode(bcs_name),
160 type_,
161 object_id: object.id().to_string(),
162 version: object.version().value(),
163 digest: object.digest().to_string(),
164 object_type: move_object.clone().into_type().into_type_params()[1]
165 .to_canonical_string(true),
166 },
167 DynamicFieldType::DynamicObject => {
168 let object = all_written_objects.get(&object_id).ok_or(
169 IndexerError::Uncategorized(anyhow::anyhow!(
170 "Failed to find object_id {:?} when trying to create dynamic field info",
171 object_id
172 )),
173 )?;
174 let version = object.version().value();
175 let digest = object.digest().to_string();
176 let object_type = object.data.type_().unwrap().clone();
177 DynamicFieldEntry {
178 parent_object_id: parent_id.to_string(),
179 transaction_digest: object.previous_transaction.base58_encode(),
180 checkpoint,
181 epoch,
182 timestamp_ms,
183 name: name_json,
184 bcs_name: Base64::encode(bcs_name),
185 type_,
186 object_id: object.id().to_string(),
187 digest,
188 version,
189 object_type: object_type.to_canonical_string(true),
190 }
191 }
192 };
193 state.dynamic_fields.push(entry);
194 Ok(())
195 }
196
197 async fn process_transaction(
198 &self,
199 epoch: u64,
200 checkpoint: u64,
201 timestamp_ms: u64,
202 checkpoint_transaction: &CheckpointTransaction,
203 state: &mut State,
204 ) -> Result<()> {
205 let all_objects: HashMap<_, _> = checkpoint_transaction
206 .output_objects
207 .iter()
208 .map(|x| (x.id(), x.clone()))
209 .collect();
210 for object in checkpoint_transaction.output_objects.iter() {
211 self.process_dynamic_field(
212 epoch,
213 checkpoint,
214 timestamp_ms,
215 object,
216 &all_objects,
217 state,
218 )
219 .await?;
220 }
221 Ok(())
222 }
223}