iota_core/execution_cache/
passthrough_cache.rs1use std::sync::Arc;
6
7use futures::{FutureExt, future::BoxFuture};
8use iota_common::sync::notify_read::NotifyRead;
9use iota_storage::package_object_cache::PackageObjectCache;
10use iota_types::{
11 accumulator::Accumulator,
12 base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
13 bridge::{Bridge, get_bridge},
14 digests::{TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
15 effects::{TransactionEffects, TransactionEvents},
16 error::{IotaError, IotaResult},
17 iota_system_state::{IotaSystemState, get_iota_system_state},
18 message_envelope::Message,
19 messages_checkpoint::CheckpointSequenceNumber,
20 object::Object,
21 storage::{MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject},
22 transaction::{VerifiedSignedTransaction, VerifiedTransaction},
23};
24use prometheus::Registry;
25use tap::TapFallible;
26use tracing::instrument;
27use typed_store::Map;
28
29use super::{
30 CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
31 ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
32 implement_passthrough_traits,
33};
34use crate::{
35 authority::{
36 AuthorityStore,
37 authority_per_epoch_store::AuthorityPerEpochStore,
38 authority_store::{ExecutionLockWriteGuard, IotaLockResult},
39 epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
40 },
41 state_accumulator::AccumulatorStore,
42 transaction_outputs::TransactionOutputs,
43};
44
45pub struct PassthroughCache {
46 store: Arc<AuthorityStore>,
47 metrics: Arc<ExecutionCacheMetrics>,
48 package_cache: Arc<PackageObjectCache>,
49 executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
50}
51
52impl PassthroughCache {
53 pub fn new(store: Arc<AuthorityStore>, metrics: Arc<ExecutionCacheMetrics>) -> Self {
54 Self {
55 store,
56 metrics,
57 package_cache: PackageObjectCache::new(),
58 executed_effects_digests_notify_read: NotifyRead::new(),
59 }
60 }
61
62 pub fn new_for_tests(store: Arc<AuthorityStore>, registry: &Registry) -> Self {
63 let metrics = Arc::new(ExecutionCacheMetrics::new(registry));
64 Self::new(store, metrics)
65 }
66
67 pub fn store_for_testing(&self) -> &Arc<AuthorityStore> {
68 &self.store
69 }
70
71 fn revert_state_update_impl(&self, digest: &TransactionDigest) -> IotaResult {
72 self.store.revert_state_update(digest)
73 }
74
75 fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard) {
76 self.store
77 .clear_object_per_epoch_marker_table(execution_guard)
78 .tap_err(|e| {
79 tracing::error!(?e, "Failed to clear object per-epoch marker table");
80 })
81 .ok();
82 }
83}
84
85impl ObjectCacheRead for PassthroughCache {
86 fn get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
87 self.package_cache
88 .get_package_object(package_id, &*self.store)
89 }
90
91 fn force_reload_system_packages(&self, system_package_ids: &[ObjectID]) {
92 self.package_cache
93 .force_reload_system_packages(system_package_ids.iter().cloned(), self);
94 }
95
96 fn get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
97 self.store.get_object(id).map_err(Into::into)
98 }
99
100 fn get_object_by_key(
101 &self,
102 object_id: &ObjectID,
103 version: SequenceNumber,
104 ) -> IotaResult<Option<Object>> {
105 Ok(self.store.get_object_by_key(object_id, version)?)
106 }
107
108 fn multi_get_objects_by_key(
109 &self,
110 object_keys: &[ObjectKey],
111 ) -> Result<Vec<Option<Object>>, IotaError> {
112 Ok(self.store.multi_get_objects_by_key(object_keys)?)
113 }
114
115 fn object_exists_by_key(
116 &self,
117 object_id: &ObjectID,
118 version: SequenceNumber,
119 ) -> IotaResult<bool> {
120 self.store.object_exists_by_key(object_id, version)
121 }
122
123 fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
124 self.store.multi_object_exists_by_key(object_keys)
125 }
126
127 fn get_latest_object_ref_or_tombstone(
128 &self,
129 object_id: ObjectID,
130 ) -> IotaResult<Option<ObjectRef>> {
131 self.store.get_latest_object_ref_or_tombstone(object_id)
132 }
133
134 fn get_latest_object_or_tombstone(
135 &self,
136 object_id: ObjectID,
137 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
138 self.store.get_latest_object_or_tombstone(object_id)
139 }
140
141 fn find_object_lt_or_eq_version(
142 &self,
143 object_id: ObjectID,
144 version: SequenceNumber,
145 ) -> IotaResult<Option<Object>> {
146 self.store.find_object_lt_or_eq_version(object_id, version)
147 }
148
149 fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> IotaLockResult {
150 self.store.get_lock(obj_ref, epoch_store)
151 }
152
153 fn _get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
154 self.store.get_latest_live_version_for_object_id(object_id)
155 }
156
157 fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
158 self.store.check_owned_objects_are_live(owned_object_refs)
159 }
160
161 fn get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
162 get_iota_system_state(self)
163 }
164
165 fn get_bridge_object_unsafe(&self) -> IotaResult<Bridge> {
166 get_bridge(self)
167 }
168
169 fn get_marker_value(
170 &self,
171 object_id: &ObjectID,
172 version: SequenceNumber,
173 epoch_id: EpochId,
174 ) -> IotaResult<Option<MarkerValue>> {
175 self.store.get_marker_value(object_id, &version, epoch_id)
176 }
177
178 fn get_latest_marker(
179 &self,
180 object_id: &ObjectID,
181 epoch_id: EpochId,
182 ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
183 self.store.get_latest_marker(object_id, epoch_id)
184 }
185
186 fn get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
187 self.store.perpetual_tables.get_highest_pruned_checkpoint()
188 }
189}
190
191impl TransactionCacheRead for PassthroughCache {
192 fn multi_get_transaction_blocks(
193 &self,
194 digests: &[TransactionDigest],
195 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
196 Ok(self
197 .store
198 .multi_get_transaction_blocks(digests)?
199 .into_iter()
200 .map(|o| o.map(Arc::new))
201 .collect())
202 }
203
204 fn multi_get_executed_effects_digests(
205 &self,
206 digests: &[TransactionDigest],
207 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
208 self.store.multi_get_executed_effects_digests(digests)
209 }
210
211 fn multi_get_effects(
212 &self,
213 digests: &[TransactionEffectsDigest],
214 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
215 Ok(self.store.perpetual_tables.effects.multi_get(digests)?)
216 }
217
218 fn notify_read_executed_effects_digests<'a>(
219 &'a self,
220 digests: &'a [TransactionDigest],
221 ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
222 self.executed_effects_digests_notify_read
223 .read(digests, |digests| {
224 self.multi_get_executed_effects_digests(digests)
225 })
226 .boxed()
227 }
228
229 fn multi_get_events(
230 &self,
231 event_digests: &[TransactionEventsDigest],
232 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
233 self.store.multi_get_events(event_digests)
234 }
235}
236
237impl ExecutionCacheWrite for PassthroughCache {
238 #[instrument(level = "debug", skip_all)]
239 fn write_transaction_outputs<'a>(
240 &'a self,
241 epoch_id: EpochId,
242 tx_outputs: Arc<TransactionOutputs>,
243 ) -> BoxFuture<'a, IotaResult> {
244 async move {
245 let tx_digest = *tx_outputs.transaction.digest();
246 let effects_digest = tx_outputs.effects.digest();
247
248 self.store
261 .check_owned_objects_are_live(&tx_outputs.live_object_markers_to_delete)?;
262
263 self.store
264 .write_transaction_outputs(epoch_id, &[tx_outputs])
265 .await?;
266
267 self.executed_effects_digests_notify_read
268 .notify(&tx_digest, &effects_digest);
269
270 self.metrics
271 .pending_notify_read
272 .set(self.executed_effects_digests_notify_read.num_pending() as i64);
273
274 Ok(())
275 }
276 .boxed()
277 }
278
279 fn acquire_transaction_locks<'a>(
280 &'a self,
281 epoch_store: &'a AuthorityPerEpochStore,
282 owned_input_objects: &'a [ObjectRef],
283 transaction: VerifiedSignedTransaction,
284 ) -> BoxFuture<'a, IotaResult> {
285 self.store
286 .acquire_transaction_locks(epoch_store, owned_input_objects, transaction)
287 .boxed()
288 }
289}
290
291impl AccumulatorStore for PassthroughCache {
292 fn get_root_state_accumulator_for_epoch(
293 &self,
294 epoch: EpochId,
295 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
296 self.store.get_root_state_accumulator_for_epoch(epoch)
297 }
298
299 fn get_root_state_accumulator_for_highest_epoch(
300 &self,
301 ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
302 self.store.get_root_state_accumulator_for_highest_epoch()
303 }
304
305 fn insert_state_accumulator_for_epoch(
306 &self,
307 epoch: EpochId,
308 checkpoint_seq_num: &CheckpointSequenceNumber,
309 acc: &Accumulator,
310 ) -> IotaResult {
311 self.store
312 .insert_state_accumulator_for_epoch(epoch, checkpoint_seq_num, acc)
313 }
314
315 fn iter_live_object_set(
316 &self,
317 ) -> Box<dyn Iterator<Item = crate::authority::authority_store_tables::LiveObject> + '_> {
318 self.store.iter_live_object_set()
319 }
320}
321
322impl ExecutionCacheCommit for PassthroughCache {
323 fn commit_transaction_outputs<'a>(
324 &'a self,
325 _epoch: EpochId,
326 _digests: &'a [TransactionDigest],
327 ) -> BoxFuture<'a, IotaResult> {
328 async { Ok(()) }.boxed()
331 }
332
333 fn persist_transactions(&self, _digests: &[TransactionDigest]) -> BoxFuture<'_, IotaResult> {
334 async { Ok(()) }.boxed()
337 }
338}
339
340implement_passthrough_traits!(PassthroughCache);