iota_core/execution_cache/
passthrough_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use futures::{FutureExt, future::BoxFuture};
8use iota_common::sync::notify_read::NotifyRead;
9use iota_storage::package_object_cache::PackageObjectCache;
10use iota_types::{
11    accumulator::Accumulator,
12    base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
13    bridge::{Bridge, get_bridge},
14    digests::{TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
15    effects::{TransactionEffects, TransactionEvents},
16    error::{IotaError, IotaResult},
17    iota_system_state::{IotaSystemState, get_iota_system_state},
18    message_envelope::Message,
19    messages_checkpoint::CheckpointSequenceNumber,
20    object::Object,
21    storage::{MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject},
22    transaction::{VerifiedSignedTransaction, VerifiedTransaction},
23};
24use prometheus::Registry;
25use tap::TapFallible;
26use tracing::instrument;
27use typed_store::Map;
28
29use super::{
30    CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
31    ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
32    implement_passthrough_traits,
33};
34use crate::{
35    authority::{
36        AuthorityStore,
37        authority_per_epoch_store::AuthorityPerEpochStore,
38        authority_store::{ExecutionLockWriteGuard, IotaLockResult},
39        epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
40    },
41    state_accumulator::AccumulatorStore,
42    transaction_outputs::TransactionOutputs,
43};
44
45pub struct PassthroughCache {
46    store: Arc<AuthorityStore>,
47    metrics: Arc<ExecutionCacheMetrics>,
48    package_cache: Arc<PackageObjectCache>,
49    executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
50}
51
52impl PassthroughCache {
53    pub fn new(store: Arc<AuthorityStore>, metrics: Arc<ExecutionCacheMetrics>) -> Self {
54        Self {
55            store,
56            metrics,
57            package_cache: PackageObjectCache::new(),
58            executed_effects_digests_notify_read: NotifyRead::new(),
59        }
60    }
61
62    pub fn new_for_tests(store: Arc<AuthorityStore>, registry: &Registry) -> Self {
63        let metrics = Arc::new(ExecutionCacheMetrics::new(registry));
64        Self::new(store, metrics)
65    }
66
67    pub fn store_for_testing(&self) -> &Arc<AuthorityStore> {
68        &self.store
69    }
70
71    fn revert_state_update_impl(&self, digest: &TransactionDigest) -> IotaResult {
72        self.store.revert_state_update(digest)
73    }
74
75    fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard) {
76        self.store
77            .clear_object_per_epoch_marker_table(execution_guard)
78            .tap_err(|e| {
79                tracing::error!(?e, "Failed to clear object per-epoch marker table");
80            })
81            .ok();
82    }
83}
84
85impl ObjectCacheRead for PassthroughCache {
86    fn get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
87        self.package_cache
88            .get_package_object(package_id, &*self.store)
89    }
90
91    fn force_reload_system_packages(&self, system_package_ids: &[ObjectID]) {
92        self.package_cache
93            .force_reload_system_packages(system_package_ids.iter().cloned(), self);
94    }
95
96    fn get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
97        self.store.get_object(id).map_err(Into::into)
98    }
99
100    fn get_object_by_key(
101        &self,
102        object_id: &ObjectID,
103        version: SequenceNumber,
104    ) -> IotaResult<Option<Object>> {
105        Ok(self.store.get_object_by_key(object_id, version)?)
106    }
107
108    fn multi_get_objects_by_key(
109        &self,
110        object_keys: &[ObjectKey],
111    ) -> Result<Vec<Option<Object>>, IotaError> {
112        Ok(self.store.multi_get_objects_by_key(object_keys)?)
113    }
114
115    fn object_exists_by_key(
116        &self,
117        object_id: &ObjectID,
118        version: SequenceNumber,
119    ) -> IotaResult<bool> {
120        self.store.object_exists_by_key(object_id, version)
121    }
122
123    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
124        self.store.multi_object_exists_by_key(object_keys)
125    }
126
127    fn get_latest_object_ref_or_tombstone(
128        &self,
129        object_id: ObjectID,
130    ) -> IotaResult<Option<ObjectRef>> {
131        self.store.get_latest_object_ref_or_tombstone(object_id)
132    }
133
134    fn get_latest_object_or_tombstone(
135        &self,
136        object_id: ObjectID,
137    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
138        self.store.get_latest_object_or_tombstone(object_id)
139    }
140
141    fn find_object_lt_or_eq_version(
142        &self,
143        object_id: ObjectID,
144        version: SequenceNumber,
145    ) -> IotaResult<Option<Object>> {
146        self.store.find_object_lt_or_eq_version(object_id, version)
147    }
148
149    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> IotaLockResult {
150        self.store.get_lock(obj_ref, epoch_store)
151    }
152
153    fn _get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
154        self.store.get_latest_live_version_for_object_id(object_id)
155    }
156
157    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
158        self.store.check_owned_objects_are_live(owned_object_refs)
159    }
160
161    fn get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
162        get_iota_system_state(self)
163    }
164
165    fn get_bridge_object_unsafe(&self) -> IotaResult<Bridge> {
166        get_bridge(self)
167    }
168
169    fn get_marker_value(
170        &self,
171        object_id: &ObjectID,
172        version: SequenceNumber,
173        epoch_id: EpochId,
174    ) -> IotaResult<Option<MarkerValue>> {
175        self.store.get_marker_value(object_id, &version, epoch_id)
176    }
177
178    fn get_latest_marker(
179        &self,
180        object_id: &ObjectID,
181        epoch_id: EpochId,
182    ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
183        self.store.get_latest_marker(object_id, epoch_id)
184    }
185
186    fn get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
187        self.store.perpetual_tables.get_highest_pruned_checkpoint()
188    }
189}
190
191impl TransactionCacheRead for PassthroughCache {
192    fn multi_get_transaction_blocks(
193        &self,
194        digests: &[TransactionDigest],
195    ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
196        Ok(self
197            .store
198            .multi_get_transaction_blocks(digests)?
199            .into_iter()
200            .map(|o| o.map(Arc::new))
201            .collect())
202    }
203
204    fn multi_get_executed_effects_digests(
205        &self,
206        digests: &[TransactionDigest],
207    ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
208        self.store.multi_get_executed_effects_digests(digests)
209    }
210
211    fn multi_get_effects(
212        &self,
213        digests: &[TransactionEffectsDigest],
214    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
215        Ok(self.store.perpetual_tables.effects.multi_get(digests)?)
216    }
217
218    fn notify_read_executed_effects_digests<'a>(
219        &'a self,
220        digests: &'a [TransactionDigest],
221    ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
222        self.executed_effects_digests_notify_read
223            .read(digests, |digests| {
224                self.multi_get_executed_effects_digests(digests)
225            })
226            .boxed()
227    }
228
229    fn multi_get_events(
230        &self,
231        event_digests: &[TransactionEventsDigest],
232    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
233        self.store.multi_get_events(event_digests)
234    }
235}
236
237impl ExecutionCacheWrite for PassthroughCache {
238    #[instrument(level = "debug", skip_all)]
239    fn write_transaction_outputs<'a>(
240        &'a self,
241        epoch_id: EpochId,
242        tx_outputs: Arc<TransactionOutputs>,
243    ) -> BoxFuture<'a, IotaResult> {
244        async move {
245            let tx_digest = *tx_outputs.transaction.digest();
246            let effects_digest = tx_outputs.effects.digest();
247
248            // NOTE: We just check here that live markers exist, not that they are locked to
249            // a specific TX. Why?
250            // 1. Live markers existence prevents re-execution of old certs when objects
251            //    have been upgraded
252            // 2. Not all validators lock, just 2f+1, so transaction should proceed
253            //    regardless (But the live markers should exist which means previous
254            //    transactions finished)
255            // 3. Equivocation possible (different TX) but as long as 2f+1 approves current
256            //    TX its fine
257            // 4. Live markers may have existed when we started processing this tx, but
258            //    could have since been deleted by a concurrent tx that finished first. In
259            //    that case, check if the tx effects exist.
260            self.store
261                .check_owned_objects_are_live(&tx_outputs.live_object_markers_to_delete)?;
262
263            self.store
264                .write_transaction_outputs(epoch_id, &[tx_outputs])
265                .await?;
266
267            self.executed_effects_digests_notify_read
268                .notify(&tx_digest, &effects_digest);
269
270            self.metrics
271                .pending_notify_read
272                .set(self.executed_effects_digests_notify_read.num_pending() as i64);
273
274            Ok(())
275        }
276        .boxed()
277    }
278
279    fn acquire_transaction_locks<'a>(
280        &'a self,
281        epoch_store: &'a AuthorityPerEpochStore,
282        owned_input_objects: &'a [ObjectRef],
283        transaction: VerifiedSignedTransaction,
284    ) -> BoxFuture<'a, IotaResult> {
285        self.store
286            .acquire_transaction_locks(epoch_store, owned_input_objects, transaction)
287            .boxed()
288    }
289}
290
291impl AccumulatorStore for PassthroughCache {
292    fn get_root_state_accumulator_for_epoch(
293        &self,
294        epoch: EpochId,
295    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
296        self.store.get_root_state_accumulator_for_epoch(epoch)
297    }
298
299    fn get_root_state_accumulator_for_highest_epoch(
300        &self,
301    ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
302        self.store.get_root_state_accumulator_for_highest_epoch()
303    }
304
305    fn insert_state_accumulator_for_epoch(
306        &self,
307        epoch: EpochId,
308        checkpoint_seq_num: &CheckpointSequenceNumber,
309        acc: &Accumulator,
310    ) -> IotaResult {
311        self.store
312            .insert_state_accumulator_for_epoch(epoch, checkpoint_seq_num, acc)
313    }
314
315    fn iter_live_object_set(
316        &self,
317    ) -> Box<dyn Iterator<Item = crate::authority::authority_store_tables::LiveObject> + '_> {
318        self.store.iter_live_object_set()
319    }
320}
321
322impl ExecutionCacheCommit for PassthroughCache {
323    fn commit_transaction_outputs<'a>(
324        &'a self,
325        _epoch: EpochId,
326        _digests: &'a [TransactionDigest],
327    ) -> BoxFuture<'a, IotaResult> {
328        // Nothing needs to be done since they were already committed in
329        // write_transaction_outputs
330        async { Ok(()) }.boxed()
331    }
332
333    fn persist_transactions(&self, _digests: &[TransactionDigest]) -> BoxFuture<'_, IotaResult> {
334        // Nothing needs to be done since they were already committed in
335        // write_transaction_outputs
336        async { Ok(()) }.boxed()
337    }
338}
339
340implement_passthrough_traits!(PassthroughCache);