iota_analytics_indexer/handlers/
df_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, path::Path, sync::Arc};
6
7use anyhow::Result;
8use fastcrypto::encoding::{Base64, Encoding};
9use iota_data_ingestion_core::Worker;
10use iota_indexer::{errors::IndexerError, types::owner_to_owner_info};
11use iota_json_rpc_types::IotaMoveValue;
12use iota_package_resolver::Resolver;
13use iota_rest_api::{CheckpointData, CheckpointTransaction};
14use iota_types::{
15    SYSTEM_PACKAGE_ADDRESSES, TypeTag,
16    base_types::ObjectID,
17    dynamic_field::{DynamicFieldName, DynamicFieldType, visitor as DFV},
18    object::{Object, bounded_visitor::BoundedVisitor},
19};
20use tap::tap::TapFallible;
21use tokio::sync::Mutex;
22use tracing::error;
23
24use crate::{
25    FileType,
26    handlers::AnalyticsHandler,
27    package_store::{LocalDBPackageStore, PackageCache},
28    tables::DynamicFieldEntry,
29};
30
31pub struct DynamicFieldHandler {
32    state: Mutex<State>,
33}
34
35struct State {
36    dynamic_fields: Vec<DynamicFieldEntry>,
37    package_store: LocalDBPackageStore,
38    resolver: Resolver<PackageCache>,
39}
40
41#[async_trait::async_trait]
42impl Worker for DynamicFieldHandler {
43    type Message = ();
44    type Error = anyhow::Error;
45
46    async fn process_checkpoint(
47        &self,
48        checkpoint_data: Arc<CheckpointData>,
49    ) -> Result<Self::Message, Self::Error> {
50        let CheckpointData {
51            checkpoint_summary,
52            transactions: checkpoint_transactions,
53            ..
54        } = checkpoint_data.as_ref();
55        let mut state = self.state.lock().await;
56        for checkpoint_transaction in checkpoint_transactions {
57            for object in checkpoint_transaction.output_objects.iter() {
58                state.package_store.update(object)?;
59            }
60            self.process_transaction(
61                checkpoint_summary.epoch,
62                checkpoint_summary.sequence_number,
63                checkpoint_summary.timestamp_ms,
64                checkpoint_transaction,
65                &mut state,
66            )
67            .await?;
68            if checkpoint_summary.end_of_epoch_data.is_some() {
69                state
70                    .resolver
71                    .package_store()
72                    .evict(SYSTEM_PACKAGE_ADDRESSES.iter().copied());
73            }
74        }
75        Ok(())
76    }
77}
78
79#[async_trait::async_trait]
80impl AnalyticsHandler<DynamicFieldEntry> for DynamicFieldHandler {
81    async fn read(&self) -> Result<Vec<DynamicFieldEntry>> {
82        let mut state = self.state.lock().await;
83        let cloned = state.dynamic_fields.clone();
84        state.dynamic_fields.clear();
85        Ok(cloned)
86    }
87
88    fn file_type(&self) -> Result<FileType> {
89        Ok(FileType::DynamicField)
90    }
91
92    fn name(&self) -> &str {
93        "dynamic_field"
94    }
95}
96
97impl DynamicFieldHandler {
98    pub fn new(store_path: &Path, rest_uri: &str) -> Self {
99        let package_store = LocalDBPackageStore::new(&store_path.join("dynamic_field"), rest_uri);
100        let state = State {
101            dynamic_fields: vec![],
102            package_store: package_store.clone(),
103            resolver: Resolver::new(PackageCache::new(package_store)),
104        };
105        Self {
106            state: Mutex::new(state),
107        }
108    }
109    async fn process_dynamic_field(
110        &self,
111        epoch: u64,
112        checkpoint: u64,
113        timestamp_ms: u64,
114        object: &Object,
115        all_written_objects: &HashMap<ObjectID, Object>,
116        state: &mut State,
117    ) -> Result<()> {
118        let move_obj_opt = object.data.try_as_move();
119        // Skip if not a move object
120        let Some(move_object) = move_obj_opt else {
121            return Ok(());
122        };
123        if !move_object.type_().is_dynamic_field() {
124            return Ok(());
125        }
126        let layout = state
127            .resolver
128            .type_layout(move_object.type_().clone().into())
129            .await?;
130        let object_id = object.id();
131
132        let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)?;
133
134        let type_ = field.kind;
135        let name_type: TypeTag = field.name_layout.into();
136        let bcs_name = field.name_bytes.to_owned();
137
138        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
139            .tap_err(|e| {
140                error!("{e}");
141            })?;
142        let name = DynamicFieldName {
143            type_: name_type,
144            value: IotaMoveValue::from(name_value).to_json_value(),
145        };
146        let name_json = serde_json::to_string(&name)?;
147        let (_owner_type, owner_id) = owner_to_owner_info(&object.owner);
148        let Some(parent_id) = owner_id else {
149            return Ok(());
150        };
151        let entry = match type_ {
152            DynamicFieldType::DynamicField => DynamicFieldEntry {
153                parent_object_id: parent_id.to_string(),
154                transaction_digest: object.previous_transaction.base58_encode(),
155                checkpoint,
156                epoch,
157                timestamp_ms,
158                name: name_json,
159                bcs_name: Base64::encode(bcs_name),
160                type_,
161                object_id: object.id().to_string(),
162                version: object.version().value(),
163                digest: object.digest().to_string(),
164                object_type: move_object.clone().into_type().into_type_params()[1]
165                    .to_canonical_string(/* with_prefix */ true),
166            },
167            DynamicFieldType::DynamicObject => {
168                let object = all_written_objects.get(&object_id).ok_or(
169                    IndexerError::Uncategorized(anyhow::anyhow!(
170                        "Failed to find object_id {:?} when trying to create dynamic field info",
171                        object_id
172                    )),
173                )?;
174                let version = object.version().value();
175                let digest = object.digest().to_string();
176                let object_type = object.data.type_().unwrap().clone();
177                DynamicFieldEntry {
178                    parent_object_id: parent_id.to_string(),
179                    transaction_digest: object.previous_transaction.base58_encode(),
180                    checkpoint,
181                    epoch,
182                    timestamp_ms,
183                    name: name_json,
184                    bcs_name: Base64::encode(bcs_name),
185                    type_,
186                    object_id: object.id().to_string(),
187                    digest,
188                    version,
189                    object_type: object_type.to_canonical_string(true),
190                }
191            }
192        };
193        state.dynamic_fields.push(entry);
194        Ok(())
195    }
196
197    async fn process_transaction(
198        &self,
199        epoch: u64,
200        checkpoint: u64,
201        timestamp_ms: u64,
202        checkpoint_transaction: &CheckpointTransaction,
203        state: &mut State,
204    ) -> Result<()> {
205        let all_objects: HashMap<_, _> = checkpoint_transaction
206            .output_objects
207            .iter()
208            .map(|x| (x.id(), x.clone()))
209            .collect();
210        for object in checkpoint_transaction.output_objects.iter() {
211            self.process_dynamic_field(
212                epoch,
213                checkpoint,
214                timestamp_ms,
215                object,
216                &all_objects,
217                state,
218            )
219            .await?;
220        }
221        Ok(())
222    }
223}