iota_core/execution_cache/
writeback_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! MemoryCache is a cache for the transaction execution which delays writes to
6//! the database until transaction results are certified (i.e. they appear in a
7//! certified checkpoint, or an effects cert is observed by a fullnode). The
8//! cache also stores committed data in memory in order to serve future reads
9//! without hitting the database.
10//!
11//! For storing uncommitted transaction outputs, we cannot evict the data at all
12//! until it is written to disk. Committed data not only can be evicted, but it
13//! is also unbounded (imagine a stream of transactions that keep splitting a
14//! coin into smaller coins).
15//!
16//! We also want to be able to support negative cache hits (i.e. the case where
17//! we can determine an object does not exist without hitting the database).
18//!
19//! To achieve both of these goals, we split the cache data into two pieces, a
20//! dirty set and a cached set. The dirty set has no automatic evictions, data
21//! is only removed after being committed. The cached set is in a bounded-sized
22//! cache with automatic evictions. In order to support negative cache hits, we
23//! treat the two halves of the cache as FIFO queue. Newly written (dirty)
24//! versions are inserted to one end of the dirty queue. As versions are
25//! committed to disk, they are removed from the other end of the dirty queue
26//! and inserted into the cache queue. The cache queue is truncated if it
27//! exceeds its maximum size, by removing all but the N newest versions.
28//!
29//! This gives us the property that the sequence of versions in the dirty and
30//! cached queues are the most recent versions of the object, i.e. there can be
31//! no "gaps". This allows for the following:
32//!
33//!   - Negative cache hits: If the queried version is not in memory, but is
34//!     higher than the smallest version in the cached queue, it does not exist
35//!     in the db either.
36//!   - Bounded reads: When reading the most recent version that is <= some
37//!     version bound, we can correctly satisfy this query from the cache, or
38//!     determine that we must go to the db.
39//!
40//! Note that at any time, either or both the dirty or the cached queue may be
41//! non-existent. There may be no dirty versions of the objects, in which case
42//! there will be no dirty queue. And, the cached queue may be evicted from the
43//! cache, in which case there will be no cached queue. Because only the cached
44//! queue can be evicted (the dirty queue can only become empty by moving
45//! versions from it to the cached queue), the "highest versions" property still
46//! holds in all cases.
47//!
48//! The above design is used for both objects and markers.
49
50use std::{
51    collections::{BTreeMap, BTreeSet},
52    hash::Hash,
53    sync::Arc,
54};
55
56use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
57use futures::{FutureExt, future::BoxFuture};
58use iota_common::sync::notify_read::NotifyRead;
59use iota_macros::fail_point_async;
60use iota_types::{
61    accumulator::Accumulator,
62    base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
63    digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
64    effects::{TransactionEffects, TransactionEvents},
65    error::{IotaError, IotaResult, UserInputError},
66    iota_system_state::{IotaSystemState, get_iota_system_state},
67    message_envelope::Message,
68    messages_checkpoint::CheckpointSequenceNumber,
69    object::Object,
70    storage::{MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject},
71    transaction::{VerifiedSignedTransaction, VerifiedTransaction},
72};
73use moka::sync::Cache as MokaCache;
74use parking_lot::Mutex;
75use prometheus::Registry;
76use tap::TapOptional;
77use tracing::{debug, info, instrument, trace, warn};
78
79use super::{
80    CheckpointCache, ExecutionCacheAPI, ExecutionCacheCommit, ExecutionCacheMetrics,
81    ExecutionCacheReconfigAPI, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI,
82    TransactionCacheRead,
83    cache_types::{CachedVersionMap, IsNewer, MonotonicCache},
84    implement_passthrough_traits,
85    object_locks::ObjectLocks,
86};
87use crate::{
88    authority::{
89        AuthorityStore,
90        authority_per_epoch_store::AuthorityPerEpochStore,
91        authority_store::{ExecutionLockWriteGuard, IotaLockResult, ObjectLockStatus},
92        authority_store_tables::LiveObject,
93        epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
94    },
95    state_accumulator::AccumulatorStore,
96    transaction_outputs::TransactionOutputs,
97};
98
99#[cfg(test)]
100#[path = "unit_tests/writeback_cache_tests.rs"]
101pub mod writeback_cache_tests;
102
103#[derive(Clone, PartialEq, Eq)]
104enum ObjectEntry {
105    Object(Object),
106    Deleted,
107    Wrapped,
108}
109
110impl ObjectEntry {
111    #[cfg(test)]
112    fn unwrap_object(&self) -> &Object {
113        match self {
114            ObjectEntry::Object(o) => o,
115            _ => panic!("unwrap_object called on non-Object"),
116        }
117    }
118
119    fn is_tombstone(&self) -> bool {
120        match self {
121            ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
122            ObjectEntry::Object(_) => false,
123        }
124    }
125}
126
127impl std::fmt::Debug for ObjectEntry {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        match self {
130            ObjectEntry::Object(o) => {
131                write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
132            }
133            ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
134            ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
135        }
136    }
137}
138
139impl From<Object> for ObjectEntry {
140    fn from(object: Object) -> Self {
141        ObjectEntry::Object(object)
142    }
143}
144
145impl From<ObjectOrTombstone> for ObjectEntry {
146    fn from(object: ObjectOrTombstone) -> Self {
147        match object {
148            ObjectOrTombstone::Object(o) => o.into(),
149            ObjectOrTombstone::Tombstone(obj_ref) => {
150                if obj_ref.2.is_deleted() {
151                    ObjectEntry::Deleted
152                } else if obj_ref.2.is_wrapped() {
153                    ObjectEntry::Wrapped
154                } else {
155                    panic!("tombstone digest must either be deleted or wrapped");
156                }
157            }
158        }
159    }
160}
161
162#[derive(Debug, Clone, PartialEq, Eq)]
163enum LatestObjectCacheEntry {
164    Object(SequenceNumber, ObjectEntry),
165    NonExistent,
166}
167
168impl LatestObjectCacheEntry {
169    #[cfg(test)]
170    fn version(&self) -> Option<SequenceNumber> {
171        match self {
172            LatestObjectCacheEntry::Object(version, _) => Some(*version),
173            LatestObjectCacheEntry::NonExistent => None,
174        }
175    }
176}
177
178impl IsNewer for LatestObjectCacheEntry {
179    fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
180        match (self, other) {
181            (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
182                v1 > v2
183            }
184            (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
185            _ => false,
186        }
187    }
188}
189
190type MarkerKey = (EpochId, ObjectID);
191
192enum CacheResult<T> {
193    /// Entry is in the cache
194    Hit(T),
195    /// Entry is not in the cache and is known to not exist
196    NegativeHit,
197    /// Entry is not in the cache and may or may not exist in the store
198    Miss,
199}
200
201/// UncommittedData stores execution outputs that are not yet written to the db.
202/// Entries in this struct can only be purged after they are committed.
203struct UncommittedData {
204    /// The object dirty set. All writes go into this table first. After we
205    /// flush the data to the db, the data is removed from this table and
206    /// inserted into the object_cache.
207    ///
208    /// This table may contain both live and dead objects, since we flush both
209    /// live and dead objects to the db in order to support past object
210    /// queries on fullnodes.
211    ///
212    /// Further, we only remove objects in FIFO order, which ensures that the
213    /// cached sequence of objects has no gaps. In other words, if we have
214    /// versions 4, 8, 13 of an object, we can deduce that version 9 does
215    /// not exist. This also makes child object reads efficient.
216    /// `object_cache` cannot contain a more recent version of an object than
217    /// `objects`, and neither can have any gaps. Therefore if there is any
218    /// object <= the version bound for a child read in objects, it is the
219    /// correct object to return.
220    objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
221
222    // Markers for received objects and deleted shared objects. This contains all of the dirty
223    // marker state, which is committed to the db at the same time as other transaction data.
224    // After markers are committed to the db we remove them from this table and insert them into
225    // marker_cache.
226    markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
227
228    transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
229
230    // Because TransactionEvents are not unique to the transaction that created them, we must
231    // reference count them in order to know when we can remove them from the cache. For now
232    // we track all referrers explicitly, but we can use a ref count when we are confident in
233    // the correctness of the code.
234    transaction_events:
235        DashMap<TransactionEventsDigest, (BTreeSet<TransactionDigest>, TransactionEvents)>,
236
237    executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
238
239    // Transaction outputs that have not yet been written to the DB. Items are removed from this
240    // table as they are flushed to the db.
241    pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
242}
243
244impl UncommittedData {
245    fn new() -> Self {
246        Self {
247            objects: DashMap::new(),
248            markers: DashMap::new(),
249            transaction_effects: DashMap::new(),
250            executed_effects_digests: DashMap::new(),
251            pending_transaction_writes: DashMap::new(),
252            transaction_events: DashMap::new(),
253        }
254    }
255
256    fn clear(&self) {
257        self.objects.clear();
258        self.markers.clear();
259        self.transaction_effects.clear();
260        self.executed_effects_digests.clear();
261        self.pending_transaction_writes.clear();
262        self.transaction_events.clear();
263    }
264
265    fn is_empty(&self) -> bool {
266        let empty = self.pending_transaction_writes.is_empty();
267        if empty && cfg!(debug_assertions) {
268            assert!(
269                self.objects.is_empty()
270                    && self.markers.is_empty()
271                    && self.transaction_effects.is_empty()
272                    && self.executed_effects_digests.is_empty()
273                    && self.transaction_events.is_empty()
274            );
275        }
276        empty
277    }
278}
279
280// TODO: set this via the config
281static MAX_CACHE_SIZE: u64 = 10000;
282
283/// CachedData stores data that has been committed to the db, but is likely to
284/// be read soon.
285struct CachedCommittedData {
286    // See module level comment for an explanation of caching strategy.
287    object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
288
289    // We separately cache the latest version of each object. Although this seems
290    // redundant, it is the only way to support populating the cache after a read.
291    // We cannot simply insert objects that we read off the disk into `object_cache`,
292    // since that may violate the no-missing-versions property.
293    // `object_by_id_cache` is also written to on writes so that it is always coherent.
294    object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
295
296    // See module level comment for an explanation of caching strategy.
297    marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
298
299    transactions: MokaCache<TransactionDigest, Arc<VerifiedTransaction>>,
300
301    transaction_effects: MokaCache<TransactionEffectsDigest, Arc<TransactionEffects>>,
302
303    transaction_events: MokaCache<TransactionEventsDigest, Arc<TransactionEvents>>,
304
305    executed_effects_digests: MokaCache<TransactionDigest, TransactionEffectsDigest>,
306
307    // Objects that were read at transaction signing time - allows us to access them again at
308    // execution time with a single lock / hash lookup
309    _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
310}
311
312impl CachedCommittedData {
313    fn new() -> Self {
314        let object_cache = MokaCache::builder()
315            .max_capacity(MAX_CACHE_SIZE)
316            .max_capacity(MAX_CACHE_SIZE)
317            .build();
318        let marker_cache = MokaCache::builder()
319            .max_capacity(MAX_CACHE_SIZE)
320            .max_capacity(MAX_CACHE_SIZE)
321            .build();
322        let transactions = MokaCache::builder()
323            .max_capacity(MAX_CACHE_SIZE)
324            .max_capacity(MAX_CACHE_SIZE)
325            .build();
326        let transaction_effects = MokaCache::builder()
327            .max_capacity(MAX_CACHE_SIZE)
328            .max_capacity(MAX_CACHE_SIZE)
329            .build();
330        let transaction_events = MokaCache::builder()
331            .max_capacity(MAX_CACHE_SIZE)
332            .max_capacity(MAX_CACHE_SIZE)
333            .build();
334        let executed_effects_digests = MokaCache::builder()
335            .max_capacity(MAX_CACHE_SIZE)
336            .max_capacity(MAX_CACHE_SIZE)
337            .build();
338        let transaction_objects = MokaCache::builder()
339            .max_capacity(MAX_CACHE_SIZE)
340            .max_capacity(MAX_CACHE_SIZE)
341            .build();
342
343        Self {
344            object_cache,
345            object_by_id_cache: MonotonicCache::new(MAX_CACHE_SIZE),
346            marker_cache,
347            transactions,
348            transaction_effects,
349            transaction_events,
350            executed_effects_digests,
351            _transaction_objects: transaction_objects,
352        }
353    }
354
355    fn clear_and_assert_empty(&self) {
356        self.object_cache.invalidate_all();
357        self.object_by_id_cache.invalidate_all();
358        self.marker_cache.invalidate_all();
359        self.transactions.invalidate_all();
360        self.transaction_effects.invalidate_all();
361        self.transaction_events.invalidate_all();
362        self.executed_effects_digests.invalidate_all();
363        self._transaction_objects.invalidate_all();
364
365        assert_empty(&self.object_cache);
366        assert!(&self.object_by_id_cache.is_empty());
367        assert_empty(&self.marker_cache);
368        assert_empty(&self.transactions);
369        assert_empty(&self.transaction_effects);
370        assert_empty(&self.transaction_events);
371        assert_empty(&self.executed_effects_digests);
372        assert_empty(&self._transaction_objects);
373    }
374}
375
376fn assert_empty<K, V>(cache: &MokaCache<K, V>)
377where
378    K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
379    V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
380{
381    if cache.iter().next().is_some() {
382        panic!("cache should be empty");
383    }
384}
385
386pub struct WritebackCache {
387    dirty: UncommittedData,
388    cached: CachedCommittedData,
389
390    // The packages cache is treated separately from objects, because they are immutable and can be
391    // used by any number of transactions. Additionally, many operations require loading large
392    // numbers of packages (due to dependencies), so we want to try to keep all packages in memory.
393    //
394    // Also, this cache can contain packages that are dirty or committed, so it does not live in
395    // UncachedData or CachedCommittedData. The cache is populated in two ways:
396    // - when packages are written (in which case they will also be present in the dirty set)
397    // - after a cache miss. Because package IDs are unique (only one version exists for each ID)
398    //   we do not need to worry about the contiguous version property.
399    // - note that we removed any unfinalized packages from the cache during revert_state_update().
400    packages: MokaCache<ObjectID, PackageObject>,
401
402    object_locks: ObjectLocks,
403
404    executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
405    store: Arc<AuthorityStore>,
406    metrics: Arc<ExecutionCacheMetrics>,
407}
408
409macro_rules! check_cache_entry_by_version {
410    ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
411        $self.metrics.record_cache_request($table, $level);
412        if let Some(cache) = $cache {
413            if let Some(entry) = cache.get(&$version) {
414                $self.metrics.record_cache_hit($table, $level);
415                return CacheResult::Hit(entry.clone());
416            }
417
418            if let Some(least_version) = cache.get_least() {
419                if least_version.0 < $version {
420                    // If the version is greater than the least version in the cache, then we know
421                    // that the object does not exist anywhere
422                    $self.metrics.record_cache_negative_hit($table, $level);
423                    return CacheResult::NegativeHit;
424                }
425            }
426        }
427        $self.metrics.record_cache_miss($table, $level);
428    };
429}
430
431macro_rules! check_cache_entry_by_latest {
432    ($self: ident, $table: expr, $level: expr, $cache: expr) => {
433        $self.metrics.record_cache_request($table, $level);
434        if let Some(cache) = $cache {
435            if let Some((version, entry)) = cache.get_highest() {
436                $self.metrics.record_cache_hit($table, $level);
437                return CacheResult::Hit((*version, entry.clone()));
438            } else {
439                panic!("empty CachedVersionMap should have been removed");
440            }
441        }
442        $self.metrics.record_cache_miss($table, $level);
443    };
444}
445
446impl WritebackCache {
447    pub fn new(store: Arc<AuthorityStore>, metrics: Arc<ExecutionCacheMetrics>) -> Self {
448        let packages = MokaCache::builder()
449            .max_capacity(MAX_CACHE_SIZE)
450            .max_capacity(MAX_CACHE_SIZE)
451            .build();
452        Self {
453            dirty: UncommittedData::new(),
454            cached: CachedCommittedData::new(),
455            packages,
456            object_locks: ObjectLocks::new(),
457            executed_effects_digests_notify_read: NotifyRead::new(),
458            store,
459            metrics,
460        }
461    }
462
463    pub fn new_for_tests(store: Arc<AuthorityStore>, registry: &Registry) -> Self {
464        Self::new(store, ExecutionCacheMetrics::new(registry).into())
465    }
466
467    #[cfg(test)]
468    pub fn reset_for_test(&mut self) {
469        let mut new = Self::new(self.store.clone(), self.metrics.clone());
470        std::mem::swap(self, &mut new);
471    }
472
473    async fn write_object_entry(
474        &self,
475        object_id: &ObjectID,
476        version: SequenceNumber,
477        object: ObjectEntry,
478    ) {
479        trace!(?object_id, ?version, ?object, "inserting object entry");
480        fail_point_async!("write_object_entry");
481        self.metrics.record_cache_write("object");
482
483        // We must hold the lock for the object entry while inserting to the
484        // object_by_id_cache. Otherwise, a surprising bug can occur:
485        //
486        // 1. A thread executing TX1 can write object (O,1) to the dirty set and then
487        //    pause.
488        // 2. TX2, which reads (O,1) can begin executing, because TransactionManager
489        //    immediately schedules transactions if their inputs are available. It does
490        //    not matter that TX1 hasn't finished executing yet.
491        // 3. TX2 can write (O,2) to both the dirty set and the object_by_id_cache.
492        // 4. The thread executing TX1 can resume and write (O,1) to the
493        //    object_by_id_cache.
494        //
495        // Now, any subsequent attempt to get the latest version of O will return (O,1)
496        // instead of (O,2).
497        //
498        // This seems very unlikely, but it may be possible under the following
499        // circumstances:
500        // - While a thread is unlikely to pause for so long, moka cache uses optimistic
501        //   lock-free algorithms that have retry loops. Possibly, under high
502        //   contention, this code might spin for a surprisingly long time.
503        // - Additionally, many concurrent re-executions of the same tx could happen due
504        //   to the tx finalizer, plus checkpoint executor, consensus, and RPCs from
505        //   fullnodes.
506        let mut entry = self.dirty.objects.entry(*object_id).or_default();
507
508        self.cached.object_by_id_cache.insert(
509            object_id,
510            LatestObjectCacheEntry::Object(version, object.clone()),
511        );
512
513        entry.insert(version, object);
514    }
515
516    async fn write_marker_value(
517        &self,
518        epoch_id: EpochId,
519        object_key: &ObjectKey,
520        marker_value: MarkerValue,
521    ) {
522        tracing::trace!(
523            "inserting marker value {:?}: {:?}",
524            object_key,
525            marker_value
526        );
527        fail_point_async!("write_marker_entry");
528        self.metrics.record_cache_write("marker");
529        self.dirty
530            .markers
531            .entry((epoch_id, object_key.0))
532            .or_default()
533            .value_mut()
534            .insert(object_key.1, marker_value);
535    }
536
537    // lock both the dirty and committed sides of the cache, and then pass the
538    // entries to the callback. Written with the `with` pattern because any
539    // other way of doing this creates lifetime hell.
540    fn with_locked_cache_entries<K, V, R>(
541        dirty_map: &DashMap<K, CachedVersionMap<V>>,
542        cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
543        key: &K,
544        cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
545    ) -> R
546    where
547        K: Copy + Eq + Hash + Send + Sync + 'static,
548        V: Send + Sync + 'static,
549    {
550        let dirty_entry = dirty_map.entry(*key);
551        let dirty_entry = match &dirty_entry {
552            DashMapEntry::Occupied(occupied) => Some(occupied.get()),
553            DashMapEntry::Vacant(_) => None,
554        };
555
556        let cached_entry = cached_map.get(key);
557        let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
558        let cached_entry = cached_lock.as_deref();
559
560        cb(dirty_entry, cached_entry)
561    }
562
563    // Attempt to get an object from the cache. The DB is not consulted.
564    // Can return Hit, Miss, or NegativeHit (if the object is known to not exist).
565    fn get_object_entry_by_key_cache_only(
566        &self,
567        object_id: &ObjectID,
568        version: SequenceNumber,
569    ) -> CacheResult<ObjectEntry> {
570        Self::with_locked_cache_entries(
571            &self.dirty.objects,
572            &self.cached.object_cache,
573            object_id,
574            |dirty_entry, cached_entry| {
575                check_cache_entry_by_version!(
576                    self,
577                    "object_by_version",
578                    "uncommitted",
579                    dirty_entry,
580                    version
581                );
582                check_cache_entry_by_version!(
583                    self,
584                    "object_by_version",
585                    "committed",
586                    cached_entry,
587                    version
588                );
589                CacheResult::Miss
590            },
591        )
592    }
593
594    fn get_object_by_key_cache_only(
595        &self,
596        object_id: &ObjectID,
597        version: SequenceNumber,
598    ) -> CacheResult<Object> {
599        match self.get_object_entry_by_key_cache_only(object_id, version) {
600            CacheResult::Hit(entry) => match entry {
601                ObjectEntry::Object(object) => CacheResult::Hit(object),
602                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
603            },
604            CacheResult::Miss => CacheResult::Miss,
605            CacheResult::NegativeHit => CacheResult::NegativeHit,
606        }
607    }
608
609    fn get_object_entry_by_id_cache_only(
610        &self,
611        request_type: &'static str,
612        object_id: &ObjectID,
613    ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
614        self.metrics
615            .record_cache_request(request_type, "object_by_id");
616        let entry = self.cached.object_by_id_cache.get(object_id);
617
618        if cfg!(debug_assertions) {
619            if let Some(entry) = &entry {
620                // check that cache is coherent
621                let highest: Option<ObjectEntry> = self
622                    .dirty
623                    .objects
624                    .get(object_id)
625                    .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
626                    .or_else(|| {
627                        let obj: Option<ObjectEntry> = self
628                            .store
629                            .get_latest_object_or_tombstone(*object_id)
630                            .unwrap()
631                            .map(|(_, o)| o.into());
632                        obj
633                    });
634
635                let cache_entry = match &*entry.lock() {
636                    LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
637                    LatestObjectCacheEntry::NonExistent => None,
638                };
639
640                // If the cache entry is a tombstone, the db entry may be missing if it was
641                // pruned.
642                let tombstone_possibly_pruned = highest.is_none()
643                    && cache_entry
644                        .as_ref()
645                        .map(|e| e.is_tombstone())
646                        .unwrap_or(false);
647
648                if highest != cache_entry && !tombstone_possibly_pruned {
649                    tracing::error!(
650                        ?highest,
651                        ?cache_entry,
652                        ?tombstone_possibly_pruned,
653                        "object_by_id cache is incoherent for {:?}",
654                        object_id
655                    );
656                    panic!("object_by_id cache is incoherent for {object_id:?}");
657                }
658            }
659        }
660
661        if let Some(entry) = entry {
662            let entry = entry.lock();
663            match &*entry {
664                LatestObjectCacheEntry::Object(latest_version, latest_object) => {
665                    self.metrics.record_cache_hit(request_type, "object_by_id");
666                    return CacheResult::Hit((*latest_version, latest_object.clone()));
667                }
668                LatestObjectCacheEntry::NonExistent => {
669                    self.metrics
670                        .record_cache_negative_hit(request_type, "object_by_id");
671                    return CacheResult::NegativeHit;
672                }
673            }
674        } else {
675            self.metrics.record_cache_miss(request_type, "object_by_id");
676        }
677
678        Self::with_locked_cache_entries(
679            &self.dirty.objects,
680            &self.cached.object_cache,
681            object_id,
682            |dirty_entry, cached_entry| {
683                check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
684                check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
685                CacheResult::Miss
686            },
687        )
688    }
689
690    fn get_object_by_id_cache_only(
691        &self,
692        request_type: &'static str,
693        object_id: &ObjectID,
694    ) -> CacheResult<(SequenceNumber, Object)> {
695        match self.get_object_entry_by_id_cache_only(request_type, object_id) {
696            CacheResult::Hit((version, entry)) => match entry {
697                ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
698                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
699            },
700            CacheResult::NegativeHit => CacheResult::NegativeHit,
701            CacheResult::Miss => CacheResult::Miss,
702        }
703    }
704
705    fn get_marker_value_cache_only(
706        &self,
707        object_id: &ObjectID,
708        version: SequenceNumber,
709        epoch_id: EpochId,
710    ) -> CacheResult<MarkerValue> {
711        Self::with_locked_cache_entries(
712            &self.dirty.markers,
713            &self.cached.marker_cache,
714            &(epoch_id, *object_id),
715            |dirty_entry, cached_entry| {
716                check_cache_entry_by_version!(
717                    self,
718                    "marker_by_version",
719                    "uncommitted",
720                    dirty_entry,
721                    version
722                );
723                check_cache_entry_by_version!(
724                    self,
725                    "marker_by_version",
726                    "committed",
727                    cached_entry,
728                    version
729                );
730                CacheResult::Miss
731            },
732        )
733    }
734
735    fn get_latest_marker_value_cache_only(
736        &self,
737        object_id: &ObjectID,
738        epoch_id: EpochId,
739    ) -> CacheResult<(SequenceNumber, MarkerValue)> {
740        Self::with_locked_cache_entries(
741            &self.dirty.markers,
742            &self.cached.marker_cache,
743            &(epoch_id, *object_id),
744            |dirty_entry, cached_entry| {
745                check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
746                check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
747                CacheResult::Miss
748            },
749        )
750    }
751
752    fn get_object_impl(
753        &self,
754        request_type: &'static str,
755        id: &ObjectID,
756    ) -> IotaResult<Option<Object>> {
757        match self.get_object_by_id_cache_only(request_type, id) {
758            CacheResult::Hit((_, object)) => Ok(Some(object)),
759            CacheResult::NegativeHit => Ok(None),
760            CacheResult::Miss => {
761                let obj = self.store.get_object(id)?;
762                if let Some(obj) = &obj {
763                    self.cache_latest_object_by_id(
764                        id,
765                        LatestObjectCacheEntry::Object(obj.version(), obj.clone().into()),
766                    );
767                } else {
768                    self.cache_object_not_found(id);
769                }
770                Ok(obj)
771            }
772        }
773    }
774
775    fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
776        self.metrics.record_cache_request(request_type, "db");
777        &self.store
778    }
779
780    fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
781        self.metrics
782            .record_cache_multi_request(request_type, "db", count);
783        &self.store
784    }
785
786    #[instrument(level = "debug", skip_all)]
787    async fn write_transaction_outputs(
788        &self,
789        epoch_id: EpochId,
790        tx_outputs: Arc<TransactionOutputs>,
791    ) -> IotaResult {
792        trace!(digest = ?tx_outputs.transaction.digest(), "writing transaction outputs to cache");
793
794        let TransactionOutputs {
795            transaction,
796            effects,
797            markers,
798            written,
799            deleted,
800            wrapped,
801            events,
802            ..
803        } = &*tx_outputs;
804
805        // Deletions and wraps must be written first. The reason is that one of the
806        // deletes may be a child object, and if we write the parent object
807        // first, a reader may or may not see the previous version of the child
808        // object, instead of the deleted/wrapped tombstone, which would cause
809        // an execution fork
810        for ObjectKey(id, version) in deleted.iter() {
811            self.write_object_entry(id, *version, ObjectEntry::Deleted)
812                .await;
813        }
814
815        for ObjectKey(id, version) in wrapped.iter() {
816            self.write_object_entry(id, *version, ObjectEntry::Wrapped)
817                .await;
818        }
819
820        // Update all markers
821        for (object_key, marker_value) in markers.iter() {
822            self.write_marker_value(epoch_id, object_key, *marker_value)
823                .await;
824        }
825
826        // Write children before parents to ensure that readers do not observe a parent
827        // object before its most recent children are visible.
828        for (object_id, object) in written.iter() {
829            if object.is_child_object() {
830                self.write_object_entry(object_id, object.version(), object.clone().into())
831                    .await;
832            }
833        }
834        for (object_id, object) in written.iter() {
835            if !object.is_child_object() {
836                self.write_object_entry(object_id, object.version(), object.clone().into())
837                    .await;
838                if object.is_package() {
839                    debug!("caching package: {:?}", object.compute_object_reference());
840                    self.packages
841                        .insert(*object_id, PackageObject::new(object.clone()));
842                }
843            }
844        }
845
846        let tx_digest = *transaction.digest();
847        let effects_digest = effects.digest();
848
849        self.metrics.record_cache_write("transaction_block");
850        self.dirty
851            .pending_transaction_writes
852            .insert(tx_digest, tx_outputs.clone());
853
854        // insert transaction effects before executed_effects_digests so that there
855        // are never dangling entries in executed_effects_digests
856        self.metrics.record_cache_write("transaction_effects");
857        self.dirty
858            .transaction_effects
859            .insert(effects_digest, effects.clone());
860
861        // note: if events.data.is_empty(), then there are no events for this
862        // transaction. We store it anyway to avoid special cases in
863        // commint_transaction_outputs, and translate an empty events structure
864        // to None when reading.
865        self.metrics.record_cache_write("transaction_events");
866        match self.dirty.transaction_events.entry(events.digest()) {
867            DashMapEntry::Occupied(mut occupied) => {
868                occupied.get_mut().0.insert(tx_digest);
869            }
870            DashMapEntry::Vacant(entry) => {
871                let mut txns = BTreeSet::new();
872                txns.insert(tx_digest);
873                entry.insert((txns, events.clone()));
874            }
875        }
876
877        self.metrics.record_cache_write("executed_effects_digests");
878        self.dirty
879            .executed_effects_digests
880            .insert(tx_digest, effects_digest);
881
882        self.executed_effects_digests_notify_read
883            .notify(&tx_digest, &effects_digest);
884
885        self.metrics
886            .pending_notify_read
887            .set(self.executed_effects_digests_notify_read.num_pending() as i64);
888
889        Ok(())
890    }
891
892    // Commits dirty data for the given TransactionDigest to the db.
893    #[instrument(level = "debug", skip_all)]
894    async fn commit_transaction_outputs(
895        &self,
896        epoch: EpochId,
897        digests: &[TransactionDigest],
898    ) -> IotaResult {
899        fail_point_async!("writeback-cache-commit");
900        trace!(?digests);
901
902        let mut all_outputs = Vec::with_capacity(digests.len());
903        for tx in digests {
904            let Some(outputs) = self
905                .dirty
906                .pending_transaction_writes
907                .get(tx)
908                .map(|o| o.clone())
909            else {
910                // This can happen in the following rare case:
911                // All transactions in the checkpoint are committed to the db (by
912                // commit_transaction_outputs,
913                // called in CheckpointExecutor::process_executed_transactions), but the process
914                // crashes before the checkpoint water mark is bumped. We will
915                // then re-commit the checkpoint at startup, despite that all
916                // transactions are already executed.
917                warn!("Attempt to commit unknown transaction {:?}", tx);
918                continue;
919            };
920            all_outputs.push(outputs);
921        }
922
923        // Flush writes to disk before removing anything from dirty set. otherwise,
924        // a cache eviction could cause a value to disappear briefly, even if we insert
925        // to the cache before removing from the dirty set.
926        self.store
927            .write_transaction_outputs(epoch, &all_outputs)
928            .await?;
929
930        for outputs in all_outputs.iter() {
931            let tx_digest = outputs.transaction.digest();
932            assert!(
933                self.dirty
934                    .pending_transaction_writes
935                    .remove(tx_digest)
936                    .is_some()
937            );
938            self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
939        }
940
941        Ok(())
942    }
943
944    fn flush_transactions_from_dirty_to_cached(
945        &self,
946        epoch: EpochId,
947        tx_digest: TransactionDigest,
948        outputs: &TransactionOutputs,
949    ) {
950        // Now, remove each piece of committed data from the dirty state and insert it
951        // into the cache. TODO: outputs should have a strong count of 1 so we
952        // should be able to move out of it
953        let TransactionOutputs {
954            transaction,
955            effects,
956            markers,
957            written,
958            deleted,
959            wrapped,
960            events,
961            ..
962        } = outputs;
963
964        let effects_digest = effects.digest();
965        let events_digest = events.digest();
966
967        // Update cache before removing from self.dirty to avoid
968        // unnecessary cache misses
969        self.cached
970            .transactions
971            .insert(tx_digest, transaction.clone());
972        self.cached
973            .transaction_effects
974            .insert(effects_digest, effects.clone().into());
975        self.cached
976            .executed_effects_digests
977            .insert(tx_digest, effects_digest);
978        self.cached
979            .transaction_events
980            .insert(events_digest, events.clone().into());
981
982        self.dirty
983            .transaction_effects
984            .remove(&effects_digest)
985            .expect("effects must exist");
986
987        match self.dirty.transaction_events.entry(events.digest()) {
988            DashMapEntry::Occupied(mut occupied) => {
989                let txns = &mut occupied.get_mut().0;
990                assert!(txns.remove(&tx_digest), "transaction must exist");
991                if txns.is_empty() {
992                    occupied.remove();
993                }
994            }
995            DashMapEntry::Vacant(_) => {
996                panic!("events must exist");
997            }
998        }
999
1000        self.dirty
1001            .executed_effects_digests
1002            .remove(&tx_digest)
1003            .expect("executed effects must exist");
1004
1005        // Move dirty markers to cache
1006        for (object_key, marker_value) in markers.iter() {
1007            Self::move_version_from_dirty_to_cache(
1008                &self.dirty.markers,
1009                &self.cached.marker_cache,
1010                (epoch, object_key.0),
1011                object_key.1,
1012                marker_value,
1013            );
1014        }
1015
1016        for (object_id, object) in written.iter() {
1017            Self::move_version_from_dirty_to_cache(
1018                &self.dirty.objects,
1019                &self.cached.object_cache,
1020                *object_id,
1021                object.version(),
1022                &ObjectEntry::Object(object.clone()),
1023            );
1024        }
1025
1026        for ObjectKey(object_id, version) in deleted.iter() {
1027            Self::move_version_from_dirty_to_cache(
1028                &self.dirty.objects,
1029                &self.cached.object_cache,
1030                *object_id,
1031                *version,
1032                &ObjectEntry::Deleted,
1033            );
1034        }
1035
1036        for ObjectKey(object_id, version) in wrapped.iter() {
1037            Self::move_version_from_dirty_to_cache(
1038                &self.dirty.objects,
1039                &self.cached.object_cache,
1040                *object_id,
1041                *version,
1042                &ObjectEntry::Wrapped,
1043            );
1044        }
1045    }
1046
1047    async fn persist_transactions(&self, digests: &[TransactionDigest]) -> IotaResult {
1048        let mut txns = Vec::with_capacity(digests.len());
1049        for tx_digest in digests {
1050            let Some(tx) = self
1051                .dirty
1052                .pending_transaction_writes
1053                .get(tx_digest)
1054                .map(|o| o.transaction.clone())
1055            else {
1056                // tx should exist in the db if it is not in dirty set.
1057                debug_assert!(
1058                    self.store
1059                        .get_transaction_block(tx_digest)
1060                        .unwrap()
1061                        .is_some()
1062                );
1063                // If the transaction is not in dirty, it does not need to be committed.
1064                // This situation can happen if we build a checkpoint locally which was just
1065                // executed via state sync.
1066                continue;
1067            };
1068
1069            txns.push((*tx_digest, (*tx).clone()));
1070        }
1071
1072        self.store.commit_transactions(&txns)
1073    }
1074
1075    // Move the oldest/least entry from the dirty queue to the cache queue.
1076    // This is called after the entry is committed to the db.
1077    fn move_version_from_dirty_to_cache<K, V>(
1078        dirty: &DashMap<K, CachedVersionMap<V>>,
1079        cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1080        key: K,
1081        version: SequenceNumber,
1082        value: &V,
1083    ) where
1084        K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1085        V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1086    {
1087        static MAX_VERSIONS: usize = 3;
1088
1089        // IMPORTANT: lock both the dirty set entry and the cache entry before modifying
1090        // either. this ensures that readers cannot see a value temporarily
1091        // disappear.
1092        let dirty_entry = dirty.entry(key);
1093        let cache_entry = cache.entry(key).or_default();
1094        let mut cache_map = cache_entry.value().lock();
1095
1096        // insert into cache and drop old versions.
1097        cache_map.insert(version, value.clone());
1098        // TODO: make this automatic by giving CachedVersionMap an optional max capacity
1099        cache_map.truncate_to(MAX_VERSIONS);
1100
1101        let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1102            panic!("dirty map must exist");
1103        };
1104
1105        let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1106
1107        assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1108
1109        // if there are no versions remaining, remove the map entry
1110        if occupied_dirty_entry.get().is_empty() {
1111            occupied_dirty_entry.remove();
1112        }
1113    }
1114
1115    // Updates the latest object id cache with an entry that was read from the db.
1116    fn cache_latest_object_by_id(&self, object_id: &ObjectID, object: LatestObjectCacheEntry) {
1117        trace!("caching object by id: {:?} {:?}", object_id, object);
1118        self.metrics.record_cache_write("object_by_id");
1119        self.cached.object_by_id_cache.insert(object_id, object);
1120    }
1121
1122    fn cache_object_not_found(&self, object_id: &ObjectID) {
1123        self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent);
1124    }
1125
1126    fn clear_state_end_of_epoch_impl(&self, _execution_guard: &ExecutionLockWriteGuard<'_>) {
1127        info!("clearing state at end of epoch");
1128        assert!(
1129            self.dirty.pending_transaction_writes.is_empty(),
1130            "should be empty due to revert_state_update"
1131        );
1132        self.dirty.clear();
1133        info!("clearing old transaction locks");
1134        self.object_locks.clear();
1135    }
1136
1137    fn revert_state_update_impl(&self, tx: &TransactionDigest) -> IotaResult {
1138        // TODO: remove revert_state_update_impl entirely, and simply drop all dirty
1139        // state when clear_state_end_of_epoch_impl is called.
1140        // Further, once we do this, we can delay the insertion of the transaction into
1141        // pending_consensus_transactions until after the transaction has executed.
1142        let Some((_, outputs)) = self.dirty.pending_transaction_writes.remove(tx) else {
1143            assert!(
1144                !self.is_tx_already_executed(tx).expect("read cannot fail"),
1145                "attempt to revert committed transaction"
1146            );
1147
1148            // A transaction can be inserted into pending_consensus_transactions, but then
1149            // reconfiguration can happen before the transaction executes.
1150            info!("Not reverting {:?} as it was not executed", tx);
1151            return Ok(());
1152        };
1153
1154        for (object_id, object) in outputs.written.iter() {
1155            if object.is_package() {
1156                info!("removing non-finalized package from cache: {:?}", object_id);
1157                self.packages.invalidate(object_id);
1158            }
1159            self.cached.object_by_id_cache.invalidate(object_id);
1160            self.cached.object_cache.invalidate(object_id);
1161        }
1162
1163        for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1164            self.cached.object_by_id_cache.invalidate(object_id);
1165            self.cached.object_cache.invalidate(object_id);
1166        }
1167
1168        // Note: individual object entries are removed when
1169        // clear_state_end_of_epoch_impl is called
1170        Ok(())
1171    }
1172
1173    fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) -> IotaResult {
1174        self.store.bulk_insert_genesis_objects(objects)?;
1175        for obj in objects {
1176            self.cached.object_cache.invalidate(&obj.id());
1177            self.cached.object_by_id_cache.invalidate(&obj.id());
1178        }
1179        Ok(())
1180    }
1181
1182    fn insert_genesis_object_impl(&self, object: Object) -> IotaResult {
1183        self.cached.object_by_id_cache.invalidate(&object.id());
1184        self.cached.object_cache.invalidate(&object.id());
1185        self.store.insert_genesis_object(object)
1186    }
1187
1188    pub fn clear_caches_and_assert_empty(&self) {
1189        info!("clearing caches");
1190        self.cached.clear_and_assert_empty();
1191        self.packages.invalidate_all();
1192        assert_empty(&self.packages);
1193    }
1194}
1195
1196impl ExecutionCacheAPI for WritebackCache {}
1197
1198impl ExecutionCacheCommit for WritebackCache {
1199    fn commit_transaction_outputs<'a>(
1200        &'a self,
1201        epoch: EpochId,
1202        digests: &'a [TransactionDigest],
1203    ) -> BoxFuture<'a, IotaResult> {
1204        WritebackCache::commit_transaction_outputs(self, epoch, digests).boxed()
1205    }
1206
1207    fn persist_transactions<'a>(
1208        &'a self,
1209        digests: &'a [TransactionDigest],
1210    ) -> BoxFuture<'a, IotaResult> {
1211        WritebackCache::persist_transactions(self, digests).boxed()
1212    }
1213}
1214
1215impl ObjectCacheRead for WritebackCache {
1216    fn get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1217        self.metrics
1218            .record_cache_request("package", "package_cache");
1219        if let Some(p) = self.packages.get(package_id) {
1220            if cfg!(debug_assertions) {
1221                if let Some(store_package) = self.store.get_object(package_id).unwrap() {
1222                    assert_eq!(
1223                        store_package.digest(),
1224                        p.object().digest(),
1225                        "Package object cache is inconsistent for package {package_id:?}"
1226                    );
1227                }
1228            }
1229            self.metrics.record_cache_hit("package", "package_cache");
1230            return Ok(Some(p));
1231        } else {
1232            self.metrics.record_cache_miss("package", "package_cache");
1233        }
1234
1235        // We try the dirty objects cache as well before going to the database. This is
1236        // necessary because the package could be evicted from the package cache
1237        // before it is committed to the database.
1238        if let Some(p) = self.get_object_impl("package", package_id)? {
1239            if p.is_package() {
1240                let p = PackageObject::new(p);
1241                tracing::trace!(
1242                    "caching package: {:?}",
1243                    p.object().compute_object_reference()
1244                );
1245                self.metrics.record_cache_write("package");
1246                self.packages.insert(*package_id, p.clone());
1247                Ok(Some(p))
1248            } else {
1249                Err(IotaError::UserInput {
1250                    error: UserInputError::MoveObjectAsPackage {
1251                        object_id: *package_id,
1252                    },
1253                })
1254            }
1255        } else {
1256            Ok(None)
1257        }
1258    }
1259
1260    fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1261        // This is a no-op because all writes go through the cache, therefore it
1262        // can never be incoherent
1263    }
1264
1265    // get_object and variants.
1266
1267    fn get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
1268        self.get_object_impl("object_latest", id)
1269    }
1270
1271    fn get_object_by_key(
1272        &self,
1273        object_id: &ObjectID,
1274        version: SequenceNumber,
1275    ) -> IotaResult<Option<Object>> {
1276        match self.get_object_by_key_cache_only(object_id, version) {
1277            CacheResult::Hit(object) => Ok(Some(object)),
1278            CacheResult::NegativeHit => Ok(None),
1279            CacheResult::Miss => Ok(self
1280                .record_db_get("object_by_version")
1281                .get_object_by_key(object_id, version)?),
1282        }
1283    }
1284
1285    fn multi_get_objects_by_key(
1286        &self,
1287        object_keys: &[ObjectKey],
1288    ) -> Result<Vec<Option<Object>>, IotaError> {
1289        do_fallback_lookup(
1290            object_keys,
1291            |key| {
1292                Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1293                    CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1294                    CacheResult::NegativeHit => CacheResult::NegativeHit,
1295                    CacheResult::Miss => CacheResult::Miss,
1296                })
1297            },
1298            |remaining| {
1299                self.record_db_multi_get("object_by_version", remaining.len())
1300                    .multi_get_objects_by_key(remaining)
1301            },
1302        )
1303    }
1304
1305    fn object_exists_by_key(
1306        &self,
1307        object_id: &ObjectID,
1308        version: SequenceNumber,
1309    ) -> IotaResult<bool> {
1310        match self.get_object_by_key_cache_only(object_id, version) {
1311            CacheResult::Hit(_) => Ok(true),
1312            CacheResult::NegativeHit => Ok(false),
1313            CacheResult::Miss => self
1314                .record_db_get("object_by_version")
1315                .object_exists_by_key(object_id, version),
1316        }
1317    }
1318
1319    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
1320        do_fallback_lookup(
1321            object_keys,
1322            |key| {
1323                Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1324                    CacheResult::Hit(_) => CacheResult::Hit(true),
1325                    CacheResult::NegativeHit => CacheResult::Hit(false),
1326                    CacheResult::Miss => CacheResult::Miss,
1327                })
1328            },
1329            |remaining| {
1330                self.record_db_multi_get("object_by_version", remaining.len())
1331                    .multi_object_exists_by_key(remaining)
1332            },
1333        )
1334    }
1335
1336    fn get_latest_object_ref_or_tombstone(
1337        &self,
1338        object_id: ObjectID,
1339    ) -> IotaResult<Option<ObjectRef>> {
1340        match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1341            CacheResult::Hit((version, entry)) => Ok(Some(match entry {
1342                ObjectEntry::Object(object) => object.compute_object_reference(),
1343                ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1344                ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1345            })),
1346            CacheResult::NegativeHit => Ok(None),
1347            CacheResult::Miss => self
1348                .record_db_get("latest_objref_or_tombstone")
1349                .get_latest_object_ref_or_tombstone(object_id),
1350        }
1351    }
1352
1353    fn get_latest_object_or_tombstone(
1354        &self,
1355        object_id: ObjectID,
1356    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
1357        match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1358            CacheResult::Hit((version, entry)) => {
1359                let key = ObjectKey(object_id, version);
1360                Ok(Some(match entry {
1361                    ObjectEntry::Object(object) => (key, object.into()),
1362                    ObjectEntry::Deleted => (
1363                        key,
1364                        ObjectOrTombstone::Tombstone((
1365                            object_id,
1366                            version,
1367                            ObjectDigest::OBJECT_DIGEST_DELETED,
1368                        )),
1369                    ),
1370                    ObjectEntry::Wrapped => (
1371                        key,
1372                        ObjectOrTombstone::Tombstone((
1373                            object_id,
1374                            version,
1375                            ObjectDigest::OBJECT_DIGEST_WRAPPED,
1376                        )),
1377                    ),
1378                }))
1379            }
1380            CacheResult::NegativeHit => Ok(None),
1381            CacheResult::Miss => self
1382                .record_db_get("latest_object_or_tombstone")
1383                .get_latest_object_or_tombstone(object_id),
1384        }
1385    }
1386
1387    #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1388    fn find_object_lt_or_eq_version(
1389        &self,
1390        object_id: ObjectID,
1391        version_bound: SequenceNumber,
1392    ) -> IotaResult<Option<Object>> {
1393        macro_rules! check_cache_entry {
1394            ($level: expr, $objects: expr) => {
1395                self.metrics
1396                    .record_cache_request("object_lt_or_eq_version", $level);
1397                if let Some(objects) = $objects {
1398                    if let Some((_, object)) = objects
1399                        .all_versions_lt_or_eq_descending(&version_bound)
1400                        .next()
1401                    {
1402                        if let ObjectEntry::Object(object) = object {
1403                            self.metrics
1404                                .record_cache_hit("object_lt_or_eq_version", $level);
1405                            return Ok(Some(object.clone()));
1406                        } else {
1407                            // if we find a tombstone, the object does not exist
1408                            self.metrics
1409                                .record_cache_negative_hit("object_lt_or_eq_version", $level);
1410                            return Ok(None);
1411                        }
1412                    } else {
1413                        self.metrics
1414                            .record_cache_miss("object_lt_or_eq_version", $level);
1415                    }
1416                }
1417            };
1418        }
1419
1420        // if we have the latest version cached, and it is within the bound, we are done
1421        self.metrics
1422            .record_cache_request("object_lt_or_eq_version", "object_by_id");
1423        if let Some(latest) = self.cached.object_by_id_cache.get(&object_id) {
1424            let latest = latest.lock();
1425            match &*latest {
1426                LatestObjectCacheEntry::Object(latest_version, object) => {
1427                    if *latest_version <= version_bound {
1428                        if let ObjectEntry::Object(object) = object {
1429                            self.metrics
1430                                .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1431                            return Ok(Some(object.clone()));
1432                        } else {
1433                            // object is a tombstone, but is still within the version bound
1434                            self.metrics.record_cache_negative_hit(
1435                                "object_lt_or_eq_version",
1436                                "object_by_id",
1437                            );
1438                            return Ok(None);
1439                        }
1440                    }
1441                    // latest object is not within the version bound. fall
1442                    // through.
1443                }
1444                // No object by this ID exists at all
1445                LatestObjectCacheEntry::NonExistent => {
1446                    self.metrics
1447                        .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1448                    return Ok(None);
1449                }
1450            }
1451        }
1452        self.metrics
1453            .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1454
1455        Self::with_locked_cache_entries(
1456            &self.dirty.objects,
1457            &self.cached.object_cache,
1458            &object_id,
1459            |dirty_entry, cached_entry| {
1460                check_cache_entry!("committed", dirty_entry);
1461                check_cache_entry!("uncommitted", cached_entry);
1462
1463                // Much of the time, the query will be for the very latest object version, so
1464                // try that first. But we have to be careful:
1465                // 1. We must load the tombstone if it is present, because its version may
1466                //    exceed the version_bound, in which case we must do a scan.
1467                // 2. You might think we could just call
1468                //    `self.store.get_latest_object_or_tombstone` here. But we cannot, because
1469                //    there may be a more recent version in the dirty set, which we skipped over
1470                //    in check_cache_entry! because of the version bound. However, if we skipped
1471                //    it above, we will skip it here as well, again due to the version bound.
1472                // 3. Despite that, we really want to warm the cache here. Why? Because if the
1473                //    object is cold (not being written to), then we will very soon be able to
1474                //    start serving reads of it from the object_by_id cache, IF we can warm the
1475                //    cache. If we don't warm the cache here, and no writes to the object occur,
1476                //    then we will always have to go to the db for the object.
1477                //
1478                // Lastly, it is important to understand the rationale for all this: If the
1479                // object is write-hot, we will serve almost all reads to it
1480                // from the dirty set (or possibly the cached set if it is only
1481                // written to once every few checkpoints). If the object is
1482                // write-cold (or non-existent) and read-hot, then we will serve almost all
1483                // reads to it from the object_by_id cache check above.  Most of
1484                // the apparently wasteful code here exists only to ensure
1485                // correctness in all the edge cases.
1486                let latest: Option<(SequenceNumber, ObjectEntry)> =
1487                    if let Some(dirty_set) = dirty_entry {
1488                        dirty_set
1489                            .get_highest()
1490                            .cloned()
1491                            .tap_none(|| panic!("dirty set cannot be empty"))
1492                    } else {
1493                        self.record_db_get("object_lt_or_eq_version_latest")
1494                            .get_latest_object_or_tombstone(object_id)?
1495                            .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1496                                (version, ObjectEntry::from(obj_or_tombstone))
1497                            })
1498                    };
1499
1500                if let Some((obj_version, obj_entry)) = latest {
1501                    // we can always cache the latest object (or tombstone), even if it is not
1502                    // within the version_bound. This is done in order to warm
1503                    // the cache in the case where a sequence of transactions
1504                    // all read the same child object without writing to it.
1505                    self.cache_latest_object_by_id(
1506                        &object_id,
1507                        LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1508                    );
1509
1510                    if obj_version <= version_bound {
1511                        match obj_entry {
1512                            ObjectEntry::Object(object) => Ok(Some(object)),
1513                            ObjectEntry::Deleted | ObjectEntry::Wrapped => Ok(None),
1514                        }
1515                    } else {
1516                        // The latest object exceeded the bound, so now we have to do a scan
1517                        // But we already know there is no dirty entry within the bound,
1518                        // so we go to the db.
1519                        self.record_db_get("object_lt_or_eq_version_scan")
1520                            .find_object_lt_or_eq_version(object_id, version_bound)
1521                    }
1522                } else {
1523                    // no object found in dirty set or db, object does not exist
1524                    // When this is called from a read api (i.e. not the execution path) it is
1525                    // possible that the object has been deleted and pruned. In this case,
1526                    // there would be no entry at all on disk, but we may have a tombstone in the
1527                    // cache
1528                    let highest = cached_entry.and_then(|c| c.get_highest());
1529                    assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1530                    self.cache_object_not_found(&object_id);
1531                    Ok(None)
1532                }
1533            },
1534        )
1535    }
1536
1537    fn get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
1538        get_iota_system_state(self)
1539    }
1540
1541    fn get_marker_value(
1542        &self,
1543        object_id: &ObjectID,
1544        version: SequenceNumber,
1545        epoch_id: EpochId,
1546    ) -> IotaResult<Option<MarkerValue>> {
1547        match self.get_marker_value_cache_only(object_id, version, epoch_id) {
1548            CacheResult::Hit(marker) => Ok(Some(marker)),
1549            CacheResult::NegativeHit => Ok(None),
1550            CacheResult::Miss => self
1551                .record_db_get("marker_by_version")
1552                .get_marker_value(object_id, &version, epoch_id),
1553        }
1554    }
1555
1556    fn get_latest_marker(
1557        &self,
1558        object_id: &ObjectID,
1559        epoch_id: EpochId,
1560    ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
1561        match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1562            CacheResult::Hit((v, marker)) => Ok(Some((v, marker))),
1563            CacheResult::NegativeHit => {
1564                panic!("cannot have negative hit when getting latest marker")
1565            }
1566            CacheResult::Miss => self
1567                .record_db_get("marker_latest")
1568                .get_latest_marker(object_id, epoch_id),
1569        }
1570    }
1571
1572    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> IotaLockResult {
1573        match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1574            CacheResult::Hit((_, obj)) => {
1575                let actual_objref = obj.compute_object_reference();
1576                if obj_ref != actual_objref {
1577                    Ok(ObjectLockStatus::LockedAtDifferentVersion {
1578                        locked_ref: actual_objref,
1579                    })
1580                } else {
1581                    // requested object ref is live, check if there is a lock
1582                    Ok(
1583                        match self
1584                            .object_locks
1585                            .get_transaction_lock(&obj_ref, epoch_store)?
1586                        {
1587                            Some(tx_digest) => ObjectLockStatus::LockedToTx {
1588                                locked_by_tx: tx_digest,
1589                            },
1590                            None => ObjectLockStatus::Initialized,
1591                        },
1592                    )
1593                }
1594            }
1595            CacheResult::NegativeHit => {
1596                Err(IotaError::from(UserInputError::ObjectNotFound {
1597                    object_id: obj_ref.0,
1598                    // even though we know the requested version, we leave it as None to indicate
1599                    // that the object does not exist at any version
1600                    version: None,
1601                }))
1602            }
1603            CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1604        }
1605    }
1606
1607    fn _get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
1608        let obj = self.get_object_impl("live_objref", &object_id)?.ok_or(
1609            UserInputError::ObjectNotFound {
1610                object_id,
1611                version: None,
1612            },
1613        )?;
1614        Ok(obj.compute_object_reference())
1615    }
1616
1617    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1618        do_fallback_lookup(
1619            owned_object_refs,
1620            |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1621                CacheResult::Hit((version, obj)) => {
1622                    if obj.compute_object_reference() != *obj_ref {
1623                        Err(UserInputError::ObjectVersionUnavailableForConsumption {
1624                            provided_obj_ref: *obj_ref,
1625                            current_version: version,
1626                        }
1627                        .into())
1628                    } else {
1629                        Ok(CacheResult::Hit(()))
1630                    }
1631                }
1632                CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1633                    object_id: obj_ref.0,
1634                    version: None,
1635                }
1636                .into()),
1637                CacheResult::Miss => Ok(CacheResult::Miss),
1638            },
1639            |remaining| {
1640                self.record_db_multi_get("object_is_live", remaining.len())
1641                    .check_owned_objects_are_live(remaining)?;
1642                Ok(vec![(); remaining.len()])
1643            },
1644        )?;
1645        Ok(())
1646    }
1647
1648    fn get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
1649        self.store.perpetual_tables.get_highest_pruned_checkpoint()
1650    }
1651}
1652
1653impl TransactionCacheRead for WritebackCache {
1654    fn multi_get_transaction_blocks(
1655        &self,
1656        digests: &[TransactionDigest],
1657    ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
1658        do_fallback_lookup(
1659            digests,
1660            |digest| {
1661                self.metrics
1662                    .record_cache_request("transaction_block", "uncommitted");
1663                if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1664                    self.metrics
1665                        .record_cache_hit("transaction_block", "uncommitted");
1666                    return Ok(CacheResult::Hit(Some(tx.transaction.clone())));
1667                }
1668                self.metrics
1669                    .record_cache_miss("transaction_block", "uncommitted");
1670
1671                self.metrics
1672                    .record_cache_request("transaction_block", "committed");
1673                if let Some(tx) = self.cached.transactions.get(digest) {
1674                    self.metrics
1675                        .record_cache_hit("transaction_block", "committed");
1676                    return Ok(CacheResult::Hit(Some(tx.clone())));
1677                }
1678                self.metrics
1679                    .record_cache_miss("transaction_block", "committed");
1680
1681                Ok(CacheResult::Miss)
1682            },
1683            |remaining| {
1684                self.record_db_multi_get("transaction_block", remaining.len())
1685                    .multi_get_transaction_blocks(remaining)
1686                    .map(|v| v.into_iter().map(|o| o.map(Arc::new)).collect())
1687            },
1688        )
1689    }
1690
1691    fn multi_get_executed_effects_digests(
1692        &self,
1693        digests: &[TransactionDigest],
1694    ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
1695        do_fallback_lookup(
1696            digests,
1697            |digest| {
1698                self.metrics
1699                    .record_cache_request("executed_effects_digests", "uncommitted");
1700                if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
1701                    self.metrics
1702                        .record_cache_hit("executed_effects_digests", "uncommitted");
1703                    return Ok(CacheResult::Hit(Some(*digest)));
1704                }
1705                self.metrics
1706                    .record_cache_miss("executed_effects_digests", "uncommitted");
1707
1708                self.metrics
1709                    .record_cache_request("executed_effects_digests", "committed");
1710                if let Some(digest) = self.cached.executed_effects_digests.get(digest) {
1711                    self.metrics
1712                        .record_cache_hit("executed_effects_digests", "committed");
1713                    return Ok(CacheResult::Hit(Some(digest)));
1714                }
1715                self.metrics
1716                    .record_cache_miss("executed_effects_digests", "committed");
1717
1718                Ok(CacheResult::Miss)
1719            },
1720            |remaining| {
1721                self.record_db_multi_get("executed_effects_digests", remaining.len())
1722                    .multi_get_executed_effects_digests(remaining)
1723            },
1724        )
1725    }
1726
1727    fn multi_get_effects(
1728        &self,
1729        digests: &[TransactionEffectsDigest],
1730    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
1731        do_fallback_lookup(
1732            digests,
1733            |digest| {
1734                self.metrics
1735                    .record_cache_request("transaction_effects", "uncommitted");
1736                if let Some(effects) = self.dirty.transaction_effects.get(digest) {
1737                    self.metrics
1738                        .record_cache_hit("transaction_effects", "uncommitted");
1739                    return Ok(CacheResult::Hit(Some(effects.clone())));
1740                }
1741                self.metrics
1742                    .record_cache_miss("transaction_effects", "uncommitted");
1743
1744                self.metrics
1745                    .record_cache_request("transaction_effects", "committed");
1746                if let Some(effects) = self.cached.transaction_effects.get(digest) {
1747                    self.metrics
1748                        .record_cache_hit("transaction_effects", "committed");
1749                    return Ok(CacheResult::Hit(Some((*effects).clone())));
1750                }
1751                self.metrics
1752                    .record_cache_miss("transaction_effects", "committed");
1753
1754                Ok(CacheResult::Miss)
1755            },
1756            |remaining| {
1757                self.record_db_multi_get("transaction_effects", remaining.len())
1758                    .multi_get_effects(remaining.iter())
1759            },
1760        )
1761    }
1762
1763    fn notify_read_executed_effects_digests<'a>(
1764        &'a self,
1765        digests: &'a [TransactionDigest],
1766    ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
1767        self.executed_effects_digests_notify_read
1768            .read(digests, |digests| {
1769                self.multi_get_executed_effects_digests(digests)
1770            })
1771            .boxed()
1772    }
1773
1774    fn multi_get_events(
1775        &self,
1776        event_digests: &[TransactionEventsDigest],
1777    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
1778        fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
1779            if events.data.is_empty() {
1780                None
1781            } else {
1782                Some(events)
1783            }
1784        }
1785
1786        do_fallback_lookup(
1787            event_digests,
1788            |digest| {
1789                self.metrics
1790                    .record_cache_request("transaction_events", "uncommitted");
1791                if let Some(events) = self
1792                    .dirty
1793                    .transaction_events
1794                    .get(digest)
1795                    .map(|e| e.1.clone())
1796                {
1797                    self.metrics
1798                        .record_cache_hit("transaction_events", "uncommitted");
1799
1800                    return Ok(CacheResult::Hit(map_events(events)));
1801                }
1802                self.metrics
1803                    .record_cache_miss("transaction_events", "uncommitted");
1804
1805                self.metrics
1806                    .record_cache_request("transaction_events", "committed");
1807                if let Some(events) = self
1808                    .cached
1809                    .transaction_events
1810                    .get(digest)
1811                    .map(|e| (*e).clone())
1812                {
1813                    self.metrics
1814                        .record_cache_hit("transaction_events", "committed");
1815                    return Ok(CacheResult::Hit(map_events(events)));
1816                }
1817
1818                self.metrics
1819                    .record_cache_miss("transaction_events", "committed");
1820
1821                Ok(CacheResult::Miss)
1822            },
1823            |digests| self.store.multi_get_events(digests),
1824        )
1825    }
1826}
1827
1828impl ExecutionCacheWrite for WritebackCache {
1829    fn acquire_transaction_locks<'a>(
1830        &'a self,
1831        epoch_store: &'a AuthorityPerEpochStore,
1832        owned_input_objects: &'a [ObjectRef],
1833        transaction: VerifiedSignedTransaction,
1834    ) -> BoxFuture<'a, IotaResult> {
1835        self.object_locks
1836            .acquire_transaction_locks(self, epoch_store, owned_input_objects, transaction)
1837            .boxed()
1838    }
1839
1840    fn write_transaction_outputs(
1841        &self,
1842        epoch_id: EpochId,
1843        tx_outputs: Arc<TransactionOutputs>,
1844    ) -> BoxFuture<'_, IotaResult> {
1845        WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs).boxed()
1846    }
1847}
1848
1849/// do_fallback_lookup is a helper function for multi-get operations.
1850/// It takes a list of keys and first attempts to look up each key in the cache.
1851/// The cache can return a hit, a miss, or a negative hit (if the object is
1852/// known to not exist). Any keys that result in a miss are then looked up in
1853/// the store.
1854///
1855/// The "get from cache" and "get from store" behavior are implemented by the
1856/// caller and provided via the get_cached_key and multiget_fallback functions.
1857fn do_fallback_lookup<K: Copy, V: Default + Clone>(
1858    keys: &[K],
1859    get_cached_key: impl Fn(&K) -> IotaResult<CacheResult<V>>,
1860    multiget_fallback: impl Fn(&[K]) -> IotaResult<Vec<V>>,
1861) -> IotaResult<Vec<V>> {
1862    let mut results = vec![V::default(); keys.len()];
1863    let mut fallback_keys = Vec::with_capacity(keys.len());
1864    let mut fallback_indices = Vec::with_capacity(keys.len());
1865
1866    for (i, key) in keys.iter().enumerate() {
1867        match get_cached_key(key)? {
1868            CacheResult::Miss => {
1869                fallback_keys.push(*key);
1870                fallback_indices.push(i);
1871            }
1872            CacheResult::NegativeHit => (),
1873            CacheResult::Hit(value) => {
1874                results[i] = value;
1875            }
1876        }
1877    }
1878
1879    let fallback_results = multiget_fallback(&fallback_keys)?;
1880    assert_eq!(fallback_results.len(), fallback_indices.len());
1881    assert_eq!(fallback_results.len(), fallback_keys.len());
1882
1883    for (i, result) in fallback_indices
1884        .into_iter()
1885        .zip(fallback_results.into_iter())
1886    {
1887        results[i] = result;
1888    }
1889    Ok(results)
1890}
1891
1892implement_passthrough_traits!(WritebackCache);
1893
1894impl AccumulatorStore for WritebackCache {
1895    fn get_root_state_accumulator_for_epoch(
1896        &self,
1897        epoch: EpochId,
1898    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
1899        self.store.get_root_state_accumulator_for_epoch(epoch)
1900    }
1901
1902    fn get_root_state_accumulator_for_highest_epoch(
1903        &self,
1904    ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
1905        self.store.get_root_state_accumulator_for_highest_epoch()
1906    }
1907
1908    fn insert_state_accumulator_for_epoch(
1909        &self,
1910        epoch: EpochId,
1911        checkpoint_seq_num: &CheckpointSequenceNumber,
1912        acc: &Accumulator,
1913    ) -> IotaResult {
1914        self.store
1915            .insert_state_accumulator_for_epoch(epoch, checkpoint_seq_num, acc)
1916    }
1917
1918    fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1919        // The only time it is safe to iterate the live object set is at an epoch
1920        // boundary, at which point the db is consistent and the dirty cache is
1921        // empty. So this does read the cache
1922        assert!(
1923            self.dirty.is_empty(),
1924            "cannot iterate live object set with dirty data"
1925        );
1926        self.store.iter_live_object_set()
1927    }
1928
1929    // A version of iter_live_object_set that reads the cache. Only use for testing.
1930    // If used on a live validator, can cause the server to block for as long as
1931    // it takes to iterate the entire live object set.
1932    fn iter_cached_live_object_set_for_testing(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1933        // hold iter until we are finished to prevent any concurrent inserts/deletes
1934        let iter = self.dirty.objects.iter();
1935        let mut dirty_objects = BTreeMap::new();
1936
1937        // add everything from the store
1938        for obj in self.store.iter_live_object_set() {
1939            dirty_objects.insert(obj.object_id(), obj);
1940        }
1941
1942        // add everything from the cache, but also remove deletions
1943        for entry in iter {
1944            let id = *entry.key();
1945            let value = entry.value();
1946            match value.get_highest().unwrap() {
1947                (_, ObjectEntry::Object(object)) => {
1948                    dirty_objects.insert(id, LiveObject::Normal(object.clone()));
1949                }
1950                (_version, ObjectEntry::Wrapped) => {
1951                    dirty_objects.remove(&id);
1952                }
1953                (_, ObjectEntry::Deleted) => {
1954                    dirty_objects.remove(&id);
1955                }
1956            }
1957        }
1958
1959        Box::new(dirty_objects.into_values())
1960    }
1961}