iota_core/execution_cache/
writeback_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! MemoryCache is a cache for the transaction execution which delays writes to
6//! the database until transaction results are certified (i.e. they appear in a
7//! certified checkpoint, or an effects cert is observed by a fullnode). The
8//! cache also stores committed data in memory in order to serve future reads
9//! without hitting the database.
10//!
11//! For storing uncommitted transaction outputs, we cannot evict the data at all
12//! until it is written to disk. Committed data not only can be evicted, but it
13//! is also unbounded (imagine a stream of transactions that keep splitting a
14//! coin into smaller coins).
15//!
16//! We also want to be able to support negative cache hits (i.e. the case where
17//! we can determine an object does not exist without hitting the database).
18//!
19//! To achieve both of these goals, we split the cache data into two pieces, a
20//! dirty set and a cached set. The dirty set has no automatic evictions, data
21//! is only removed after being committed. The cached set is in a bounded-sized
22//! cache with automatic evictions. In order to support negative cache hits, we
23//! treat the two halves of the cache as FIFO queue. Newly written (dirty)
24//! versions are inserted to one end of the dirty queue. As versions are
25//! committed to disk, they are removed from the other end of the dirty queue
26//! and inserted into the cache queue. The cache queue is truncated if it
27//! exceeds its maximum size, by removing all but the N newest versions.
28//!
29//! This gives us the property that the sequence of versions in the dirty and
30//! cached queues are the most recent versions of the object, i.e. there can be
31//! no "gaps". This allows for the following:
32//!
33//!   - Negative cache hits: If the queried version is not in memory, but is
34//!     higher than the smallest version in the cached queue, it does not exist
35//!     in the db either.
36//!   - Bounded reads: When reading the most recent version that is <= some
37//!     version bound, we can correctly satisfy this query from the cache, or
38//!     determine that we must go to the db.
39//!
40//! Note that at any time, either or both the dirty or the cached queue may be
41//! non-existent. There may be no dirty versions of the objects, in which case
42//! there will be no dirty queue. And, the cached queue may be evicted from the
43//! cache, in which case there will be no cached queue. Because only the cached
44//! queue can be evicted (the dirty queue can only become empty by moving
45//! versions from it to the cached queue), the "highest versions" property still
46//! holds in all cases.
47//!
48//! The above design is used for both objects and markers.
49
50use std::{
51    collections::{BTreeMap, BTreeSet},
52    hash::Hash,
53    sync::Arc,
54};
55
56use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
57use futures::{FutureExt, future::BoxFuture};
58use iota_common::sync::notify_read::NotifyRead;
59use iota_macros::fail_point_async;
60use iota_types::{
61    accumulator::Accumulator,
62    base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
63    bridge::{Bridge, get_bridge},
64    digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
65    effects::{TransactionEffects, TransactionEvents},
66    error::{IotaError, IotaResult, UserInputError},
67    iota_system_state::{IotaSystemState, get_iota_system_state},
68    message_envelope::Message,
69    messages_checkpoint::CheckpointSequenceNumber,
70    object::Object,
71    storage::{MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject},
72    transaction::{VerifiedSignedTransaction, VerifiedTransaction},
73};
74use moka::sync::Cache as MokaCache;
75use parking_lot::Mutex;
76use prometheus::Registry;
77use tap::TapOptional;
78use tracing::{debug, info, instrument, trace, warn};
79
80use super::{
81    CheckpointCache, ExecutionCacheAPI, ExecutionCacheCommit, ExecutionCacheMetrics,
82    ExecutionCacheReconfigAPI, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI,
83    TransactionCacheRead,
84    cache_types::{CachedVersionMap, IsNewer, MonotonicCache},
85    implement_passthrough_traits,
86    object_locks::ObjectLocks,
87};
88use crate::{
89    authority::{
90        AuthorityStore,
91        authority_per_epoch_store::AuthorityPerEpochStore,
92        authority_store::{ExecutionLockWriteGuard, IotaLockResult, ObjectLockStatus},
93        authority_store_tables::LiveObject,
94        epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
95    },
96    state_accumulator::AccumulatorStore,
97    transaction_outputs::TransactionOutputs,
98};
99
100#[cfg(test)]
101#[path = "unit_tests/writeback_cache_tests.rs"]
102pub mod writeback_cache_tests;
103
104#[derive(Clone, PartialEq, Eq)]
105enum ObjectEntry {
106    Object(Object),
107    Deleted,
108    Wrapped,
109}
110
111impl ObjectEntry {
112    #[cfg(test)]
113    fn unwrap_object(&self) -> &Object {
114        match self {
115            ObjectEntry::Object(o) => o,
116            _ => panic!("unwrap_object called on non-Object"),
117        }
118    }
119
120    fn is_tombstone(&self) -> bool {
121        match self {
122            ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
123            ObjectEntry::Object(_) => false,
124        }
125    }
126}
127
128impl std::fmt::Debug for ObjectEntry {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        match self {
131            ObjectEntry::Object(o) => {
132                write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
133            }
134            ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
135            ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
136        }
137    }
138}
139
140impl From<Object> for ObjectEntry {
141    fn from(object: Object) -> Self {
142        ObjectEntry::Object(object)
143    }
144}
145
146impl From<ObjectOrTombstone> for ObjectEntry {
147    fn from(object: ObjectOrTombstone) -> Self {
148        match object {
149            ObjectOrTombstone::Object(o) => o.into(),
150            ObjectOrTombstone::Tombstone(obj_ref) => {
151                if obj_ref.2.is_deleted() {
152                    ObjectEntry::Deleted
153                } else if obj_ref.2.is_wrapped() {
154                    ObjectEntry::Wrapped
155                } else {
156                    panic!("tombstone digest must either be deleted or wrapped");
157                }
158            }
159        }
160    }
161}
162
163#[derive(Debug, Clone, PartialEq, Eq)]
164enum LatestObjectCacheEntry {
165    Object(SequenceNumber, ObjectEntry),
166    NonExistent,
167}
168
169impl LatestObjectCacheEntry {
170    #[cfg(test)]
171    fn version(&self) -> Option<SequenceNumber> {
172        match self {
173            LatestObjectCacheEntry::Object(version, _) => Some(*version),
174            LatestObjectCacheEntry::NonExistent => None,
175        }
176    }
177}
178
179impl IsNewer for LatestObjectCacheEntry {
180    fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
181        match (self, other) {
182            (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
183                v1 > v2
184            }
185            (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
186            _ => false,
187        }
188    }
189}
190
191type MarkerKey = (EpochId, ObjectID);
192
193enum CacheResult<T> {
194    /// Entry is in the cache
195    Hit(T),
196    /// Entry is not in the cache and is known to not exist
197    NegativeHit,
198    /// Entry is not in the cache and may or may not exist in the store
199    Miss,
200}
201
202/// UncommittedData stores execution outputs that are not yet written to the db.
203/// Entries in this struct can only be purged after they are committed.
204struct UncommittedData {
205    /// The object dirty set. All writes go into this table first. After we
206    /// flush the data to the db, the data is removed from this table and
207    /// inserted into the object_cache.
208    ///
209    /// This table may contain both live and dead objects, since we flush both
210    /// live and dead objects to the db in order to support past object
211    /// queries on fullnodes.
212    ///
213    /// Further, we only remove objects in FIFO order, which ensures that the
214    /// cached sequence of objects has no gaps. In other words, if we have
215    /// versions 4, 8, 13 of an object, we can deduce that version 9 does
216    /// not exist. This also makes child object reads efficient.
217    /// `object_cache` cannot contain a more recent version of an object than
218    /// `objects`, and neither can have any gaps. Therefore if there is any
219    /// object <= the version bound for a child read in objects, it is the
220    /// correct object to return.
221    objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
222
223    // Markers for received objects and deleted shared objects. This contains all of the dirty
224    // marker state, which is committed to the db at the same time as other transaction data.
225    // After markers are committed to the db we remove them from this table and insert them into
226    // marker_cache.
227    markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
228
229    transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
230
231    // Because TransactionEvents are not unique to the transaction that created them, we must
232    // reference count them in order to know when we can remove them from the cache. For now
233    // we track all referrers explicitly, but we can use a ref count when we are confident in
234    // the correctness of the code.
235    transaction_events:
236        DashMap<TransactionEventsDigest, (BTreeSet<TransactionDigest>, TransactionEvents)>,
237
238    executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
239
240    // Transaction outputs that have not yet been written to the DB. Items are removed from this
241    // table as they are flushed to the db.
242    pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
243}
244
245impl UncommittedData {
246    fn new() -> Self {
247        Self {
248            objects: DashMap::new(),
249            markers: DashMap::new(),
250            transaction_effects: DashMap::new(),
251            executed_effects_digests: DashMap::new(),
252            pending_transaction_writes: DashMap::new(),
253            transaction_events: DashMap::new(),
254        }
255    }
256
257    fn clear(&self) {
258        self.objects.clear();
259        self.markers.clear();
260        self.transaction_effects.clear();
261        self.executed_effects_digests.clear();
262        self.pending_transaction_writes.clear();
263        self.transaction_events.clear();
264    }
265
266    fn is_empty(&self) -> bool {
267        let empty = self.pending_transaction_writes.is_empty();
268        if empty && cfg!(debug_assertions) {
269            assert!(
270                self.objects.is_empty()
271                    && self.markers.is_empty()
272                    && self.transaction_effects.is_empty()
273                    && self.executed_effects_digests.is_empty()
274                    && self.transaction_events.is_empty()
275            );
276        }
277        empty
278    }
279}
280
281// TODO: set this via the config
282static MAX_CACHE_SIZE: u64 = 10000;
283
284/// CachedData stores data that has been committed to the db, but is likely to
285/// be read soon.
286struct CachedCommittedData {
287    // See module level comment for an explanation of caching strategy.
288    object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
289
290    // We separately cache the latest version of each object. Although this seems
291    // redundant, it is the only way to support populating the cache after a read.
292    // We cannot simply insert objects that we read off the disk into `object_cache`,
293    // since that may violate the no-missing-versions property.
294    // `object_by_id_cache` is also written to on writes so that it is always coherent.
295    object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
296
297    // See module level comment for an explanation of caching strategy.
298    marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
299
300    transactions: MokaCache<TransactionDigest, Arc<VerifiedTransaction>>,
301
302    transaction_effects: MokaCache<TransactionEffectsDigest, Arc<TransactionEffects>>,
303
304    transaction_events: MokaCache<TransactionEventsDigest, Arc<TransactionEvents>>,
305
306    executed_effects_digests: MokaCache<TransactionDigest, TransactionEffectsDigest>,
307
308    // Objects that were read at transaction signing time - allows us to access them again at
309    // execution time with a single lock / hash lookup
310    _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
311}
312
313impl CachedCommittedData {
314    fn new() -> Self {
315        let object_cache = MokaCache::builder()
316            .max_capacity(MAX_CACHE_SIZE)
317            .max_capacity(MAX_CACHE_SIZE)
318            .build();
319        let marker_cache = MokaCache::builder()
320            .max_capacity(MAX_CACHE_SIZE)
321            .max_capacity(MAX_CACHE_SIZE)
322            .build();
323        let transactions = MokaCache::builder()
324            .max_capacity(MAX_CACHE_SIZE)
325            .max_capacity(MAX_CACHE_SIZE)
326            .build();
327        let transaction_effects = MokaCache::builder()
328            .max_capacity(MAX_CACHE_SIZE)
329            .max_capacity(MAX_CACHE_SIZE)
330            .build();
331        let transaction_events = MokaCache::builder()
332            .max_capacity(MAX_CACHE_SIZE)
333            .max_capacity(MAX_CACHE_SIZE)
334            .build();
335        let executed_effects_digests = MokaCache::builder()
336            .max_capacity(MAX_CACHE_SIZE)
337            .max_capacity(MAX_CACHE_SIZE)
338            .build();
339        let transaction_objects = MokaCache::builder()
340            .max_capacity(MAX_CACHE_SIZE)
341            .max_capacity(MAX_CACHE_SIZE)
342            .build();
343
344        Self {
345            object_cache,
346            object_by_id_cache: MonotonicCache::new(MAX_CACHE_SIZE),
347            marker_cache,
348            transactions,
349            transaction_effects,
350            transaction_events,
351            executed_effects_digests,
352            _transaction_objects: transaction_objects,
353        }
354    }
355
356    fn clear_and_assert_empty(&self) {
357        self.object_cache.invalidate_all();
358        self.object_by_id_cache.invalidate_all();
359        self.marker_cache.invalidate_all();
360        self.transactions.invalidate_all();
361        self.transaction_effects.invalidate_all();
362        self.transaction_events.invalidate_all();
363        self.executed_effects_digests.invalidate_all();
364        self._transaction_objects.invalidate_all();
365
366        assert_empty(&self.object_cache);
367        assert!(&self.object_by_id_cache.is_empty());
368        assert_empty(&self.marker_cache);
369        assert_empty(&self.transactions);
370        assert_empty(&self.transaction_effects);
371        assert_empty(&self.transaction_events);
372        assert_empty(&self.executed_effects_digests);
373        assert_empty(&self._transaction_objects);
374    }
375}
376
377fn assert_empty<K, V>(cache: &MokaCache<K, V>)
378where
379    K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
380    V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
381{
382    if cache.iter().next().is_some() {
383        panic!("cache should be empty");
384    }
385}
386
387pub struct WritebackCache {
388    dirty: UncommittedData,
389    cached: CachedCommittedData,
390
391    // The packages cache is treated separately from objects, because they are immutable and can be
392    // used by any number of transactions. Additionally, many operations require loading large
393    // numbers of packages (due to dependencies), so we want to try to keep all packages in memory.
394    //
395    // Also, this cache can contain packages that are dirty or committed, so it does not live in
396    // UncachedData or CachedCommittedData. The cache is populated in two ways:
397    // - when packages are written (in which case they will also be present in the dirty set)
398    // - after a cache miss. Because package IDs are unique (only one version exists for each ID)
399    //   we do not need to worry about the contiguous version property.
400    // - note that we removed any unfinalized packages from the cache during revert_state_update().
401    packages: MokaCache<ObjectID, PackageObject>,
402
403    object_locks: ObjectLocks,
404
405    executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
406    store: Arc<AuthorityStore>,
407    metrics: Arc<ExecutionCacheMetrics>,
408}
409
410macro_rules! check_cache_entry_by_version {
411    ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
412        $self.metrics.record_cache_request($table, $level);
413        if let Some(cache) = $cache {
414            if let Some(entry) = cache.get(&$version) {
415                $self.metrics.record_cache_hit($table, $level);
416                return CacheResult::Hit(entry.clone());
417            }
418
419            if let Some(least_version) = cache.get_least() {
420                if least_version.0 < $version {
421                    // If the version is greater than the least version in the cache, then we know
422                    // that the object does not exist anywhere
423                    $self.metrics.record_cache_negative_hit($table, $level);
424                    return CacheResult::NegativeHit;
425                }
426            }
427        }
428        $self.metrics.record_cache_miss($table, $level);
429    };
430}
431
432macro_rules! check_cache_entry_by_latest {
433    ($self: ident, $table: expr, $level: expr, $cache: expr) => {
434        $self.metrics.record_cache_request($table, $level);
435        if let Some(cache) = $cache {
436            if let Some((version, entry)) = cache.get_highest() {
437                $self.metrics.record_cache_hit($table, $level);
438                return CacheResult::Hit((*version, entry.clone()));
439            } else {
440                panic!("empty CachedVersionMap should have been removed");
441            }
442        }
443        $self.metrics.record_cache_miss($table, $level);
444    };
445}
446
447impl WritebackCache {
448    pub fn new(store: Arc<AuthorityStore>, metrics: Arc<ExecutionCacheMetrics>) -> Self {
449        let packages = MokaCache::builder()
450            .max_capacity(MAX_CACHE_SIZE)
451            .max_capacity(MAX_CACHE_SIZE)
452            .build();
453        Self {
454            dirty: UncommittedData::new(),
455            cached: CachedCommittedData::new(),
456            packages,
457            object_locks: ObjectLocks::new(),
458            executed_effects_digests_notify_read: NotifyRead::new(),
459            store,
460            metrics,
461        }
462    }
463
464    pub fn new_for_tests(store: Arc<AuthorityStore>, registry: &Registry) -> Self {
465        Self::new(store, ExecutionCacheMetrics::new(registry).into())
466    }
467
468    #[cfg(test)]
469    pub fn reset_for_test(&mut self) {
470        let mut new = Self::new(self.store.clone(), self.metrics.clone());
471        std::mem::swap(self, &mut new);
472    }
473
474    async fn write_object_entry(
475        &self,
476        object_id: &ObjectID,
477        version: SequenceNumber,
478        object: ObjectEntry,
479    ) {
480        debug!(?object_id, ?version, ?object, "inserting object entry");
481        fail_point_async!("write_object_entry");
482        self.metrics.record_cache_write("object");
483
484        // We must hold the lock for the object entry while inserting to the
485        // object_by_id_cache. Otherwise, a surprising bug can occur:
486        //
487        // 1. A thread executing TX1 can write object (O,1) to the dirty set and then
488        //    pause.
489        // 2. TX2, which reads (O,1) can begin executing, because TransactionManager
490        //    immediately schedules transactions if their inputs are available. It does
491        //    not matter that TX1 hasn't finished executing yet.
492        // 3. TX2 can write (O,2) to both the dirty set and the object_by_id_cache.
493        // 4. The thread executing TX1 can resume and write (O,1) to the
494        //    object_by_id_cache.
495        //
496        // Now, any subsequent attempt to get the latest version of O will return (O,1)
497        // instead of (O,2).
498        //
499        // This seems very unlikely, but it may be possible under the following
500        // circumstances:
501        // - While a thread is unlikely to pause for so long, moka cache uses optimistic
502        //   lock-free algorithms that have retry loops. Possibly, under high
503        //   contention, this code might spin for a surprisingly long time.
504        // - Additionally, many concurrent re-executions of the same tx could happen due
505        //   to the tx finalizer, plus checkpoint executor, consensus, and RPCs from
506        //   fullnodes.
507        let mut entry = self.dirty.objects.entry(*object_id).or_default();
508
509        self.cached.object_by_id_cache.insert(
510            object_id,
511            LatestObjectCacheEntry::Object(version, object.clone()),
512        );
513
514        entry.insert(version, object);
515    }
516
517    async fn write_marker_value(
518        &self,
519        epoch_id: EpochId,
520        object_key: &ObjectKey,
521        marker_value: MarkerValue,
522    ) {
523        tracing::trace!(
524            "inserting marker value {:?}: {:?}",
525            object_key,
526            marker_value
527        );
528        fail_point_async!("write_marker_entry");
529        self.metrics.record_cache_write("marker");
530        self.dirty
531            .markers
532            .entry((epoch_id, object_key.0))
533            .or_default()
534            .value_mut()
535            .insert(object_key.1, marker_value);
536    }
537
538    // lock both the dirty and committed sides of the cache, and then pass the
539    // entries to the callback. Written with the `with` pattern because any
540    // other way of doing this creates lifetime hell.
541    fn with_locked_cache_entries<K, V, R>(
542        dirty_map: &DashMap<K, CachedVersionMap<V>>,
543        cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
544        key: &K,
545        cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
546    ) -> R
547    where
548        K: Copy + Eq + Hash + Send + Sync + 'static,
549        V: Send + Sync + 'static,
550    {
551        let dirty_entry = dirty_map.entry(*key);
552        let dirty_entry = match &dirty_entry {
553            DashMapEntry::Occupied(occupied) => Some(occupied.get()),
554            DashMapEntry::Vacant(_) => None,
555        };
556
557        let cached_entry = cached_map.get(key);
558        let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
559        let cached_entry = cached_lock.as_deref();
560
561        cb(dirty_entry, cached_entry)
562    }
563
564    // Attempt to get an object from the cache. The DB is not consulted.
565    // Can return Hit, Miss, or NegativeHit (if the object is known to not exist).
566    fn get_object_entry_by_key_cache_only(
567        &self,
568        object_id: &ObjectID,
569        version: SequenceNumber,
570    ) -> CacheResult<ObjectEntry> {
571        Self::with_locked_cache_entries(
572            &self.dirty.objects,
573            &self.cached.object_cache,
574            object_id,
575            |dirty_entry, cached_entry| {
576                check_cache_entry_by_version!(
577                    self,
578                    "object_by_version",
579                    "uncommitted",
580                    dirty_entry,
581                    version
582                );
583                check_cache_entry_by_version!(
584                    self,
585                    "object_by_version",
586                    "committed",
587                    cached_entry,
588                    version
589                );
590                CacheResult::Miss
591            },
592        )
593    }
594
595    fn get_object_by_key_cache_only(
596        &self,
597        object_id: &ObjectID,
598        version: SequenceNumber,
599    ) -> CacheResult<Object> {
600        match self.get_object_entry_by_key_cache_only(object_id, version) {
601            CacheResult::Hit(entry) => match entry {
602                ObjectEntry::Object(object) => CacheResult::Hit(object),
603                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
604            },
605            CacheResult::Miss => CacheResult::Miss,
606            CacheResult::NegativeHit => CacheResult::NegativeHit,
607        }
608    }
609
610    fn get_object_entry_by_id_cache_only(
611        &self,
612        request_type: &'static str,
613        object_id: &ObjectID,
614    ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
615        self.metrics
616            .record_cache_request(request_type, "object_by_id");
617        let entry = self.cached.object_by_id_cache.get(object_id);
618
619        if cfg!(debug_assertions) {
620            if let Some(entry) = &entry {
621                // check that cache is coherent
622                let highest: Option<ObjectEntry> = self
623                    .dirty
624                    .objects
625                    .get(object_id)
626                    .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
627                    .or_else(|| {
628                        let obj: Option<ObjectEntry> = self
629                            .store
630                            .get_latest_object_or_tombstone(*object_id)
631                            .unwrap()
632                            .map(|(_, o)| o.into());
633                        obj
634                    });
635
636                let cache_entry = match &*entry.lock() {
637                    LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
638                    LatestObjectCacheEntry::NonExistent => None,
639                };
640
641                // If the cache entry is a tombstone, the db entry may be missing if it was
642                // pruned.
643                let tombstone_possibly_pruned = highest.is_none()
644                    && cache_entry
645                        .as_ref()
646                        .map(|e| e.is_tombstone())
647                        .unwrap_or(false);
648
649                if highest != cache_entry && !tombstone_possibly_pruned {
650                    tracing::error!(
651                        ?highest,
652                        ?cache_entry,
653                        ?tombstone_possibly_pruned,
654                        "object_by_id cache is incoherent for {:?}",
655                        object_id
656                    );
657                    panic!("object_by_id cache is incoherent for {:?}", object_id);
658                }
659            }
660        }
661
662        if let Some(entry) = entry {
663            let entry = entry.lock();
664            match &*entry {
665                LatestObjectCacheEntry::Object(latest_version, latest_object) => {
666                    self.metrics.record_cache_hit(request_type, "object_by_id");
667                    return CacheResult::Hit((*latest_version, latest_object.clone()));
668                }
669                LatestObjectCacheEntry::NonExistent => {
670                    self.metrics
671                        .record_cache_negative_hit(request_type, "object_by_id");
672                    return CacheResult::NegativeHit;
673                }
674            }
675        } else {
676            self.metrics.record_cache_miss(request_type, "object_by_id");
677        }
678
679        Self::with_locked_cache_entries(
680            &self.dirty.objects,
681            &self.cached.object_cache,
682            object_id,
683            |dirty_entry, cached_entry| {
684                check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
685                check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
686                CacheResult::Miss
687            },
688        )
689    }
690
691    fn get_object_by_id_cache_only(
692        &self,
693        request_type: &'static str,
694        object_id: &ObjectID,
695    ) -> CacheResult<(SequenceNumber, Object)> {
696        match self.get_object_entry_by_id_cache_only(request_type, object_id) {
697            CacheResult::Hit((version, entry)) => match entry {
698                ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
699                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
700            },
701            CacheResult::NegativeHit => CacheResult::NegativeHit,
702            CacheResult::Miss => CacheResult::Miss,
703        }
704    }
705
706    fn get_marker_value_cache_only(
707        &self,
708        object_id: &ObjectID,
709        version: SequenceNumber,
710        epoch_id: EpochId,
711    ) -> CacheResult<MarkerValue> {
712        Self::with_locked_cache_entries(
713            &self.dirty.markers,
714            &self.cached.marker_cache,
715            &(epoch_id, *object_id),
716            |dirty_entry, cached_entry| {
717                check_cache_entry_by_version!(
718                    self,
719                    "marker_by_version",
720                    "uncommitted",
721                    dirty_entry,
722                    version
723                );
724                check_cache_entry_by_version!(
725                    self,
726                    "marker_by_version",
727                    "committed",
728                    cached_entry,
729                    version
730                );
731                CacheResult::Miss
732            },
733        )
734    }
735
736    fn get_latest_marker_value_cache_only(
737        &self,
738        object_id: &ObjectID,
739        epoch_id: EpochId,
740    ) -> CacheResult<(SequenceNumber, MarkerValue)> {
741        Self::with_locked_cache_entries(
742            &self.dirty.markers,
743            &self.cached.marker_cache,
744            &(epoch_id, *object_id),
745            |dirty_entry, cached_entry| {
746                check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
747                check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
748                CacheResult::Miss
749            },
750        )
751    }
752
753    fn get_object_impl(
754        &self,
755        request_type: &'static str,
756        id: &ObjectID,
757    ) -> IotaResult<Option<Object>> {
758        match self.get_object_by_id_cache_only(request_type, id) {
759            CacheResult::Hit((_, object)) => Ok(Some(object)),
760            CacheResult::NegativeHit => Ok(None),
761            CacheResult::Miss => {
762                let obj = self.store.get_object(id)?;
763                if let Some(obj) = &obj {
764                    self.cache_latest_object_by_id(
765                        id,
766                        LatestObjectCacheEntry::Object(obj.version(), obj.clone().into()),
767                    );
768                } else {
769                    self.cache_object_not_found(id);
770                }
771                Ok(obj)
772            }
773        }
774    }
775
776    fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
777        self.metrics.record_cache_request(request_type, "db");
778        &self.store
779    }
780
781    fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
782        self.metrics
783            .record_cache_multi_request(request_type, "db", count);
784        &self.store
785    }
786
787    #[instrument(level = "debug", skip_all)]
788    async fn write_transaction_outputs(
789        &self,
790        epoch_id: EpochId,
791        tx_outputs: Arc<TransactionOutputs>,
792    ) -> IotaResult {
793        trace!(digest = ?tx_outputs.transaction.digest(), "writing transaction outputs to cache");
794
795        let TransactionOutputs {
796            transaction,
797            effects,
798            markers,
799            written,
800            deleted,
801            wrapped,
802            events,
803            ..
804        } = &*tx_outputs;
805
806        // Deletions and wraps must be written first. The reason is that one of the
807        // deletes may be a child object, and if we write the parent object
808        // first, a reader may or may not see the previous version of the child
809        // object, instead of the deleted/wrapped tombstone, which would cause
810        // an execution fork
811        for ObjectKey(id, version) in deleted.iter() {
812            self.write_object_entry(id, *version, ObjectEntry::Deleted)
813                .await;
814        }
815
816        for ObjectKey(id, version) in wrapped.iter() {
817            self.write_object_entry(id, *version, ObjectEntry::Wrapped)
818                .await;
819        }
820
821        // Update all markers
822        for (object_key, marker_value) in markers.iter() {
823            self.write_marker_value(epoch_id, object_key, *marker_value)
824                .await;
825        }
826
827        // Write children before parents to ensure that readers do not observe a parent
828        // object before its most recent children are visible.
829        for (object_id, object) in written.iter() {
830            if object.is_child_object() {
831                self.write_object_entry(object_id, object.version(), object.clone().into())
832                    .await;
833            }
834        }
835        for (object_id, object) in written.iter() {
836            if !object.is_child_object() {
837                self.write_object_entry(object_id, object.version(), object.clone().into())
838                    .await;
839                if object.is_package() {
840                    debug!("caching package: {:?}", object.compute_object_reference());
841                    self.packages
842                        .insert(*object_id, PackageObject::new(object.clone()));
843                }
844            }
845        }
846
847        let tx_digest = *transaction.digest();
848        let effects_digest = effects.digest();
849
850        self.metrics.record_cache_write("transaction_block");
851        self.dirty
852            .pending_transaction_writes
853            .insert(tx_digest, tx_outputs.clone());
854
855        // insert transaction effects before executed_effects_digests so that there
856        // are never dangling entries in executed_effects_digests
857        self.metrics.record_cache_write("transaction_effects");
858        self.dirty
859            .transaction_effects
860            .insert(effects_digest, effects.clone());
861
862        // note: if events.data.is_empty(), then there are no events for this
863        // transaction. We store it anyway to avoid special cases in
864        // commint_transaction_outputs, and translate an empty events structure
865        // to None when reading.
866        self.metrics.record_cache_write("transaction_events");
867        match self.dirty.transaction_events.entry(events.digest()) {
868            DashMapEntry::Occupied(mut occupied) => {
869                occupied.get_mut().0.insert(tx_digest);
870            }
871            DashMapEntry::Vacant(entry) => {
872                let mut txns = BTreeSet::new();
873                txns.insert(tx_digest);
874                entry.insert((txns, events.clone()));
875            }
876        }
877
878        self.metrics.record_cache_write("executed_effects_digests");
879        self.dirty
880            .executed_effects_digests
881            .insert(tx_digest, effects_digest);
882
883        self.executed_effects_digests_notify_read
884            .notify(&tx_digest, &effects_digest);
885
886        self.metrics
887            .pending_notify_read
888            .set(self.executed_effects_digests_notify_read.num_pending() as i64);
889
890        Ok(())
891    }
892
893    // Commits dirty data for the given TransactionDigest to the db.
894    #[instrument(level = "debug", skip_all)]
895    async fn commit_transaction_outputs(
896        &self,
897        epoch: EpochId,
898        digests: &[TransactionDigest],
899    ) -> IotaResult {
900        fail_point_async!("writeback-cache-commit");
901        trace!(?digests);
902
903        let mut all_outputs = Vec::with_capacity(digests.len());
904        for tx in digests {
905            let Some(outputs) = self
906                .dirty
907                .pending_transaction_writes
908                .get(tx)
909                .map(|o| o.clone())
910            else {
911                // This can happen in the following rare case:
912                // All transactions in the checkpoint are committed to the db (by
913                // commit_transaction_outputs,
914                // called in CheckpointExecutor::process_executed_transactions), but the process
915                // crashes before the checkpoint water mark is bumped. We will
916                // then re-commit the checkpoint at startup, despite that all
917                // transactions are already executed.
918                warn!("Attempt to commit unknown transaction {:?}", tx);
919                continue;
920            };
921            all_outputs.push(outputs);
922        }
923
924        // Flush writes to disk before removing anything from dirty set. otherwise,
925        // a cache eviction could cause a value to disappear briefly, even if we insert
926        // to the cache before removing from the dirty set.
927        self.store
928            .write_transaction_outputs(epoch, &all_outputs)
929            .await?;
930
931        for outputs in all_outputs.iter() {
932            let tx_digest = outputs.transaction.digest();
933            assert!(
934                self.dirty
935                    .pending_transaction_writes
936                    .remove(tx_digest)
937                    .is_some()
938            );
939            self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
940        }
941
942        Ok(())
943    }
944
945    fn flush_transactions_from_dirty_to_cached(
946        &self,
947        epoch: EpochId,
948        tx_digest: TransactionDigest,
949        outputs: &TransactionOutputs,
950    ) {
951        // Now, remove each piece of committed data from the dirty state and insert it
952        // into the cache. TODO: outputs should have a strong count of 1 so we
953        // should be able to move out of it
954        let TransactionOutputs {
955            transaction,
956            effects,
957            markers,
958            written,
959            deleted,
960            wrapped,
961            events,
962            ..
963        } = outputs;
964
965        let effects_digest = effects.digest();
966        let events_digest = events.digest();
967
968        // Update cache before removing from self.dirty to avoid
969        // unnecessary cache misses
970        self.cached
971            .transactions
972            .insert(tx_digest, transaction.clone());
973        self.cached
974            .transaction_effects
975            .insert(effects_digest, effects.clone().into());
976        self.cached
977            .executed_effects_digests
978            .insert(tx_digest, effects_digest);
979        self.cached
980            .transaction_events
981            .insert(events_digest, events.clone().into());
982
983        self.dirty
984            .transaction_effects
985            .remove(&effects_digest)
986            .expect("effects must exist");
987
988        match self.dirty.transaction_events.entry(events.digest()) {
989            DashMapEntry::Occupied(mut occupied) => {
990                let txns = &mut occupied.get_mut().0;
991                assert!(txns.remove(&tx_digest), "transaction must exist");
992                if txns.is_empty() {
993                    occupied.remove();
994                }
995            }
996            DashMapEntry::Vacant(_) => {
997                panic!("events must exist");
998            }
999        }
1000
1001        self.dirty
1002            .executed_effects_digests
1003            .remove(&tx_digest)
1004            .expect("executed effects must exist");
1005
1006        // Move dirty markers to cache
1007        for (object_key, marker_value) in markers.iter() {
1008            Self::move_version_from_dirty_to_cache(
1009                &self.dirty.markers,
1010                &self.cached.marker_cache,
1011                (epoch, object_key.0),
1012                object_key.1,
1013                marker_value,
1014            );
1015        }
1016
1017        for (object_id, object) in written.iter() {
1018            Self::move_version_from_dirty_to_cache(
1019                &self.dirty.objects,
1020                &self.cached.object_cache,
1021                *object_id,
1022                object.version(),
1023                &ObjectEntry::Object(object.clone()),
1024            );
1025        }
1026
1027        for ObjectKey(object_id, version) in deleted.iter() {
1028            Self::move_version_from_dirty_to_cache(
1029                &self.dirty.objects,
1030                &self.cached.object_cache,
1031                *object_id,
1032                *version,
1033                &ObjectEntry::Deleted,
1034            );
1035        }
1036
1037        for ObjectKey(object_id, version) in wrapped.iter() {
1038            Self::move_version_from_dirty_to_cache(
1039                &self.dirty.objects,
1040                &self.cached.object_cache,
1041                *object_id,
1042                *version,
1043                &ObjectEntry::Wrapped,
1044            );
1045        }
1046    }
1047
1048    async fn persist_transactions(&self, digests: &[TransactionDigest]) -> IotaResult {
1049        let mut txns = Vec::with_capacity(digests.len());
1050        for tx_digest in digests {
1051            let Some(tx) = self
1052                .dirty
1053                .pending_transaction_writes
1054                .get(tx_digest)
1055                .map(|o| o.transaction.clone())
1056            else {
1057                // tx should exist in the db if it is not in dirty set.
1058                debug_assert!(
1059                    self.store
1060                        .get_transaction_block(tx_digest)
1061                        .unwrap()
1062                        .is_some()
1063                );
1064                // If the transaction is not in dirty, it does not need to be committed.
1065                // This situation can happen if we build a checkpoint locally which was just
1066                // executed via state sync.
1067                continue;
1068            };
1069
1070            txns.push((*tx_digest, (*tx).clone()));
1071        }
1072
1073        self.store.commit_transactions(&txns)
1074    }
1075
1076    // Move the oldest/least entry from the dirty queue to the cache queue.
1077    // This is called after the entry is committed to the db.
1078    fn move_version_from_dirty_to_cache<K, V>(
1079        dirty: &DashMap<K, CachedVersionMap<V>>,
1080        cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1081        key: K,
1082        version: SequenceNumber,
1083        value: &V,
1084    ) where
1085        K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1086        V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1087    {
1088        static MAX_VERSIONS: usize = 3;
1089
1090        // IMPORTANT: lock both the dirty set entry and the cache entry before modifying
1091        // either. this ensures that readers cannot see a value temporarily
1092        // disappear.
1093        let dirty_entry = dirty.entry(key);
1094        let cache_entry = cache.entry(key).or_default();
1095        let mut cache_map = cache_entry.value().lock();
1096
1097        // insert into cache and drop old versions.
1098        cache_map.insert(version, value.clone());
1099        // TODO: make this automatic by giving CachedVersionMap an optional max capacity
1100        cache_map.truncate_to(MAX_VERSIONS);
1101
1102        let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1103            panic!("dirty map must exist");
1104        };
1105
1106        let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1107
1108        assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1109
1110        // if there are no versions remaining, remove the map entry
1111        if occupied_dirty_entry.get().is_empty() {
1112            occupied_dirty_entry.remove();
1113        }
1114    }
1115
1116    // Updates the latest object id cache with an entry that was read from the db.
1117    fn cache_latest_object_by_id(&self, object_id: &ObjectID, object: LatestObjectCacheEntry) {
1118        trace!("caching object by id: {:?} {:?}", object_id, object);
1119        self.metrics.record_cache_write("object_by_id");
1120        self.cached.object_by_id_cache.insert(object_id, object);
1121    }
1122
1123    fn cache_object_not_found(&self, object_id: &ObjectID) {
1124        self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent);
1125    }
1126
1127    fn clear_state_end_of_epoch_impl(&self, _execution_guard: &ExecutionLockWriteGuard<'_>) {
1128        info!("clearing state at end of epoch");
1129        assert!(
1130            self.dirty.pending_transaction_writes.is_empty(),
1131            "should be empty due to revert_state_update"
1132        );
1133        self.dirty.clear();
1134        info!("clearing old transaction locks");
1135        self.object_locks.clear();
1136    }
1137
1138    fn revert_state_update_impl(&self, tx: &TransactionDigest) -> IotaResult {
1139        // TODO: remove revert_state_update_impl entirely, and simply drop all dirty
1140        // state when clear_state_end_of_epoch_impl is called.
1141        // Further, once we do this, we can delay the insertion of the transaction into
1142        // pending_consensus_transactions until after the transaction has executed.
1143        let Some((_, outputs)) = self.dirty.pending_transaction_writes.remove(tx) else {
1144            assert!(
1145                !self.is_tx_already_executed(tx).expect("read cannot fail"),
1146                "attempt to revert committed transaction"
1147            );
1148
1149            // A transaction can be inserted into pending_consensus_transactions, but then
1150            // reconfiguration can happen before the transaction executes.
1151            info!("Not reverting {:?} as it was not executed", tx);
1152            return Ok(());
1153        };
1154
1155        for (object_id, object) in outputs.written.iter() {
1156            if object.is_package() {
1157                info!("removing non-finalized package from cache: {:?}", object_id);
1158                self.packages.invalidate(object_id);
1159            }
1160            self.cached.object_by_id_cache.invalidate(object_id);
1161        }
1162
1163        for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1164            self.cached.object_by_id_cache.invalidate(object_id);
1165        }
1166
1167        // Note: individual object entries are removed when
1168        // clear_state_end_of_epoch_impl is called
1169        Ok(())
1170    }
1171
1172    pub fn clear_caches_and_assert_empty(&self) {
1173        info!("clearing caches");
1174        self.cached.clear_and_assert_empty();
1175        self.packages.invalidate_all();
1176        assert_empty(&self.packages);
1177    }
1178}
1179
1180impl ExecutionCacheAPI for WritebackCache {}
1181
1182impl ExecutionCacheCommit for WritebackCache {
1183    fn commit_transaction_outputs<'a>(
1184        &'a self,
1185        epoch: EpochId,
1186        digests: &'a [TransactionDigest],
1187    ) -> BoxFuture<'a, IotaResult> {
1188        WritebackCache::commit_transaction_outputs(self, epoch, digests).boxed()
1189    }
1190
1191    fn persist_transactions<'a>(
1192        &'a self,
1193        digests: &'a [TransactionDigest],
1194    ) -> BoxFuture<'a, IotaResult> {
1195        WritebackCache::persist_transactions(self, digests).boxed()
1196    }
1197}
1198
1199impl ObjectCacheRead for WritebackCache {
1200    fn get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1201        self.metrics
1202            .record_cache_request("package", "package_cache");
1203        if let Some(p) = self.packages.get(package_id) {
1204            if cfg!(debug_assertions) {
1205                if let Some(store_package) = self.store.get_object(package_id).unwrap() {
1206                    assert_eq!(
1207                        store_package.digest(),
1208                        p.object().digest(),
1209                        "Package object cache is inconsistent for package {:?}",
1210                        package_id
1211                    );
1212                }
1213            }
1214            self.metrics.record_cache_hit("package", "package_cache");
1215            return Ok(Some(p));
1216        } else {
1217            self.metrics.record_cache_miss("package", "package_cache");
1218        }
1219
1220        // We try the dirty objects cache as well before going to the database. This is
1221        // necessary because the package could be evicted from the package cache
1222        // before it is committed to the database.
1223        if let Some(p) = self.get_object_impl("package", package_id)? {
1224            if p.is_package() {
1225                let p = PackageObject::new(p);
1226                tracing::trace!(
1227                    "caching package: {:?}",
1228                    p.object().compute_object_reference()
1229                );
1230                self.metrics.record_cache_write("package");
1231                self.packages.insert(*package_id, p.clone());
1232                Ok(Some(p))
1233            } else {
1234                Err(IotaError::UserInput {
1235                    error: UserInputError::MoveObjectAsPackage {
1236                        object_id: *package_id,
1237                    },
1238                })
1239            }
1240        } else {
1241            Ok(None)
1242        }
1243    }
1244
1245    fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1246        // This is a no-op because all writes go through the cache, therefore it
1247        // can never be incoherent
1248    }
1249
1250    // get_object and variants.
1251
1252    fn get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
1253        self.get_object_impl("object_latest", id)
1254    }
1255
1256    fn get_object_by_key(
1257        &self,
1258        object_id: &ObjectID,
1259        version: SequenceNumber,
1260    ) -> IotaResult<Option<Object>> {
1261        match self.get_object_by_key_cache_only(object_id, version) {
1262            CacheResult::Hit(object) => Ok(Some(object)),
1263            CacheResult::NegativeHit => Ok(None),
1264            CacheResult::Miss => Ok(self
1265                .record_db_get("object_by_version")
1266                .get_object_by_key(object_id, version)?),
1267        }
1268    }
1269
1270    fn multi_get_objects_by_key(
1271        &self,
1272        object_keys: &[ObjectKey],
1273    ) -> Result<Vec<Option<Object>>, IotaError> {
1274        do_fallback_lookup(
1275            object_keys,
1276            |key| {
1277                Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1278                    CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1279                    CacheResult::NegativeHit => CacheResult::NegativeHit,
1280                    CacheResult::Miss => CacheResult::Miss,
1281                })
1282            },
1283            |remaining| {
1284                self.record_db_multi_get("object_by_version", remaining.len())
1285                    .multi_get_objects_by_key(remaining)
1286            },
1287        )
1288    }
1289
1290    fn object_exists_by_key(
1291        &self,
1292        object_id: &ObjectID,
1293        version: SequenceNumber,
1294    ) -> IotaResult<bool> {
1295        match self.get_object_by_key_cache_only(object_id, version) {
1296            CacheResult::Hit(_) => Ok(true),
1297            CacheResult::NegativeHit => Ok(false),
1298            CacheResult::Miss => self
1299                .record_db_get("object_by_version")
1300                .object_exists_by_key(object_id, version),
1301        }
1302    }
1303
1304    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
1305        do_fallback_lookup(
1306            object_keys,
1307            |key| {
1308                Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1309                    CacheResult::Hit(_) => CacheResult::Hit(true),
1310                    CacheResult::NegativeHit => CacheResult::Hit(false),
1311                    CacheResult::Miss => CacheResult::Miss,
1312                })
1313            },
1314            |remaining| {
1315                self.record_db_multi_get("object_by_version", remaining.len())
1316                    .multi_object_exists_by_key(remaining)
1317            },
1318        )
1319    }
1320
1321    fn get_latest_object_ref_or_tombstone(
1322        &self,
1323        object_id: ObjectID,
1324    ) -> IotaResult<Option<ObjectRef>> {
1325        match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1326            CacheResult::Hit((version, entry)) => Ok(Some(match entry {
1327                ObjectEntry::Object(object) => object.compute_object_reference(),
1328                ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1329                ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1330            })),
1331            CacheResult::NegativeHit => Ok(None),
1332            CacheResult::Miss => self
1333                .record_db_get("latest_objref_or_tombstone")
1334                .get_latest_object_ref_or_tombstone(object_id),
1335        }
1336    }
1337
1338    fn get_latest_object_or_tombstone(
1339        &self,
1340        object_id: ObjectID,
1341    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
1342        match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1343            CacheResult::Hit((version, entry)) => {
1344                let key = ObjectKey(object_id, version);
1345                Ok(Some(match entry {
1346                    ObjectEntry::Object(object) => (key, object.into()),
1347                    ObjectEntry::Deleted => (
1348                        key,
1349                        ObjectOrTombstone::Tombstone((
1350                            object_id,
1351                            version,
1352                            ObjectDigest::OBJECT_DIGEST_DELETED,
1353                        )),
1354                    ),
1355                    ObjectEntry::Wrapped => (
1356                        key,
1357                        ObjectOrTombstone::Tombstone((
1358                            object_id,
1359                            version,
1360                            ObjectDigest::OBJECT_DIGEST_WRAPPED,
1361                        )),
1362                    ),
1363                }))
1364            }
1365            CacheResult::NegativeHit => Ok(None),
1366            CacheResult::Miss => self
1367                .record_db_get("latest_object_or_tombstone")
1368                .get_latest_object_or_tombstone(object_id),
1369        }
1370    }
1371
1372    #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1373    fn find_object_lt_or_eq_version(
1374        &self,
1375        object_id: ObjectID,
1376        version_bound: SequenceNumber,
1377    ) -> IotaResult<Option<Object>> {
1378        macro_rules! check_cache_entry {
1379            ($level: expr, $objects: expr) => {
1380                self.metrics
1381                    .record_cache_request("object_lt_or_eq_version", $level);
1382                if let Some(objects) = $objects {
1383                    if let Some((_, object)) = objects
1384                        .all_versions_lt_or_eq_descending(&version_bound)
1385                        .next()
1386                    {
1387                        if let ObjectEntry::Object(object) = object {
1388                            self.metrics
1389                                .record_cache_hit("object_lt_or_eq_version", $level);
1390                            return Ok(Some(object.clone()));
1391                        } else {
1392                            // if we find a tombstone, the object does not exist
1393                            self.metrics
1394                                .record_cache_negative_hit("object_lt_or_eq_version", $level);
1395                            return Ok(None);
1396                        }
1397                    } else {
1398                        self.metrics
1399                            .record_cache_miss("object_lt_or_eq_version", $level);
1400                    }
1401                }
1402            };
1403        }
1404
1405        // if we have the latest version cached, and it is within the bound, we are done
1406        self.metrics
1407            .record_cache_request("object_lt_or_eq_version", "object_by_id");
1408        if let Some(latest) = self.cached.object_by_id_cache.get(&object_id) {
1409            let latest = latest.lock();
1410            match &*latest {
1411                LatestObjectCacheEntry::Object(latest_version, object) => {
1412                    if *latest_version <= version_bound {
1413                        if let ObjectEntry::Object(object) = object {
1414                            self.metrics
1415                                .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1416                            return Ok(Some(object.clone()));
1417                        } else {
1418                            // object is a tombstone, but is still within the version bound
1419                            self.metrics.record_cache_negative_hit(
1420                                "object_lt_or_eq_version",
1421                                "object_by_id",
1422                            );
1423                            return Ok(None);
1424                        }
1425                    }
1426                    // latest object is not within the version bound. fall
1427                    // through.
1428                }
1429                // No object by this ID exists at all
1430                LatestObjectCacheEntry::NonExistent => {
1431                    self.metrics
1432                        .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1433                    return Ok(None);
1434                }
1435            }
1436        }
1437        self.metrics
1438            .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1439
1440        Self::with_locked_cache_entries(
1441            &self.dirty.objects,
1442            &self.cached.object_cache,
1443            &object_id,
1444            |dirty_entry, cached_entry| {
1445                check_cache_entry!("committed", dirty_entry);
1446                check_cache_entry!("uncommitted", cached_entry);
1447
1448                // Much of the time, the query will be for the very latest object version, so
1449                // try that first. But we have to be careful:
1450                // 1. We must load the tombstone if it is present, because its version may
1451                //    exceed the version_bound, in which case we must do a scan.
1452                // 2. You might think we could just call
1453                //    `self.store.get_latest_object_or_tombstone` here. But we cannot, because
1454                //    there may be a more recent version in the dirty set, which we skipped over
1455                //    in check_cache_entry! because of the version bound. However, if we skipped
1456                //    it above, we will skip it here as well, again due to the version bound.
1457                // 3. Despite that, we really want to warm the cache here. Why? Because if the
1458                //    object is cold (not being written to), then we will very soon be able to
1459                //    start serving reads of it from the object_by_id cache, IF we can warm the
1460                //    cache. If we don't warm the cache here, and no writes to the object occur,
1461                //    then we will always have to go to the db for the object.
1462                //
1463                // Lastly, it is important to understand the rationale for all this: If the
1464                // object is write-hot, we will serve almost all reads to it
1465                // from the dirty set (or possibly the cached set if it is only
1466                // written to once every few checkpoints). If the object is
1467                // write-cold (or non-existent) and read-hot, then we will serve almost all
1468                // reads to it from the object_by_id cache check above.  Most of
1469                // the apparently wasteful code here exists only to ensure
1470                // correctness in all the edge cases.
1471                let latest: Option<(SequenceNumber, ObjectEntry)> =
1472                    if let Some(dirty_set) = dirty_entry {
1473                        dirty_set
1474                            .get_highest()
1475                            .cloned()
1476                            .tap_none(|| panic!("dirty set cannot be empty"))
1477                    } else {
1478                        self.record_db_get("object_lt_or_eq_version_latest")
1479                            .get_latest_object_or_tombstone(object_id)?
1480                            .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1481                                (version, ObjectEntry::from(obj_or_tombstone))
1482                            })
1483                    };
1484
1485                if let Some((obj_version, obj_entry)) = latest {
1486                    // we can always cache the latest object (or tombstone), even if it is not
1487                    // within the version_bound. This is done in order to warm
1488                    // the cache in the case where a sequence of transactions
1489                    // all read the same child object without writing to it.
1490                    self.cache_latest_object_by_id(
1491                        &object_id,
1492                        LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1493                    );
1494
1495                    if obj_version <= version_bound {
1496                        match obj_entry {
1497                            ObjectEntry::Object(object) => Ok(Some(object)),
1498                            ObjectEntry::Deleted | ObjectEntry::Wrapped => Ok(None),
1499                        }
1500                    } else {
1501                        // The latest object exceeded the bound, so now we have to do a scan
1502                        // But we already know there is no dirty entry within the bound,
1503                        // so we go to the db.
1504                        self.record_db_get("object_lt_or_eq_version_scan")
1505                            .find_object_lt_or_eq_version(object_id, version_bound)
1506                    }
1507                } else {
1508                    // no object found in dirty set or db, object does not exist
1509                    // When this is called from a read api (i.e. not the execution path) it is
1510                    // possible that the object has been deleted and pruned. In this case,
1511                    // there would be no entry at all on disk, but we may have a tombstone in the
1512                    // cache
1513                    let highest = cached_entry.and_then(|c| c.get_highest());
1514                    assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1515                    self.cache_object_not_found(&object_id);
1516                    Ok(None)
1517                }
1518            },
1519        )
1520    }
1521
1522    fn get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
1523        get_iota_system_state(self)
1524    }
1525
1526    fn get_bridge_object_unsafe(&self) -> IotaResult<Bridge> {
1527        get_bridge(self)
1528    }
1529
1530    fn get_marker_value(
1531        &self,
1532        object_id: &ObjectID,
1533        version: SequenceNumber,
1534        epoch_id: EpochId,
1535    ) -> IotaResult<Option<MarkerValue>> {
1536        match self.get_marker_value_cache_only(object_id, version, epoch_id) {
1537            CacheResult::Hit(marker) => Ok(Some(marker)),
1538            CacheResult::NegativeHit => Ok(None),
1539            CacheResult::Miss => self
1540                .record_db_get("marker_by_version")
1541                .get_marker_value(object_id, &version, epoch_id),
1542        }
1543    }
1544
1545    fn get_latest_marker(
1546        &self,
1547        object_id: &ObjectID,
1548        epoch_id: EpochId,
1549    ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
1550        match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1551            CacheResult::Hit((v, marker)) => Ok(Some((v, marker))),
1552            CacheResult::NegativeHit => {
1553                panic!("cannot have negative hit when getting latest marker")
1554            }
1555            CacheResult::Miss => self
1556                .record_db_get("marker_latest")
1557                .get_latest_marker(object_id, epoch_id),
1558        }
1559    }
1560
1561    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> IotaLockResult {
1562        match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1563            CacheResult::Hit((_, obj)) => {
1564                let actual_objref = obj.compute_object_reference();
1565                if obj_ref != actual_objref {
1566                    Ok(ObjectLockStatus::LockedAtDifferentVersion {
1567                        locked_ref: actual_objref,
1568                    })
1569                } else {
1570                    // requested object ref is live, check if there is a lock
1571                    Ok(
1572                        match self
1573                            .object_locks
1574                            .get_transaction_lock(&obj_ref, epoch_store)?
1575                        {
1576                            Some(tx_digest) => ObjectLockStatus::LockedToTx {
1577                                locked_by_tx: tx_digest,
1578                            },
1579                            None => ObjectLockStatus::Initialized,
1580                        },
1581                    )
1582                }
1583            }
1584            CacheResult::NegativeHit => {
1585                Err(IotaError::from(UserInputError::ObjectNotFound {
1586                    object_id: obj_ref.0,
1587                    // even though we know the requested version, we leave it as None to indicate
1588                    // that the object does not exist at any version
1589                    version: None,
1590                }))
1591            }
1592            CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1593        }
1594    }
1595
1596    fn _get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
1597        let obj = self.get_object_impl("live_objref", &object_id)?.ok_or(
1598            UserInputError::ObjectNotFound {
1599                object_id,
1600                version: None,
1601            },
1602        )?;
1603        Ok(obj.compute_object_reference())
1604    }
1605
1606    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1607        do_fallback_lookup(
1608            owned_object_refs,
1609            |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1610                CacheResult::Hit((version, obj)) => {
1611                    if obj.compute_object_reference() != *obj_ref {
1612                        Err(UserInputError::ObjectVersionUnavailableForConsumption {
1613                            provided_obj_ref: *obj_ref,
1614                            current_version: version,
1615                        }
1616                        .into())
1617                    } else {
1618                        Ok(CacheResult::Hit(()))
1619                    }
1620                }
1621                CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1622                    object_id: obj_ref.0,
1623                    version: None,
1624                }
1625                .into()),
1626                CacheResult::Miss => Ok(CacheResult::Miss),
1627            },
1628            |remaining| {
1629                self.record_db_multi_get("object_is_live", remaining.len())
1630                    .check_owned_objects_are_live(remaining)?;
1631                Ok(vec![(); remaining.len()])
1632            },
1633        )?;
1634        Ok(())
1635    }
1636
1637    fn get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
1638        self.store.perpetual_tables.get_highest_pruned_checkpoint()
1639    }
1640}
1641
1642impl TransactionCacheRead for WritebackCache {
1643    fn multi_get_transaction_blocks(
1644        &self,
1645        digests: &[TransactionDigest],
1646    ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
1647        do_fallback_lookup(
1648            digests,
1649            |digest| {
1650                self.metrics
1651                    .record_cache_request("transaction_block", "uncommitted");
1652                if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1653                    self.metrics
1654                        .record_cache_hit("transaction_block", "uncommitted");
1655                    return Ok(CacheResult::Hit(Some(tx.transaction.clone())));
1656                }
1657                self.metrics
1658                    .record_cache_miss("transaction_block", "uncommitted");
1659
1660                self.metrics
1661                    .record_cache_request("transaction_block", "committed");
1662                if let Some(tx) = self.cached.transactions.get(digest) {
1663                    self.metrics
1664                        .record_cache_hit("transaction_block", "committed");
1665                    return Ok(CacheResult::Hit(Some(tx.clone())));
1666                }
1667                self.metrics
1668                    .record_cache_miss("transaction_block", "committed");
1669
1670                Ok(CacheResult::Miss)
1671            },
1672            |remaining| {
1673                self.record_db_multi_get("transaction_block", remaining.len())
1674                    .multi_get_transaction_blocks(remaining)
1675                    .map(|v| v.into_iter().map(|o| o.map(Arc::new)).collect())
1676            },
1677        )
1678    }
1679
1680    fn multi_get_executed_effects_digests(
1681        &self,
1682        digests: &[TransactionDigest],
1683    ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
1684        do_fallback_lookup(
1685            digests,
1686            |digest| {
1687                self.metrics
1688                    .record_cache_request("executed_effects_digests", "uncommitted");
1689                if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
1690                    self.metrics
1691                        .record_cache_hit("executed_effects_digests", "uncommitted");
1692                    return Ok(CacheResult::Hit(Some(*digest)));
1693                }
1694                self.metrics
1695                    .record_cache_miss("executed_effects_digests", "uncommitted");
1696
1697                self.metrics
1698                    .record_cache_request("executed_effects_digests", "committed");
1699                if let Some(digest) = self.cached.executed_effects_digests.get(digest) {
1700                    self.metrics
1701                        .record_cache_hit("executed_effects_digests", "committed");
1702                    return Ok(CacheResult::Hit(Some(digest)));
1703                }
1704                self.metrics
1705                    .record_cache_miss("executed_effects_digests", "committed");
1706
1707                Ok(CacheResult::Miss)
1708            },
1709            |remaining| {
1710                self.record_db_multi_get("executed_effects_digests", remaining.len())
1711                    .multi_get_executed_effects_digests(remaining)
1712            },
1713        )
1714    }
1715
1716    fn multi_get_effects(
1717        &self,
1718        digests: &[TransactionEffectsDigest],
1719    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
1720        do_fallback_lookup(
1721            digests,
1722            |digest| {
1723                self.metrics
1724                    .record_cache_request("transaction_effects", "uncommitted");
1725                if let Some(effects) = self.dirty.transaction_effects.get(digest) {
1726                    self.metrics
1727                        .record_cache_hit("transaction_effects", "uncommitted");
1728                    return Ok(CacheResult::Hit(Some(effects.clone())));
1729                }
1730                self.metrics
1731                    .record_cache_miss("transaction_effects", "uncommitted");
1732
1733                self.metrics
1734                    .record_cache_request("transaction_effects", "committed");
1735                if let Some(effects) = self.cached.transaction_effects.get(digest) {
1736                    self.metrics
1737                        .record_cache_hit("transaction_effects", "committed");
1738                    return Ok(CacheResult::Hit(Some((*effects).clone())));
1739                }
1740                self.metrics
1741                    .record_cache_miss("transaction_effects", "committed");
1742
1743                Ok(CacheResult::Miss)
1744            },
1745            |remaining| {
1746                self.record_db_multi_get("transaction_effects", remaining.len())
1747                    .multi_get_effects(remaining.iter())
1748            },
1749        )
1750    }
1751
1752    fn notify_read_executed_effects_digests<'a>(
1753        &'a self,
1754        digests: &'a [TransactionDigest],
1755    ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
1756        self.executed_effects_digests_notify_read
1757            .read(digests, |digests| {
1758                self.multi_get_executed_effects_digests(digests)
1759            })
1760            .boxed()
1761    }
1762
1763    fn multi_get_events(
1764        &self,
1765        event_digests: &[TransactionEventsDigest],
1766    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
1767        fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
1768            if events.data.is_empty() {
1769                None
1770            } else {
1771                Some(events)
1772            }
1773        }
1774
1775        do_fallback_lookup(
1776            event_digests,
1777            |digest| {
1778                self.metrics
1779                    .record_cache_request("transaction_events", "uncommitted");
1780                if let Some(events) = self
1781                    .dirty
1782                    .transaction_events
1783                    .get(digest)
1784                    .map(|e| e.1.clone())
1785                {
1786                    self.metrics
1787                        .record_cache_hit("transaction_events", "uncommitted");
1788
1789                    return Ok(CacheResult::Hit(map_events(events)));
1790                }
1791                self.metrics
1792                    .record_cache_miss("transaction_events", "uncommitted");
1793
1794                self.metrics
1795                    .record_cache_request("transaction_events", "committed");
1796                if let Some(events) = self
1797                    .cached
1798                    .transaction_events
1799                    .get(digest)
1800                    .map(|e| (*e).clone())
1801                {
1802                    self.metrics
1803                        .record_cache_hit("transaction_events", "committed");
1804                    return Ok(CacheResult::Hit(map_events(events)));
1805                }
1806
1807                self.metrics
1808                    .record_cache_miss("transaction_events", "committed");
1809
1810                Ok(CacheResult::Miss)
1811            },
1812            |digests| self.store.multi_get_events(digests),
1813        )
1814    }
1815}
1816
1817impl ExecutionCacheWrite for WritebackCache {
1818    fn acquire_transaction_locks<'a>(
1819        &'a self,
1820        epoch_store: &'a AuthorityPerEpochStore,
1821        owned_input_objects: &'a [ObjectRef],
1822        transaction: VerifiedSignedTransaction,
1823    ) -> BoxFuture<'a, IotaResult> {
1824        self.object_locks
1825            .acquire_transaction_locks(self, epoch_store, owned_input_objects, transaction)
1826            .boxed()
1827    }
1828
1829    fn write_transaction_outputs(
1830        &self,
1831        epoch_id: EpochId,
1832        tx_outputs: Arc<TransactionOutputs>,
1833    ) -> BoxFuture<'_, IotaResult> {
1834        WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs).boxed()
1835    }
1836}
1837
1838/// do_fallback_lookup is a helper function for multi-get operations.
1839/// It takes a list of keys and first attempts to look up each key in the cache.
1840/// The cache can return a hit, a miss, or a negative hit (if the object is
1841/// known to not exist). Any keys that result in a miss are then looked up in
1842/// the store.
1843///
1844/// The "get from cache" and "get from store" behavior are implemented by the
1845/// caller and provided via the get_cached_key and multiget_fallback functions.
1846fn do_fallback_lookup<K: Copy, V: Default + Clone>(
1847    keys: &[K],
1848    get_cached_key: impl Fn(&K) -> IotaResult<CacheResult<V>>,
1849    multiget_fallback: impl Fn(&[K]) -> IotaResult<Vec<V>>,
1850) -> IotaResult<Vec<V>> {
1851    let mut results = vec![V::default(); keys.len()];
1852    let mut fallback_keys = Vec::with_capacity(keys.len());
1853    let mut fallback_indices = Vec::with_capacity(keys.len());
1854
1855    for (i, key) in keys.iter().enumerate() {
1856        match get_cached_key(key)? {
1857            CacheResult::Miss => {
1858                fallback_keys.push(*key);
1859                fallback_indices.push(i);
1860            }
1861            CacheResult::NegativeHit => (),
1862            CacheResult::Hit(value) => {
1863                results[i] = value;
1864            }
1865        }
1866    }
1867
1868    let fallback_results = multiget_fallback(&fallback_keys)?;
1869    assert_eq!(fallback_results.len(), fallback_indices.len());
1870    assert_eq!(fallback_results.len(), fallback_keys.len());
1871
1872    for (i, result) in fallback_indices
1873        .into_iter()
1874        .zip(fallback_results.into_iter())
1875    {
1876        results[i] = result;
1877    }
1878    Ok(results)
1879}
1880
1881implement_passthrough_traits!(WritebackCache);
1882
1883impl AccumulatorStore for WritebackCache {
1884    fn get_root_state_accumulator_for_epoch(
1885        &self,
1886        epoch: EpochId,
1887    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
1888        self.store.get_root_state_accumulator_for_epoch(epoch)
1889    }
1890
1891    fn get_root_state_accumulator_for_highest_epoch(
1892        &self,
1893    ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
1894        self.store.get_root_state_accumulator_for_highest_epoch()
1895    }
1896
1897    fn insert_state_accumulator_for_epoch(
1898        &self,
1899        epoch: EpochId,
1900        checkpoint_seq_num: &CheckpointSequenceNumber,
1901        acc: &Accumulator,
1902    ) -> IotaResult {
1903        self.store
1904            .insert_state_accumulator_for_epoch(epoch, checkpoint_seq_num, acc)
1905    }
1906
1907    fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1908        // The only time it is safe to iterate the live object set is at an epoch
1909        // boundary, at which point the db is consistent and the dirty cache is
1910        // empty. So this does read the cache
1911        assert!(
1912            self.dirty.is_empty(),
1913            "cannot iterate live object set with dirty data"
1914        );
1915        self.store.iter_live_object_set()
1916    }
1917
1918    // A version of iter_live_object_set that reads the cache. Only use for testing.
1919    // If used on a live validator, can cause the server to block for as long as
1920    // it takes to iterate the entire live object set.
1921    fn iter_cached_live_object_set_for_testing(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1922        // hold iter until we are finished to prevent any concurrent inserts/deletes
1923        let iter = self.dirty.objects.iter();
1924        let mut dirty_objects = BTreeMap::new();
1925
1926        // add everything from the store
1927        for obj in self.store.iter_live_object_set() {
1928            dirty_objects.insert(obj.object_id(), obj);
1929        }
1930
1931        // add everything from the cache, but also remove deletions
1932        for entry in iter {
1933            let id = *entry.key();
1934            let value = entry.value();
1935            match value.get_highest().unwrap() {
1936                (_, ObjectEntry::Object(object)) => {
1937                    dirty_objects.insert(id, LiveObject::Normal(object.clone()));
1938                }
1939                (_version, ObjectEntry::Wrapped) => {
1940                    dirty_objects.remove(&id);
1941                }
1942                (_, ObjectEntry::Deleted) => {
1943                    dirty_objects.remove(&id);
1944                }
1945            }
1946        }
1947
1948        Box::new(dirty_objects.into_values())
1949    }
1950}