iota_indexer/handlers/
tx_processor.rs1use std::collections::HashMap;
6
7use async_trait::async_trait;
8use iota_json_rpc::{ObjectProvider, get_balance_changes_from_effect, get_object_changes};
9use iota_rest_api::CheckpointData;
10use iota_types::{
11 base_types::{ObjectID, SequenceNumber},
12 digests::TransactionDigest,
13 effects::{TransactionEffects, TransactionEffectsAPI},
14 object::Object,
15 transaction::{TransactionData, TransactionDataAPI},
16};
17
18use crate::{
19 errors::IndexerError,
20 metrics::IndexerMetrics,
21 types::{IndexedObjectChange, IndexerResult},
22};
23
24pub struct InMemObjectCache {
25 id_map: HashMap<ObjectID, Object>,
26 seq_map: HashMap<(ObjectID, SequenceNumber), Object>,
27}
28
29impl InMemObjectCache {
30 pub fn new() -> Self {
31 Self {
32 id_map: HashMap::new(),
33 seq_map: HashMap::new(),
34 }
35 }
36
37 pub fn insert_object(&mut self, obj: Object) {
38 self.id_map.insert(obj.id(), obj.clone());
39 self.seq_map.insert((obj.id(), obj.version()), obj);
40 }
41
42 pub fn get(&self, id: &ObjectID, version: Option<&SequenceNumber>) -> Option<&Object> {
43 if let Some(version) = version {
44 self.seq_map.get(&(*id, *version))
45 } else {
46 self.id_map.get(id)
47 }
48 }
49}
50
51impl Default for InMemObjectCache {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57pub struct TxChangesProcessor {
61 object_cache: InMemObjectCache,
62 metrics: IndexerMetrics,
63}
64
65impl TxChangesProcessor {
66 pub fn new(objects: &[&Object], metrics: IndexerMetrics) -> Self {
67 let mut object_cache = InMemObjectCache::new();
68 for obj in objects {
69 object_cache.insert_object(<&Object>::clone(obj).clone());
70 }
71 Self {
72 object_cache,
73 metrics,
74 }
75 }
76
77 pub(crate) async fn get_changes(
78 &self,
79 tx: &TransactionData,
80 effects: &TransactionEffects,
81 tx_digest: &TransactionDigest,
82 ) -> IndexerResult<(
83 Vec<iota_json_rpc_types::BalanceChange>,
84 Vec<IndexedObjectChange>,
85 )> {
86 let _timer = self
87 .metrics
88 .indexing_tx_object_changes_latency
89 .start_timer();
90 let object_change: Vec<_> = get_object_changes(
91 self,
92 tx.sender(),
93 effects.modified_at_versions(),
94 effects.all_changed_objects(),
95 effects.all_removed_objects(),
96 )
97 .await?
98 .into_iter()
99 .map(IndexedObjectChange::from)
100 .collect();
101 let balance_change = get_balance_changes_from_effect(
102 self,
103 effects,
104 tx.input_objects().unwrap_or_else(|e| {
105 panic!(
106 "Checkpointed tx {:?} has invalid input objects: {e}",
107 tx_digest,
108 )
109 }),
110 None,
111 )
112 .await?;
113 Ok((balance_change, object_change))
114 }
115}
116
117#[async_trait]
118impl ObjectProvider for TxChangesProcessor {
119 type Error = IndexerError;
120
121 async fn get_object(
122 &self,
123 id: &ObjectID,
124 version: &SequenceNumber,
125 ) -> Result<Object, Self::Error> {
126 let object = self
127 .object_cache
128 .get(id, Some(version))
129 .as_ref()
130 .map(|o| <&Object>::clone(o).clone());
131 if let Some(o) = object {
132 self.metrics.indexing_get_object_in_mem_hit.inc();
133 return Ok(o);
134 }
135
136 panic!(
137 "Object {} is not found in TxChangesProcessor as an ObjectProvider (fn get_object)",
138 id
139 );
140 }
141
142 async fn find_object_lt_or_eq_version(
143 &self,
144 id: &ObjectID,
145 version: &SequenceNumber,
146 ) -> Result<Option<Object>, Self::Error> {
147 let object = self
149 .object_cache
150 .get(id, Some(version))
151 .as_ref()
152 .map(|o| <&Object>::clone(o).clone());
153 if let Some(o) = object {
154 self.metrics.indexing_get_object_in_mem_hit.inc();
155 return Ok(Some(o));
156 }
157
158 let object = self
162 .object_cache
163 .get(id, None)
164 .as_ref()
165 .map(|o| <&Object>::clone(o).clone());
166 if let Some(o) = object {
167 if o.version() > *version {
168 panic!(
169 "Found a higher version {} for object {}, expected lt_or_eq {}",
170 o.version(),
171 id,
172 *version
173 );
174 }
175 if o.version() <= *version {
176 self.metrics.indexing_get_object_in_mem_hit.inc();
177 return Ok(Some(o));
178 }
179 }
180
181 panic!(
182 "Object {} is not found in TxChangesProcessor as an ObjectProvider (fn find_object_lt_or_eq_version)",
183 id
184 );
185 }
186}
187
188pub(crate) struct EpochEndIndexingObjectStore<'a> {
191 objects: Vec<&'a Object>,
192}
193
194impl<'a> EpochEndIndexingObjectStore<'a> {
195 pub fn new(data: &'a CheckpointData) -> Self {
196 Self {
197 objects: data.latest_live_output_objects(),
198 }
199 }
200}
201
202impl iota_types::storage::ObjectStore for EpochEndIndexingObjectStore<'_> {
203 fn get_object(
204 &self,
205 object_id: &ObjectID,
206 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
207 Ok(self
208 .objects
209 .iter()
210 .find(|o| o.id() == *object_id)
211 .cloned()
212 .cloned())
213 }
214
215 fn get_object_by_key(
216 &self,
217 object_id: &ObjectID,
218 version: iota_types::base_types::VersionNumber,
219 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
220 Ok(self
221 .objects
222 .iter()
223 .find(|o| o.id() == *object_id && o.version() == version)
224 .cloned()
225 .cloned())
226 }
227}