iota_analytics_indexer/handlers/
wrapped_object_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::BTreeMap, path::Path, sync::Arc};
6
7use anyhow::Result;
8use iota_data_ingestion_core::Worker;
9use iota_package_resolver::Resolver;
10use iota_rest_api::{CheckpointData, CheckpointTransaction};
11use iota_types::{SYSTEM_PACKAGE_ADDRESSES, object::Object};
12use tokio::sync::Mutex;
13
14use crate::{
15    FileType,
16    handlers::{AnalyticsHandler, get_move_struct, parse_struct},
17    package_store::{LocalDBPackageStore, PackageCache},
18    tables::WrappedObjectEntry,
19};
20
21pub struct WrappedObjectHandler {
22    state: Mutex<State>,
23}
24
25struct State {
26    wrapped_objects: Vec<WrappedObjectEntry>,
27    package_store: LocalDBPackageStore,
28    resolver: Resolver<PackageCache>,
29}
30
31#[async_trait::async_trait]
32impl Worker for WrappedObjectHandler {
33    type Message = ();
34    type Error = anyhow::Error;
35
36    async fn process_checkpoint(
37        &self,
38        checkpoint_data: Arc<CheckpointData>,
39    ) -> Result<Self::Message, Self::Error> {
40        let CheckpointData {
41            checkpoint_summary,
42            transactions: checkpoint_transactions,
43            ..
44        } = checkpoint_data.as_ref();
45        let mut state = self.state.lock().await;
46        for checkpoint_transaction in checkpoint_transactions {
47            for object in checkpoint_transaction.output_objects.iter() {
48                state.package_store.update(object)?;
49            }
50            self.process_transaction(
51                checkpoint_summary.epoch,
52                checkpoint_summary.sequence_number,
53                checkpoint_summary.timestamp_ms,
54                checkpoint_transaction,
55                &mut state,
56            )
57            .await?;
58            if checkpoint_summary.end_of_epoch_data.is_some() {
59                state
60                    .resolver
61                    .package_store()
62                    .evict(SYSTEM_PACKAGE_ADDRESSES.iter().copied());
63            }
64        }
65        Ok(())
66    }
67}
68
69#[async_trait::async_trait]
70impl AnalyticsHandler<WrappedObjectEntry> for WrappedObjectHandler {
71    async fn read(&self) -> Result<Vec<WrappedObjectEntry>> {
72        let mut state = self.state.lock().await;
73        let cloned = state.wrapped_objects.clone();
74        state.wrapped_objects.clear();
75        Ok(cloned)
76    }
77
78    fn file_type(&self) -> Result<FileType> {
79        Ok(FileType::WrappedObject)
80    }
81
82    fn name(&self) -> &str {
83        "wrapped_object"
84    }
85}
86
87impl WrappedObjectHandler {
88    pub fn new(store_path: &Path, rest_uri: &str) -> Self {
89        let package_store = LocalDBPackageStore::new(&store_path.join("wrapped_object"), rest_uri);
90        let state = Mutex::new(State {
91            wrapped_objects: vec![],
92            package_store: package_store.clone(),
93            resolver: Resolver::new(PackageCache::new(package_store)),
94        });
95        WrappedObjectHandler { state }
96    }
97    async fn process_transaction(
98        &self,
99        epoch: u64,
100        checkpoint: u64,
101        timestamp_ms: u64,
102        checkpoint_transaction: &CheckpointTransaction,
103        state: &mut State,
104    ) -> Result<()> {
105        for object in checkpoint_transaction.output_objects.iter() {
106            self.process_object(epoch, checkpoint, timestamp_ms, object, state)
107                .await?;
108        }
109        Ok(())
110    }
111
112    async fn process_object(
113        &self,
114        epoch: u64,
115        checkpoint: u64,
116        timestamp_ms: u64,
117        object: &Object,
118        state: &mut State,
119    ) -> Result<()> {
120        let move_struct = if let Some((tag, contents)) = object
121            .struct_tag()
122            .and_then(|tag| object.data.try_as_move().map(|mo| (tag, mo.contents())))
123        {
124            let move_struct = get_move_struct(&tag, contents, &state.resolver).await?;
125            Some(move_struct)
126        } else {
127            None
128        };
129        let mut wrapped_structs = BTreeMap::new();
130        if let Some(move_struct) = move_struct {
131            parse_struct("$", move_struct, &mut wrapped_structs);
132        }
133        for (json_path, wrapped_struct) in wrapped_structs.iter() {
134            let entry = WrappedObjectEntry {
135                object_id: wrapped_struct.object_id.map(|id| id.to_string()),
136                root_object_id: object.id().to_string(),
137                root_object_version: object.version().value(),
138                checkpoint,
139                epoch,
140                timestamp_ms,
141                json_path: json_path.to_string(),
142                struct_tag: wrapped_struct.struct_tag.clone().map(|tag| tag.to_string()),
143            };
144            state.wrapped_objects.push(entry);
145        }
146        Ok(())
147    }
148}