iota_core/execution_cache/
proxy_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use futures::{FutureExt, future::BoxFuture};
8use iota_types::{
9    accumulator::Accumulator,
10    base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
11    bridge::Bridge,
12    digests::{TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
13    effects::{TransactionEffects, TransactionEvents},
14    error::{IotaError, IotaResult},
15    iota_system_state::IotaSystemState,
16    messages_checkpoint::CheckpointSequenceNumber,
17    object::Object,
18    storage::{MarkerValue, ObjectKey, ObjectOrTombstone, PackageObject},
19    transaction::{VerifiedSignedTransaction, VerifiedTransaction},
20};
21use parking_lot::RwLock;
22
23use super::{
24    CheckpointCache, ExecutionCacheCommit, ExecutionCacheConfigType, ExecutionCacheMetrics,
25    ExecutionCacheReconfigAPI, ExecutionCacheWrite, ObjectCacheRead, PassthroughCache,
26    StateSyncAPI, TestingAPI, TransactionCacheRead, WritebackCache,
27};
28use crate::{
29    authority::{
30        AuthorityStore,
31        authority_per_epoch_store::AuthorityPerEpochStore,
32        authority_store::{ExecutionLockWriteGuard, IotaLockResult},
33        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
34    },
35    state_accumulator::AccumulatorStore,
36    transaction_outputs::TransactionOutputs,
37};
38
39macro_rules! delegate_method {
40    ($self:ident.$method:ident($($args:ident),*)) => {
41        match *$self.mode.read() {
42            ExecutionCacheConfigType::PassthroughCache => $self.passthrough_cache.$method($($args),*),
43            ExecutionCacheConfigType::WritebackCache => $self.writeback_cache.$method($($args),*),
44        }
45    };
46}
47
48pub struct ProxyCache {
49    // Note: both caches must be constructed at startup, rather than using ArcSwap
50    // (or some similar strategy). This is because we need to proxy iter_live_object_set,
51    // which requires that we borrow from a member of the cache. If we used ArcSwap,
52    // we would be forced to borrow from a local variable after loading from the ArcSwap.
53    //
54    // Cache implementations are entirely passive, so the unused one will have no effect.
55    passthrough_cache: PassthroughCache,
56    writeback_cache: WritebackCache,
57    mode: RwLock<ExecutionCacheConfigType>,
58}
59
60impl ProxyCache {
61    pub fn new(
62        epoch_start_config: &EpochStartConfiguration,
63        store: Arc<AuthorityStore>,
64        metrics: Arc<ExecutionCacheMetrics>,
65    ) -> Self {
66        let cache_type = epoch_start_config.execution_cache_type();
67        tracing::info!("using cache impl {:?}", cache_type);
68        let passthrough_cache = PassthroughCache::new(store.clone(), metrics.clone());
69        let writeback_cache = WritebackCache::new(store.clone(), metrics.clone());
70
71        Self {
72            passthrough_cache,
73            writeback_cache,
74            mode: RwLock::new(cache_type),
75        }
76    }
77
78    async fn reconfigure_cache_impl(&self, epoch_start_config: &EpochStartConfiguration) {
79        let cache_type = epoch_start_config.execution_cache_type();
80        tracing::info!("switching to cache impl {:?}", cache_type);
81        if matches!(cache_type, ExecutionCacheConfigType::PassthroughCache) {
82            // we may switch back to the writeback cache next epoch, at which point its
83            // caches will be stale if not cleared now
84
85            // When we call invalidate_all on Moka caches, it sets the valid after time
86            // stamp to the current time. Upon retrieval, it ignores entries
87            // whose insertion time is strictly less than the valid-after
88            // time. In the simulator, time remains constant for the duration of a single
89            // task poll, so it is possible that entries have been inserted in
90            // the same tick, and will therefore not be invalidated
91            // properly. So, this sleep is necessary for passing tests, and it also is some
92            // insurance against hitting the same issue in production. (It
93            // should be more or less impossible for two consecutive
94            // calls to Instant::now() to return the same value in production, but there is
95            // no harm in having a short sleep here just to be sure).
96            tokio::time::sleep(Duration::from_nanos(100)).await;
97            self.writeback_cache.clear_caches_and_assert_empty();
98        }
99        *self.mode.write() = cache_type;
100    }
101}
102
103impl ObjectCacheRead for ProxyCache {
104    fn get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
105        delegate_method!(self.get_package_object(package_id))
106    }
107
108    fn force_reload_system_packages(&self, system_package_ids: &[ObjectID]) {
109        delegate_method!(self.force_reload_system_packages(system_package_ids))
110    }
111
112    fn get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
113        delegate_method!(self.get_object(id))
114    }
115
116    fn get_object_by_key(
117        &self,
118        object_id: &ObjectID,
119        version: SequenceNumber,
120    ) -> IotaResult<Option<Object>> {
121        delegate_method!(self.get_object_by_key(object_id, version))
122    }
123
124    fn multi_get_objects_by_key(
125        &self,
126        object_keys: &[ObjectKey],
127    ) -> Result<Vec<Option<Object>>, IotaError> {
128        delegate_method!(self.multi_get_objects_by_key(object_keys))
129    }
130
131    fn object_exists_by_key(
132        &self,
133        object_id: &ObjectID,
134        version: SequenceNumber,
135    ) -> IotaResult<bool> {
136        delegate_method!(self.object_exists_by_key(object_id, version))
137    }
138
139    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
140        delegate_method!(self.multi_object_exists_by_key(object_keys))
141    }
142
143    fn get_latest_object_ref_or_tombstone(
144        &self,
145        object_id: ObjectID,
146    ) -> IotaResult<Option<ObjectRef>> {
147        delegate_method!(self.get_latest_object_ref_or_tombstone(object_id))
148    }
149
150    fn get_latest_object_or_tombstone(
151        &self,
152        object_id: ObjectID,
153    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
154        delegate_method!(self.get_latest_object_or_tombstone(object_id))
155    }
156
157    fn find_object_lt_or_eq_version(
158        &self,
159        object_id: ObjectID,
160        version: SequenceNumber,
161    ) -> IotaResult<Option<Object>> {
162        delegate_method!(self.find_object_lt_or_eq_version(object_id, version))
163    }
164
165    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> IotaLockResult {
166        delegate_method!(self.get_lock(obj_ref, epoch_store))
167    }
168
169    fn _get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
170        delegate_method!(self._get_live_objref(object_id))
171    }
172
173    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
174        delegate_method!(self.check_owned_objects_are_live(owned_object_refs))
175    }
176
177    fn get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
178        delegate_method!(self.get_iota_system_state_object_unsafe())
179    }
180
181    fn get_bridge_object_unsafe(&self) -> IotaResult<Bridge> {
182        delegate_method!(self.get_bridge_object_unsafe())
183    }
184
185    fn get_marker_value(
186        &self,
187        object_id: &ObjectID,
188        version: SequenceNumber,
189        epoch_id: EpochId,
190    ) -> IotaResult<Option<MarkerValue>> {
191        delegate_method!(self.get_marker_value(object_id, version, epoch_id))
192    }
193
194    fn get_latest_marker(
195        &self,
196        object_id: &ObjectID,
197        epoch_id: EpochId,
198    ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
199        delegate_method!(self.get_latest_marker(object_id, epoch_id))
200    }
201
202    fn get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
203        delegate_method!(self.get_highest_pruned_checkpoint())
204    }
205}
206
207impl TransactionCacheRead for ProxyCache {
208    fn multi_get_transaction_blocks(
209        &self,
210        digests: &[TransactionDigest],
211    ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
212        delegate_method!(self.multi_get_transaction_blocks(digests))
213    }
214
215    fn multi_get_executed_effects_digests(
216        &self,
217        digests: &[TransactionDigest],
218    ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
219        delegate_method!(self.multi_get_executed_effects_digests(digests))
220    }
221
222    fn multi_get_effects(
223        &self,
224        digests: &[TransactionEffectsDigest],
225    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
226        delegate_method!(self.multi_get_effects(digests))
227    }
228
229    fn notify_read_executed_effects_digests<'a>(
230        &'a self,
231        digests: &'a [TransactionDigest],
232    ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
233        delegate_method!(self.notify_read_executed_effects_digests(digests))
234    }
235
236    fn multi_get_events(
237        &self,
238        event_digests: &[TransactionEventsDigest],
239    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
240        delegate_method!(self.multi_get_events(event_digests))
241    }
242}
243
244impl ExecutionCacheWrite for ProxyCache {
245    fn write_transaction_outputs(
246        &self,
247        epoch_id: EpochId,
248        tx_outputs: Arc<TransactionOutputs>,
249    ) -> BoxFuture<'_, IotaResult> {
250        delegate_method!(self.write_transaction_outputs(epoch_id, tx_outputs))
251    }
252
253    fn acquire_transaction_locks<'a>(
254        &'a self,
255        epoch_store: &'a AuthorityPerEpochStore,
256        owned_input_objects: &'a [ObjectRef],
257        transaction: VerifiedSignedTransaction,
258    ) -> BoxFuture<'a, IotaResult> {
259        delegate_method!(self.acquire_transaction_locks(
260            epoch_store,
261            owned_input_objects,
262            transaction
263        ))
264    }
265}
266
267impl AccumulatorStore for ProxyCache {
268    fn get_root_state_accumulator_for_epoch(
269        &self,
270        epoch: EpochId,
271    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
272        delegate_method!(self.get_root_state_accumulator_for_epoch(epoch))
273    }
274
275    fn get_root_state_accumulator_for_highest_epoch(
276        &self,
277    ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
278        delegate_method!(self.get_root_state_accumulator_for_highest_epoch())
279    }
280
281    fn insert_state_accumulator_for_epoch(
282        &self,
283        epoch: EpochId,
284        checkpoint_seq_num: &CheckpointSequenceNumber,
285        acc: &Accumulator,
286    ) -> IotaResult {
287        delegate_method!(self.insert_state_accumulator_for_epoch(epoch, checkpoint_seq_num, acc))
288    }
289
290    fn iter_live_object_set(
291        &self,
292    ) -> Box<dyn Iterator<Item = crate::authority::authority_store_tables::LiveObject> + '_> {
293        delegate_method!(self.iter_live_object_set())
294    }
295
296    fn iter_cached_live_object_set_for_testing(
297        &self,
298    ) -> Box<dyn Iterator<Item = crate::authority::authority_store_tables::LiveObject> + '_> {
299        delegate_method!(self.iter_cached_live_object_set_for_testing())
300    }
301}
302
303impl ExecutionCacheCommit for ProxyCache {
304    fn commit_transaction_outputs<'a>(
305        &'a self,
306        epoch: EpochId,
307        digests: &'a [TransactionDigest],
308    ) -> BoxFuture<'a, IotaResult> {
309        delegate_method!(self.commit_transaction_outputs(epoch, digests))
310    }
311
312    fn persist_transactions<'a>(
313        &'a self,
314        digests: &'a [TransactionDigest],
315    ) -> BoxFuture<'a, IotaResult> {
316        delegate_method!(self.persist_transactions(digests))
317    }
318}
319
320impl CheckpointCache for ProxyCache {
321    fn get_transaction_perpetual_checkpoint(
322        &self,
323        digest: &TransactionDigest,
324    ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
325        delegate_method!(self.get_transaction_perpetual_checkpoint(digest))
326    }
327
328    fn multi_get_transactions_perpetual_checkpoints(
329        &self,
330        digests: &[TransactionDigest],
331    ) -> IotaResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
332        delegate_method!(self.multi_get_transactions_perpetual_checkpoints(digests))
333    }
334
335    fn insert_finalized_transactions_perpetual_checkpoints(
336        &self,
337        digests: &[TransactionDigest],
338        epoch: EpochId,
339        sequence: CheckpointSequenceNumber,
340    ) -> IotaResult {
341        delegate_method!(
342            self.insert_finalized_transactions_perpetual_checkpoints(digests, epoch, sequence)
343        )
344    }
345}
346
347impl ExecutionCacheReconfigAPI for ProxyCache {
348    fn insert_genesis_object(&self, object: Object) -> IotaResult {
349        delegate_method!(self.insert_genesis_object(object))
350    }
351
352    fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> IotaResult {
353        delegate_method!(self.bulk_insert_genesis_objects(objects))
354    }
355
356    fn revert_state_update(&self, digest: &TransactionDigest) -> IotaResult {
357        delegate_method!(self.revert_state_update(digest))
358    }
359
360    fn set_epoch_start_configuration(
361        &self,
362        epoch_start_config: &EpochStartConfiguration,
363    ) -> IotaResult {
364        delegate_method!(self.set_epoch_start_configuration(epoch_start_config))
365    }
366
367    fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
368        delegate_method!(self.update_epoch_flags_metrics(old, new))
369    }
370
371    fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
372        delegate_method!(self.clear_state_end_of_epoch(execution_guard))
373    }
374
375    fn expensive_check_iota_conservation(
376        &self,
377        old_epoch_store: &AuthorityPerEpochStore,
378        epoch_supply_change: Option<i64>,
379    ) -> IotaResult {
380        delegate_method!(
381            self.expensive_check_iota_conservation(old_epoch_store, epoch_supply_change)
382        )
383    }
384
385    fn checkpoint_db(&self, path: &std::path::Path) -> IotaResult {
386        delegate_method!(self.checkpoint_db(path))
387    }
388
389    fn reconfigure_cache<'a>(
390        &'a self,
391        epoch_start_config: &'a EpochStartConfiguration,
392    ) -> BoxFuture<'a, ()> {
393        self.reconfigure_cache_impl(epoch_start_config).boxed()
394    }
395}
396
397impl StateSyncAPI for ProxyCache {
398    fn insert_transaction_and_effects(
399        &self,
400        transaction: &VerifiedTransaction,
401        transaction_effects: &TransactionEffects,
402    ) -> IotaResult {
403        delegate_method!(self.insert_transaction_and_effects(transaction, transaction_effects))
404    }
405
406    fn multi_insert_transaction_and_effects(
407        &self,
408        transactions_and_effects: &[VerifiedExecutionData],
409    ) -> IotaResult {
410        delegate_method!(self.multi_insert_transaction_and_effects(transactions_and_effects))
411    }
412}
413
414impl TestingAPI for ProxyCache {
415    fn database_for_testing(&self) -> Arc<AuthorityStore> {
416        delegate_method!(self.database_for_testing())
417    }
418}