iota_indexer/handlers/
tx_processor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::HashMap;
6
7use async_trait::async_trait;
8use iota_json_rpc::{ObjectProvider, get_balance_changes_from_effect, get_object_changes};
9use iota_rest_api::CheckpointData;
10use iota_types::{
11    base_types::{ObjectID, SequenceNumber},
12    digests::TransactionDigest,
13    effects::{TransactionEffects, TransactionEffectsAPI},
14    object::Object,
15    transaction::{TransactionData, TransactionDataAPI},
16};
17
18use crate::{
19    errors::IndexerError,
20    metrics::IndexerMetrics,
21    types::{IndexedObjectChange, IndexerResult},
22};
23
24pub struct InMemObjectCache {
25    id_map: HashMap<ObjectID, Object>,
26    seq_map: HashMap<(ObjectID, SequenceNumber), Object>,
27}
28
29impl InMemObjectCache {
30    pub fn new() -> Self {
31        Self {
32            id_map: HashMap::new(),
33            seq_map: HashMap::new(),
34        }
35    }
36
37    pub fn insert_object(&mut self, obj: Object) {
38        self.id_map.insert(obj.id(), obj.clone());
39        self.seq_map.insert((obj.id(), obj.version()), obj);
40    }
41
42    pub fn get(&self, id: &ObjectID, version: Option<&SequenceNumber>) -> Option<&Object> {
43        if let Some(version) = version {
44            self.seq_map.get(&(*id, *version))
45        } else {
46            self.id_map.get(id)
47        }
48    }
49}
50
51impl Default for InMemObjectCache {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57/// Along with InMemObjectCache, TxChangesProcessor implements ObjectProvider
58/// so it can be used in indexing write path to get object/balance changes.
59/// Its lifetime is per checkpoint.
60pub struct TxChangesProcessor {
61    object_cache: InMemObjectCache,
62    metrics: IndexerMetrics,
63}
64
65impl TxChangesProcessor {
66    pub fn new(objects: &[&Object], metrics: IndexerMetrics) -> Self {
67        let mut object_cache = InMemObjectCache::new();
68        for obj in objects {
69            object_cache.insert_object(<&Object>::clone(obj).clone());
70        }
71        Self {
72            object_cache,
73            metrics,
74        }
75    }
76
77    pub(crate) async fn get_changes(
78        &self,
79        tx: &TransactionData,
80        effects: &TransactionEffects,
81        tx_digest: &TransactionDigest,
82    ) -> IndexerResult<(
83        Vec<iota_json_rpc_types::BalanceChange>,
84        Vec<IndexedObjectChange>,
85    )> {
86        let _timer = self
87            .metrics
88            .indexing_tx_object_changes_latency
89            .start_timer();
90        let object_change: Vec<_> = get_object_changes(
91            self,
92            tx.sender(),
93            effects.modified_at_versions(),
94            effects.all_changed_objects(),
95            effects.all_removed_objects(),
96        )
97        .await?
98        .into_iter()
99        .map(IndexedObjectChange::from)
100        .collect();
101        let balance_change = get_balance_changes_from_effect(
102            self,
103            effects,
104            tx.input_objects().unwrap_or_else(|e| {
105                panic!(
106                    "Checkpointed tx {:?} has invalid input objects: {e}",
107                    tx_digest,
108                )
109            }),
110            None,
111        )
112        .await?;
113        Ok((balance_change, object_change))
114    }
115}
116
117#[async_trait]
118impl ObjectProvider for TxChangesProcessor {
119    type Error = IndexerError;
120
121    async fn get_object(
122        &self,
123        id: &ObjectID,
124        version: &SequenceNumber,
125    ) -> Result<Object, Self::Error> {
126        let object = self
127            .object_cache
128            .get(id, Some(version))
129            .as_ref()
130            .map(|o| <&Object>::clone(o).clone());
131        if let Some(o) = object {
132            self.metrics.indexing_get_object_in_mem_hit.inc();
133            return Ok(o);
134        }
135
136        panic!(
137            "Object {} is not found in TxChangesProcessor as an ObjectProvider (fn get_object)",
138            id
139        );
140    }
141
142    async fn find_object_lt_or_eq_version(
143        &self,
144        id: &ObjectID,
145        version: &SequenceNumber,
146    ) -> Result<Option<Object>, Self::Error> {
147        // First look up the exact version in object_cache.
148        let object = self
149            .object_cache
150            .get(id, Some(version))
151            .as_ref()
152            .map(|o| <&Object>::clone(o).clone());
153        if let Some(o) = object {
154            self.metrics.indexing_get_object_in_mem_hit.inc();
155            return Ok(Some(o));
156        }
157
158        // Second look up the latest version in object_cache. This may be
159        // called when the object is deleted hence the version at deletion
160        // is given.
161        let object = self
162            .object_cache
163            .get(id, None)
164            .as_ref()
165            .map(|o| <&Object>::clone(o).clone());
166        if let Some(o) = object {
167            if o.version() > *version {
168                panic!(
169                    "Found a higher version {} for object {}, expected lt_or_eq {}",
170                    o.version(),
171                    id,
172                    *version
173                );
174            }
175            if o.version() <= *version {
176                self.metrics.indexing_get_object_in_mem_hit.inc();
177                return Ok(Some(o));
178            }
179        }
180
181        panic!(
182            "Object {} is not found in TxChangesProcessor as an ObjectProvider (fn find_object_lt_or_eq_version)",
183            id
184        );
185    }
186}
187
188// This is a struct that is used to extract IotaSystemState and its dynamic
189// children for end-of-epoch indexing.
190pub(crate) struct EpochEndIndexingObjectStore<'a> {
191    objects: Vec<&'a Object>,
192}
193
194impl<'a> EpochEndIndexingObjectStore<'a> {
195    pub fn new(data: &'a CheckpointData) -> Self {
196        Self {
197            objects: data.latest_live_output_objects(),
198        }
199    }
200}
201
202impl iota_types::storage::ObjectStore for EpochEndIndexingObjectStore<'_> {
203    fn get_object(
204        &self,
205        object_id: &ObjectID,
206    ) -> Result<Option<Object>, iota_types::storage::error::Error> {
207        Ok(self
208            .objects
209            .iter()
210            .find(|o| o.id() == *object_id)
211            .cloned()
212            .cloned())
213    }
214
215    fn get_object_by_key(
216        &self,
217        object_id: &ObjectID,
218        version: iota_types::base_types::VersionNumber,
219    ) -> Result<Option<Object>, iota_types::storage::error::Error> {
220        Ok(self
221            .objects
222            .iter()
223            .find(|o| o.id() == *object_id && o.version() == version)
224            .cloned()
225            .cloned())
226    }
227}