iota_analytics_indexer/handlers/
object_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{path::Path, sync::Arc};
6
7use anyhow::Result;
8use fastcrypto::encoding::{Base64, Encoding};
9use iota_data_ingestion_core::Worker;
10use iota_json_rpc_types::IotaMoveStruct;
11use iota_package_resolver::Resolver;
12use iota_types::{
13    SYSTEM_PACKAGE_ADDRESSES,
14    effects::TransactionEffects,
15    full_checkpoint_content::{CheckpointData, CheckpointTransaction},
16    object::Object,
17};
18use tokio::sync::Mutex;
19
20use crate::{
21    FileType,
22    handlers::{
23        AnalyticsHandler, ObjectStatusTracker, get_move_struct, get_owner_address, get_owner_type,
24        initial_shared_version,
25    },
26    package_store::{LocalDBPackageStore, PackageCache},
27    tables::{ObjectEntry, ObjectStatus},
28};
29
30pub struct ObjectHandler {
31    state: Mutex<State>,
32}
33
34struct State {
35    objects: Vec<ObjectEntry>,
36    package_store: LocalDBPackageStore,
37    resolver: Resolver<PackageCache>,
38}
39
40#[async_trait::async_trait]
41impl Worker for ObjectHandler {
42    type Message = ();
43    type Error = anyhow::Error;
44
45    async fn process_checkpoint(
46        &self,
47        checkpoint_data: Arc<CheckpointData>,
48    ) -> Result<Self::Message, Self::Error> {
49        let CheckpointData {
50            checkpoint_summary,
51            transactions: checkpoint_transactions,
52            ..
53        } = checkpoint_data.as_ref();
54        let mut state = self.state.lock().await;
55        for checkpoint_transaction in checkpoint_transactions {
56            for object in checkpoint_transaction.output_objects.iter() {
57                state.package_store.update(object)?;
58            }
59            self.process_transaction(
60                checkpoint_summary.epoch,
61                checkpoint_summary.sequence_number,
62                checkpoint_summary.timestamp_ms,
63                checkpoint_transaction,
64                &checkpoint_transaction.effects,
65                &mut state,
66            )
67            .await?;
68            if checkpoint_summary.end_of_epoch_data.is_some() {
69                state
70                    .resolver
71                    .package_store()
72                    .evict(SYSTEM_PACKAGE_ADDRESSES.iter().copied());
73            }
74        }
75        Ok(())
76    }
77}
78
79#[async_trait::async_trait]
80impl AnalyticsHandler<ObjectEntry> for ObjectHandler {
81    async fn read(&self) -> Result<Vec<ObjectEntry>> {
82        let mut state = self.state.lock().await;
83        let cloned = state.objects.clone();
84        state.objects.clear();
85        Ok(cloned)
86    }
87
88    fn file_type(&self) -> Result<FileType> {
89        Ok(FileType::Object)
90    }
91
92    fn name(&self) -> &str {
93        "object"
94    }
95}
96
97impl ObjectHandler {
98    pub fn new(store_path: &Path, rest_uri: &str) -> Self {
99        let package_store = LocalDBPackageStore::new(&store_path.join("object"), rest_uri);
100        let state = State {
101            objects: vec![],
102            package_store: package_store.clone(),
103            resolver: Resolver::new(PackageCache::new(package_store)),
104        };
105        Self {
106            state: Mutex::new(state),
107        }
108    }
109    async fn process_transaction(
110        &self,
111        epoch: u64,
112        checkpoint: u64,
113        timestamp_ms: u64,
114        checkpoint_transaction: &CheckpointTransaction,
115        effects: &TransactionEffects,
116        state: &mut State,
117    ) -> Result<()> {
118        let object_status_tracker = ObjectStatusTracker::new(effects);
119        for object in checkpoint_transaction.output_objects.iter() {
120            self.process_object(
121                epoch,
122                checkpoint,
123                timestamp_ms,
124                object,
125                &object_status_tracker,
126                state,
127            )
128            .await?;
129        }
130        for (object_ref, _) in effects.all_removed_objects().iter() {
131            let entry = ObjectEntry {
132                object_id: object_ref.0.to_string(),
133                digest: object_ref.2.to_string(),
134                version: u64::from(object_ref.1),
135                type_: None,
136                checkpoint,
137                epoch,
138                timestamp_ms,
139                owner_type: None,
140                owner_address: None,
141                object_status: ObjectStatus::Deleted,
142                initial_shared_version: None,
143                previous_transaction: checkpoint_transaction.transaction.digest().base58_encode(),
144                storage_rebate: None,
145                bcs: None,
146                coin_type: None,
147                coin_balance: None,
148                struct_tag: None,
149                object_json: None,
150            };
151            state.objects.push(entry);
152        }
153        Ok(())
154    }
155    // Object data. Only called if there are objects in the transaction.
156    // Responsible to build the live object table.
157    async fn process_object(
158        &self,
159        epoch: u64,
160        checkpoint: u64,
161        timestamp_ms: u64,
162        object: &Object,
163        object_status_tracker: &ObjectStatusTracker,
164        state: &mut State,
165    ) -> Result<()> {
166        let move_obj_opt = object.data.try_as_move();
167        let move_struct = if let Some((tag, contents)) = object
168            .struct_tag()
169            .and_then(|tag| object.data.try_as_move().map(|mo| (tag, mo.contents())))
170        {
171            let move_struct = get_move_struct(&tag, contents, &state.resolver).await?;
172            Some(move_struct)
173        } else {
174            None
175        };
176        let (struct_tag, iota_move_struct) = if let Some(move_struct) = move_struct {
177            match move_struct.into() {
178                IotaMoveStruct::WithTypes { type_, fields } => {
179                    (Some(type_), Some(IotaMoveStruct::WithFields(fields)))
180                }
181                fields => (object.struct_tag(), Some(fields)),
182            }
183        } else {
184            (None, None)
185        };
186        let object_type = move_obj_opt.map(|o| o.type_().to_string());
187        let object_id = object.id();
188        let entry = ObjectEntry {
189            object_id: object_id.to_string(),
190            digest: object.digest().to_string(),
191            version: object.version().value(),
192            type_: object_type,
193            checkpoint,
194            epoch,
195            timestamp_ms,
196            owner_type: Some(get_owner_type(object)),
197            owner_address: get_owner_address(object),
198            object_status: object_status_tracker
199                .get_object_status(&object_id)
200                .expect("object must be in output objects"),
201            initial_shared_version: initial_shared_version(object),
202            previous_transaction: object.previous_transaction.base58_encode(),
203            storage_rebate: Some(object.storage_rebate),
204            bcs: Some(Base64::encode(bcs::to_bytes(object).unwrap())),
205            coin_type: object.coin_type_maybe().map(|t| t.to_string()),
206            coin_balance: if object.coin_type_maybe().is_some() {
207                Some(object.get_coin_value_unsafe())
208            } else {
209                None
210            },
211            struct_tag: struct_tag.map(|x| x.to_string()),
212            object_json: iota_move_struct.map(|x| x.to_json_value().to_string()),
213        };
214        state.objects.push(entry);
215        Ok(())
216    }
217}