iota_analytics_indexer/handlers/
object_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{path::Path, sync::Arc};
6
7use anyhow::Result;
8use fastcrypto::encoding::{Base64, Encoding};
9use iota_data_ingestion_core::Worker;
10use iota_json_rpc_types::IotaMoveStruct;
11use iota_package_resolver::Resolver;
12use iota_rest_api::{CheckpointData, CheckpointTransaction};
13use iota_types::{SYSTEM_PACKAGE_ADDRESSES, effects::TransactionEffects, object::Object};
14use tokio::sync::Mutex;
15
16use crate::{
17    FileType,
18    handlers::{
19        AnalyticsHandler, ObjectStatusTracker, get_move_struct, get_owner_address, get_owner_type,
20        initial_shared_version,
21    },
22    package_store::{LocalDBPackageStore, PackageCache},
23    tables::{ObjectEntry, ObjectStatus},
24};
25
26pub struct ObjectHandler {
27    state: Mutex<State>,
28}
29
30struct State {
31    objects: Vec<ObjectEntry>,
32    package_store: LocalDBPackageStore,
33    resolver: Resolver<PackageCache>,
34}
35
36#[async_trait::async_trait]
37impl Worker for ObjectHandler {
38    type Message = ();
39    type Error = anyhow::Error;
40
41    async fn process_checkpoint(
42        &self,
43        checkpoint_data: Arc<CheckpointData>,
44    ) -> Result<Self::Message, Self::Error> {
45        let CheckpointData {
46            checkpoint_summary,
47            transactions: checkpoint_transactions,
48            ..
49        } = checkpoint_data.as_ref();
50        let mut state = self.state.lock().await;
51        for checkpoint_transaction in checkpoint_transactions {
52            for object in checkpoint_transaction.output_objects.iter() {
53                state.package_store.update(object)?;
54            }
55            self.process_transaction(
56                checkpoint_summary.epoch,
57                checkpoint_summary.sequence_number,
58                checkpoint_summary.timestamp_ms,
59                checkpoint_transaction,
60                &checkpoint_transaction.effects,
61                &mut state,
62            )
63            .await?;
64            if checkpoint_summary.end_of_epoch_data.is_some() {
65                state
66                    .resolver
67                    .package_store()
68                    .evict(SYSTEM_PACKAGE_ADDRESSES.iter().copied());
69            }
70        }
71        Ok(())
72    }
73}
74
75#[async_trait::async_trait]
76impl AnalyticsHandler<ObjectEntry> for ObjectHandler {
77    async fn read(&self) -> Result<Vec<ObjectEntry>> {
78        let mut state = self.state.lock().await;
79        let cloned = state.objects.clone();
80        state.objects.clear();
81        Ok(cloned)
82    }
83
84    fn file_type(&self) -> Result<FileType> {
85        Ok(FileType::Object)
86    }
87
88    fn name(&self) -> &str {
89        "object"
90    }
91}
92
93impl ObjectHandler {
94    pub fn new(store_path: &Path, rest_uri: &str) -> Self {
95        let package_store = LocalDBPackageStore::new(&store_path.join("object"), rest_uri);
96        let state = State {
97            objects: vec![],
98            package_store: package_store.clone(),
99            resolver: Resolver::new(PackageCache::new(package_store)),
100        };
101        Self {
102            state: Mutex::new(state),
103        }
104    }
105    async fn process_transaction(
106        &self,
107        epoch: u64,
108        checkpoint: u64,
109        timestamp_ms: u64,
110        checkpoint_transaction: &CheckpointTransaction,
111        effects: &TransactionEffects,
112        state: &mut State,
113    ) -> Result<()> {
114        let object_status_tracker = ObjectStatusTracker::new(effects);
115        for object in checkpoint_transaction.output_objects.iter() {
116            self.process_object(
117                epoch,
118                checkpoint,
119                timestamp_ms,
120                object,
121                &object_status_tracker,
122                state,
123            )
124            .await?;
125        }
126        for (object_ref, _) in effects.all_removed_objects().iter() {
127            let entry = ObjectEntry {
128                object_id: object_ref.0.to_string(),
129                digest: object_ref.2.to_string(),
130                version: u64::from(object_ref.1),
131                type_: None,
132                checkpoint,
133                epoch,
134                timestamp_ms,
135                owner_type: None,
136                owner_address: None,
137                object_status: ObjectStatus::Deleted,
138                initial_shared_version: None,
139                previous_transaction: checkpoint_transaction.transaction.digest().base58_encode(),
140                storage_rebate: None,
141                bcs: None,
142                coin_type: None,
143                coin_balance: None,
144                struct_tag: None,
145                object_json: None,
146            };
147            state.objects.push(entry);
148        }
149        Ok(())
150    }
151    // Object data. Only called if there are objects in the transaction.
152    // Responsible to build the live object table.
153    async fn process_object(
154        &self,
155        epoch: u64,
156        checkpoint: u64,
157        timestamp_ms: u64,
158        object: &Object,
159        object_status_tracker: &ObjectStatusTracker,
160        state: &mut State,
161    ) -> Result<()> {
162        let move_obj_opt = object.data.try_as_move();
163        let move_struct = if let Some((tag, contents)) = object
164            .struct_tag()
165            .and_then(|tag| object.data.try_as_move().map(|mo| (tag, mo.contents())))
166        {
167            let move_struct = get_move_struct(&tag, contents, &state.resolver).await?;
168            Some(move_struct)
169        } else {
170            None
171        };
172        let (struct_tag, iota_move_struct) = if let Some(move_struct) = move_struct {
173            match move_struct.into() {
174                IotaMoveStruct::WithTypes { type_, fields } => {
175                    (Some(type_), Some(IotaMoveStruct::WithFields(fields)))
176                }
177                fields => (object.struct_tag(), Some(fields)),
178            }
179        } else {
180            (None, None)
181        };
182        let object_type = move_obj_opt.map(|o| o.type_().to_string());
183        let object_id = object.id();
184        let entry = ObjectEntry {
185            object_id: object_id.to_string(),
186            digest: object.digest().to_string(),
187            version: object.version().value(),
188            type_: object_type,
189            checkpoint,
190            epoch,
191            timestamp_ms,
192            owner_type: Some(get_owner_type(object)),
193            owner_address: get_owner_address(object),
194            object_status: object_status_tracker
195                .get_object_status(&object_id)
196                .expect("Object must be in output objects"),
197            initial_shared_version: initial_shared_version(object),
198            previous_transaction: object.previous_transaction.base58_encode(),
199            storage_rebate: Some(object.storage_rebate),
200            bcs: Some(Base64::encode(bcs::to_bytes(object).unwrap())),
201            coin_type: object.coin_type_maybe().map(|t| t.to_string()),
202            coin_balance: if object.coin_type_maybe().is_some() {
203                Some(object.get_coin_value_unsafe())
204            } else {
205                None
206            },
207            struct_tag: struct_tag.map(|x| x.to_string()),
208            object_json: iota_move_struct.map(|x| x.to_json_value().to_string()),
209        };
210        state.objects.push(entry);
211        Ok(())
212    }
213}