iota_core/
execution_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashSet, path::Path, sync::Arc};
6
7use futures::{FutureExt, future::BoxFuture};
8use iota_common::fatal;
9use iota_config::ExecutionCacheConfig;
10use iota_types::{
11    base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
12    bridge::Bridge,
13    digests::{TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
14    effects::{TransactionEffects, TransactionEvents},
15    error::{IotaError, IotaResult, UserInputError},
16    iota_system_state::IotaSystemState,
17    messages_checkpoint::CheckpointSequenceNumber,
18    object::{Object, Owner},
19    storage::{
20        BackingPackageStore, BackingStore, ChildObjectResolver, InputKey, MarkerValue, ObjectKey,
21        ObjectOrTombstone, ObjectStore, PackageObject,
22        error::{Error as StorageError, Result as StorageResult},
23    },
24    transaction::{VerifiedSignedTransaction, VerifiedTransaction},
25};
26use prometheus::Registry;
27use tracing::instrument;
28
29use crate::{
30    authority::{
31        AuthorityStore,
32        authority_per_epoch_store::AuthorityPerEpochStore,
33        authority_store::{ExecutionLockWriteGuard, IotaLockResult},
34        epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
35    },
36    state_accumulator::AccumulatorStore,
37    transaction_outputs::TransactionOutputs,
38};
39
40pub(crate) mod cache_types;
41pub mod metrics;
42mod object_locks;
43pub mod passthrough_cache;
44pub mod proxy_cache;
45pub mod writeback_cache;
46
47use metrics::ExecutionCacheMetrics;
48pub use passthrough_cache::PassthroughCache;
49pub use proxy_cache::ProxyCache;
50pub use writeback_cache::WritebackCache;
51
52// If you have Arc<ExecutionCache>, you cannot return a reference to it as
53// an &Arc<dyn ExecutionCacheRead> (for example), because the trait object is a
54// fat pointer. So, in order to be able to return &Arc<dyn T>, we create all the
55// converted trait objects (aka fat pointers) up front and return references to
56// them.
57#[derive(Clone)]
58pub struct ExecutionCacheTraitPointers {
59    pub object_cache_reader: Arc<dyn ObjectCacheRead>,
60    pub transaction_cache_reader: Arc<dyn TransactionCacheRead>,
61    pub cache_writer: Arc<dyn ExecutionCacheWrite>,
62    pub backing_store: Arc<dyn BackingStore + Send + Sync>,
63    pub backing_package_store: Arc<dyn BackingPackageStore + Send + Sync>,
64    pub object_store: Arc<dyn ObjectStore + Send + Sync>,
65    pub reconfig_api: Arc<dyn ExecutionCacheReconfigAPI>,
66    pub accumulator_store: Arc<dyn AccumulatorStore>,
67    pub checkpoint_cache: Arc<dyn CheckpointCache>,
68    pub state_sync_store: Arc<dyn StateSyncAPI>,
69    pub cache_commit: Arc<dyn ExecutionCacheCommit>,
70    pub testing_api: Arc<dyn TestingAPI>,
71}
72
73impl ExecutionCacheTraitPointers {
74    pub fn new<T>(cache: Arc<T>) -> Self
75    where
76        T: ObjectCacheRead
77            + TransactionCacheRead
78            + ExecutionCacheWrite
79            + BackingStore
80            + BackingPackageStore
81            + ObjectStore
82            + ExecutionCacheReconfigAPI
83            + AccumulatorStore
84            + CheckpointCache
85            + StateSyncAPI
86            + ExecutionCacheCommit
87            + TestingAPI
88            + 'static,
89    {
90        Self {
91            object_cache_reader: cache.clone(),
92            transaction_cache_reader: cache.clone(),
93            cache_writer: cache.clone(),
94            backing_store: cache.clone(),
95            backing_package_store: cache.clone(),
96            object_store: cache.clone(),
97            reconfig_api: cache.clone(),
98            accumulator_store: cache.clone(),
99            checkpoint_cache: cache.clone(),
100            state_sync_store: cache.clone(),
101            cache_commit: cache.clone(),
102            testing_api: cache.clone(),
103        }
104    }
105}
106
107static ENABLE_WRITEBACK_CACHE_ENV_VAR: &str = "ENABLE_WRITEBACK_CACHE";
108
109#[derive(Debug)]
110pub enum ExecutionCacheConfigType {
111    WritebackCache,
112    PassthroughCache,
113}
114
115pub fn choose_execution_cache(config: &ExecutionCacheConfig) -> ExecutionCacheConfigType {
116    #[cfg(msim)]
117    {
118        let mut use_random_cache = None;
119        iota_macros::fail_point_if!("select-random-cache", || {
120            let random = rand::random::<bool>();
121            tracing::info!("Randomly selecting cache: {}", random);
122            use_random_cache = Some(random);
123        });
124        if let Some(random) = use_random_cache {
125            if random {
126                return ExecutionCacheConfigType::PassthroughCache;
127            } else {
128                return ExecutionCacheConfigType::WritebackCache;
129            }
130        }
131    }
132
133    if std::env::var(ENABLE_WRITEBACK_CACHE_ENV_VAR).is_ok()
134        || matches!(config, ExecutionCacheConfig::WritebackCache { .. })
135    {
136        ExecutionCacheConfigType::WritebackCache
137    } else {
138        ExecutionCacheConfigType::PassthroughCache
139    }
140}
141
142pub fn build_execution_cache(
143    epoch_start_config: &EpochStartConfiguration,
144    prometheus_registry: &Registry,
145    store: &Arc<AuthorityStore>,
146) -> ExecutionCacheTraitPointers {
147    let execution_cache_metrics = Arc::new(ExecutionCacheMetrics::new(prometheus_registry));
148    ExecutionCacheTraitPointers::new(
149        ProxyCache::new(epoch_start_config, store.clone(), execution_cache_metrics).into(),
150    )
151}
152
153/// Should only be used for iota-tool or tests. Nodes must use
154/// build_execution_cache which uses the epoch_start_config to prevent cache
155/// impl from switching except at epoch boundaries.
156pub fn build_execution_cache_from_env(
157    prometheus_registry: &Registry,
158    store: &Arc<AuthorityStore>,
159) -> ExecutionCacheTraitPointers {
160    let execution_cache_metrics = Arc::new(ExecutionCacheMetrics::new(prometheus_registry));
161
162    if std::env::var(ENABLE_WRITEBACK_CACHE_ENV_VAR).is_ok() {
163        ExecutionCacheTraitPointers::new(
164            WritebackCache::new(store.clone(), execution_cache_metrics).into(),
165        )
166    } else {
167        ExecutionCacheTraitPointers::new(
168            PassthroughCache::new(store.clone(), execution_cache_metrics).into(),
169        )
170    }
171}
172
173pub trait ExecutionCacheCommit: Send + Sync {
174    /// Durably commit the outputs of the given transactions to the database.
175    /// Will be called by CheckpointExecutor to ensure that transaction outputs
176    /// are written durably before marking a checkpoint as finalized.
177    fn commit_transaction_outputs<'a>(
178        &'a self,
179        epoch: EpochId,
180        digests: &'a [TransactionDigest],
181    ) -> BoxFuture<'a, IotaResult>;
182
183    /// Durably commit transactions (but not their outputs) to the database.
184    /// Called before writing a locally built checkpoint to the CheckpointStore,
185    /// so that the inputs of the checkpoint cannot be lost.
186    /// These transactions are guaranteed to be final unless this validator
187    /// forks (i.e. constructs a checkpoint which will never be certified). In
188    /// this case some non-final transactions could be left in the database.
189    ///
190    /// This is an intermediate solution until we delay commits to the epoch db.
191    /// After we have done that, crash recovery will be done by
192    /// re-processing consensus commits and pending_consensus_transactions,
193    /// and this method can be removed.
194    fn persist_transactions<'a>(
195        &'a self,
196        digests: &'a [TransactionDigest],
197    ) -> BoxFuture<'a, IotaResult>;
198}
199
200pub trait ObjectCacheRead: Send + Sync {
201    fn get_package_object(&self, id: &ObjectID) -> IotaResult<Option<PackageObject>>;
202    fn force_reload_system_packages(&self, system_package_ids: &[ObjectID]);
203
204    fn get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>>;
205
206    fn get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
207        let mut ret = Vec::with_capacity(objects.len());
208        for object_id in objects {
209            ret.push(self.get_object(object_id)?);
210        }
211        Ok(ret)
212    }
213
214    fn get_latest_object_ref_or_tombstone(
215        &self,
216        object_id: ObjectID,
217    ) -> IotaResult<Option<ObjectRef>>;
218
219    fn get_latest_object_or_tombstone(
220        &self,
221        object_id: ObjectID,
222    ) -> IotaResult<Option<(ObjectKey, ObjectOrTombstone)>>;
223
224    fn get_object_by_key(
225        &self,
226        object_id: &ObjectID,
227        version: SequenceNumber,
228    ) -> IotaResult<Option<Object>>;
229
230    fn multi_get_objects_by_key(
231        &self,
232        object_keys: &[ObjectKey],
233    ) -> IotaResult<Vec<Option<Object>>>;
234
235    fn object_exists_by_key(
236        &self,
237        object_id: &ObjectID,
238        version: SequenceNumber,
239    ) -> IotaResult<bool>;
240
241    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>>;
242
243    /// Load a list of objects from the store by object reference.
244    /// If they exist in the store, they are returned directly.
245    /// If any object missing, we try to figure out the best error to return.
246    /// If the object we are asking is currently locked at a future version, we
247    /// know this transaction is out-of-date and we return a
248    /// ObjectVersionUnavailableForConsumption, which indicates this is not
249    /// retriable. Otherwise, we return a ObjectNotFound error, which
250    /// indicates this is retriable.
251    fn multi_get_objects_with_more_accurate_error_return(
252        &self,
253        object_refs: &[ObjectRef],
254    ) -> Result<Vec<Object>, IotaError> {
255        let objects = self.multi_get_objects_by_key(
256            &object_refs.iter().map(ObjectKey::from).collect::<Vec<_>>(),
257        )?;
258        let mut result = Vec::new();
259        for (object_opt, object_ref) in objects.into_iter().zip(object_refs) {
260            match object_opt {
261                None => {
262                    let live_objref = self._get_live_objref(object_ref.0)?;
263                    let error = if live_objref.1 >= object_ref.1 {
264                        UserInputError::ObjectVersionUnavailableForConsumption {
265                            provided_obj_ref: *object_ref,
266                            current_version: live_objref.1,
267                        }
268                    } else {
269                        UserInputError::ObjectNotFound {
270                            object_id: object_ref.0,
271                            version: Some(object_ref.1),
272                        }
273                    };
274                    return Err(IotaError::UserInput { error });
275                }
276                Some(object) => {
277                    result.push(object);
278                }
279            }
280        }
281        assert_eq!(result.len(), object_refs.len());
282        Ok(result)
283    }
284
285    /// Used by transaction manager to determine if input objects are ready.
286    /// Distinct from multi_get_object_by_key because it also consults
287    /// markers to handle the case where an object will never become available
288    /// (e.g. because it has been received by some other transaction
289    /// already).
290    fn multi_input_objects_available(
291        &self,
292        keys: &[InputKey],
293        receiving_objects: HashSet<InputKey>,
294        epoch: EpochId,
295    ) -> Result<Vec<bool>, IotaError> {
296        let (keys_with_version, keys_without_version): (Vec<_>, Vec<_>) = keys
297            .iter()
298            .enumerate()
299            .partition(|(_, key)| key.version().is_some());
300
301        let mut versioned_results = vec![];
302        for ((idx, input_key), has_key) in keys_with_version.iter().zip(
303            self.multi_object_exists_by_key(
304                &keys_with_version
305                    .iter()
306                    .map(|(_, k)| ObjectKey(k.id(), k.version().unwrap()))
307                    .collect::<Vec<_>>(),
308            )?
309            .into_iter(),
310        ) {
311            assert!(
312                input_key.version().is_none() || input_key.version().unwrap().is_valid(),
313                "Shared objects in cancelled transaction should always be available immediately, 
314                 but it appears that transaction manager is waiting for {:?} to become available",
315                input_key
316            );
317            // If the key exists at the specified version, then the object is available.
318            if has_key {
319                versioned_results.push((*idx, true))
320            } else if receiving_objects.contains(input_key) {
321                // There could be a more recent version of this object, and the object at the
322                // specified version could have already been pruned. In such a case `has_key`
323                // will be false, but since this is a receiving object we should
324                // mark it as available if we can determine that an object with
325                // a version greater than or equal to the specified version
326                // exists or was deleted. We will then let mark it as available
327                // to let the transaction through so it can fail at execution.
328                let is_available = self
329                    .get_object(&input_key.id())?
330                    .map(|obj| obj.version() >= input_key.version().unwrap())
331                    .unwrap_or(false)
332                    || self.have_deleted_owned_object_at_version_or_after(
333                        &input_key.id(),
334                        input_key.version().unwrap(),
335                        epoch,
336                    )?;
337                versioned_results.push((*idx, is_available));
338            } else if self
339                .get_deleted_shared_object_previous_tx_digest(
340                    &input_key.id(),
341                    input_key.version().unwrap(),
342                    epoch,
343                )?
344                .is_some()
345            {
346                // If the object is an already deleted shared object, mark it as available if
347                // the version for that object is in the shared deleted marker
348                // table.
349                versioned_results.push((*idx, true));
350            } else {
351                versioned_results.push((*idx, false));
352            }
353        }
354
355        let unversioned_results = keys_without_version.into_iter().map(|(idx, key)| {
356            (
357                idx,
358                match self
359                    .get_latest_object_ref_or_tombstone(key.id())
360                    .expect("read cannot fail")
361                {
362                    None => false,
363                    Some(entry) => entry.2.is_alive(),
364                },
365            )
366        });
367
368        let mut results = versioned_results
369            .into_iter()
370            .chain(unversioned_results)
371            .collect::<Vec<_>>();
372        results.sort_by_key(|(idx, _)| *idx);
373        Ok(results.into_iter().map(|(_, result)| result).collect())
374    }
375
376    /// Return the object with version less then or eq to the provided seq
377    /// number. This is used by indexer to find the correct version of
378    /// dynamic field child object. We do not store the version of the child
379    /// object, but because of lamport timestamp, we know the child must
380    /// have version number less then or eq to the parent.
381    fn find_object_lt_or_eq_version(
382        &self,
383        object_id: ObjectID,
384        version: SequenceNumber,
385    ) -> IotaResult<Option<Object>>;
386
387    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> IotaLockResult;
388
389    // This method is considered "private" - only used by
390    // multi_get_objects_with_more_accurate_error_return
391    fn _get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef>;
392
393    // Check that the given set of objects are live at the given version. This is
394    // used as a safety check before execution, and could potentially be deleted
395    // or changed to a debug_assert
396    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult;
397
398    fn get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState>;
399
400    fn get_bridge_object_unsafe(&self) -> IotaResult<Bridge>;
401
402    // Marker methods
403
404    /// Get the marker at a specific version
405    fn get_marker_value(
406        &self,
407        object_id: &ObjectID,
408        version: SequenceNumber,
409        epoch_id: EpochId,
410    ) -> IotaResult<Option<MarkerValue>>;
411
412    /// Get the latest marker for a given object.
413    fn get_latest_marker(
414        &self,
415        object_id: &ObjectID,
416        epoch_id: EpochId,
417    ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>>;
418
419    /// If the shared object was deleted, return deletion info for the current
420    /// live version
421    fn get_last_shared_object_deletion_info(
422        &self,
423        object_id: &ObjectID,
424        epoch_id: EpochId,
425    ) -> IotaResult<Option<(SequenceNumber, TransactionDigest)>> {
426        match self.get_latest_marker(object_id, epoch_id)? {
427            Some((version, MarkerValue::SharedDeleted(digest))) => Ok(Some((version, digest))),
428            _ => Ok(None),
429        }
430    }
431
432    /// If the shared object was deleted, return deletion info for the specified
433    /// version.
434    fn get_deleted_shared_object_previous_tx_digest(
435        &self,
436        object_id: &ObjectID,
437        version: SequenceNumber,
438        epoch_id: EpochId,
439    ) -> IotaResult<Option<TransactionDigest>> {
440        match self.get_marker_value(object_id, version, epoch_id)? {
441            Some(MarkerValue::SharedDeleted(digest)) => Ok(Some(digest)),
442            _ => Ok(None),
443        }
444    }
445
446    fn have_received_object_at_version(
447        &self,
448        object_id: &ObjectID,
449        version: SequenceNumber,
450        epoch_id: EpochId,
451    ) -> IotaResult<bool> {
452        match self.get_marker_value(object_id, version, epoch_id)? {
453            Some(MarkerValue::Received) => Ok(true),
454            _ => Ok(false),
455        }
456    }
457
458    fn have_deleted_owned_object_at_version_or_after(
459        &self,
460        object_id: &ObjectID,
461        version: SequenceNumber,
462        epoch_id: EpochId,
463    ) -> IotaResult<bool> {
464        match self.get_latest_marker(object_id, epoch_id)? {
465            Some((marker_version, MarkerValue::OwnedDeleted)) if marker_version >= version => {
466                Ok(true)
467            }
468            _ => Ok(false),
469        }
470    }
471
472    /// Return the watermark for the highest checkpoint for which we've pruned
473    /// objects.
474    fn get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber>;
475}
476
477pub trait TransactionCacheRead: Send + Sync {
478    fn multi_get_transaction_blocks(
479        &self,
480        digests: &[TransactionDigest],
481    ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>>;
482
483    fn get_transaction_block(
484        &self,
485        digest: &TransactionDigest,
486    ) -> IotaResult<Option<Arc<VerifiedTransaction>>> {
487        self.multi_get_transaction_blocks(&[*digest])
488            .map(|mut blocks| {
489                blocks
490                    .pop()
491                    .expect("multi-get must return correct number of items")
492            })
493    }
494
495    #[instrument(level = "trace", skip_all)]
496    fn get_transactions_and_serialized_sizes(
497        &self,
498        digests: &[TransactionDigest],
499    ) -> IotaResult<Vec<Option<(VerifiedTransaction, usize)>>> {
500        let txns = self.multi_get_transaction_blocks(digests)?;
501        txns.into_iter()
502            .map(|txn| {
503                txn.map(|txn| {
504                    // Note: if the transaction is read from the db, we are wasting some
505                    // effort relative to reading the raw bytes from the db instead of
506                    // calling serialized_size. However, transactions should usually be
507                    // fetched from cache.
508                    match txn.serialized_size() {
509                        Ok(size) => Ok(((*txn).clone(), size)),
510                        Err(e) => Err(e),
511                    }
512                })
513                .transpose()
514            })
515            .collect::<Result<Vec<_>, _>>()
516    }
517
518    fn multi_get_executed_effects_digests(
519        &self,
520        digests: &[TransactionDigest],
521    ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>>;
522
523    fn is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
524        self.multi_get_executed_effects_digests(&[*digest])
525            .map(|mut digests| {
526                digests
527                    .pop()
528                    .expect("multi-get must return correct number of items")
529                    .is_some()
530            })
531    }
532
533    fn multi_get_executed_effects(
534        &self,
535        digests: &[TransactionDigest],
536    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
537        let effects_digests = self.multi_get_executed_effects_digests(digests)?;
538        assert_eq!(effects_digests.len(), digests.len());
539
540        let mut results = vec![None; digests.len()];
541        let mut fetch_digests = Vec::with_capacity(digests.len());
542        let mut fetch_indices = Vec::with_capacity(digests.len());
543
544        for (i, digest) in effects_digests.into_iter().enumerate() {
545            if let Some(digest) = digest {
546                fetch_digests.push(digest);
547                fetch_indices.push(i);
548            }
549        }
550
551        let effects = self.multi_get_effects(&fetch_digests)?;
552        for (i, effects) in fetch_indices.into_iter().zip(effects.into_iter()) {
553            results[i] = effects;
554        }
555
556        Ok(results)
557    }
558
559    fn get_executed_effects(
560        &self,
561        digest: &TransactionDigest,
562    ) -> IotaResult<Option<TransactionEffects>> {
563        self.multi_get_executed_effects(&[*digest])
564            .map(|mut effects| {
565                effects
566                    .pop()
567                    .expect("multi-get must return correct number of items")
568            })
569    }
570
571    fn multi_get_effects(
572        &self,
573        digests: &[TransactionEffectsDigest],
574    ) -> IotaResult<Vec<Option<TransactionEffects>>>;
575
576    fn get_effects(
577        &self,
578        digest: &TransactionEffectsDigest,
579    ) -> IotaResult<Option<TransactionEffects>> {
580        self.multi_get_effects(&[*digest]).map(|mut effects| {
581            effects
582                .pop()
583                .expect("multi-get must return correct number of items")
584        })
585    }
586
587    fn multi_get_events(
588        &self,
589        event_digests: &[TransactionEventsDigest],
590    ) -> IotaResult<Vec<Option<TransactionEvents>>>;
591
592    fn get_events(
593        &self,
594        digest: &TransactionEventsDigest,
595    ) -> IotaResult<Option<TransactionEvents>> {
596        self.multi_get_events(&[*digest]).map(|mut events| {
597            events
598                .pop()
599                .expect("multi-get must return correct number of items")
600        })
601    }
602
603    fn notify_read_executed_effects_digests<'a>(
604        &'a self,
605        digests: &'a [TransactionDigest],
606    ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>>;
607
608    /// Wait until the effects of the given transactions are available and
609    /// return them. WARNING: If calling this on a transaction that could be
610    /// reverted, you must be sure that this function cannot be called
611    /// during reconfiguration. The best way to do this is to wrap your
612    /// future in EpochStore::within_alive_epoch. Holding an
613    /// ExecutionLockReadGuard would also prevent reconfig from happening while
614    /// waiting, but this is very dangerous, as it could prevent
615    /// reconfiguration from ever occurring!
616    fn notify_read_executed_effects<'a>(
617        &'a self,
618        digests: &'a [TransactionDigest],
619    ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffects>>> {
620        async move {
621            let digests = self.notify_read_executed_effects_digests(digests).await?;
622            // once digests are available, effects must be present as well
623            self.multi_get_effects(&digests).map(|effects| {
624                effects
625                    .into_iter()
626                    .map(|e| e.unwrap_or_else(|| fatal!("digests must exist")))
627                    .collect()
628            })
629        }
630        .boxed()
631    }
632}
633
634pub trait ExecutionCacheWrite: Send + Sync {
635    /// Write the output of a transaction.
636    ///
637    /// Because of the child object consistency rule (readers that observe
638    /// parents must observe all children of that parent, up to the parent's
639    /// version bound), implementations of this method must not write any
640    /// top-level (address-owned or shared) objects before they have written all
641    /// of the object-owned objects (i.e. child objects) in the `objects` list.
642    ///
643    /// In the future, we may modify this method to expose finer-grained
644    /// information about parent/child relationships. (This may be
645    /// especially necessary for distributed object storage, but is unlikely
646    /// to be an issue before we tackle that problem).
647    ///
648    /// This function may evict the mutable input objects (and successfully
649    /// received objects) of transaction from the cache, since they cannot
650    /// be read by any other transaction.
651    ///
652    /// Any write performed by this method immediately notifies any waiter that
653    /// has previously called notify_read_objects_for_execution or
654    /// notify_read_objects_for_signing for the object in question.
655    fn write_transaction_outputs(
656        &self,
657        epoch_id: EpochId,
658        tx_outputs: Arc<TransactionOutputs>,
659    ) -> BoxFuture<'_, IotaResult>;
660
661    /// Attempt to acquire object locks for all of the owned input locks.
662    fn acquire_transaction_locks<'a>(
663        &'a self,
664        epoch_store: &'a AuthorityPerEpochStore,
665        owned_input_objects: &'a [ObjectRef],
666        transaction: VerifiedSignedTransaction,
667    ) -> BoxFuture<'a, IotaResult>;
668}
669
670pub trait CheckpointCache: Send + Sync {
671    // TODO: In addition to the methods below, this will eventually
672    // include access to the CheckpointStore.
673
674    // Note, the methods below were deemed deprecated before.
675    // Currently, they are only used to implement `get_transaction_block`
676    // for JSON RPC `ReadApi`.
677
678    fn get_transaction_perpetual_checkpoint(
679        &self,
680        digest: &TransactionDigest,
681    ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>>;
682
683    fn multi_get_transactions_perpetual_checkpoints(
684        &self,
685        digests: &[TransactionDigest],
686    ) -> IotaResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>>;
687
688    fn insert_finalized_transactions_perpetual_checkpoints(
689        &self,
690        digests: &[TransactionDigest],
691        epoch: EpochId,
692        sequence: CheckpointSequenceNumber,
693    ) -> IotaResult;
694}
695
696pub trait ExecutionCacheReconfigAPI: Send + Sync {
697    fn insert_genesis_object(&self, object: Object) -> IotaResult;
698    fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> IotaResult;
699
700    fn revert_state_update(&self, digest: &TransactionDigest) -> IotaResult;
701    fn set_epoch_start_configuration(
702        &self,
703        epoch_start_config: &EpochStartConfiguration,
704    ) -> IotaResult;
705
706    fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]);
707
708    fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>);
709
710    fn expensive_check_iota_conservation(
711        &self,
712        old_epoch_store: &AuthorityPerEpochStore,
713        epoch_supply_change: Option<i64>,
714    ) -> IotaResult;
715
716    fn checkpoint_db(&self, path: &Path) -> IotaResult;
717
718    /// Reconfigure the cache itself.
719    /// TODO: this is only needed for ProxyCache to switch between cache impls.
720    /// It can be removed once WritebackCache is the sole cache impl.
721    fn reconfigure_cache<'a>(
722        &'a self,
723        epoch_start_config: &'a EpochStartConfiguration,
724    ) -> BoxFuture<'a, ()>;
725}
726
727// StateSyncAPI is for writing any data that was not the result of transaction
728// execution, but that arrived via state sync. The fact that it came via state
729// sync implies that it is certified output, and can be immediately persisted to
730// the store.
731pub trait StateSyncAPI: Send + Sync {
732    fn insert_transaction_and_effects(
733        &self,
734        transaction: &VerifiedTransaction,
735        transaction_effects: &TransactionEffects,
736    ) -> IotaResult;
737
738    fn multi_insert_transaction_and_effects(
739        &self,
740        transactions_and_effects: &[VerifiedExecutionData],
741    ) -> IotaResult;
742}
743
744pub trait TestingAPI: Send + Sync {
745    fn database_for_testing(&self) -> Arc<AuthorityStore>;
746}
747
748macro_rules! implement_storage_traits {
749    ($implementor: ident) => {
750        impl ObjectStore for $implementor {
751            fn get_object(&self, object_id: &ObjectID) -> StorageResult<Option<Object>> {
752                ObjectCacheRead::get_object(self, object_id).map_err(StorageError::custom)
753            }
754
755            fn get_object_by_key(
756                &self,
757                object_id: &ObjectID,
758                version: iota_types::base_types::VersionNumber,
759            ) -> StorageResult<Option<Object>> {
760                ObjectCacheRead::get_object_by_key(self, object_id, version)
761                    .map_err(StorageError::custom)
762            }
763        }
764
765        impl ChildObjectResolver for $implementor {
766            fn read_child_object(
767                &self,
768                parent: &ObjectID,
769                child: &ObjectID,
770                child_version_upper_bound: SequenceNumber,
771            ) -> IotaResult<Option<Object>> {
772                let Some(child_object) =
773                    self.find_object_lt_or_eq_version(*child, child_version_upper_bound)?
774                else {
775                    return Ok(None);
776                };
777
778                let parent = *parent;
779                if child_object.owner != Owner::ObjectOwner(parent.into()) {
780                    return Err(IotaError::InvalidChildObjectAccess {
781                        object: *child,
782                        given_parent: parent,
783                        actual_owner: child_object.owner,
784                    });
785                }
786                Ok(Some(child_object))
787            }
788
789            fn get_object_received_at_version(
790                &self,
791                owner: &ObjectID,
792                receiving_object_id: &ObjectID,
793                receive_object_at_version: SequenceNumber,
794                epoch_id: EpochId,
795            ) -> IotaResult<Option<Object>> {
796                let Some(recv_object) = ObjectCacheRead::get_object_by_key(
797                    self,
798                    receiving_object_id,
799                    receive_object_at_version,
800                )?
801                else {
802                    return Ok(None);
803                };
804
805                // Check for:
806                // * Invalid access -- treat as the object does not exist. Or;
807                // * If we've already received the object at the version -- then treat it as
808                //   though it doesn't exist.
809                // These two cases must remain indisguishable to the caller otherwise we risk
810                // forks in transaction replay due to possible reordering of
811                // transactions during replay.
812                if recv_object.owner != Owner::AddressOwner((*owner).into())
813                    || self.have_received_object_at_version(
814                        receiving_object_id,
815                        receive_object_at_version,
816                        epoch_id,
817                    )?
818                {
819                    return Ok(None);
820                }
821
822                Ok(Some(recv_object))
823            }
824        }
825
826        impl BackingPackageStore for $implementor {
827            fn get_package_object(
828                &self,
829                package_id: &ObjectID,
830            ) -> IotaResult<Option<PackageObject>> {
831                ObjectCacheRead::get_package_object(self, package_id)
832            }
833        }
834    };
835}
836
837// Implement traits for a cache implementation that always go directly to the
838// store.
839macro_rules! implement_passthrough_traits {
840    ($implementor: ident) => {
841        impl CheckpointCache for $implementor {
842            fn get_transaction_perpetual_checkpoint(
843                &self,
844                digest: &TransactionDigest,
845            ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
846                self.store.get_transaction_perpetual_checkpoint(digest)
847            }
848
849            fn multi_get_transactions_perpetual_checkpoints(
850                &self,
851                digests: &[TransactionDigest],
852            ) -> IotaResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
853                self.store
854                    .multi_get_transactions_perpetual_checkpoints(digests)
855            }
856
857            fn insert_finalized_transactions_perpetual_checkpoints(
858                &self,
859                digests: &[TransactionDigest],
860                epoch: EpochId,
861                sequence: CheckpointSequenceNumber,
862            ) -> IotaResult {
863                self.store
864                    .insert_finalized_transactions_perpetual_checkpoints(digests, epoch, sequence)
865            }
866        }
867
868        impl ExecutionCacheReconfigAPI for $implementor {
869            fn insert_genesis_object(&self, object: Object) -> IotaResult {
870                self.store.insert_genesis_object(object)
871            }
872
873            fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> IotaResult {
874                self.store.bulk_insert_genesis_objects(objects)
875            }
876
877            fn revert_state_update(&self, digest: &TransactionDigest) -> IotaResult {
878                self.revert_state_update_impl(digest)
879            }
880
881            fn set_epoch_start_configuration(
882                &self,
883                epoch_start_config: &EpochStartConfiguration,
884            ) -> IotaResult {
885                self.store.set_epoch_start_configuration(epoch_start_config)
886            }
887
888            fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
889                self.store.update_epoch_flags_metrics(old, new)
890            }
891
892            fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
893                self.clear_state_end_of_epoch_impl(execution_guard)
894            }
895
896            fn expensive_check_iota_conservation(
897                &self,
898                old_epoch_store: &AuthorityPerEpochStore,
899                epoch_supply_change: Option<i64>,
900            ) -> IotaResult {
901                self.store.expensive_check_iota_conservation(
902                    self,
903                    old_epoch_store,
904                    epoch_supply_change,
905                )
906            }
907
908            fn checkpoint_db(&self, path: &std::path::Path) -> IotaResult {
909                self.store.perpetual_tables.checkpoint_db(path)
910            }
911
912            fn reconfigure_cache<'a>(
913                &'a self,
914                _: &'a EpochStartConfiguration,
915            ) -> BoxFuture<'a, ()> {
916                // If we call this method instead of ProxyCache::reconfigure_cache, it's a bug.
917                // Such a bug would almost certainly cause other test failures before reaching
918                // this point, but if it somehow slipped through it is better to crash
919                // than risk forking because ProxyCache::reconfigure_cache was not
920                // called.
921                panic!(
922                    "reconfigure_cache should not be called on a {}",
923                    stringify!($implementor)
924                );
925            }
926        }
927
928        impl StateSyncAPI for $implementor {
929            fn insert_transaction_and_effects(
930                &self,
931                transaction: &VerifiedTransaction,
932                transaction_effects: &TransactionEffects,
933            ) -> IotaResult {
934                Ok(self
935                    .store
936                    .insert_transaction_and_effects(transaction, transaction_effects)?)
937            }
938
939            fn multi_insert_transaction_and_effects(
940                &self,
941                transactions_and_effects: &[VerifiedExecutionData],
942            ) -> IotaResult {
943                Ok(self
944                    .store
945                    .multi_insert_transaction_and_effects(transactions_and_effects.iter())?)
946            }
947        }
948
949        impl TestingAPI for $implementor {
950            fn database_for_testing(&self) -> Arc<AuthorityStore> {
951                self.store.clone()
952            }
953        }
954    };
955}
956
957use implement_passthrough_traits;
958
959implement_storage_traits!(PassthroughCache);
960implement_storage_traits!(WritebackCache);
961implement_storage_traits!(ProxyCache);
962
963pub trait ExecutionCacheAPI:
964    ObjectCacheRead
965    + ExecutionCacheWrite
966    + ExecutionCacheCommit
967    + ExecutionCacheReconfigAPI
968    + CheckpointCache
969    + StateSyncAPI
970{
971}