iota_core/execution_cache/
writeback_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! MemoryCache is a cache for the transaction execution which delays writes to
6//! the database until transaction results are certified (i.e. they appear in a
7//! certified checkpoint, or an effects cert is observed by a fullnode). The
8//! cache also stores committed data in memory in order to serve future reads
9//! without hitting the database.
10//!
11//! For storing uncommitted transaction outputs, we cannot evict the data at all
12//! until it is written to disk. Committed data not only can be evicted, but it
13//! is also unbounded (imagine a stream of transactions that keep splitting a
14//! coin into smaller coins).
15//!
16//! We also want to be able to support negative cache hits (i.e. the case where
17//! we can determine an object does not exist without hitting the database).
18//!
19//! To achieve both of these goals, we split the cache data into two pieces, a
20//! dirty set and a cached set. The dirty set has no automatic evictions, data
21//! is only removed after being committed. The cached set is in a bounded-sized
22//! cache with automatic evictions. In order to support negative cache hits, we
23//! treat the two halves of the cache as FIFO queue. Newly written (dirty)
24//! versions are inserted to one end of the dirty queue. As versions are
25//! committed to disk, they are removed from the other end of the dirty queue
26//! and inserted into the cache queue. The cache queue is truncated if it
27//! exceeds its maximum size, by removing all but the N newest versions.
28//!
29//! This gives us the property that the sequence of versions in the dirty and
30//! cached queues are the most recent versions of the object, i.e. there can be
31//! no "gaps". This allows for the following:
32//!
33//!   - Negative cache hits: If the queried version is not in memory, but is
34//!     higher than the smallest version in the cached queue, it does not exist
35//!     in the db either.
36//!   - Bounded reads: When reading the most recent version that is <= some
37//!     version bound, we can correctly satisfy this query from the cache, or
38//!     determine that we must go to the db.
39//!
40//! Note that at any time, either or both the dirty or the cached queue may be
41//! non-existent. There may be no dirty versions of the objects, in which case
42//! there will be no dirty queue. And, the cached queue may be evicted from the
43//! cache, in which case there will be no cached queue. Because only the cached
44//! queue can be evicted (the dirty queue can only become empty by moving
45//! versions from it to the cached queue), the "highest versions" property still
46//! holds in all cases.
47//!
48//! The above design is used for both objects and markers.
49
50use std::{
51    collections::{BTreeMap, BTreeSet},
52    hash::Hash,
53    sync::{Arc, atomic::AtomicU64},
54};
55
56use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
57use futures::{FutureExt, future::BoxFuture};
58use iota_common::sync::notify_read::NotifyRead;
59use iota_config::WritebackCacheConfig;
60use iota_macros::fail_point;
61use iota_types::{
62    accumulator::Accumulator,
63    base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
64    digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
65    effects::{TransactionEffects, TransactionEvents},
66    error::{IotaError, IotaResult, UserInputError},
67    executable_transaction::VerifiedExecutableTransaction,
68    iota_system_state::{IotaSystemState, get_iota_system_state},
69    message_envelope::Message,
70    messages_checkpoint::CheckpointSequenceNumber,
71    object::Object,
72    storage::{MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject},
73    transaction::{VerifiedSignedTransaction, VerifiedTransaction},
74};
75use moka::sync::Cache as MokaCache;
76use parking_lot::Mutex;
77use prometheus::Registry;
78use tap::TapOptional;
79use tracing::{debug, info, instrument, trace, warn};
80
81use super::{
82    CheckpointCache, ExecutionCacheAPI, ExecutionCacheCommit, ExecutionCacheMetrics,
83    ExecutionCacheReconfigAPI, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI,
84    TransactionCacheRead,
85    cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache, Ticket},
86    implement_passthrough_traits,
87    object_locks::ObjectLocks,
88};
89use crate::{
90    authority::{
91        AuthorityStore,
92        authority_per_epoch_store::AuthorityPerEpochStore,
93        authority_store::{ExecutionLockWriteGuard, IotaLockResult, ObjectLockStatus},
94        authority_store_tables::LiveObject,
95        backpressure::BackpressureManager,
96        epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
97    },
98    fallback_fetch::try_do_fallback_lookup,
99    state_accumulator::AccumulatorStore,
100    transaction_outputs::TransactionOutputs,
101};
102
103#[cfg(test)]
104#[path = "unit_tests/writeback_cache_tests.rs"]
105pub mod writeback_cache_tests;
106
107#[derive(Clone, PartialEq, Eq)]
108enum ObjectEntry {
109    Object(Object),
110    Deleted,
111    Wrapped,
112}
113
114impl ObjectEntry {
115    #[cfg(test)]
116    fn unwrap_object(&self) -> &Object {
117        match self {
118            ObjectEntry::Object(o) => o,
119            _ => panic!("unwrap_object called on non-Object"),
120        }
121    }
122
123    fn is_tombstone(&self) -> bool {
124        match self {
125            ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
126            ObjectEntry::Object(_) => false,
127        }
128    }
129}
130
131impl std::fmt::Debug for ObjectEntry {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        match self {
134            ObjectEntry::Object(o) => {
135                write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
136            }
137            ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
138            ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
139        }
140    }
141}
142
143impl From<Object> for ObjectEntry {
144    fn from(object: Object) -> Self {
145        ObjectEntry::Object(object)
146    }
147}
148
149impl From<ObjectOrTombstone> for ObjectEntry {
150    fn from(object: ObjectOrTombstone) -> Self {
151        match object {
152            ObjectOrTombstone::Object(o) => o.into(),
153            ObjectOrTombstone::Tombstone(obj_ref) => {
154                if obj_ref.2.is_deleted() {
155                    ObjectEntry::Deleted
156                } else if obj_ref.2.is_wrapped() {
157                    ObjectEntry::Wrapped
158                } else {
159                    panic!("tombstone digest must either be deleted or wrapped");
160                }
161            }
162        }
163    }
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
167enum LatestObjectCacheEntry {
168    Object(SequenceNumber, ObjectEntry),
169    NonExistent,
170}
171
172impl LatestObjectCacheEntry {
173    #[cfg(test)]
174    fn version(&self) -> Option<SequenceNumber> {
175        match self {
176            LatestObjectCacheEntry::Object(version, _) => Some(*version),
177            LatestObjectCacheEntry::NonExistent => None,
178        }
179    }
180}
181
182impl IsNewer for LatestObjectCacheEntry {
183    fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
184        match (self, other) {
185            (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
186                v1 > v2
187            }
188            (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
189            _ => false,
190        }
191    }
192}
193
194type MarkerKey = (EpochId, ObjectID);
195
196/// UncommittedData stores execution outputs that are not yet written to the db.
197/// Entries in this struct can only be purged after they are committed.
198struct UncommittedData {
199    /// The object dirty set. All writes go into this table first. After we
200    /// flush the data to the db, the data is removed from this table and
201    /// inserted into the object_cache.
202    ///
203    /// This table may contain both live and dead objects, since we flush both
204    /// live and dead objects to the db in order to support past object
205    /// queries on fullnodes.
206    ///
207    /// Further, we only remove objects in FIFO order, which ensures that the
208    /// cached sequence of objects has no gaps. In other words, if we have
209    /// versions 4, 8, 13 of an object, we can deduce that version 9 does
210    /// not exist. This also makes child object reads efficient.
211    /// `object_cache` cannot contain a more recent version of an object than
212    /// `objects`, and neither can have any gaps. Therefore if there is any
213    /// object <= the version bound for a child read in objects, it is the
214    /// correct object to return.
215    objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
216
217    // Markers for received objects and deleted shared objects. This contains all of the dirty
218    // marker state, which is committed to the db at the same time as other transaction data.
219    // After markers are committed to the db we remove them from this table and insert them into
220    // marker_cache.
221    markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
222
223    transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
224
225    // Because TransactionEvents are not unique to the transaction that created them, we must
226    // reference count them in order to know when we can remove them from the cache. For now
227    // we track all referrers explicitly, but we can use a ref count when we are confident in
228    // the correctness of the code.
229    transaction_events:
230        DashMap<TransactionEventsDigest, (BTreeSet<TransactionDigest>, TransactionEvents)>,
231
232    executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
233
234    // Transaction outputs that have not yet been written to the DB. Items are removed from this
235    // table as they are flushed to the db.
236    pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
237
238    total_transaction_inserts: AtomicU64,
239    total_transaction_commits: AtomicU64,
240}
241
242impl UncommittedData {
243    fn new() -> Self {
244        Self {
245            objects: DashMap::new(),
246            markers: DashMap::new(),
247            transaction_effects: DashMap::new(),
248            executed_effects_digests: DashMap::new(),
249            pending_transaction_writes: DashMap::new(),
250            transaction_events: DashMap::new(),
251            total_transaction_inserts: AtomicU64::new(0),
252            total_transaction_commits: AtomicU64::new(0),
253        }
254    }
255
256    fn clear(&self) {
257        self.objects.clear();
258        self.markers.clear();
259        self.transaction_effects.clear();
260        self.executed_effects_digests.clear();
261        self.pending_transaction_writes.clear();
262        self.transaction_events.clear();
263        self.total_transaction_inserts
264            .store(0, std::sync::atomic::Ordering::Relaxed);
265        self.total_transaction_commits
266            .store(0, std::sync::atomic::Ordering::Relaxed);
267    }
268
269    fn is_empty(&self) -> bool {
270        let empty = self.pending_transaction_writes.is_empty();
271        if empty && cfg!(debug_assertions) {
272            assert!(
273                self.objects.is_empty()
274                    && self.markers.is_empty()
275                    && self.transaction_effects.is_empty()
276                    && self.executed_effects_digests.is_empty()
277                    && self.transaction_events.is_empty()
278                    && self
279                        .total_transaction_inserts
280                        .load(std::sync::atomic::Ordering::Relaxed)
281                        == self
282                            .total_transaction_commits
283                            .load(std::sync::atomic::Ordering::Relaxed),
284            );
285        }
286        empty
287    }
288}
289
290// Point items (anything without a version number) can be negatively cached as
291// None
292type PointCacheItem<T> = Option<T>;
293
294// PointCacheItem can only be used for insert-only collections, so a Some entry
295// is always newer than a None entry.
296impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
297    fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
298        match (self, other) {
299            (Some(_), None) => true,
300
301            (Some(a), Some(b)) => {
302                // conflicting inserts should never happen
303                debug_assert_eq!(a, b);
304                false
305            }
306
307            _ => false,
308        }
309    }
310}
311
312/// CachedData stores data that has been committed to the db, but is likely to
313/// be read soon.
314struct CachedCommittedData {
315    // See module level comment for an explanation of caching strategy.
316    object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
317
318    // We separately cache the latest version of each object. Although this seems
319    // redundant, it is the only way to support populating the cache after a read.
320    // We cannot simply insert objects that we read off the disk into `object_cache`,
321    // since that may violate the no-missing-versions property.
322    // `object_by_id_cache` is also written to on writes so that it is always coherent.
323    object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
324
325    // See module level comment for an explanation of caching strategy.
326    marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
327
328    transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
329
330    transaction_effects:
331        MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
332
333    transaction_events:
334        MonotonicCache<TransactionEventsDigest, PointCacheItem<Arc<TransactionEvents>>>,
335
336    executed_effects_digests:
337        MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
338
339    // Objects that were read at transaction signing time - allows us to access them again at
340    // execution time with a single lock / hash lookup
341    _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
342}
343
344impl CachedCommittedData {
345    fn new(config: &WritebackCacheConfig) -> Self {
346        let object_cache = MokaCache::builder()
347            .max_capacity(config.object_cache_size())
348            .build();
349        let marker_cache = MokaCache::builder()
350            .max_capacity(config.marker_cache_size())
351            .build();
352
353        let transactions = MonotonicCache::new(config.transaction_cache_size());
354        let transaction_effects = MonotonicCache::new(config.effect_cache_size());
355        let transaction_events = MonotonicCache::new(config.events_cache_size());
356        let executed_effects_digests = MonotonicCache::new(config.executed_effect_cache_size());
357
358        let transaction_objects = MokaCache::builder()
359            .max_capacity(config.transaction_objects_cache_size())
360            .build();
361
362        Self {
363            object_cache,
364            object_by_id_cache: MonotonicCache::new(config.object_by_id_cache_size()),
365            marker_cache,
366            transactions,
367            transaction_effects,
368            transaction_events,
369            executed_effects_digests,
370            _transaction_objects: transaction_objects,
371        }
372    }
373
374    fn clear_and_assert_empty(&self) {
375        self.object_cache.invalidate_all();
376        self.object_by_id_cache.invalidate_all();
377        self.marker_cache.invalidate_all();
378        self.transactions.invalidate_all();
379        self.transaction_effects.invalidate_all();
380        self.transaction_events.invalidate_all();
381        self.executed_effects_digests.invalidate_all();
382        self._transaction_objects.invalidate_all();
383
384        assert_empty(&self.object_cache);
385        assert!(&self.object_by_id_cache.is_empty());
386        assert_empty(&self.marker_cache);
387        assert!(self.transactions.is_empty());
388        assert!(self.transaction_effects.is_empty());
389        assert!(self.transaction_events.is_empty());
390        assert!(self.executed_effects_digests.is_empty());
391        assert_empty(&self._transaction_objects);
392    }
393}
394
395fn assert_empty<K, V>(cache: &MokaCache<K, V>)
396where
397    K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
398    V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
399{
400    if cache.iter().next().is_some() {
401        panic!("cache should be empty");
402    }
403}
404
405pub struct WritebackCache {
406    dirty: UncommittedData,
407    cached: CachedCommittedData,
408
409    // The packages cache is treated separately from objects, because they are immutable and can be
410    // used by any number of transactions. Additionally, many operations require loading large
411    // numbers of packages (due to dependencies), so we want to try to keep all packages in memory.
412    //
413    // Also, this cache can contain packages that are dirty or committed, so it does not live in
414    // UncachedData or CachedCommittedData. The cache is populated in two ways:
415    // - when packages are written (in which case they will also be present in the dirty set)
416    // - after a cache miss. Because package IDs are unique (only one version exists for each ID)
417    //   we do not need to worry about the contiguous version property.
418    // - note that we removed any unfinalized packages from the cache during revert_state_update().
419    packages: MokaCache<ObjectID, PackageObject>,
420
421    object_locks: ObjectLocks,
422
423    executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
424    store: Arc<AuthorityStore>,
425    backpressure_threshold: u64,
426    backpressure_manager: Arc<BackpressureManager>,
427    metrics: Arc<ExecutionCacheMetrics>,
428}
429
430macro_rules! check_cache_entry_by_version {
431    ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
432        $self.metrics.record_cache_request($table, $level);
433        if let Some(cache) = $cache {
434            if let Some(entry) = cache.get(&$version) {
435                $self.metrics.record_cache_hit($table, $level);
436                return CacheResult::Hit(entry.clone());
437            }
438
439            if let Some(least_version) = cache.get_least() {
440                if least_version.0 < $version {
441                    // If the version is greater than the least version in the cache, then we know
442                    // that the object does not exist anywhere
443                    $self.metrics.record_cache_negative_hit($table, $level);
444                    return CacheResult::NegativeHit;
445                }
446            }
447        }
448        $self.metrics.record_cache_miss($table, $level);
449    };
450}
451
452macro_rules! check_cache_entry_by_latest {
453    ($self: ident, $table: expr, $level: expr, $cache: expr) => {
454        $self.metrics.record_cache_request($table, $level);
455        if let Some(cache) = $cache {
456            if let Some((version, entry)) = cache.get_highest() {
457                $self.metrics.record_cache_hit($table, $level);
458                return CacheResult::Hit((*version, entry.clone()));
459            } else {
460                panic!("empty CachedVersionMap should have been removed");
461            }
462        }
463        $self.metrics.record_cache_miss($table, $level);
464    };
465}
466
467impl WritebackCache {
468    pub fn new(
469        config: &WritebackCacheConfig,
470        store: Arc<AuthorityStore>,
471        metrics: Arc<ExecutionCacheMetrics>,
472        backpressure_manager: Arc<BackpressureManager>,
473    ) -> Self {
474        let packages = MokaCache::builder()
475            .max_capacity(config.package_cache_size())
476            .build();
477        Self {
478            dirty: UncommittedData::new(),
479            cached: CachedCommittedData::new(config),
480            packages,
481            object_locks: ObjectLocks::new(),
482            executed_effects_digests_notify_read: NotifyRead::new(),
483            store,
484            backpressure_manager,
485            backpressure_threshold: config.backpressure_threshold(),
486            metrics,
487        }
488    }
489
490    pub fn new_for_tests(store: Arc<AuthorityStore>, registry: &Registry) -> Self {
491        Self::new(
492            &Default::default(),
493            store,
494            ExecutionCacheMetrics::new(registry).into(),
495            BackpressureManager::new_for_tests(),
496        )
497    }
498
499    #[cfg(test)]
500    pub fn reset_for_test(&mut self) {
501        let mut new = Self::new(
502            &Default::default(),
503            self.store.clone(),
504            self.metrics.clone(),
505            self.backpressure_manager.clone(),
506        );
507        std::mem::swap(self, &mut new);
508    }
509
510    fn write_object_entry(
511        &self,
512        object_id: &ObjectID,
513        version: SequenceNumber,
514        object: ObjectEntry,
515    ) {
516        trace!(?object_id, ?version, ?object, "inserting object entry");
517        self.metrics.record_cache_write("object");
518
519        // We must hold the lock for the object entry while inserting to the
520        // object_by_id_cache. Otherwise, a surprising bug can occur:
521        //
522        // 1. A thread executing TX1 can write object (O,1) to the dirty set and then
523        //    pause.
524        // 2. TX2, which reads (O,1) can begin executing, because TransactionManager
525        //    immediately schedules transactions if their inputs are available. It does
526        //    not matter that TX1 hasn't finished executing yet.
527        // 3. TX2 can write (O,2) to both the dirty set and the object_by_id_cache.
528        // 4. The thread executing TX1 can resume and write (O,1) to the
529        //    object_by_id_cache.
530        //
531        // Now, any subsequent attempt to get the latest version of O will return (O,1)
532        // instead of (O,2).
533        //
534        // This seems very unlikely, but it may be possible under the following
535        // circumstances:
536        // - While a thread is unlikely to pause for so long, moka cache uses optimistic
537        //   lock-free algorithms that have retry loops. Possibly, under high
538        //   contention, this code might spin for a surprisingly long time.
539        // - Additionally, many concurrent re-executions of the same tx could happen due
540        //   to the tx finalizer, plus checkpoint executor, consensus, and RPCs from
541        //   fullnodes.
542        let mut entry = self.dirty.objects.entry(*object_id).or_default();
543
544        self.cached
545            .object_by_id_cache
546            .insert(
547                object_id,
548                LatestObjectCacheEntry::Object(version, object.clone()),
549                Ticket::Write,
550            )
551            // While Ticket::Write cannot expire, this insert may still fail.
552            // See the comment in `MonotonicCache::insert`.
553            .ok();
554
555        entry.insert(version, object);
556    }
557
558    fn write_marker_value(
559        &self,
560        epoch_id: EpochId,
561        object_key: &ObjectKey,
562        marker_value: MarkerValue,
563    ) {
564        tracing::trace!(
565            "inserting marker value {:?}: {:?}",
566            object_key,
567            marker_value
568        );
569        fail_point!("write_marker_entry");
570        self.metrics.record_cache_write("marker");
571        self.dirty
572            .markers
573            .entry((epoch_id, object_key.0))
574            .or_default()
575            .value_mut()
576            .insert(object_key.1, marker_value);
577    }
578
579    // lock both the dirty and committed sides of the cache, and then pass the
580    // entries to the callback. Written with the `with` pattern because any
581    // other way of doing this creates lifetime hell.
582    fn with_locked_cache_entries<K, V, R>(
583        dirty_map: &DashMap<K, CachedVersionMap<V>>,
584        cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
585        key: &K,
586        cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
587    ) -> R
588    where
589        K: Copy + Eq + Hash + Send + Sync + 'static,
590        V: Send + Sync + 'static,
591    {
592        let dirty_entry = dirty_map.entry(*key);
593        let dirty_entry = match &dirty_entry {
594            DashMapEntry::Occupied(occupied) => Some(occupied.get()),
595            DashMapEntry::Vacant(_) => None,
596        };
597
598        let cached_entry = cached_map.get(key);
599        let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
600        let cached_entry = cached_lock.as_deref();
601
602        cb(dirty_entry, cached_entry)
603    }
604
605    // Attempt to get an object from the cache. The DB is not consulted.
606    // Can return Hit, Miss, or NegativeHit (if the object is known to not exist).
607    fn get_object_entry_by_key_cache_only(
608        &self,
609        object_id: &ObjectID,
610        version: SequenceNumber,
611    ) -> CacheResult<ObjectEntry> {
612        Self::with_locked_cache_entries(
613            &self.dirty.objects,
614            &self.cached.object_cache,
615            object_id,
616            |dirty_entry, cached_entry| {
617                check_cache_entry_by_version!(
618                    self,
619                    "object_by_version",
620                    "uncommitted",
621                    dirty_entry,
622                    version
623                );
624                check_cache_entry_by_version!(
625                    self,
626                    "object_by_version",
627                    "committed",
628                    cached_entry,
629                    version
630                );
631                CacheResult::Miss
632            },
633        )
634    }
635
636    fn get_object_by_key_cache_only(
637        &self,
638        object_id: &ObjectID,
639        version: SequenceNumber,
640    ) -> CacheResult<Object> {
641        match self.get_object_entry_by_key_cache_only(object_id, version) {
642            CacheResult::Hit(entry) => match entry {
643                ObjectEntry::Object(object) => CacheResult::Hit(object),
644                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
645            },
646            CacheResult::Miss => CacheResult::Miss,
647            CacheResult::NegativeHit => CacheResult::NegativeHit,
648        }
649    }
650
651    fn get_object_entry_by_id_cache_only(
652        &self,
653        request_type: &'static str,
654        object_id: &ObjectID,
655    ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
656        self.metrics
657            .record_cache_request(request_type, "object_by_id");
658        let entry = self.cached.object_by_id_cache.get(object_id);
659
660        if cfg!(debug_assertions) {
661            if let Some(entry) = &entry {
662                // check that cache is coherent
663                let highest: Option<ObjectEntry> = self
664                    .dirty
665                    .objects
666                    .get(object_id)
667                    .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
668                    .or_else(|| {
669                        let obj: Option<ObjectEntry> = self
670                            .store
671                            .get_latest_object_or_tombstone(*object_id)
672                            .unwrap()
673                            .map(|(_, o)| o.into());
674                        obj
675                    });
676
677                let cache_entry = match &*entry.lock() {
678                    LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
679                    LatestObjectCacheEntry::NonExistent => None,
680                };
681
682                // If the cache entry is a tombstone, the db entry may be missing if it was
683                // pruned.
684                let tombstone_possibly_pruned = highest.is_none()
685                    && cache_entry
686                        .as_ref()
687                        .map(|e| e.is_tombstone())
688                        .unwrap_or(false);
689
690                if highest != cache_entry && !tombstone_possibly_pruned {
691                    tracing::error!(
692                        ?highest,
693                        ?cache_entry,
694                        ?tombstone_possibly_pruned,
695                        "object_by_id cache is incoherent for {:?}",
696                        object_id
697                    );
698                    panic!("object_by_id cache is incoherent for {object_id:?}");
699                }
700            }
701        }
702
703        if let Some(entry) = entry {
704            let entry = entry.lock();
705            match &*entry {
706                LatestObjectCacheEntry::Object(latest_version, latest_object) => {
707                    self.metrics.record_cache_hit(request_type, "object_by_id");
708                    return CacheResult::Hit((*latest_version, latest_object.clone()));
709                }
710                LatestObjectCacheEntry::NonExistent => {
711                    self.metrics
712                        .record_cache_negative_hit(request_type, "object_by_id");
713                    return CacheResult::NegativeHit;
714                }
715            }
716        } else {
717            self.metrics.record_cache_miss(request_type, "object_by_id");
718        }
719
720        Self::with_locked_cache_entries(
721            &self.dirty.objects,
722            &self.cached.object_cache,
723            object_id,
724            |dirty_entry, cached_entry| {
725                check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
726                check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
727                CacheResult::Miss
728            },
729        )
730    }
731
732    fn get_object_by_id_cache_only(
733        &self,
734        request_type: &'static str,
735        object_id: &ObjectID,
736    ) -> CacheResult<(SequenceNumber, Object)> {
737        match self.get_object_entry_by_id_cache_only(request_type, object_id) {
738            CacheResult::Hit((version, entry)) => match entry {
739                ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
740                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
741            },
742            CacheResult::NegativeHit => CacheResult::NegativeHit,
743            CacheResult::Miss => CacheResult::Miss,
744        }
745    }
746
747    fn get_marker_value_cache_only(
748        &self,
749        object_id: &ObjectID,
750        version: SequenceNumber,
751        epoch_id: EpochId,
752    ) -> CacheResult<MarkerValue> {
753        Self::with_locked_cache_entries(
754            &self.dirty.markers,
755            &self.cached.marker_cache,
756            &(epoch_id, *object_id),
757            |dirty_entry, cached_entry| {
758                check_cache_entry_by_version!(
759                    self,
760                    "marker_by_version",
761                    "uncommitted",
762                    dirty_entry,
763                    version
764                );
765                check_cache_entry_by_version!(
766                    self,
767                    "marker_by_version",
768                    "committed",
769                    cached_entry,
770                    version
771                );
772                CacheResult::Miss
773            },
774        )
775    }
776
777    fn get_latest_marker_value_cache_only(
778        &self,
779        object_id: &ObjectID,
780        epoch_id: EpochId,
781    ) -> CacheResult<(SequenceNumber, MarkerValue)> {
782        Self::with_locked_cache_entries(
783            &self.dirty.markers,
784            &self.cached.marker_cache,
785            &(epoch_id, *object_id),
786            |dirty_entry, cached_entry| {
787                check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
788                check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
789                CacheResult::Miss
790            },
791        )
792    }
793
794    fn get_object_impl(
795        &self,
796        request_type: &'static str,
797        id: &ObjectID,
798    ) -> IotaResult<Option<Object>> {
799        let ticket = self.cached.object_by_id_cache.get_ticket_for_read(id);
800        match self.get_object_by_id_cache_only(request_type, id) {
801            CacheResult::Hit((_, object)) => Ok(Some(object)),
802            CacheResult::NegativeHit => Ok(None),
803            CacheResult::Miss => {
804                let obj = self.store.try_get_object(id)?;
805                if let Some(obj) = &obj {
806                    self.cache_latest_object_by_id(
807                        id,
808                        LatestObjectCacheEntry::Object(obj.version(), obj.clone().into()),
809                        ticket,
810                    );
811                } else {
812                    self.cache_object_not_found(id, ticket);
813                }
814                Ok(obj)
815            }
816        }
817    }
818
819    fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
820        self.metrics.record_cache_request(request_type, "db");
821        &self.store
822    }
823
824    fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
825        self.metrics
826            .record_cache_multi_request(request_type, "db", count);
827        &self.store
828    }
829
830    #[instrument(level = "debug", skip_all)]
831    fn write_transaction_outputs(
832        &self,
833        epoch_id: EpochId,
834        tx_outputs: Arc<TransactionOutputs>,
835    ) -> IotaResult {
836        trace!(digest = ?tx_outputs.transaction.digest(), "writing transaction outputs to cache");
837
838        let TransactionOutputs {
839            transaction,
840            effects,
841            markers,
842            written,
843            deleted,
844            wrapped,
845            events,
846            ..
847        } = &*tx_outputs;
848
849        // Deletions and wraps must be written first. The reason is that one of the
850        // deletes may be a child object, and if we write the parent object
851        // first, a reader may or may not see the previous version of the child
852        // object, instead of the deleted/wrapped tombstone, which would cause
853        // an execution fork
854        for ObjectKey(id, version) in deleted.iter() {
855            self.write_object_entry(id, *version, ObjectEntry::Deleted);
856        }
857
858        for ObjectKey(id, version) in wrapped.iter() {
859            self.write_object_entry(id, *version, ObjectEntry::Wrapped);
860        }
861
862        // Update all markers
863        for (object_key, marker_value) in markers.iter() {
864            self.write_marker_value(epoch_id, object_key, *marker_value);
865        }
866
867        // Write children before parents to ensure that readers do not observe a parent
868        // object before its most recent children are visible.
869        for (object_id, object) in written.iter() {
870            if object.is_child_object() {
871                self.write_object_entry(object_id, object.version(), object.clone().into());
872            }
873        }
874        for (object_id, object) in written.iter() {
875            if !object.is_child_object() {
876                self.write_object_entry(object_id, object.version(), object.clone().into());
877                if object.is_package() {
878                    debug!("caching package: {:?}", object.compute_object_reference());
879                    self.packages
880                        .insert(*object_id, PackageObject::new(object.clone()));
881                }
882            }
883        }
884
885        let tx_digest = *transaction.digest();
886        let effects_digest = effects.digest();
887
888        self.metrics.record_cache_write("transaction_block");
889        self.dirty
890            .pending_transaction_writes
891            .insert(tx_digest, tx_outputs.clone());
892
893        // insert transaction effects before executed_effects_digests so that there
894        // are never dangling entries in executed_effects_digests
895        self.metrics.record_cache_write("transaction_effects");
896        self.dirty
897            .transaction_effects
898            .insert(effects_digest, effects.clone());
899
900        // note: if events.data.is_empty(), then there are no events for this
901        // transaction. We store it anyway to avoid special cases in
902        // commint_transaction_outputs, and translate an empty events structure
903        // to None when reading.
904        self.metrics.record_cache_write("transaction_events");
905        match self.dirty.transaction_events.entry(events.digest()) {
906            DashMapEntry::Occupied(mut occupied) => {
907                occupied.get_mut().0.insert(tx_digest);
908            }
909            DashMapEntry::Vacant(entry) => {
910                let mut txns = BTreeSet::new();
911                txns.insert(tx_digest);
912                entry.insert((txns, events.clone()));
913            }
914        }
915
916        self.metrics.record_cache_write("executed_effects_digests");
917        self.dirty
918            .executed_effects_digests
919            .insert(tx_digest, effects_digest);
920
921        self.executed_effects_digests_notify_read
922            .notify(&tx_digest, &effects_digest);
923
924        self.metrics
925            .pending_notify_read
926            .set(self.executed_effects_digests_notify_read.num_pending() as i64);
927
928        let prev = self
929            .dirty
930            .total_transaction_inserts
931            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
932
933        let pending_count = (prev + 1).saturating_sub(
934            self.dirty
935                .total_transaction_commits
936                .load(std::sync::atomic::Ordering::Relaxed),
937        );
938
939        self.set_backpressure(pending_count);
940
941        Ok(())
942    }
943
944    // Commits dirty data for the given TransactionDigest to the db.
945    #[instrument(level = "debug", skip_all)]
946    fn commit_transaction_outputs(
947        &self,
948        epoch: EpochId,
949        digests: &[TransactionDigest],
950    ) -> IotaResult {
951        fail_point!("writeback-cache-commit");
952        trace!(?digests);
953
954        let mut all_outputs = Vec::with_capacity(digests.len());
955        for tx in digests {
956            let Some(outputs) = self
957                .dirty
958                .pending_transaction_writes
959                .get(tx)
960                .map(|o| o.clone())
961            else {
962                // This can happen in the following rare case:
963                // All transactions in the checkpoint are committed to the db (by
964                // commit_transaction_outputs,
965                // called in CheckpointExecutor::process_executed_transactions), but the process
966                // crashes before the checkpoint water mark is bumped. We will
967                // then re-commit the checkpoint at startup, despite that all
968                // transactions are already executed.
969                warn!("Attempt to commit unknown transaction {:?}", tx);
970                continue;
971            };
972            all_outputs.push(outputs);
973        }
974
975        // Flush writes to disk before removing anything from dirty set. otherwise,
976        // a cache eviction could cause a value to disappear briefly, even if we insert
977        // to the cache before removing from the dirty set.
978        self.store.write_transaction_outputs(epoch, &all_outputs)?;
979
980        for outputs in all_outputs.iter() {
981            let tx_digest = outputs.transaction.digest();
982            assert!(
983                self.dirty
984                    .pending_transaction_writes
985                    .remove(tx_digest)
986                    .is_some()
987            );
988            self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
989        }
990
991        let num_outputs = all_outputs.len() as u64;
992        let num_commits = self
993            .dirty
994            .total_transaction_commits
995            .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
996            + num_outputs;
997
998        let pending_count = self
999            .dirty
1000            .total_transaction_inserts
1001            .load(std::sync::atomic::Ordering::Relaxed)
1002            .saturating_sub(num_commits);
1003
1004        self.set_backpressure(pending_count);
1005
1006        Ok(())
1007    }
1008
1009    fn approximate_pending_transaction_count(&self) -> u64 {
1010        let num_commits = self
1011            .dirty
1012            .total_transaction_commits
1013            .load(std::sync::atomic::Ordering::Relaxed);
1014
1015        self.dirty
1016            .total_transaction_inserts
1017            .load(std::sync::atomic::Ordering::Relaxed)
1018            .saturating_sub(num_commits)
1019    }
1020
1021    fn set_backpressure(&self, pending_count: u64) {
1022        let backpressure = pending_count > self.backpressure_threshold;
1023        let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1024        if backpressure_changed {
1025            self.metrics.backpressure_toggles.inc();
1026        }
1027        self.metrics
1028            .backpressure_status
1029            .set(if backpressure { 1 } else { 0 });
1030    }
1031
1032    fn flush_transactions_from_dirty_to_cached(
1033        &self,
1034        epoch: EpochId,
1035        tx_digest: TransactionDigest,
1036        outputs: &TransactionOutputs,
1037    ) {
1038        // Now, remove each piece of committed data from the dirty state and insert it
1039        // into the cache. TODO: outputs should have a strong count of 1 so we
1040        // should be able to move out of it
1041        let TransactionOutputs {
1042            transaction,
1043            effects,
1044            markers,
1045            written,
1046            deleted,
1047            wrapped,
1048            events,
1049            ..
1050        } = outputs;
1051
1052        let effects_digest = effects.digest();
1053        let events_digest = events.digest();
1054
1055        // Update cache before removing from self.dirty to avoid
1056        // unnecessary cache misses
1057        self.cached
1058            .transactions
1059            .insert(
1060                &tx_digest,
1061                PointCacheItem::Some(transaction.clone()),
1062                Ticket::Write,
1063            )
1064            .ok();
1065        self.cached
1066            .transaction_effects
1067            .insert(
1068                &effects_digest,
1069                PointCacheItem::Some(effects.clone().into()),
1070                Ticket::Write,
1071            )
1072            .ok();
1073        self.cached
1074            .executed_effects_digests
1075            .insert(
1076                &tx_digest,
1077                PointCacheItem::Some(effects_digest),
1078                Ticket::Write,
1079            )
1080            .ok();
1081        self.cached
1082            .transaction_events
1083            .insert(
1084                &events_digest,
1085                PointCacheItem::Some(events.clone().into()),
1086                Ticket::Write,
1087            )
1088            .ok();
1089
1090        self.dirty
1091            .transaction_effects
1092            .remove(&effects_digest)
1093            .expect("effects must exist");
1094
1095        match self.dirty.transaction_events.entry(events.digest()) {
1096            DashMapEntry::Occupied(mut occupied) => {
1097                let txns = &mut occupied.get_mut().0;
1098                assert!(txns.remove(&tx_digest), "transaction must exist");
1099                if txns.is_empty() {
1100                    occupied.remove();
1101                }
1102            }
1103            DashMapEntry::Vacant(_) => {
1104                panic!("events must exist");
1105            }
1106        }
1107
1108        self.dirty
1109            .executed_effects_digests
1110            .remove(&tx_digest)
1111            .expect("executed effects must exist");
1112
1113        // Move dirty markers to cache
1114        for (object_key, marker_value) in markers.iter() {
1115            Self::move_version_from_dirty_to_cache(
1116                &self.dirty.markers,
1117                &self.cached.marker_cache,
1118                (epoch, object_key.0),
1119                object_key.1,
1120                marker_value,
1121            );
1122        }
1123
1124        for (object_id, object) in written.iter() {
1125            Self::move_version_from_dirty_to_cache(
1126                &self.dirty.objects,
1127                &self.cached.object_cache,
1128                *object_id,
1129                object.version(),
1130                &ObjectEntry::Object(object.clone()),
1131            );
1132        }
1133
1134        for ObjectKey(object_id, version) in deleted.iter() {
1135            Self::move_version_from_dirty_to_cache(
1136                &self.dirty.objects,
1137                &self.cached.object_cache,
1138                *object_id,
1139                *version,
1140                &ObjectEntry::Deleted,
1141            );
1142        }
1143
1144        for ObjectKey(object_id, version) in wrapped.iter() {
1145            Self::move_version_from_dirty_to_cache(
1146                &self.dirty.objects,
1147                &self.cached.object_cache,
1148                *object_id,
1149                *version,
1150                &ObjectEntry::Wrapped,
1151            );
1152        }
1153    }
1154
1155    // Move the oldest/least entry from the dirty queue to the cache queue.
1156    // This is called after the entry is committed to the db.
1157    fn move_version_from_dirty_to_cache<K, V>(
1158        dirty: &DashMap<K, CachedVersionMap<V>>,
1159        cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1160        key: K,
1161        version: SequenceNumber,
1162        value: &V,
1163    ) where
1164        K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1165        V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1166    {
1167        static MAX_VERSIONS: usize = 3;
1168
1169        // IMPORTANT: lock both the dirty set entry and the cache entry before modifying
1170        // either. this ensures that readers cannot see a value temporarily
1171        // disappear.
1172        let dirty_entry = dirty.entry(key);
1173        let cache_entry = cache.entry(key).or_default();
1174        let mut cache_map = cache_entry.value().lock();
1175
1176        // insert into cache and drop old versions.
1177        cache_map.insert(version, value.clone());
1178        // TODO: make this automatic by giving CachedVersionMap an optional max capacity
1179        cache_map.truncate_to(MAX_VERSIONS);
1180
1181        let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1182            panic!("dirty map must exist");
1183        };
1184
1185        let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1186
1187        assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1188
1189        // if there are no versions remaining, remove the map entry
1190        if occupied_dirty_entry.get().is_empty() {
1191            occupied_dirty_entry.remove();
1192        }
1193    }
1194
1195    // Updates the latest object id cache with an entry that was read from the db.
1196    fn cache_latest_object_by_id(
1197        &self,
1198        object_id: &ObjectID,
1199        object: LatestObjectCacheEntry,
1200        ticket: Ticket,
1201    ) {
1202        trace!("caching object by id: {:?} {:?}", object_id, object);
1203        if self
1204            .cached
1205            .object_by_id_cache
1206            .insert(object_id, object, ticket)
1207            .is_ok()
1208        {
1209            self.metrics.record_cache_write("object_by_id");
1210        } else {
1211            trace!("discarded cache write due to expired ticket");
1212            self.metrics.record_ticket_expiry();
1213        }
1214    }
1215
1216    fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1217        self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1218    }
1219
1220    fn clear_state_end_of_epoch_impl(&self, _execution_guard: &ExecutionLockWriteGuard<'_>) {
1221        info!("clearing state at end of epoch");
1222        assert!(
1223            self.dirty.pending_transaction_writes.is_empty(),
1224            "should be empty due to revert_state_update"
1225        );
1226        self.dirty.clear();
1227        info!("clearing old transaction locks");
1228        self.object_locks.clear();
1229    }
1230
1231    fn revert_state_update_impl(&self, tx: &TransactionDigest) -> IotaResult {
1232        // TODO: remove revert_state_update_impl entirely, and simply drop all dirty
1233        // state when clear_state_end_of_epoch_impl is called.
1234        // Further, once we do this, we can delay the insertion of the transaction into
1235        // pending_consensus_transactions until after the transaction has executed.
1236        let Some((_, outputs)) = self.dirty.pending_transaction_writes.remove(tx) else {
1237            assert!(
1238                !self.try_is_tx_already_executed(tx)?,
1239                "attempt to revert committed transaction"
1240            );
1241
1242            // A transaction can be inserted into pending_consensus_transactions, but then
1243            // reconfiguration can happen before the transaction executes.
1244            info!("Not reverting {:?} as it was not executed", tx);
1245            return Ok(());
1246        };
1247
1248        for (object_id, object) in outputs.written.iter() {
1249            if object.is_package() {
1250                info!("removing non-finalized package from cache: {:?}", object_id);
1251                self.packages.invalidate(object_id);
1252            }
1253            self.cached.object_by_id_cache.invalidate(object_id);
1254            self.cached.object_cache.invalidate(object_id);
1255        }
1256
1257        for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1258            self.cached.object_by_id_cache.invalidate(object_id);
1259            self.cached.object_cache.invalidate(object_id);
1260        }
1261
1262        // Note: individual object entries are removed when
1263        // clear_state_end_of_epoch_impl is called
1264        Ok(())
1265    }
1266
1267    fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) -> IotaResult {
1268        self.store.bulk_insert_genesis_objects(objects)?;
1269        for obj in objects {
1270            self.cached.object_cache.invalidate(&obj.id());
1271            self.cached.object_by_id_cache.invalidate(&obj.id());
1272        }
1273        Ok(())
1274    }
1275
1276    fn insert_genesis_object_impl(&self, object: Object) -> IotaResult {
1277        self.cached.object_by_id_cache.invalidate(&object.id());
1278        self.cached.object_cache.invalidate(&object.id());
1279        self.store.insert_genesis_object(object)
1280    }
1281
1282    pub fn clear_caches_and_assert_empty(&self) {
1283        info!("clearing caches");
1284        self.cached.clear_and_assert_empty();
1285        self.packages.invalidate_all();
1286        assert_empty(&self.packages);
1287    }
1288}
1289
1290impl ExecutionCacheAPI for WritebackCache {}
1291
1292impl ExecutionCacheCommit for WritebackCache {
1293    fn try_commit_transaction_outputs(
1294        &self,
1295        epoch: EpochId,
1296        digests: &[TransactionDigest],
1297    ) -> IotaResult {
1298        WritebackCache::commit_transaction_outputs(self, epoch, digests)
1299    }
1300
1301    fn try_persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> IotaResult {
1302        self.store.persist_transaction(tx)
1303    }
1304
1305    fn approximate_pending_transaction_count(&self) -> u64 {
1306        WritebackCache::approximate_pending_transaction_count(self)
1307    }
1308}
1309
1310impl ObjectCacheRead for WritebackCache {
1311    fn try_get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1312        self.metrics
1313            .record_cache_request("package", "package_cache");
1314        if let Some(p) = self.packages.get(package_id) {
1315            if cfg!(debug_assertions) {
1316                let canonical_package = self
1317                    .dirty
1318                    .objects
1319                    .get(package_id)
1320                    .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1321                        Some(ObjectEntry::Object(object)) => Some(object),
1322                        _ => None,
1323                    })
1324                    .or_else(|| self.store.get_object(package_id));
1325
1326                if let Some(canonical_package) = canonical_package {
1327                    assert_eq!(
1328                        canonical_package.digest(),
1329                        p.object().digest(),
1330                        "Package object cache is inconsistent for package {package_id:?}"
1331                    );
1332                }
1333            }
1334            self.metrics.record_cache_hit("package", "package_cache");
1335            return Ok(Some(p));
1336        } else {
1337            self.metrics.record_cache_miss("package", "package_cache");
1338        }
1339
1340        // We try the dirty objects cache as well before going to the database. This is
1341        // necessary because the package could be evicted from the package cache
1342        // before it is committed to the database.
1343        if let Some(p) = self.get_object_impl("package", package_id)? {
1344            if p.is_package() {
1345                let p = PackageObject::new(p);
1346                tracing::trace!(
1347                    "caching package: {:?}",
1348                    p.object().compute_object_reference()
1349                );
1350                self.metrics.record_cache_write("package");
1351                self.packages.insert(*package_id, p.clone());
1352                Ok(Some(p))
1353            } else {
1354                Err(IotaError::UserInput {
1355                    error: UserInputError::MoveObjectAsPackage {
1356                        object_id: *package_id,
1357                    },
1358                })
1359            }
1360        } else {
1361            Ok(None)
1362        }
1363    }
1364
1365    fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1366        // This is a no-op because all writes go through the cache, therefore it
1367        // can never be incoherent
1368    }
1369
1370    // get_object and variants.
1371
1372    fn try_get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
1373        self.get_object_impl("object_latest", id)
1374    }
1375
1376    fn try_get_object_by_key(
1377        &self,
1378        object_id: &ObjectID,
1379        version: SequenceNumber,
1380    ) -> IotaResult<Option<Object>> {
1381        match self.get_object_by_key_cache_only(object_id, version) {
1382            CacheResult::Hit(object) => Ok(Some(object)),
1383            CacheResult::NegativeHit => Ok(None),
1384            CacheResult::Miss => Ok(self
1385                .record_db_get("object_by_version")
1386                .try_get_object_by_key(object_id, version)?),
1387        }
1388    }
1389
1390    fn try_multi_get_objects_by_key(
1391        &self,
1392        object_keys: &[ObjectKey],
1393    ) -> Result<Vec<Option<Object>>, IotaError> {
1394        try_do_fallback_lookup(
1395            object_keys,
1396            |key| {
1397                Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1398                    CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1399                    CacheResult::NegativeHit => CacheResult::NegativeHit,
1400                    CacheResult::Miss => CacheResult::Miss,
1401                })
1402            },
1403            |remaining| {
1404                self.record_db_multi_get("object_by_version", remaining.len())
1405                    .multi_get_objects_by_key(remaining)
1406            },
1407        )
1408    }
1409
1410    fn try_object_exists_by_key(
1411        &self,
1412        object_id: &ObjectID,
1413        version: SequenceNumber,
1414    ) -> IotaResult<bool> {
1415        match self.get_object_by_key_cache_only(object_id, version) {
1416            CacheResult::Hit(_) => Ok(true),
1417            CacheResult::NegativeHit => Ok(false),
1418            CacheResult::Miss => self
1419                .record_db_get("object_by_version")
1420                .object_exists_by_key(object_id, version),
1421        }
1422    }
1423
1424    fn try_multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
1425        try_do_fallback_lookup(
1426            object_keys,
1427            |key| {
1428                Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1429                    CacheResult::Hit(_) => CacheResult::Hit(true),
1430                    CacheResult::NegativeHit => CacheResult::Hit(false),
1431                    CacheResult::Miss => CacheResult::Miss,
1432                })
1433            },
1434            |remaining| {
1435                self.record_db_multi_get("object_by_version", remaining.len())
1436                    .multi_object_exists_by_key(remaining)
1437            },
1438        )
1439    }
1440
1441    fn try_get_latest_object_ref_or_tombstone(
1442        &self,
1443        object_id: ObjectID,
1444    ) -> IotaResult<Option<ObjectRef>> {
1445        match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1446            CacheResult::Hit((version, entry)) => Ok(Some(match entry {
1447                ObjectEntry::Object(object) => object.compute_object_reference(),
1448                ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1449                ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1450            })),
1451            CacheResult::NegativeHit => Ok(None),
1452            CacheResult::Miss => self
1453                .record_db_get("latest_objref_or_tombstone")
1454                .get_latest_object_ref_or_tombstone(object_id),
1455        }
1456    }
1457
1458    fn try_get_latest_object_or_tombstone(
1459        &self,
1460        object_id: ObjectID,
1461    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
1462        match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1463            CacheResult::Hit((version, entry)) => {
1464                let key = ObjectKey(object_id, version);
1465                Ok(Some(match entry {
1466                    ObjectEntry::Object(object) => (key, object.into()),
1467                    ObjectEntry::Deleted => (
1468                        key,
1469                        ObjectOrTombstone::Tombstone((
1470                            object_id,
1471                            version,
1472                            ObjectDigest::OBJECT_DIGEST_DELETED,
1473                        )),
1474                    ),
1475                    ObjectEntry::Wrapped => (
1476                        key,
1477                        ObjectOrTombstone::Tombstone((
1478                            object_id,
1479                            version,
1480                            ObjectDigest::OBJECT_DIGEST_WRAPPED,
1481                        )),
1482                    ),
1483                }))
1484            }
1485            CacheResult::NegativeHit => Ok(None),
1486            CacheResult::Miss => self
1487                .record_db_get("latest_object_or_tombstone")
1488                .get_latest_object_or_tombstone(object_id),
1489        }
1490    }
1491
1492    #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1493    fn try_find_object_lt_or_eq_version(
1494        &self,
1495        object_id: ObjectID,
1496        version_bound: SequenceNumber,
1497    ) -> IotaResult<Option<Object>> {
1498        macro_rules! check_cache_entry {
1499            ($level: expr, $objects: expr) => {
1500                self.metrics
1501                    .record_cache_request("object_lt_or_eq_version", $level);
1502                if let Some(objects) = $objects {
1503                    if let Some((_, object)) = objects
1504                        .all_versions_lt_or_eq_descending(&version_bound)
1505                        .next()
1506                    {
1507                        if let ObjectEntry::Object(object) = object {
1508                            self.metrics
1509                                .record_cache_hit("object_lt_or_eq_version", $level);
1510                            return Ok(Some(object.clone()));
1511                        } else {
1512                            // if we find a tombstone, the object does not exist
1513                            self.metrics
1514                                .record_cache_negative_hit("object_lt_or_eq_version", $level);
1515                            return Ok(None);
1516                        }
1517                    } else {
1518                        self.metrics
1519                            .record_cache_miss("object_lt_or_eq_version", $level);
1520                    }
1521                }
1522            };
1523        }
1524
1525        // if we have the latest version cached, and it is within the bound, we are done
1526        self.metrics
1527            .record_cache_request("object_lt_or_eq_version", "object_by_id");
1528        if let Some(latest) = self.cached.object_by_id_cache.get(&object_id) {
1529            let latest = latest.lock();
1530            match &*latest {
1531                LatestObjectCacheEntry::Object(latest_version, object) => {
1532                    if *latest_version <= version_bound {
1533                        if let ObjectEntry::Object(object) = object {
1534                            self.metrics
1535                                .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1536                            return Ok(Some(object.clone()));
1537                        } else {
1538                            // object is a tombstone, but is still within the version bound
1539                            self.metrics.record_cache_negative_hit(
1540                                "object_lt_or_eq_version",
1541                                "object_by_id",
1542                            );
1543                            return Ok(None);
1544                        }
1545                    }
1546                    // latest object is not within the version bound. fall
1547                    // through.
1548                }
1549                // No object by this ID exists at all
1550                LatestObjectCacheEntry::NonExistent => {
1551                    self.metrics
1552                        .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1553                    return Ok(None);
1554                }
1555            }
1556        }
1557        self.metrics
1558            .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1559
1560        Self::with_locked_cache_entries(
1561            &self.dirty.objects,
1562            &self.cached.object_cache,
1563            &object_id,
1564            |dirty_entry, cached_entry| {
1565                check_cache_entry!("committed", dirty_entry);
1566                check_cache_entry!("uncommitted", cached_entry);
1567
1568                // Much of the time, the query will be for the very latest object version, so
1569                // try that first. But we have to be careful:
1570                // 1. We must load the tombstone if it is present, because its version may
1571                //    exceed the version_bound, in which case we must do a scan.
1572                // 2. You might think we could just call
1573                //    `self.store.get_latest_object_or_tombstone` here. But we cannot, because
1574                //    there may be a more recent version in the dirty set, which we skipped over
1575                //    in check_cache_entry! because of the version bound. However, if we skipped
1576                //    it above, we will skip it here as well, again due to the version bound.
1577                // 3. Despite that, we really want to warm the cache here. Why? Because if the
1578                //    object is cold (not being written to), then we will very soon be able to
1579                //    start serving reads of it from the object_by_id cache, IF we can warm the
1580                //    cache. If we don't warm the cache here, and no writes to the object occur,
1581                //    then we will always have to go to the db for the object.
1582                //
1583                // Lastly, it is important to understand the rationale for all this: If the
1584                // object is write-hot, we will serve almost all reads to it
1585                // from the dirty set (or possibly the cached set if it is only
1586                // written to once every few checkpoints). If the object is
1587                // write-cold (or non-existent) and read-hot, then we will serve almost all
1588                // reads to it from the object_by_id cache check above.  Most of
1589                // the apparently wasteful code here exists only to ensure
1590                // correctness in all the edge cases.
1591                let latest: Option<(SequenceNumber, ObjectEntry)> =
1592                    if let Some(dirty_set) = dirty_entry {
1593                        dirty_set
1594                            .get_highest()
1595                            .cloned()
1596                            .tap_none(|| panic!("dirty set cannot be empty"))
1597                    } else {
1598                        // TODO: we should try not to read from the db while holding the locks.
1599                        self.record_db_get("object_lt_or_eq_version_latest")
1600                            .get_latest_object_or_tombstone(object_id)?
1601                            .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1602                                (version, ObjectEntry::from(obj_or_tombstone))
1603                            })
1604                    };
1605
1606                if let Some((obj_version, obj_entry)) = latest {
1607                    // we can always cache the latest object (or tombstone), even if it is not
1608                    // within the version_bound. This is done in order to warm
1609                    // the cache in the case where a sequence of transactions
1610                    // all read the same child object without writing to it.
1611
1612                    // Note: no need to call with_object_by_id_cache_update here, because we are
1613                    // holding the lock on the dirty cache entry, and `latest`
1614                    // cannot become out-of-date while we hold that lock.
1615                    self.cache_latest_object_by_id(
1616                        &object_id,
1617                        LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1618                        // We can get a ticket at the last second, because we are holding the lock
1619                        // on dirty, so there cannot be any concurrent writes.
1620                        self.cached
1621                            .object_by_id_cache
1622                            .get_ticket_for_read(&object_id),
1623                    );
1624
1625                    if obj_version <= version_bound {
1626                        match obj_entry {
1627                            ObjectEntry::Object(object) => Ok(Some(object)),
1628                            ObjectEntry::Deleted | ObjectEntry::Wrapped => Ok(None),
1629                        }
1630                    } else {
1631                        // The latest object exceeded the bound, so now we have to do a scan
1632                        // But we already know there is no dirty entry within the bound,
1633                        // so we go to the db.
1634                        self.record_db_get("object_lt_or_eq_version_scan")
1635                            .find_object_lt_or_eq_version(object_id, version_bound)
1636                    }
1637                } else {
1638                    // no object found in dirty set or db, object does not exist
1639                    // When this is called from a read api (i.e. not the execution path) it is
1640                    // possible that the object has been deleted and pruned. In this case,
1641                    // there would be no entry at all on disk, but we may have a tombstone in the
1642                    // cache
1643                    let highest = cached_entry.and_then(|c| c.get_highest());
1644                    assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1645                    self.cache_object_not_found(
1646                        &object_id,
1647                        // okay to get ticket at last second - see above
1648                        self.cached
1649                            .object_by_id_cache
1650                            .get_ticket_for_read(&object_id),
1651                    );
1652                    Ok(None)
1653                }
1654            },
1655        )
1656    }
1657
1658    fn try_get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
1659        get_iota_system_state(self)
1660    }
1661
1662    fn try_get_marker_value(
1663        &self,
1664        object_id: &ObjectID,
1665        version: SequenceNumber,
1666        epoch_id: EpochId,
1667    ) -> IotaResult<Option<MarkerValue>> {
1668        match self.get_marker_value_cache_only(object_id, version, epoch_id) {
1669            CacheResult::Hit(marker) => Ok(Some(marker)),
1670            CacheResult::NegativeHit => Ok(None),
1671            CacheResult::Miss => self
1672                .record_db_get("marker_by_version")
1673                .get_marker_value(object_id, &version, epoch_id),
1674        }
1675    }
1676
1677    fn try_get_latest_marker(
1678        &self,
1679        object_id: &ObjectID,
1680        epoch_id: EpochId,
1681    ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
1682        match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1683            CacheResult::Hit((v, marker)) => Ok(Some((v, marker))),
1684            CacheResult::NegativeHit => {
1685                panic!("cannot have negative hit when getting latest marker")
1686            }
1687            CacheResult::Miss => self
1688                .record_db_get("marker_latest")
1689                .get_latest_marker(object_id, epoch_id),
1690        }
1691    }
1692
1693    fn try_get_lock(
1694        &self,
1695        obj_ref: ObjectRef,
1696        epoch_store: &AuthorityPerEpochStore,
1697    ) -> IotaLockResult {
1698        match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1699            CacheResult::Hit((_, obj)) => {
1700                let actual_objref = obj.compute_object_reference();
1701                if obj_ref != actual_objref {
1702                    Ok(ObjectLockStatus::LockedAtDifferentVersion {
1703                        locked_ref: actual_objref,
1704                    })
1705                } else {
1706                    // requested object ref is live, check if there is a lock
1707                    Ok(
1708                        match self
1709                            .object_locks
1710                            .get_transaction_lock(&obj_ref, epoch_store)?
1711                        {
1712                            Some(tx_digest) => ObjectLockStatus::LockedToTx {
1713                                locked_by_tx: tx_digest,
1714                            },
1715                            None => ObjectLockStatus::Initialized,
1716                        },
1717                    )
1718                }
1719            }
1720            CacheResult::NegativeHit => {
1721                Err(IotaError::from(UserInputError::ObjectNotFound {
1722                    object_id: obj_ref.0,
1723                    // even though we know the requested version, we leave it as None to indicate
1724                    // that the object does not exist at any version
1725                    version: None,
1726                }))
1727            }
1728            CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1729        }
1730    }
1731
1732    fn _try_get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
1733        let obj = self.get_object_impl("live_objref", &object_id)?.ok_or(
1734            UserInputError::ObjectNotFound {
1735                object_id,
1736                version: None,
1737            },
1738        )?;
1739        Ok(obj.compute_object_reference())
1740    }
1741
1742    fn try_check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1743        try_do_fallback_lookup(
1744            owned_object_refs,
1745            |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1746                CacheResult::Hit((version, obj)) => {
1747                    if obj.compute_object_reference() != *obj_ref {
1748                        Err(UserInputError::ObjectVersionUnavailableForConsumption {
1749                            provided_obj_ref: *obj_ref,
1750                            current_version: version,
1751                        }
1752                        .into())
1753                    } else {
1754                        Ok(CacheResult::Hit(()))
1755                    }
1756                }
1757                CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1758                    object_id: obj_ref.0,
1759                    version: None,
1760                }
1761                .into()),
1762                CacheResult::Miss => Ok(CacheResult::Miss),
1763            },
1764            |remaining| {
1765                self.record_db_multi_get("object_is_live", remaining.len())
1766                    .check_owned_objects_are_live(remaining)?;
1767                Ok(vec![(); remaining.len()])
1768            },
1769        )?;
1770        Ok(())
1771    }
1772
1773    fn try_get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
1774        self.store.perpetual_tables.get_highest_pruned_checkpoint()
1775    }
1776}
1777
1778impl TransactionCacheRead for WritebackCache {
1779    fn try_multi_get_transaction_blocks(
1780        &self,
1781        digests: &[TransactionDigest],
1782    ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
1783        let digests_and_tickets: Vec<_> = digests
1784            .iter()
1785            .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1786            .collect();
1787        try_do_fallback_lookup(
1788            &digests_and_tickets,
1789            |(digest, _)| {
1790                self.metrics
1791                    .record_cache_request("transaction_block", "uncommitted");
1792                if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1793                    self.metrics
1794                        .record_cache_hit("transaction_block", "uncommitted");
1795                    return Ok(CacheResult::Hit(Some(tx.transaction.clone())));
1796                }
1797                self.metrics
1798                    .record_cache_miss("transaction_block", "uncommitted");
1799
1800                self.metrics
1801                    .record_cache_request("transaction_block", "committed");
1802                match self
1803                    .cached
1804                    .transactions
1805                    .get(digest)
1806                    .map(|l| l.lock().clone())
1807                {
1808                    Some(PointCacheItem::Some(tx)) => {
1809                        self.metrics
1810                            .record_cache_hit("transaction_block", "committed");
1811                        Ok(CacheResult::Hit(Some(tx)))
1812                    }
1813                    Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1814                    None => {
1815                        self.metrics
1816                            .record_cache_miss("transaction_block", "committed");
1817
1818                        Ok(CacheResult::Miss)
1819                    }
1820                }
1821            },
1822            |remaining| {
1823                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1824                let results: Vec<_> = self
1825                    .record_db_multi_get("transaction_block", remaining.len())
1826                    .multi_get_transaction_blocks(&remaining_digests)
1827                    .map(|v| v.into_iter().map(|o| o.map(Arc::new)).collect())?;
1828                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1829                    if result.is_none() {
1830                        self.cached.transactions.insert(digest, None, *ticket).ok();
1831                    }
1832                }
1833                Ok(results)
1834            },
1835        )
1836    }
1837
1838    fn try_multi_get_executed_effects_digests(
1839        &self,
1840        digests: &[TransactionDigest],
1841    ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
1842        let digests_and_tickets: Vec<_> = digests
1843            .iter()
1844            .map(|d| {
1845                (
1846                    *d,
1847                    self.cached.executed_effects_digests.get_ticket_for_read(d),
1848                )
1849            })
1850            .collect();
1851        try_do_fallback_lookup(
1852            &digests_and_tickets,
1853            |(digest, _)| {
1854                self.metrics
1855                    .record_cache_request("executed_effects_digests", "uncommitted");
1856                if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
1857                    self.metrics
1858                        .record_cache_hit("executed_effects_digests", "uncommitted");
1859                    return Ok(CacheResult::Hit(Some(*digest)));
1860                }
1861                self.metrics
1862                    .record_cache_miss("executed_effects_digests", "uncommitted");
1863
1864                self.metrics
1865                    .record_cache_request("executed_effects_digests", "committed");
1866                match self
1867                    .cached
1868                    .executed_effects_digests
1869                    .get(digest)
1870                    .map(|l| *l.lock())
1871                {
1872                    Some(PointCacheItem::Some(digest)) => {
1873                        self.metrics
1874                            .record_cache_hit("executed_effects_digests", "committed");
1875                        Ok(CacheResult::Hit(Some(digest)))
1876                    }
1877                    Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1878                    None => {
1879                        self.metrics
1880                            .record_cache_miss("executed_effects_digests", "committed");
1881                        Ok(CacheResult::Miss)
1882                    }
1883                }
1884            },
1885            |remaining| {
1886                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1887                let results = self
1888                    .record_db_multi_get("executed_effects_digests", remaining.len())
1889                    .multi_get_executed_effects_digests(&remaining_digests)?;
1890                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1891                    if result.is_none() {
1892                        self.cached
1893                            .executed_effects_digests
1894                            .insert(digest, None, *ticket)
1895                            .ok();
1896                    }
1897                }
1898                Ok(results)
1899            },
1900        )
1901    }
1902
1903    fn try_multi_get_effects(
1904        &self,
1905        digests: &[TransactionEffectsDigest],
1906    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
1907        let digests_and_tickets: Vec<_> = digests
1908            .iter()
1909            .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
1910            .collect();
1911        try_do_fallback_lookup(
1912            &digests_and_tickets,
1913            |(digest, _)| {
1914                self.metrics
1915                    .record_cache_request("transaction_effects", "uncommitted");
1916                if let Some(effects) = self.dirty.transaction_effects.get(digest) {
1917                    self.metrics
1918                        .record_cache_hit("transaction_effects", "uncommitted");
1919                    return Ok(CacheResult::Hit(Some(effects.clone())));
1920                }
1921                self.metrics
1922                    .record_cache_miss("transaction_effects", "uncommitted");
1923
1924                self.metrics
1925                    .record_cache_request("transaction_effects", "committed");
1926                match self
1927                    .cached
1928                    .transaction_effects
1929                    .get(digest)
1930                    .map(|l| l.lock().clone())
1931                {
1932                    Some(PointCacheItem::Some(effects)) => {
1933                        self.metrics
1934                            .record_cache_hit("transaction_effects", "committed");
1935                        Ok(CacheResult::Hit(Some((*effects).clone())))
1936                    }
1937                    Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1938                    None => {
1939                        self.metrics
1940                            .record_cache_miss("transaction_effects", "committed");
1941                        Ok(CacheResult::Miss)
1942                    }
1943                }
1944            },
1945            |remaining| {
1946                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1947                let results = self
1948                    .record_db_multi_get("transaction_effects", remaining.len())
1949                    .multi_get_effects(remaining_digests.iter())
1950                    .expect("db error");
1951                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1952                    if result.is_none() {
1953                        self.cached
1954                            .transaction_effects
1955                            .insert(digest, None, *ticket)
1956                            .ok();
1957                    }
1958                }
1959                Ok(results)
1960            },
1961        )
1962    }
1963
1964    fn try_notify_read_executed_effects_digests<'a>(
1965        &'a self,
1966        digests: &'a [TransactionDigest],
1967    ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
1968        self.executed_effects_digests_notify_read
1969            .read(digests, |digests| {
1970                self.try_multi_get_executed_effects_digests(digests)
1971            })
1972            .boxed()
1973    }
1974
1975    fn try_multi_get_events(
1976        &self,
1977        event_digests: &[TransactionEventsDigest],
1978    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
1979        fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
1980            if events.data.is_empty() {
1981                None
1982            } else {
1983                Some(events)
1984            }
1985        }
1986
1987        let digests_and_tickets: Vec<_> = event_digests
1988            .iter()
1989            .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
1990            .collect();
1991        try_do_fallback_lookup(
1992            &digests_and_tickets,
1993            |(digest, _)| {
1994                self.metrics
1995                    .record_cache_request("transaction_events", "uncommitted");
1996                if let Some(events) = self
1997                    .dirty
1998                    .transaction_events
1999                    .get(digest)
2000                    .map(|e| e.1.clone())
2001                {
2002                    self.metrics
2003                        .record_cache_hit("transaction_events", "uncommitted");
2004
2005                    return Ok(CacheResult::Hit(map_events(events)));
2006                }
2007                self.metrics
2008                    .record_cache_miss("transaction_events", "uncommitted");
2009
2010                self.metrics
2011                    .record_cache_request("transaction_events", "committed");
2012                match self
2013                    .cached
2014                    .transaction_events
2015                    .get(digest)
2016                    .map(|l| l.lock().clone())
2017                {
2018                    Some(PointCacheItem::Some(events)) => {
2019                        self.metrics
2020                            .record_cache_hit("transaction_events", "committed");
2021                        Ok(CacheResult::Hit(map_events((*events).clone())))
2022                    }
2023                    Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
2024                    None => {
2025                        self.metrics
2026                            .record_cache_miss("transaction_events", "committed");
2027
2028                        Ok(CacheResult::Miss)
2029                    }
2030                }
2031            },
2032            |remaining| {
2033                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2034                let results = self
2035                    .store
2036                    .multi_get_events(&remaining_digests)
2037                    .expect("db error");
2038                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2039                    if result.is_none() {
2040                        self.cached
2041                            .transaction_events
2042                            .insert(digest, None, *ticket)
2043                            .ok();
2044                    }
2045                }
2046                Ok(results)
2047            },
2048        )
2049    }
2050}
2051
2052impl ExecutionCacheWrite for WritebackCache {
2053    fn try_acquire_transaction_locks(
2054        &self,
2055        epoch_store: &AuthorityPerEpochStore,
2056        owned_input_objects: &[ObjectRef],
2057        transaction: VerifiedSignedTransaction,
2058    ) -> IotaResult {
2059        self.object_locks.acquire_transaction_locks(
2060            self,
2061            epoch_store,
2062            owned_input_objects,
2063            transaction,
2064        )
2065    }
2066
2067    fn try_write_transaction_outputs(
2068        &self,
2069        epoch_id: EpochId,
2070        tx_outputs: Arc<TransactionOutputs>,
2071    ) -> IotaResult {
2072        WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs)
2073    }
2074}
2075
2076implement_passthrough_traits!(WritebackCache);
2077
2078impl AccumulatorStore for WritebackCache {
2079    fn get_root_state_accumulator_for_epoch(
2080        &self,
2081        epoch: EpochId,
2082    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
2083        self.store.get_root_state_accumulator_for_epoch(epoch)
2084    }
2085
2086    fn get_root_state_accumulator_for_highest_epoch(
2087        &self,
2088    ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
2089        self.store.get_root_state_accumulator_for_highest_epoch()
2090    }
2091
2092    fn insert_state_accumulator_for_epoch(
2093        &self,
2094        epoch: EpochId,
2095        checkpoint_seq_num: &CheckpointSequenceNumber,
2096        acc: &Accumulator,
2097    ) -> IotaResult {
2098        self.store
2099            .insert_state_accumulator_for_epoch(epoch, checkpoint_seq_num, acc)
2100    }
2101
2102    fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2103        // The only time it is safe to iterate the live object set is at an epoch
2104        // boundary, at which point the db is consistent and the dirty cache is
2105        // empty. So this does read the cache
2106        assert!(
2107            self.dirty.is_empty(),
2108            "cannot iterate live object set with dirty data"
2109        );
2110        self.store.iter_live_object_set()
2111    }
2112
2113    // A version of iter_live_object_set that reads the cache. Only use for testing.
2114    // If used on a live validator, can cause the server to block for as long as
2115    // it takes to iterate the entire live object set.
2116    fn iter_cached_live_object_set_for_testing(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2117        // hold iter until we are finished to prevent any concurrent inserts/deletes
2118        let iter = self.dirty.objects.iter();
2119        let mut dirty_objects = BTreeMap::new();
2120
2121        // add everything from the store
2122        for obj in self.store.iter_live_object_set() {
2123            dirty_objects.insert(obj.object_id(), obj);
2124        }
2125
2126        // add everything from the cache, but also remove deletions
2127        for entry in iter {
2128            let id = *entry.key();
2129            let value = entry.value();
2130            match value.get_highest().unwrap() {
2131                (_, ObjectEntry::Object(object)) => {
2132                    dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2133                }
2134                (_version, ObjectEntry::Wrapped) => {
2135                    dirty_objects.remove(&id);
2136                }
2137                (_, ObjectEntry::Deleted) => {
2138                    dirty_objects.remove(&id);
2139                }
2140            }
2141        }
2142
2143        Box::new(dirty_objects.into_values())
2144    }
2145}
2146
2147// TODO: For correctness, we must at least invalidate the cache when items are
2148// written through this trait (since they could be negatively cached as absent).
2149// But it may or may not be optimal to actually insert them into the cache. For
2150// instance if state sync is running ahead of execution, they might evict other
2151// items that are about to be read. This could be an area for tuning in the
2152// future.
2153impl StateSyncAPI for WritebackCache {
2154    fn try_insert_transaction_and_effects(
2155        &self,
2156        transaction: &VerifiedTransaction,
2157        transaction_effects: &TransactionEffects,
2158    ) -> IotaResult {
2159        self.store
2160            .insert_transaction_and_effects(transaction, transaction_effects)?;
2161
2162        // Cache operations should not fail the entire operation after DB write succeeds
2163        // Use .ok() to ignore cache failures and avoid data inconsistency
2164        self.cached
2165            .transactions
2166            .insert(
2167                transaction.digest(),
2168                PointCacheItem::Some(Arc::new(transaction.clone())),
2169                Ticket::Write,
2170            )
2171            .ok();
2172        self.cached
2173            .transaction_effects
2174            .insert(
2175                &transaction_effects.digest(),
2176                PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2177                Ticket::Write,
2178            )
2179            .ok();
2180
2181        Ok(())
2182    }
2183
2184    fn try_multi_insert_transaction_and_effects(
2185        &self,
2186        transactions_and_effects: &[VerifiedExecutionData],
2187    ) -> IotaResult {
2188        self.store
2189            .multi_insert_transaction_and_effects(transactions_and_effects.iter())?;
2190        for VerifiedExecutionData {
2191            transaction,
2192            effects,
2193        } in transactions_and_effects
2194        {
2195            self.cached
2196                .transactions
2197                .insert(
2198                    transaction.digest(),
2199                    PointCacheItem::Some(Arc::new(transaction.clone())),
2200                    Ticket::Write,
2201                )
2202                .ok();
2203            self.cached
2204                .transaction_effects
2205                .insert(
2206                    &effects.digest(),
2207                    PointCacheItem::Some(Arc::new(effects.clone())),
2208                    Ticket::Write,
2209                )
2210                .ok();
2211        }
2212
2213        Ok(())
2214    }
2215}