iota_core/execution_cache/
proxy_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use futures::{FutureExt, future::BoxFuture};
8use iota_config::node::ExecutionCacheTypeAtomicU8;
9use iota_types::{
10    accumulator::Accumulator,
11    base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
12    digests::{TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
13    effects::{TransactionEffects, TransactionEvents},
14    error::{IotaError, IotaResult},
15    executable_transaction::VerifiedExecutableTransaction,
16    iota_system_state::IotaSystemState,
17    messages_checkpoint::CheckpointSequenceNumber,
18    object::Object,
19    storage::{MarkerValue, ObjectKey, ObjectOrTombstone, PackageObject},
20    transaction::{VerifiedSignedTransaction, VerifiedTransaction},
21};
22use tracing::instrument;
23
24use super::{
25    CheckpointCache, ExecutionCacheCommit, ExecutionCacheConfig, ExecutionCacheMetrics,
26    ExecutionCacheReconfigAPI, ExecutionCacheType, ExecutionCacheWrite, ObjectCacheRead,
27    PassthroughCache, StateSyncAPI, TestingAPI, TransactionCacheRead, WritebackCache,
28};
29use crate::{
30    authority::{
31        AuthorityStore,
32        authority_per_epoch_store::AuthorityPerEpochStore,
33        authority_store::{ExecutionLockWriteGuard, IotaLockResult},
34        backpressure::BackpressureManager,
35        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
36    },
37    state_accumulator::AccumulatorStore,
38    transaction_outputs::TransactionOutputs,
39};
40
41macro_rules! delegate_method {
42    ($self:ident.$method:ident($($args:ident),*)) => {
43        match ExecutionCacheType::from(&$self.mode.load(std::sync::atomic::Ordering::Acquire)) {
44            ExecutionCacheType::PassthroughCache => $self.passthrough_cache.$method($($args),*),
45            ExecutionCacheType::WritebackCache => $self.writeback_cache.$method($($args),*),
46        }
47    };
48}
49
50pub struct ProxyCache {
51    // Note: both caches must be constructed at startup, rather than using ArcSwap
52    // (or some similar strategy). This is because we need to proxy iter_live_object_set,
53    // which requires that we borrow from a member of the cache. If we used ArcSwap,
54    // we would be forced to borrow from a local variable after loading from the ArcSwap.
55    //
56    // Cache implementations are entirely passive, so the unused one will have no effect.
57    passthrough_cache: PassthroughCache,
58    writeback_cache: WritebackCache,
59    mode: ExecutionCacheTypeAtomicU8,
60}
61
62impl ProxyCache {
63    pub fn new(
64        cache_config: &ExecutionCacheConfig,
65        epoch_start_config: &EpochStartConfiguration,
66        store: Arc<AuthorityStore>,
67        metrics: Arc<ExecutionCacheMetrics>,
68        backpressure_manager: Arc<BackpressureManager>,
69    ) -> Self {
70        let cache_type = epoch_start_config.execution_cache_type();
71        tracing::info!("using cache impl {:?}", cache_type);
72        let passthrough_cache = PassthroughCache::new(store.clone(), metrics.clone());
73
74        let writeback_cache = WritebackCache::new(
75            &cache_config.writeback_cache,
76            store.clone(),
77            metrics.clone(),
78            backpressure_manager,
79        );
80
81        Self {
82            passthrough_cache,
83            writeback_cache,
84            mode: cache_type.into(),
85        }
86    }
87
88    async fn reconfigure_cache_impl(&self, epoch_start_config: &EpochStartConfiguration) {
89        let cache_type = epoch_start_config.execution_cache_type();
90        tracing::info!("switching to cache impl {:?}", cache_type);
91        if matches!(cache_type, ExecutionCacheType::PassthroughCache) {
92            // we may switch back to the writeback cache next epoch, at which point its
93            // caches will be stale if not cleared now
94
95            // When we call invalidate_all on Moka caches, it sets the valid after time
96            // stamp to the current time. Upon retrieval, it ignores entries
97            // whose insertion time is strictly less than the valid-after
98            // time. In the simulator, time remains constant for the duration of a single
99            // task poll, so it is possible that entries have been inserted in
100            // the same tick, and will therefore not be invalidated
101            // properly. So, this sleep is necessary for passing tests, and it also is some
102            // insurance against hitting the same issue in production. (It
103            // should be more or less impossible for two consecutive
104            // calls to Instant::now() to return the same value in production, but there is
105            // no harm in having a short sleep here just to be sure).
106            tokio::time::sleep(Duration::from_nanos(100)).await;
107            self.writeback_cache.clear_caches_and_assert_empty();
108        }
109        self.mode
110            .store(cache_type.into(), std::sync::atomic::Ordering::Release);
111    }
112}
113
114impl ObjectCacheRead for ProxyCache {
115    fn try_get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
116        delegate_method!(self.try_get_package_object(package_id))
117    }
118
119    fn force_reload_system_packages(&self, system_package_ids: &[ObjectID]) {
120        delegate_method!(self.force_reload_system_packages(system_package_ids))
121    }
122
123    fn try_get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
124        delegate_method!(self.try_get_object(id))
125    }
126
127    fn try_get_object_by_key(
128        &self,
129        object_id: &ObjectID,
130        version: SequenceNumber,
131    ) -> IotaResult<Option<Object>> {
132        delegate_method!(self.try_get_object_by_key(object_id, version))
133    }
134
135    fn try_multi_get_objects_by_key(
136        &self,
137        object_keys: &[ObjectKey],
138    ) -> Result<Vec<Option<Object>>, IotaError> {
139        delegate_method!(self.try_multi_get_objects_by_key(object_keys))
140    }
141
142    fn try_object_exists_by_key(
143        &self,
144        object_id: &ObjectID,
145        version: SequenceNumber,
146    ) -> IotaResult<bool> {
147        delegate_method!(self.try_object_exists_by_key(object_id, version))
148    }
149
150    fn try_multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
151        delegate_method!(self.try_multi_object_exists_by_key(object_keys))
152    }
153
154    fn try_get_latest_object_ref_or_tombstone(
155        &self,
156        object_id: ObjectID,
157    ) -> IotaResult<Option<ObjectRef>> {
158        delegate_method!(self.try_get_latest_object_ref_or_tombstone(object_id))
159    }
160
161    fn try_get_latest_object_or_tombstone(
162        &self,
163        object_id: ObjectID,
164    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
165        delegate_method!(self.try_get_latest_object_or_tombstone(object_id))
166    }
167
168    fn try_find_object_lt_or_eq_version(
169        &self,
170        object_id: ObjectID,
171        version: SequenceNumber,
172    ) -> IotaResult<Option<Object>> {
173        delegate_method!(self.try_find_object_lt_or_eq_version(object_id, version))
174    }
175
176    fn try_get_lock(
177        &self,
178        obj_ref: ObjectRef,
179        epoch_store: &AuthorityPerEpochStore,
180    ) -> IotaLockResult {
181        delegate_method!(self.try_get_lock(obj_ref, epoch_store))
182    }
183
184    fn _try_get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
185        delegate_method!(self._try_get_live_objref(object_id))
186    }
187
188    fn try_check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
189        delegate_method!(self.try_check_owned_objects_are_live(owned_object_refs))
190    }
191
192    fn try_get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
193        delegate_method!(self.try_get_iota_system_state_object_unsafe())
194    }
195
196    fn try_get_marker_value(
197        &self,
198        object_id: &ObjectID,
199        version: SequenceNumber,
200        epoch_id: EpochId,
201    ) -> IotaResult<Option<MarkerValue>> {
202        delegate_method!(self.try_get_marker_value(object_id, version, epoch_id))
203    }
204
205    fn try_get_latest_marker(
206        &self,
207        object_id: &ObjectID,
208        epoch_id: EpochId,
209    ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
210        delegate_method!(self.try_get_latest_marker(object_id, epoch_id))
211    }
212
213    fn try_get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
214        delegate_method!(self.try_get_highest_pruned_checkpoint())
215    }
216}
217
218impl TransactionCacheRead for ProxyCache {
219    #[instrument(level = "trace", skip_all)]
220    fn try_multi_get_transaction_blocks(
221        &self,
222        digests: &[TransactionDigest],
223    ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
224        delegate_method!(self.try_multi_get_transaction_blocks(digests))
225    }
226
227    #[instrument(level = "trace", skip_all)]
228    fn try_multi_get_executed_effects_digests(
229        &self,
230        digests: &[TransactionDigest],
231    ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
232        delegate_method!(self.try_multi_get_executed_effects_digests(digests))
233    }
234
235    #[instrument(level = "trace", skip_all)]
236    fn try_multi_get_effects(
237        &self,
238        digests: &[TransactionEffectsDigest],
239    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
240        delegate_method!(self.try_multi_get_effects(digests))
241    }
242
243    #[instrument(level = "trace", skip_all)]
244    fn try_notify_read_executed_effects_digests<'a>(
245        &'a self,
246        digests: &'a [TransactionDigest],
247    ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
248        delegate_method!(self.try_notify_read_executed_effects_digests(digests))
249    }
250
251    #[instrument(level = "trace", skip_all)]
252    fn try_multi_get_events(
253        &self,
254        event_digests: &[TransactionEventsDigest],
255    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
256        delegate_method!(self.try_multi_get_events(event_digests))
257    }
258}
259
260impl ExecutionCacheWrite for ProxyCache {
261    fn try_write_transaction_outputs(
262        &self,
263        epoch_id: EpochId,
264        tx_outputs: Arc<TransactionOutputs>,
265    ) -> IotaResult {
266        delegate_method!(self.try_write_transaction_outputs(epoch_id, tx_outputs))
267    }
268
269    #[instrument(level = "trace", skip_all)]
270    fn try_acquire_transaction_locks<'a>(
271        &'a self,
272        epoch_store: &'a AuthorityPerEpochStore,
273        owned_input_objects: &'a [ObjectRef],
274        transaction: VerifiedSignedTransaction,
275    ) -> IotaResult {
276        delegate_method!(self.try_acquire_transaction_locks(
277            epoch_store,
278            owned_input_objects,
279            transaction
280        ))
281    }
282}
283
284impl AccumulatorStore for ProxyCache {
285    fn get_root_state_accumulator_for_epoch(
286        &self,
287        epoch: EpochId,
288    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
289        delegate_method!(self.get_root_state_accumulator_for_epoch(epoch))
290    }
291
292    fn get_root_state_accumulator_for_highest_epoch(
293        &self,
294    ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
295        delegate_method!(self.get_root_state_accumulator_for_highest_epoch())
296    }
297
298    fn insert_state_accumulator_for_epoch(
299        &self,
300        epoch: EpochId,
301        checkpoint_seq_num: &CheckpointSequenceNumber,
302        acc: &Accumulator,
303    ) -> IotaResult {
304        delegate_method!(self.insert_state_accumulator_for_epoch(epoch, checkpoint_seq_num, acc))
305    }
306
307    fn iter_live_object_set(
308        &self,
309    ) -> Box<dyn Iterator<Item = crate::authority::authority_store_tables::LiveObject> + '_> {
310        delegate_method!(self.iter_live_object_set())
311    }
312
313    fn iter_cached_live_object_set_for_testing(
314        &self,
315    ) -> Box<dyn Iterator<Item = crate::authority::authority_store_tables::LiveObject> + '_> {
316        delegate_method!(self.iter_cached_live_object_set_for_testing())
317    }
318}
319
320impl ExecutionCacheCommit for ProxyCache {
321    fn try_commit_transaction_outputs(
322        &self,
323        epoch: EpochId,
324        digests: &[TransactionDigest],
325    ) -> IotaResult {
326        delegate_method!(self.try_commit_transaction_outputs(epoch, digests))
327    }
328
329    fn try_persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> IotaResult {
330        delegate_method!(self.try_persist_transaction(tx))
331    }
332
333    fn approximate_pending_transaction_count(&self) -> u64 {
334        delegate_method!(self.approximate_pending_transaction_count())
335    }
336}
337
338impl CheckpointCache for ProxyCache {
339    fn try_get_transaction_perpetual_checkpoint(
340        &self,
341        digest: &TransactionDigest,
342    ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
343        delegate_method!(self.try_get_transaction_perpetual_checkpoint(digest))
344    }
345
346    fn try_multi_get_transactions_perpetual_checkpoints(
347        &self,
348        digests: &[TransactionDigest],
349    ) -> IotaResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
350        delegate_method!(self.try_multi_get_transactions_perpetual_checkpoints(digests))
351    }
352
353    fn try_insert_finalized_transactions_perpetual_checkpoints(
354        &self,
355        digests: &[TransactionDigest],
356        epoch: EpochId,
357        sequence: CheckpointSequenceNumber,
358    ) -> IotaResult {
359        delegate_method!(
360            self.try_insert_finalized_transactions_perpetual_checkpoints(digests, epoch, sequence)
361        )
362    }
363}
364
365impl ExecutionCacheReconfigAPI for ProxyCache {
366    fn try_insert_genesis_object(&self, object: Object) -> IotaResult {
367        delegate_method!(self.try_insert_genesis_object(object))
368    }
369
370    fn try_bulk_insert_genesis_objects(&self, objects: &[Object]) -> IotaResult {
371        delegate_method!(self.try_bulk_insert_genesis_objects(objects))
372    }
373
374    fn try_revert_state_update(&self, digest: &TransactionDigest) -> IotaResult {
375        delegate_method!(self.try_revert_state_update(digest))
376    }
377
378    fn try_set_epoch_start_configuration(
379        &self,
380        epoch_start_config: &EpochStartConfiguration,
381    ) -> IotaResult {
382        delegate_method!(self.try_set_epoch_start_configuration(epoch_start_config))
383    }
384
385    fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
386        delegate_method!(self.update_epoch_flags_metrics(old, new))
387    }
388
389    fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
390        delegate_method!(self.clear_state_end_of_epoch(execution_guard))
391    }
392
393    fn try_expensive_check_iota_conservation(
394        &self,
395        old_epoch_store: &AuthorityPerEpochStore,
396        epoch_supply_change: Option<i64>,
397    ) -> IotaResult {
398        delegate_method!(
399            self.try_expensive_check_iota_conservation(old_epoch_store, epoch_supply_change)
400        )
401    }
402
403    fn try_checkpoint_db(&self, path: &std::path::Path) -> IotaResult {
404        delegate_method!(self.try_checkpoint_db(path))
405    }
406
407    fn reconfigure_cache<'a>(
408        &'a self,
409        epoch_start_config: &'a EpochStartConfiguration,
410    ) -> BoxFuture<'a, ()> {
411        self.reconfigure_cache_impl(epoch_start_config).boxed()
412    }
413}
414
415impl StateSyncAPI for ProxyCache {
416    fn try_insert_transaction_and_effects(
417        &self,
418        transaction: &VerifiedTransaction,
419        transaction_effects: &TransactionEffects,
420    ) -> IotaResult {
421        delegate_method!(self.try_insert_transaction_and_effects(transaction, transaction_effects))
422    }
423
424    fn try_multi_insert_transaction_and_effects(
425        &self,
426        transactions_and_effects: &[VerifiedExecutionData],
427    ) -> IotaResult {
428        delegate_method!(self.try_multi_insert_transaction_and_effects(transactions_and_effects))
429    }
430}
431
432impl TestingAPI for ProxyCache {
433    fn database_for_testing(&self) -> Arc<AuthorityStore> {
434        delegate_method!(self.database_for_testing())
435    }
436}