iota_core/execution_cache/
writeback_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! MemoryCache is a cache for the transaction execution which delays writes to
6//! the database until transaction results are certified (i.e. they appear in a
7//! certified checkpoint, or an effects cert is observed by a fullnode). The
8//! cache also stores committed data in memory in order to serve future reads
9//! without hitting the database.
10//!
11//! For storing uncommitted transaction outputs, we cannot evict the data at all
12//! until it is written to disk. Committed data not only can be evicted, but it
13//! is also unbounded (imagine a stream of transactions that keep splitting a
14//! coin into smaller coins).
15//!
16//! We also want to be able to support negative cache hits (i.e. the case where
17//! we can determine an object does not exist without hitting the database).
18//!
19//! To achieve both of these goals, we split the cache data into two pieces, a
20//! dirty set and a cached set. The dirty set has no automatic evictions, data
21//! is only removed after being committed. The cached set is in a bounded-sized
22//! cache with automatic evictions. In order to support negative cache hits, we
23//! treat the two halves of the cache as FIFO queue. Newly written (dirty)
24//! versions are inserted to one end of the dirty queue. As versions are
25//! committed to disk, they are removed from the other end of the dirty queue
26//! and inserted into the cache queue. The cache queue is truncated if it
27//! exceeds its maximum size, by removing all but the N newest versions.
28//!
29//! This gives us the property that the sequence of versions in the dirty and
30//! cached queues are the most recent versions of the object, i.e. there can be
31//! no "gaps". This allows for the following:
32//!
33//!   - Negative cache hits: If the queried version is not in memory, but is
34//!     higher than the smallest version in the cached queue, it does not exist
35//!     in the db either.
36//!   - Bounded reads: When reading the most recent version that is <= some
37//!     version bound, we can correctly satisfy this query from the cache, or
38//!     determine that we must go to the db.
39//!
40//! Note that at any time, either or both the dirty or the cached queue may be
41//! non-existent. There may be no dirty versions of the objects, in which case
42//! there will be no dirty queue. And, the cached queue may be evicted from the
43//! cache, in which case there will be no cached queue. Because only the cached
44//! queue can be evicted (the dirty queue can only become empty by moving
45//! versions from it to the cached queue), the "highest versions" property still
46//! holds in all cases.
47//!
48//! The above design is used for both objects and markers.
49
50use std::{
51    collections::{BTreeMap, HashSet},
52    hash::Hash,
53    sync::{Arc, atomic::AtomicU64},
54};
55
56use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
57use futures::{FutureExt, future::BoxFuture};
58use iota_common::{random_util::randomize_cache_capacity_in_tests, sync::notify_read::NotifyRead};
59use iota_config::WritebackCacheConfig;
60use iota_macros::fail_point;
61use iota_types::{
62    base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
63    digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest},
64    effects::{TransactionEffects, TransactionEvents},
65    error::{IotaError, IotaResult, UserInputError},
66    executable_transaction::VerifiedExecutableTransaction,
67    global_state_hash::GlobalStateHash,
68    iota_system_state::{IotaSystemState, get_iota_system_state},
69    message_envelope::Message,
70    messages_checkpoint::CheckpointSequenceNumber,
71    object::Object,
72    storage::{InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject},
73    transaction::{VerifiedSignedTransaction, VerifiedTransaction},
74};
75use moka::sync::SegmentedCache as MokaCache;
76use parking_lot::Mutex;
77use tap::TapOptional;
78use tracing::{debug, info, instrument, trace, warn};
79
80use super::{
81    Batch, CheckpointCache, ExecutionCacheAPI, ExecutionCacheCommit, ExecutionCacheMetrics,
82    ExecutionCacheReconfigAPI, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI,
83    TransactionCacheRead,
84    cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache, Ticket},
85    implement_passthrough_traits,
86    object_locks::ObjectLocks,
87};
88use crate::{
89    authority::{
90        AuthorityStore,
91        authority_per_epoch_store::AuthorityPerEpochStore,
92        authority_store::{ExecutionLockWriteGuard, IotaLockResult, ObjectLockStatus},
93        authority_store_tables::LiveObject,
94        backpressure::BackpressureManager,
95        epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
96    },
97    fallback_fetch::try_do_fallback_lookup,
98    global_state_hasher::GlobalStateHashStore,
99    transaction_outputs::TransactionOutputs,
100};
101
102#[cfg(test)]
103#[path = "unit_tests/writeback_cache_tests.rs"]
104pub mod writeback_cache_tests;
105
106#[derive(Clone, PartialEq, Eq)]
107enum ObjectEntry {
108    Object(Object),
109    Deleted,
110    Wrapped,
111}
112
113impl ObjectEntry {
114    #[cfg(test)]
115    fn unwrap_object(&self) -> &Object {
116        match self {
117            ObjectEntry::Object(o) => o,
118            _ => panic!("unwrap_object called on non-Object"),
119        }
120    }
121
122    fn is_tombstone(&self) -> bool {
123        match self {
124            ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
125            ObjectEntry::Object(_) => false,
126        }
127    }
128}
129
130impl std::fmt::Debug for ObjectEntry {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        match self {
133            ObjectEntry::Object(o) => {
134                write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
135            }
136            ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
137            ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
138        }
139    }
140}
141
142impl From<Object> for ObjectEntry {
143    fn from(object: Object) -> Self {
144        ObjectEntry::Object(object)
145    }
146}
147
148impl From<ObjectOrTombstone> for ObjectEntry {
149    fn from(object: ObjectOrTombstone) -> Self {
150        match object {
151            ObjectOrTombstone::Object(o) => o.into(),
152            ObjectOrTombstone::Tombstone(obj_ref) => {
153                if obj_ref.2.is_deleted() {
154                    ObjectEntry::Deleted
155                } else if obj_ref.2.is_wrapped() {
156                    ObjectEntry::Wrapped
157                } else {
158                    panic!("tombstone digest must either be deleted or wrapped");
159                }
160            }
161        }
162    }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166enum LatestObjectCacheEntry {
167    Object(SequenceNumber, ObjectEntry),
168    NonExistent,
169}
170
171impl LatestObjectCacheEntry {
172    #[cfg(test)]
173    fn version(&self) -> Option<SequenceNumber> {
174        match self {
175            LatestObjectCacheEntry::Object(version, _) => Some(*version),
176            LatestObjectCacheEntry::NonExistent => None,
177        }
178    }
179
180    fn is_alive(&self) -> bool {
181        match self {
182            LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
183            LatestObjectCacheEntry::NonExistent => false,
184        }
185    }
186}
187
188impl IsNewer for LatestObjectCacheEntry {
189    fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
190        match (self, other) {
191            (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
192                v1 > v2
193            }
194            (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
195            _ => false,
196        }
197    }
198}
199
200type MarkerKey = (EpochId, ObjectID);
201
202/// UncommittedData stores execution outputs that are not yet written to the db.
203/// Entries in this struct can only be purged after they are committed.
204struct UncommittedData {
205    /// The object dirty set. All writes go into this table first. After we
206    /// flush the data to the db, the data is removed from this table and
207    /// inserted into the object_cache.
208    ///
209    /// This table may contain both live and dead objects, since we flush both
210    /// live and dead objects to the db in order to support past object
211    /// queries on fullnodes.
212    ///
213    /// Further, we only remove objects in FIFO order, which ensures that the
214    /// cached sequence of objects has no gaps. In other words, if we have
215    /// versions 4, 8, 13 of an object, we can deduce that version 9 does
216    /// not exist. This also makes child object reads efficient.
217    /// `object_cache` cannot contain a more recent version of an object than
218    /// `objects`, and neither can have any gaps. Therefore if there is any
219    /// object <= the version bound for a child read in objects, it is the
220    /// correct object to return.
221    objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
222
223    // Markers for received objects and deleted shared objects. This contains all of the dirty
224    // marker state, which is committed to the db at the same time as other transaction data.
225    // After markers are committed to the db we remove them from this table and insert them into
226    // marker_cache.
227    markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
228
229    transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
230
231    transaction_events: DashMap<TransactionDigest, TransactionEvents>,
232
233    executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
234
235    // Transaction outputs that have not yet been written to the DB. Items are removed from this
236    // table as they are flushed to the db.
237    pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
238
239    total_transaction_inserts: AtomicU64,
240    total_transaction_commits: AtomicU64,
241}
242
243impl UncommittedData {
244    fn new() -> Self {
245        Self {
246            objects: DashMap::with_shard_amount(2048),
247            markers: DashMap::with_shard_amount(2048),
248            transaction_effects: DashMap::with_shard_amount(2048),
249            executed_effects_digests: DashMap::with_shard_amount(2048),
250            pending_transaction_writes: DashMap::with_shard_amount(2048),
251            transaction_events: DashMap::with_shard_amount(2048),
252            total_transaction_inserts: AtomicU64::new(0),
253            total_transaction_commits: AtomicU64::new(0),
254        }
255    }
256
257    fn clear(&self) {
258        self.objects.clear();
259        self.markers.clear();
260        self.transaction_effects.clear();
261        self.executed_effects_digests.clear();
262        self.pending_transaction_writes.clear();
263        self.transaction_events.clear();
264        self.total_transaction_inserts
265            .store(0, std::sync::atomic::Ordering::Relaxed);
266        self.total_transaction_commits
267            .store(0, std::sync::atomic::Ordering::Relaxed);
268    }
269
270    fn is_empty(&self) -> bool {
271        let empty = self.pending_transaction_writes.is_empty();
272        if empty && cfg!(debug_assertions) {
273            assert!(
274                self.objects.is_empty()
275                    && self.markers.is_empty()
276                    && self.transaction_effects.is_empty()
277                    && self.executed_effects_digests.is_empty()
278                    && self.transaction_events.is_empty()
279                    && self
280                        .total_transaction_inserts
281                        .load(std::sync::atomic::Ordering::Relaxed)
282                        == self
283                            .total_transaction_commits
284                            .load(std::sync::atomic::Ordering::Relaxed),
285            );
286        }
287        empty
288    }
289}
290
291// Point items (anything without a version number) can be negatively cached as
292// None
293type PointCacheItem<T> = Option<T>;
294
295// PointCacheItem can only be used for insert-only collections, so a Some entry
296// is always newer than a None entry.
297impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
298    fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
299        match (self, other) {
300            (Some(_), None) => true,
301
302            (Some(a), Some(b)) => {
303                // conflicting inserts should never happen
304                debug_assert_eq!(a, b);
305                false
306            }
307
308            _ => false,
309        }
310    }
311}
312
313/// CachedData stores data that has been committed to the db, but is likely to
314/// be read soon.
315struct CachedCommittedData {
316    // See module level comment for an explanation of caching strategy.
317    object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
318
319    // We separately cache the latest version of each object. Although this seems
320    // redundant, it is the only way to support populating the cache after a read.
321    // We cannot simply insert objects that we read off the disk into `object_cache`,
322    // since that may violate the no-missing-versions property.
323    // `object_by_id_cache` is also written to on writes so that it is always coherent.
324    object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
325
326    // See module level comment for an explanation of caching strategy.
327    marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
328
329    transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
330
331    transaction_effects:
332        MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
333
334    transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
335
336    executed_effects_digests:
337        MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
338
339    // Objects that were read at transaction signing time - allows us to access them again at
340    // execution time with a single lock / hash lookup
341    _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
342}
343
344impl CachedCommittedData {
345    fn new(config: &WritebackCacheConfig) -> Self {
346        let object_cache = MokaCache::builder(8)
347            .max_capacity(randomize_cache_capacity_in_tests(
348                config.object_cache_size(),
349            ))
350            .build();
351        let marker_cache = MokaCache::builder(8)
352            .max_capacity(randomize_cache_capacity_in_tests(
353                config.marker_cache_size(),
354            ))
355            .build();
356
357        let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
358            config.transaction_cache_size(),
359        ));
360        let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
361            config.effect_cache_size(),
362        ));
363        let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
364            config.events_cache_size(),
365        ));
366        let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
367            config.executed_effect_cache_size(),
368        ));
369
370        let transaction_objects = MokaCache::builder(8)
371            .max_capacity(randomize_cache_capacity_in_tests(
372                config.transaction_objects_cache_size(),
373            ))
374            .build();
375
376        Self {
377            object_cache,
378            object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
379                config.object_by_id_cache_size(),
380            )),
381            marker_cache,
382            transactions,
383            transaction_effects,
384            transaction_events,
385            executed_effects_digests,
386            _transaction_objects: transaction_objects,
387        }
388    }
389
390    fn clear_and_assert_empty(&self) {
391        self.object_cache.invalidate_all();
392        self.object_by_id_cache.invalidate_all();
393        self.marker_cache.invalidate_all();
394        self.transactions.invalidate_all();
395        self.transaction_effects.invalidate_all();
396        self.transaction_events.invalidate_all();
397        self.executed_effects_digests.invalidate_all();
398        self._transaction_objects.invalidate_all();
399
400        assert_empty(&self.object_cache);
401        assert!(&self.object_by_id_cache.is_empty());
402        assert_empty(&self.marker_cache);
403        assert!(self.transactions.is_empty());
404        assert!(self.transaction_effects.is_empty());
405        assert!(self.transaction_events.is_empty());
406        assert!(self.executed_effects_digests.is_empty());
407        assert_empty(&self._transaction_objects);
408    }
409}
410
411fn assert_empty<K, V>(cache: &MokaCache<K, V>)
412where
413    K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
414    V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
415{
416    if cache.iter().next().is_some() {
417        panic!("cache should be empty");
418    }
419}
420
421pub struct WritebackCache {
422    dirty: UncommittedData,
423    cached: CachedCommittedData,
424
425    // The packages cache is treated separately from objects, because they are immutable and can be
426    // used by any number of transactions. Additionally, many operations require loading large
427    // numbers of packages (due to dependencies), so we want to try to keep all packages in memory.
428    //
429    // Also, this cache can contain packages that are dirty or committed, so it does not live in
430    // UncachedData or CachedCommittedData. The cache is populated in two ways:
431    // - when packages are written (in which case they will also be present in the dirty set)
432    // - after a cache miss. Because package IDs are unique (only one version exists for each ID)
433    //   we do not need to worry about the contiguous version property.
434    // - note that we removed any unfinalized packages from the cache during revert_state_update().
435    packages: MokaCache<ObjectID, PackageObject>,
436
437    object_locks: ObjectLocks,
438
439    executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
440    object_notify_read: NotifyRead<InputKey, ()>,
441    store: Arc<AuthorityStore>,
442    backpressure_threshold: u64,
443    backpressure_manager: Arc<BackpressureManager>,
444    metrics: Arc<ExecutionCacheMetrics>,
445}
446
447macro_rules! check_cache_entry_by_version {
448    ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
449        $self.metrics.record_cache_request($table, $level);
450        if let Some(cache) = $cache {
451            if let Some(entry) = cache.get(&$version) {
452                $self.metrics.record_cache_hit($table, $level);
453                return CacheResult::Hit(entry.clone());
454            }
455
456            if let Some(least_version) = cache.get_least() {
457                if least_version.0 < $version {
458                    // If the version is greater than the least version in the cache, then we know
459                    // that the object does not exist anywhere
460                    $self.metrics.record_cache_negative_hit($table, $level);
461                    return CacheResult::NegativeHit;
462                }
463            }
464        }
465        $self.metrics.record_cache_miss($table, $level);
466    };
467}
468
469macro_rules! check_cache_entry_by_latest {
470    ($self: ident, $table: expr, $level: expr, $cache: expr) => {
471        $self.metrics.record_cache_request($table, $level);
472        if let Some(cache) = $cache {
473            if let Some((version, entry)) = cache.get_highest() {
474                $self.metrics.record_cache_hit($table, $level);
475                return CacheResult::Hit((*version, entry.clone()));
476            } else {
477                panic!("empty CachedVersionMap should have been removed");
478            }
479        }
480        $self.metrics.record_cache_miss($table, $level);
481    };
482}
483
484impl WritebackCache {
485    pub fn new(
486        config: &WritebackCacheConfig,
487        store: Arc<AuthorityStore>,
488        metrics: Arc<ExecutionCacheMetrics>,
489        backpressure_manager: Arc<BackpressureManager>,
490    ) -> Self {
491        let packages = MokaCache::builder(8)
492            .max_capacity(randomize_cache_capacity_in_tests(
493                config.package_cache_size(),
494            ))
495            .build();
496        Self {
497            dirty: UncommittedData::new(),
498            cached: CachedCommittedData::new(config),
499            packages,
500            object_locks: ObjectLocks::new(),
501            executed_effects_digests_notify_read: NotifyRead::new(),
502            object_notify_read: NotifyRead::new(),
503            store,
504            backpressure_manager,
505            backpressure_threshold: config.backpressure_threshold(),
506            metrics,
507        }
508    }
509
510    pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
511        Self::new(
512            &Default::default(),
513            store,
514            ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
515            BackpressureManager::new_for_tests(),
516        )
517    }
518
519    #[cfg(test)]
520    pub fn reset_for_test(&mut self) {
521        let mut new = Self::new(
522            &Default::default(),
523            self.store.clone(),
524            self.metrics.clone(),
525            self.backpressure_manager.clone(),
526        );
527        std::mem::swap(self, &mut new);
528    }
529
530    fn write_object_entry(
531        &self,
532        object_id: &ObjectID,
533        version: SequenceNumber,
534        object: ObjectEntry,
535    ) {
536        trace!(?object_id, ?version, ?object, "inserting object entry");
537        self.metrics.record_cache_write("object");
538
539        // We must hold the lock for the object entry while inserting to the
540        // object_by_id_cache. Otherwise, a surprising bug can occur:
541        //
542        // 1. A thread executing TX1 can write object (O,1) to the dirty set and then
543        //    pause.
544        // 2. TX2, which reads (O,1) can begin executing, because TransactionManager
545        //    immediately schedules transactions if their inputs are available. It does
546        //    not matter that TX1 hasn't finished executing yet.
547        // 3. TX2 can write (O,2) to both the dirty set and the object_by_id_cache.
548        // 4. The thread executing TX1 can resume and write (O,1) to the
549        //    object_by_id_cache.
550        //
551        // Now, any subsequent attempt to get the latest version of O will return (O,1)
552        // instead of (O,2).
553        //
554        // This seems very unlikely, but it may be possible under the following
555        // circumstances:
556        // - While a thread is unlikely to pause for so long, moka cache uses optimistic
557        //   lock-free algorithms that have retry loops. Possibly, under high
558        //   contention, this code might spin for a surprisingly long time.
559        // - Additionally, many concurrent re-executions of the same tx could happen due
560        //   to the tx finalizer, plus checkpoint executor, consensus, and RPCs from
561        //   fullnodes.
562        let mut entry = self.dirty.objects.entry(*object_id).or_default();
563
564        self.cached
565            .object_by_id_cache
566            .insert(
567                object_id,
568                LatestObjectCacheEntry::Object(version, object.clone()),
569                Ticket::Write,
570            )
571            // While Ticket::Write cannot expire, this insert may still fail.
572            // See the comment in `MonotonicCache::insert`.
573            .ok();
574
575        entry.insert(version, object.clone());
576
577        if let ObjectEntry::Object(object) = &object {
578            super::notify_object_written(&self.object_notify_read, object);
579        }
580    }
581
582    fn write_marker_value(
583        &self,
584        epoch_id: EpochId,
585        object_key: &ObjectKey,
586        marker_value: MarkerValue,
587    ) {
588        tracing::trace!(
589            "inserting marker value {:?}: {:?}",
590            object_key,
591            marker_value
592        );
593        fail_point!("write_marker_entry");
594        self.metrics.record_cache_write("marker");
595        self.dirty
596            .markers
597            .entry((epoch_id, object_key.0))
598            .or_default()
599            .value_mut()
600            .insert(object_key.1, marker_value);
601
602        // It is possible for a transaction to use a shared
603        // object in the input, hence we must notify that it is now available
604        // at the assigned version, so that any transaction waiting for this
605        // object version can start execution.
606        super::notify_marker_written(&self.object_notify_read, object_key, &marker_value);
607    }
608
609    // lock both the dirty and committed sides of the cache, and then pass the
610    // entries to the callback. Written with the `with` pattern because any
611    // other way of doing this creates lifetime hell.
612    fn with_locked_cache_entries<K, V, R>(
613        dirty_map: &DashMap<K, CachedVersionMap<V>>,
614        cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
615        key: &K,
616        cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
617    ) -> R
618    where
619        K: Copy + Eq + Hash + Send + Sync + 'static,
620        V: Send + Sync + 'static,
621    {
622        let dirty_entry = dirty_map.entry(*key);
623        let dirty_entry = match &dirty_entry {
624            DashMapEntry::Occupied(occupied) => Some(occupied.get()),
625            DashMapEntry::Vacant(_) => None,
626        };
627
628        let cached_entry = cached_map.get(key);
629        let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
630        let cached_entry = cached_lock.as_deref();
631
632        cb(dirty_entry, cached_entry)
633    }
634
635    // Attempt to get an object from the cache. The DB is not consulted.
636    // Can return Hit, Miss, or NegativeHit (if the object is known to not exist).
637    fn get_object_entry_by_key_cache_only(
638        &self,
639        object_id: &ObjectID,
640        version: SequenceNumber,
641    ) -> CacheResult<ObjectEntry> {
642        Self::with_locked_cache_entries(
643            &self.dirty.objects,
644            &self.cached.object_cache,
645            object_id,
646            |dirty_entry, cached_entry| {
647                check_cache_entry_by_version!(
648                    self,
649                    "object_by_version",
650                    "uncommitted",
651                    dirty_entry,
652                    version
653                );
654                check_cache_entry_by_version!(
655                    self,
656                    "object_by_version",
657                    "committed",
658                    cached_entry,
659                    version
660                );
661                CacheResult::Miss
662            },
663        )
664    }
665
666    fn get_object_by_key_cache_only(
667        &self,
668        object_id: &ObjectID,
669        version: SequenceNumber,
670    ) -> CacheResult<Object> {
671        match self.get_object_entry_by_key_cache_only(object_id, version) {
672            CacheResult::Hit(entry) => match entry {
673                ObjectEntry::Object(object) => CacheResult::Hit(object),
674                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
675            },
676            CacheResult::Miss => CacheResult::Miss,
677            CacheResult::NegativeHit => CacheResult::NegativeHit,
678        }
679    }
680
681    fn get_object_entry_by_id_cache_only(
682        &self,
683        request_type: &'static str,
684        object_id: &ObjectID,
685    ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
686        self.metrics
687            .record_cache_request(request_type, "object_by_id");
688        let entry = self.cached.object_by_id_cache.get(object_id);
689
690        if cfg!(debug_assertions) {
691            if let Some(entry) = &entry {
692                // check that cache is coherent
693                let highest: Option<ObjectEntry> = self
694                    .dirty
695                    .objects
696                    .get(object_id)
697                    .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
698                    .or_else(|| {
699                        let obj: Option<ObjectEntry> = self
700                            .store
701                            .get_latest_object_or_tombstone(*object_id)
702                            .unwrap()
703                            .map(|(_, o)| o.into());
704                        obj
705                    });
706
707                let cache_entry = match &*entry.lock() {
708                    LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
709                    LatestObjectCacheEntry::NonExistent => None,
710                };
711
712                // If the cache entry is a tombstone, the db entry may be missing if it was
713                // pruned.
714                let tombstone_possibly_pruned = highest.is_none()
715                    && cache_entry
716                        .as_ref()
717                        .map(|e| e.is_tombstone())
718                        .unwrap_or(false);
719
720                if highest != cache_entry && !tombstone_possibly_pruned {
721                    tracing::error!(
722                        ?highest,
723                        ?cache_entry,
724                        ?tombstone_possibly_pruned,
725                        "object_by_id cache is incoherent for {:?}",
726                        object_id
727                    );
728                    panic!("object_by_id cache is incoherent for {object_id:?}");
729                }
730            }
731        }
732
733        if let Some(entry) = entry {
734            let entry = entry.lock();
735            match &*entry {
736                LatestObjectCacheEntry::Object(latest_version, latest_object) => {
737                    self.metrics.record_cache_hit(request_type, "object_by_id");
738                    return CacheResult::Hit((*latest_version, latest_object.clone()));
739                }
740                LatestObjectCacheEntry::NonExistent => {
741                    self.metrics
742                        .record_cache_negative_hit(request_type, "object_by_id");
743                    return CacheResult::NegativeHit;
744                }
745            }
746        } else {
747            self.metrics.record_cache_miss(request_type, "object_by_id");
748        }
749
750        Self::with_locked_cache_entries(
751            &self.dirty.objects,
752            &self.cached.object_cache,
753            object_id,
754            |dirty_entry, cached_entry| {
755                check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
756                check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
757                CacheResult::Miss
758            },
759        )
760    }
761
762    fn get_object_by_id_cache_only(
763        &self,
764        request_type: &'static str,
765        object_id: &ObjectID,
766    ) -> CacheResult<(SequenceNumber, Object)> {
767        match self.get_object_entry_by_id_cache_only(request_type, object_id) {
768            CacheResult::Hit((version, entry)) => match entry {
769                ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
770                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
771            },
772            CacheResult::NegativeHit => CacheResult::NegativeHit,
773            CacheResult::Miss => CacheResult::Miss,
774        }
775    }
776
777    fn get_marker_value_cache_only(
778        &self,
779        object_id: &ObjectID,
780        version: SequenceNumber,
781        epoch_id: EpochId,
782    ) -> CacheResult<MarkerValue> {
783        Self::with_locked_cache_entries(
784            &self.dirty.markers,
785            &self.cached.marker_cache,
786            &(epoch_id, *object_id),
787            |dirty_entry, cached_entry| {
788                check_cache_entry_by_version!(
789                    self,
790                    "marker_by_version",
791                    "uncommitted",
792                    dirty_entry,
793                    version
794                );
795                check_cache_entry_by_version!(
796                    self,
797                    "marker_by_version",
798                    "committed",
799                    cached_entry,
800                    version
801                );
802                CacheResult::Miss
803            },
804        )
805    }
806
807    fn get_latest_marker_value_cache_only(
808        &self,
809        object_id: &ObjectID,
810        epoch_id: EpochId,
811    ) -> CacheResult<(SequenceNumber, MarkerValue)> {
812        Self::with_locked_cache_entries(
813            &self.dirty.markers,
814            &self.cached.marker_cache,
815            &(epoch_id, *object_id),
816            |dirty_entry, cached_entry| {
817                check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
818                check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
819                CacheResult::Miss
820            },
821        )
822    }
823
824    fn get_object_impl(
825        &self,
826        request_type: &'static str,
827        id: &ObjectID,
828    ) -> IotaResult<Option<Object>> {
829        let ticket = self.cached.object_by_id_cache.get_ticket_for_read(id);
830        match self.get_object_entry_by_id_cache_only(request_type, id) {
831            CacheResult::Hit((_, object)) => match object {
832                ObjectEntry::Object(object) => Ok(Some(object)),
833                ObjectEntry::Deleted | ObjectEntry::Wrapped => Ok(None),
834            },
835            CacheResult::NegativeHit => Ok(None),
836            CacheResult::Miss => {
837                let obj = self.store.get_latest_object_or_tombstone(*id);
838                match obj {
839                    Ok(Some((key, obj))) => {
840                        self.cache_latest_object_by_id(
841                            id,
842                            LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
843                            ticket,
844                        );
845                        match obj {
846                            ObjectOrTombstone::Object(object) => Ok(Some(object)),
847                            ObjectOrTombstone::Tombstone(_) => Ok(None),
848                        }
849                    }
850                    Ok(None) => {
851                        self.cache_object_not_found(id, ticket);
852                        Ok(None)
853                    }
854                    Err(e) => Err(format!("failed to get latest object or tomebstone: {e}").into()),
855                }
856            }
857        }
858    }
859
860    fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
861        self.metrics.record_cache_request(request_type, "db");
862        &self.store
863    }
864
865    fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
866        self.metrics
867            .record_cache_multi_request(request_type, "db", count);
868        &self.store
869    }
870
871    #[instrument(level = "debug", skip_all)]
872    fn write_transaction_outputs(
873        &self,
874        epoch_id: EpochId,
875        tx_outputs: Arc<TransactionOutputs>,
876    ) -> IotaResult {
877        trace!(digest = ?tx_outputs.transaction.digest(), "writing transaction outputs to cache");
878
879        let TransactionOutputs {
880            transaction,
881            effects,
882            markers,
883            written,
884            deleted,
885            wrapped,
886            events,
887            ..
888        } = &*tx_outputs;
889
890        // Deletions and wraps must be written first. The reason is that one of the
891        // deletes may be a child object, and if we write the parent object
892        // first, a reader may or may not see the previous version of the child
893        // object, instead of the deleted/wrapped tombstone, which would cause
894        // an execution fork
895        for ObjectKey(id, version) in deleted.iter() {
896            self.write_object_entry(id, *version, ObjectEntry::Deleted);
897        }
898
899        for ObjectKey(id, version) in wrapped.iter() {
900            self.write_object_entry(id, *version, ObjectEntry::Wrapped);
901        }
902
903        // Update all markers
904        for (object_key, marker_value) in markers.iter() {
905            self.write_marker_value(epoch_id, object_key, *marker_value);
906        }
907
908        // Write children before parents to ensure that readers do not observe a parent
909        // object before its most recent children are visible.
910        for (object_id, object) in written.iter() {
911            if object.is_child_object() {
912                self.write_object_entry(object_id, object.version(), object.clone().into());
913            }
914        }
915        for (object_id, object) in written.iter() {
916            if !object.is_child_object() {
917                self.write_object_entry(object_id, object.version(), object.clone().into());
918                if object.is_package() {
919                    debug!("caching package: {:?}", object.compute_object_reference());
920                    self.packages
921                        .insert(*object_id, PackageObject::new(object.clone()));
922                }
923            }
924        }
925
926        let tx_digest = *transaction.digest();
927        let effects_digest = effects.digest();
928
929        self.metrics.record_cache_write("transaction_block");
930        self.dirty
931            .pending_transaction_writes
932            .insert(tx_digest, tx_outputs.clone());
933
934        // insert transaction effects before executed_effects_digests so that there
935        // are never dangling entries in executed_effects_digests
936        self.metrics.record_cache_write("transaction_effects");
937        self.dirty
938            .transaction_effects
939            .insert(effects_digest, effects.clone());
940
941        // note: if events.data.is_empty(), then there are no events for this
942        // transaction. We store it anyway to avoid special cases in
943        // commit_transaction_outputs, and translate an empty events structure
944        // to None when reading.
945        self.metrics.record_cache_write("transaction_events");
946        self.dirty
947            .transaction_events
948            .insert(tx_digest, events.clone());
949
950        self.metrics.record_cache_write("executed_effects_digests");
951        self.dirty
952            .executed_effects_digests
953            .insert(tx_digest, effects_digest);
954
955        self.executed_effects_digests_notify_read
956            .notify(&tx_digest, &effects_digest);
957
958        self.metrics
959            .pending_notify_read
960            .set(self.executed_effects_digests_notify_read.num_pending() as i64);
961
962        let prev = self
963            .dirty
964            .total_transaction_inserts
965            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
966
967        let pending_count = (prev + 1).saturating_sub(
968            self.dirty
969                .total_transaction_commits
970                .load(std::sync::atomic::Ordering::Relaxed),
971        );
972
973        self.set_backpressure(pending_count);
974
975        Ok(())
976    }
977
978    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
979        let _metrics_guard = iota_metrics::monitored_scope("WritebackCache::build_db_batch");
980        let mut all_outputs = Vec::with_capacity(digests.len());
981        for tx in digests {
982            let Some(outputs) = self
983                .dirty
984                .pending_transaction_writes
985                .get(tx)
986                .map(|o| o.clone())
987            else {
988                // This can happen in the following rare case:
989                // All transactions in the checkpoint are committed to the db (by
990                // commit_transaction_outputs,
991                // called in CheckpointExecutor::process_executed_transactions), but the process
992                // crashes before the checkpoint water mark is bumped. We will
993                // then re-commit the checkpoint at startup, despite that all
994                // transactions are already executed.
995                warn!("Attempt to commit unknown transaction {:?}", tx);
996                continue;
997            };
998            all_outputs.push(outputs);
999        }
1000
1001        let batch = self
1002            .store
1003            .build_db_batch(epoch, &all_outputs)
1004            .expect("db error");
1005        (all_outputs, batch)
1006    }
1007
1008    // Commits dirty data for the given TransactionDigest to the db.
1009    #[instrument(level = "debug", skip_all)]
1010    fn commit_transaction_outputs(
1011        &self,
1012        epoch: EpochId,
1013        (all_outputs, db_batch): Batch,
1014        digests: &[TransactionDigest],
1015    ) -> IotaResult {
1016        let _metrics_guard =
1017            iota_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1018        fail_point!("writeback-cache-commit");
1019        trace!(?digests);
1020
1021        // Flush writes to disk before removing anything from dirty set. otherwise,
1022        // a cache eviction could cause a value to disappear briefly, even if we insert
1023        // to the cache before removing from the dirty set.
1024        db_batch.write().expect("db error");
1025
1026        let _metrics_guard =
1027            iota_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1028        for outputs in all_outputs.iter() {
1029            let tx_digest = outputs.transaction.digest();
1030            assert!(
1031                self.dirty
1032                    .pending_transaction_writes
1033                    .remove(tx_digest)
1034                    .is_some()
1035            );
1036            self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
1037        }
1038
1039        let num_outputs = all_outputs.len() as u64;
1040        let num_commits = self
1041            .dirty
1042            .total_transaction_commits
1043            .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1044            + num_outputs;
1045
1046        let pending_count = self
1047            .dirty
1048            .total_transaction_inserts
1049            .load(std::sync::atomic::Ordering::Relaxed)
1050            .saturating_sub(num_commits);
1051
1052        self.set_backpressure(pending_count);
1053
1054        Ok(())
1055    }
1056
1057    fn approximate_pending_transaction_count(&self) -> u64 {
1058        let num_commits = self
1059            .dirty
1060            .total_transaction_commits
1061            .load(std::sync::atomic::Ordering::Relaxed);
1062
1063        self.dirty
1064            .total_transaction_inserts
1065            .load(std::sync::atomic::Ordering::Relaxed)
1066            .saturating_sub(num_commits)
1067    }
1068
1069    fn set_backpressure(&self, pending_count: u64) {
1070        let backpressure = pending_count > self.backpressure_threshold;
1071        let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1072        if backpressure_changed {
1073            self.metrics.backpressure_toggles.inc();
1074        }
1075        self.metrics
1076            .backpressure_status
1077            .set(if backpressure { 1 } else { 0 });
1078    }
1079
1080    fn flush_transactions_from_dirty_to_cached(
1081        &self,
1082        epoch: EpochId,
1083        tx_digest: TransactionDigest,
1084        outputs: &TransactionOutputs,
1085    ) {
1086        // Now, remove each piece of committed data from the dirty state and insert it
1087        // into the cache. TODO: outputs should have a strong count of 1 so we
1088        // should be able to move out of it
1089        let TransactionOutputs {
1090            transaction,
1091            effects,
1092            markers,
1093            written,
1094            deleted,
1095            wrapped,
1096            events,
1097            ..
1098        } = outputs;
1099
1100        let effects_digest = effects.digest();
1101
1102        // Update cache before removing from self.dirty to avoid
1103        // unnecessary cache misses
1104        self.cached
1105            .transactions
1106            .insert(
1107                &tx_digest,
1108                PointCacheItem::Some(transaction.clone()),
1109                Ticket::Write,
1110            )
1111            .ok();
1112        self.cached
1113            .transaction_effects
1114            .insert(
1115                &effects_digest,
1116                PointCacheItem::Some(effects.clone().into()),
1117                Ticket::Write,
1118            )
1119            .ok();
1120        self.cached
1121            .executed_effects_digests
1122            .insert(
1123                &tx_digest,
1124                PointCacheItem::Some(effects_digest),
1125                Ticket::Write,
1126            )
1127            .ok();
1128        self.cached
1129            .transaction_events
1130            .insert(
1131                &tx_digest,
1132                PointCacheItem::Some(events.clone().into()),
1133                Ticket::Write,
1134            )
1135            .ok();
1136
1137        self.dirty
1138            .transaction_effects
1139            .remove(&effects_digest)
1140            .expect("effects must exist");
1141
1142        self.dirty
1143            .transaction_events
1144            .remove(&tx_digest)
1145            .expect("events must exist");
1146
1147        self.dirty
1148            .executed_effects_digests
1149            .remove(&tx_digest)
1150            .expect("executed effects must exist");
1151
1152        // Move dirty markers to cache
1153        for (object_key, marker_value) in markers.iter() {
1154            Self::move_version_from_dirty_to_cache(
1155                &self.dirty.markers,
1156                &self.cached.marker_cache,
1157                (epoch, object_key.0),
1158                object_key.1,
1159                marker_value,
1160            );
1161        }
1162
1163        for (object_id, object) in written.iter() {
1164            Self::move_version_from_dirty_to_cache(
1165                &self.dirty.objects,
1166                &self.cached.object_cache,
1167                *object_id,
1168                object.version(),
1169                &ObjectEntry::Object(object.clone()),
1170            );
1171        }
1172
1173        for ObjectKey(object_id, version) in deleted.iter() {
1174            Self::move_version_from_dirty_to_cache(
1175                &self.dirty.objects,
1176                &self.cached.object_cache,
1177                *object_id,
1178                *version,
1179                &ObjectEntry::Deleted,
1180            );
1181        }
1182
1183        for ObjectKey(object_id, version) in wrapped.iter() {
1184            Self::move_version_from_dirty_to_cache(
1185                &self.dirty.objects,
1186                &self.cached.object_cache,
1187                *object_id,
1188                *version,
1189                &ObjectEntry::Wrapped,
1190            );
1191        }
1192    }
1193
1194    // Move the oldest/least entry from the dirty queue to the cache queue.
1195    // This is called after the entry is committed to the db.
1196    fn move_version_from_dirty_to_cache<K, V>(
1197        dirty: &DashMap<K, CachedVersionMap<V>>,
1198        cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1199        key: K,
1200        version: SequenceNumber,
1201        value: &V,
1202    ) where
1203        K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1204        V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1205    {
1206        static MAX_VERSIONS: usize = 3;
1207
1208        // IMPORTANT: lock both the dirty set entry and the cache entry before modifying
1209        // either. this ensures that readers cannot see a value temporarily
1210        // disappear.
1211        let dirty_entry = dirty.entry(key);
1212        let cache_entry = cache.entry(key).or_default();
1213        let mut cache_map = cache_entry.value().lock();
1214
1215        // insert into cache and drop old versions.
1216        cache_map.insert(version, value.clone());
1217        // TODO: make this automatic by giving CachedVersionMap an optional max capacity
1218        cache_map.truncate_to(MAX_VERSIONS);
1219
1220        let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1221            panic!("dirty map must exist");
1222        };
1223
1224        let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1225
1226        assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1227
1228        // if there are no versions remaining, remove the map entry
1229        if occupied_dirty_entry.get().is_empty() {
1230            occupied_dirty_entry.remove();
1231        }
1232    }
1233
1234    // Updates the latest object id cache with an entry that was read from the db.
1235    fn cache_latest_object_by_id(
1236        &self,
1237        object_id: &ObjectID,
1238        object: LatestObjectCacheEntry,
1239        ticket: Ticket,
1240    ) {
1241        trace!("caching object by id: {:?} {:?}", object_id, object);
1242        if self
1243            .cached
1244            .object_by_id_cache
1245            .insert(object_id, object, ticket)
1246            .is_ok()
1247        {
1248            self.metrics.record_cache_write("object_by_id");
1249        } else {
1250            trace!("discarded cache write due to expired ticket");
1251            self.metrics.record_ticket_expiry();
1252        }
1253    }
1254
1255    fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1256        self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1257    }
1258
1259    fn clear_state_end_of_epoch_impl(&self, _execution_guard: &ExecutionLockWriteGuard<'_>) {
1260        info!("clearing state at end of epoch");
1261        assert!(
1262            self.dirty.pending_transaction_writes.is_empty(),
1263            "should be empty due to revert_state_update"
1264        );
1265        self.dirty.clear();
1266        info!("clearing old transaction locks");
1267        self.object_locks.clear();
1268    }
1269
1270    fn revert_state_update_impl(&self, tx: &TransactionDigest) -> IotaResult {
1271        // TODO: remove revert_state_update_impl entirely, and simply drop all dirty
1272        // state when clear_state_end_of_epoch_impl is called.
1273        // Further, once we do this, we can delay the insertion of the transaction into
1274        // pending_consensus_transactions until after the transaction has executed.
1275        let Some((_, outputs)) = self.dirty.pending_transaction_writes.remove(tx) else {
1276            assert!(
1277                !self.try_is_tx_already_executed(tx)?,
1278                "attempt to revert committed transaction"
1279            );
1280
1281            // A transaction can be inserted into pending_consensus_transactions, but then
1282            // reconfiguration can happen before the transaction executes.
1283            info!("Not reverting {:?} as it was not executed", tx);
1284            return Ok(());
1285        };
1286
1287        for (object_id, object) in outputs.written.iter() {
1288            if object.is_package() {
1289                info!("removing non-finalized package from cache: {:?}", object_id);
1290                self.packages.invalidate(object_id);
1291            }
1292            self.cached.object_by_id_cache.invalidate(object_id);
1293            self.cached.object_cache.invalidate(object_id);
1294        }
1295
1296        for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1297            self.cached.object_by_id_cache.invalidate(object_id);
1298            self.cached.object_cache.invalidate(object_id);
1299        }
1300
1301        // Note: individual object entries are removed when
1302        // clear_state_end_of_epoch_impl is called
1303        Ok(())
1304    }
1305
1306    fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) -> IotaResult {
1307        self.store.bulk_insert_genesis_objects(objects)?;
1308        for obj in objects {
1309            self.cached.object_cache.invalidate(&obj.id());
1310            self.cached.object_by_id_cache.invalidate(&obj.id());
1311        }
1312        Ok(())
1313    }
1314
1315    fn insert_genesis_object_impl(&self, object: Object) -> IotaResult {
1316        self.cached.object_by_id_cache.invalidate(&object.id());
1317        self.cached.object_cache.invalidate(&object.id());
1318        self.store.insert_genesis_object(object)
1319    }
1320
1321    pub fn clear_caches_and_assert_empty(&self) {
1322        info!("clearing caches");
1323        self.cached.clear_and_assert_empty();
1324        self.packages.invalidate_all();
1325        assert_empty(&self.packages);
1326    }
1327}
1328
1329impl ExecutionCacheAPI for WritebackCache {}
1330
1331impl ExecutionCacheCommit for WritebackCache {
1332    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1333        self.build_db_batch(epoch, digests)
1334    }
1335
1336    fn try_commit_transaction_outputs(
1337        &self,
1338        epoch: EpochId,
1339        batch: Batch,
1340        digests: &[TransactionDigest],
1341    ) -> IotaResult {
1342        WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1343    }
1344
1345    fn try_persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> IotaResult {
1346        self.store.persist_transaction(tx)
1347    }
1348
1349    fn approximate_pending_transaction_count(&self) -> u64 {
1350        WritebackCache::approximate_pending_transaction_count(self)
1351    }
1352}
1353
1354impl ObjectCacheRead for WritebackCache {
1355    #[instrument(level="trace", skip_all, fields(package_id=?package_id))]
1356    fn try_get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1357        self.metrics
1358            .record_cache_request("package", "package_cache");
1359        if let Some(p) = self.packages.get(package_id) {
1360            if cfg!(debug_assertions) {
1361                let canonical_package = self
1362                    .dirty
1363                    .objects
1364                    .get(package_id)
1365                    .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1366                        Some(ObjectEntry::Object(object)) => Some(object),
1367                        _ => None,
1368                    })
1369                    .or_else(|| self.store.get_object(package_id));
1370
1371                if let Some(canonical_package) = canonical_package {
1372                    assert_eq!(
1373                        canonical_package.digest(),
1374                        p.object().digest(),
1375                        "Package object cache is inconsistent for package {package_id:?}"
1376                    );
1377                }
1378            }
1379            self.metrics.record_cache_hit("package", "package_cache");
1380            return Ok(Some(p));
1381        } else {
1382            self.metrics.record_cache_miss("package", "package_cache");
1383        }
1384
1385        // We try the dirty objects cache as well before going to the database. This is
1386        // necessary because the package could be evicted from the package cache
1387        // before it is committed to the database.
1388        if let Some(p) = self.get_object_impl("package", package_id)? {
1389            if p.is_package() {
1390                let p = PackageObject::new(p);
1391                tracing::trace!(
1392                    "caching package: {:?}",
1393                    p.object().compute_object_reference()
1394                );
1395                self.metrics.record_cache_write("package");
1396                self.packages.insert(*package_id, p.clone());
1397                Ok(Some(p))
1398            } else {
1399                Err(IotaError::UserInput {
1400                    error: UserInputError::MoveObjectAsPackage {
1401                        object_id: *package_id,
1402                    },
1403                })
1404            }
1405        } else {
1406            Ok(None)
1407        }
1408    }
1409
1410    fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1411        // This is a no-op because all writes go through the cache, therefore it
1412        // can never be incoherent
1413    }
1414
1415    // get_object and variants.
1416
1417    #[instrument(level = "trace", skip_all, fields(object_id=?id))]
1418    fn try_get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
1419        self.get_object_impl("object_latest", id)
1420    }
1421
1422    #[instrument(level = "trace", skip_all, fields(object_id, version))]
1423    fn try_get_object_by_key(
1424        &self,
1425        object_id: &ObjectID,
1426        version: SequenceNumber,
1427    ) -> IotaResult<Option<Object>> {
1428        match self.get_object_by_key_cache_only(object_id, version) {
1429            CacheResult::Hit(object) => Ok(Some(object)),
1430            CacheResult::NegativeHit => Ok(None),
1431            CacheResult::Miss => Ok(self
1432                .record_db_get("object_by_version")
1433                .try_get_object_by_key(object_id, version)?),
1434        }
1435    }
1436
1437    #[instrument(level = "trace", skip_all)]
1438    fn try_multi_get_objects_by_key(
1439        &self,
1440        object_keys: &[ObjectKey],
1441    ) -> Result<Vec<Option<Object>>, IotaError> {
1442        try_do_fallback_lookup(
1443            object_keys,
1444            |key| {
1445                Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1446                    CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1447                    CacheResult::NegativeHit => CacheResult::NegativeHit,
1448                    CacheResult::Miss => CacheResult::Miss,
1449                })
1450            },
1451            |remaining| {
1452                self.record_db_multi_get("object_by_version", remaining.len())
1453                    .multi_get_objects_by_key(remaining)
1454            },
1455        )
1456    }
1457
1458    #[instrument(level = "trace", skip_all, fields(object_id, version))]
1459    fn try_object_exists_by_key(
1460        &self,
1461        object_id: &ObjectID,
1462        version: SequenceNumber,
1463    ) -> IotaResult<bool> {
1464        match self.get_object_by_key_cache_only(object_id, version) {
1465            CacheResult::Hit(_) => Ok(true),
1466            CacheResult::NegativeHit => Ok(false),
1467            CacheResult::Miss => self
1468                .record_db_get("object_by_version")
1469                .object_exists_by_key(object_id, version),
1470        }
1471    }
1472
1473    #[instrument(level = "trace", skip_all)]
1474    fn try_multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
1475        try_do_fallback_lookup(
1476            object_keys,
1477            |key| {
1478                Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1479                    CacheResult::Hit(_) => CacheResult::Hit(true),
1480                    CacheResult::NegativeHit => CacheResult::Hit(false),
1481                    CacheResult::Miss => CacheResult::Miss,
1482                })
1483            },
1484            |remaining| {
1485                self.record_db_multi_get("object_by_version", remaining.len())
1486                    .multi_object_exists_by_key(remaining)
1487            },
1488        )
1489    }
1490
1491    #[instrument(level = "trace", skip_all, fields(object_id))]
1492    fn try_get_latest_object_ref_or_tombstone(
1493        &self,
1494        object_id: ObjectID,
1495    ) -> IotaResult<Option<ObjectRef>> {
1496        match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1497            CacheResult::Hit((version, entry)) => Ok(Some(match entry {
1498                ObjectEntry::Object(object) => object.compute_object_reference(),
1499                ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1500                ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1501            })),
1502            CacheResult::NegativeHit => Ok(None),
1503            CacheResult::Miss => self
1504                .record_db_get("latest_objref_or_tombstone")
1505                .get_latest_object_ref_or_tombstone(object_id),
1506        }
1507    }
1508
1509    #[instrument(level = "trace", skip_all, fields(object_id))]
1510    fn try_get_latest_object_or_tombstone(
1511        &self,
1512        object_id: ObjectID,
1513    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
1514        match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1515            CacheResult::Hit((version, entry)) => {
1516                let key = ObjectKey(object_id, version);
1517                Ok(Some(match entry {
1518                    ObjectEntry::Object(object) => (key, object.into()),
1519                    ObjectEntry::Deleted => (
1520                        key,
1521                        ObjectOrTombstone::Tombstone((
1522                            object_id,
1523                            version,
1524                            ObjectDigest::OBJECT_DIGEST_DELETED,
1525                        )),
1526                    ),
1527                    ObjectEntry::Wrapped => (
1528                        key,
1529                        ObjectOrTombstone::Tombstone((
1530                            object_id,
1531                            version,
1532                            ObjectDigest::OBJECT_DIGEST_WRAPPED,
1533                        )),
1534                    ),
1535                }))
1536            }
1537            CacheResult::NegativeHit => Ok(None),
1538            CacheResult::Miss => self
1539                .record_db_get("latest_object_or_tombstone")
1540                .get_latest_object_or_tombstone(object_id),
1541        }
1542    }
1543
1544    #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1545    fn try_find_object_lt_or_eq_version(
1546        &self,
1547        object_id: ObjectID,
1548        version_bound: SequenceNumber,
1549    ) -> IotaResult<Option<Object>> {
1550        macro_rules! check_cache_entry {
1551            ($level: expr, $objects: expr) => {
1552                self.metrics
1553                    .record_cache_request("object_lt_or_eq_version", $level);
1554                if let Some(objects) = $objects {
1555                    if let Some((_, object)) = objects
1556                        .all_versions_lt_or_eq_descending(&version_bound)
1557                        .next()
1558                    {
1559                        if let ObjectEntry::Object(object) = object {
1560                            self.metrics
1561                                .record_cache_hit("object_lt_or_eq_version", $level);
1562                            return Ok(Some(object.clone()));
1563                        } else {
1564                            // if we find a tombstone, the object does not exist
1565                            self.metrics
1566                                .record_cache_negative_hit("object_lt_or_eq_version", $level);
1567                            return Ok(None);
1568                        }
1569                    } else {
1570                        self.metrics
1571                            .record_cache_miss("object_lt_or_eq_version", $level);
1572                    }
1573                }
1574            };
1575        }
1576
1577        // if we have the latest version cached, and it is within the bound, we are done
1578        self.metrics
1579            .record_cache_request("object_lt_or_eq_version", "object_by_id");
1580        let latest_cache_entry = self.cached.object_by_id_cache.get(&object_id);
1581        if let Some(latest) = &latest_cache_entry {
1582            let latest = latest.lock();
1583            match &*latest {
1584                LatestObjectCacheEntry::Object(latest_version, object) => {
1585                    if *latest_version <= version_bound {
1586                        if let ObjectEntry::Object(object) = object {
1587                            self.metrics
1588                                .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1589                            return Ok(Some(object.clone()));
1590                        } else {
1591                            // object is a tombstone, but is still within the version bound
1592                            self.metrics.record_cache_negative_hit(
1593                                "object_lt_or_eq_version",
1594                                "object_by_id",
1595                            );
1596                            return Ok(None);
1597                        }
1598                    }
1599                    // latest object is not within the version bound. fall
1600                    // through.
1601                }
1602                // No object by this ID exists at all
1603                LatestObjectCacheEntry::NonExistent => {
1604                    self.metrics
1605                        .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1606                    return Ok(None);
1607                }
1608            }
1609        }
1610        self.metrics
1611            .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1612
1613        Self::with_locked_cache_entries(
1614            &self.dirty.objects,
1615            &self.cached.object_cache,
1616            &object_id,
1617            |dirty_entry, cached_entry| {
1618                check_cache_entry!("committed", dirty_entry);
1619                check_cache_entry!("uncommitted", cached_entry);
1620
1621                // Much of the time, the query will be for the very latest object version, so
1622                // try that first. But we have to be careful:
1623                // 1. We must load the tombstone if it is present, because its version may
1624                //    exceed the version_bound, in which case we must do a scan.
1625                // 2. You might think we could just call
1626                //    `self.store.get_latest_object_or_tombstone` here. But we cannot, because
1627                //    there may be a more recent version in the dirty set, which we skipped over
1628                //    in check_cache_entry! because of the version bound. However, if we skipped
1629                //    it above, we will skip it here as well, again due to the version bound.
1630                // 3. Despite that, we really want to warm the cache here. Why? Because if the
1631                //    object is cold (not being written to), then we will very soon be able to
1632                //    start serving reads of it from the object_by_id cache, IF we can warm the
1633                //    cache. If we don't warm the cache here, and no writes to the object occur,
1634                //    then we will always have to go to the db for the object.
1635                //
1636                // Lastly, it is important to understand the rationale for all this: If the
1637                // object is write-hot, we will serve almost all reads to it
1638                // from the dirty set (or possibly the cached set if it is only
1639                // written to once every few checkpoints). If the object is
1640                // write-cold (or non-existent) and read-hot, then we will serve almost all
1641                // reads to it from the object_by_id cache check above.  Most of
1642                // the apparently wasteful code here exists only to ensure
1643                // correctness in all the edge cases.
1644                let latest: Option<(SequenceNumber, ObjectEntry)> =
1645                    if let Some(dirty_set) = dirty_entry {
1646                        dirty_set
1647                            .get_highest()
1648                            .cloned()
1649                            .tap_none(|| panic!("dirty set cannot be empty"))
1650                    } else {
1651                        // TODO: we should try not to read from the db while holding the locks.
1652                        self.record_db_get("object_lt_or_eq_version_latest")
1653                            .get_latest_object_or_tombstone(object_id)?
1654                            .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1655                                (version, ObjectEntry::from(obj_or_tombstone))
1656                            })
1657                    };
1658
1659                if let Some((obj_version, obj_entry)) = latest {
1660                    // we can always cache the latest object (or tombstone), even if it is not
1661                    // within the version_bound. This is done in order to warm
1662                    // the cache in the case where a sequence of transactions
1663                    // all read the same child object without writing to it.
1664
1665                    // Note: no need to call with_object_by_id_cache_update here, because we are
1666                    // holding the lock on the dirty cache entry, and `latest`
1667                    // cannot become out-of-date while we hold that lock.
1668                    self.cache_latest_object_by_id(
1669                        &object_id,
1670                        LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1671                        // We can get a ticket at the last second, because we are holding the lock
1672                        // on dirty, so there cannot be any concurrent writes.
1673                        self.cached
1674                            .object_by_id_cache
1675                            .get_ticket_for_read(&object_id),
1676                    );
1677
1678                    if obj_version <= version_bound {
1679                        match obj_entry {
1680                            ObjectEntry::Object(object) => Ok(Some(object)),
1681                            ObjectEntry::Deleted | ObjectEntry::Wrapped => Ok(None),
1682                        }
1683                    } else {
1684                        // The latest object exceeded the bound, so now we have to do a scan
1685                        // But we already know there is no dirty entry within the bound,
1686                        // so we go to the db.
1687                        self.record_db_get("object_lt_or_eq_version_scan")
1688                            .find_object_lt_or_eq_version(object_id, version_bound)
1689                    }
1690
1691                // no object found in dirty set or db, object does not exist
1692                // When this is called from a read api (i.e. not the execution
1693                // path) it is possible that the object has been
1694                // deleted and pruned. In this case, there would
1695                // be no entry at all on disk, but we may have a tombstone in
1696                // the cache
1697                } else if let Some(latest_cache_entry) = latest_cache_entry {
1698                    // If there is a latest cache entry, it had better not be a live object!
1699                    assert!(!latest_cache_entry.lock().is_alive());
1700                    Ok(None)
1701                } else {
1702                    // If there is no latest cache entry, we can insert one.
1703                    let highest = cached_entry.and_then(|c| c.get_highest());
1704                    assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1705                    self.cache_object_not_found(
1706                        &object_id,
1707                        // okay to get ticket at last second - see above
1708                        self.cached
1709                            .object_by_id_cache
1710                            .get_ticket_for_read(&object_id),
1711                    );
1712                    Ok(None)
1713                }
1714            },
1715        )
1716    }
1717
1718    fn try_get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
1719        get_iota_system_state(self)
1720    }
1721
1722    fn try_get_marker_value(
1723        &self,
1724        object_id: &ObjectID,
1725        version: SequenceNumber,
1726        epoch_id: EpochId,
1727    ) -> IotaResult<Option<MarkerValue>> {
1728        match self.get_marker_value_cache_only(object_id, version, epoch_id) {
1729            CacheResult::Hit(marker) => Ok(Some(marker)),
1730            CacheResult::NegativeHit => Ok(None),
1731            CacheResult::Miss => self
1732                .record_db_get("marker_by_version")
1733                .get_marker_value(object_id, &version, epoch_id),
1734        }
1735    }
1736
1737    fn try_get_latest_marker(
1738        &self,
1739        object_id: &ObjectID,
1740        epoch_id: EpochId,
1741    ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
1742        match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1743            CacheResult::Hit((v, marker)) => Ok(Some((v, marker))),
1744            CacheResult::NegativeHit => {
1745                panic!("cannot have negative hit when getting latest marker")
1746            }
1747            CacheResult::Miss => self
1748                .record_db_get("marker_latest")
1749                .get_latest_marker(object_id, epoch_id),
1750        }
1751    }
1752
1753    fn try_get_lock(
1754        &self,
1755        obj_ref: ObjectRef,
1756        epoch_store: &AuthorityPerEpochStore,
1757    ) -> IotaLockResult {
1758        match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1759            CacheResult::Hit((_, obj)) => {
1760                let actual_objref = obj.compute_object_reference();
1761                if obj_ref != actual_objref {
1762                    Ok(ObjectLockStatus::LockedAtDifferentVersion {
1763                        locked_ref: actual_objref,
1764                    })
1765                } else {
1766                    // requested object ref is live, check if there is a lock
1767                    Ok(
1768                        match self
1769                            .object_locks
1770                            .get_transaction_lock(&obj_ref, epoch_store)?
1771                        {
1772                            Some(tx_digest) => ObjectLockStatus::LockedToTx {
1773                                locked_by_tx: tx_digest,
1774                            },
1775                            None => ObjectLockStatus::Initialized,
1776                        },
1777                    )
1778                }
1779            }
1780            CacheResult::NegativeHit => {
1781                Err(IotaError::from(UserInputError::ObjectNotFound {
1782                    object_id: obj_ref.0,
1783                    // even though we know the requested version, we leave it as None to indicate
1784                    // that the object does not exist at any version
1785                    version: None,
1786                }))
1787            }
1788            CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1789        }
1790    }
1791
1792    fn _try_get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
1793        let obj = self.get_object_impl("live_objref", &object_id)?.ok_or(
1794            UserInputError::ObjectNotFound {
1795                object_id,
1796                version: None,
1797            },
1798        )?;
1799        Ok(obj.compute_object_reference())
1800    }
1801
1802    fn try_check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1803        try_do_fallback_lookup(
1804            owned_object_refs,
1805            |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1806                CacheResult::Hit((version, obj)) => {
1807                    if obj.compute_object_reference() != *obj_ref {
1808                        Err(UserInputError::ObjectVersionUnavailableForConsumption {
1809                            provided_obj_ref: *obj_ref,
1810                            current_version: version,
1811                        }
1812                        .into())
1813                    } else {
1814                        Ok(CacheResult::Hit(()))
1815                    }
1816                }
1817                CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1818                    object_id: obj_ref.0,
1819                    version: None,
1820                }
1821                .into()),
1822                CacheResult::Miss => Ok(CacheResult::Miss),
1823            },
1824            |remaining| {
1825                self.record_db_multi_get("object_is_live", remaining.len())
1826                    .check_owned_objects_are_live(remaining)?;
1827                Ok(vec![(); remaining.len()])
1828            },
1829        )?;
1830        Ok(())
1831    }
1832
1833    fn try_get_highest_pruned_checkpoint(&self) -> IotaResult<Option<CheckpointSequenceNumber>> {
1834        self.store
1835            .perpetual_tables
1836            .get_highest_pruned_checkpoint()
1837            .map_err(IotaError::from)
1838    }
1839
1840    fn notify_read_input_objects<'a>(
1841        &'a self,
1842        input_and_receiving_keys: &'a [InputKey],
1843        receiving_keys: &'a HashSet<InputKey>,
1844        epoch: &'a EpochId,
1845    ) -> BoxFuture<'a, Vec<()>> {
1846        super::notify_read_input_objects_impl(
1847            &self.object_notify_read,
1848            self,
1849            input_and_receiving_keys,
1850            receiving_keys,
1851            epoch,
1852        )
1853    }
1854}
1855
1856impl TransactionCacheRead for WritebackCache {
1857    #[instrument(level = "trace", skip_all)]
1858    fn try_multi_get_transaction_blocks(
1859        &self,
1860        digests: &[TransactionDigest],
1861    ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
1862        let digests_and_tickets: Vec<_> = digests
1863            .iter()
1864            .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1865            .collect();
1866        try_do_fallback_lookup(
1867            &digests_and_tickets,
1868            |(digest, _)| {
1869                self.metrics
1870                    .record_cache_request("transaction_block", "uncommitted");
1871                if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1872                    self.metrics
1873                        .record_cache_hit("transaction_block", "uncommitted");
1874                    return Ok(CacheResult::Hit(Some(tx.transaction.clone())));
1875                }
1876                self.metrics
1877                    .record_cache_miss("transaction_block", "uncommitted");
1878
1879                self.metrics
1880                    .record_cache_request("transaction_block", "committed");
1881                match self
1882                    .cached
1883                    .transactions
1884                    .get(digest)
1885                    .map(|l| l.lock().clone())
1886                {
1887                    Some(PointCacheItem::Some(tx)) => {
1888                        self.metrics
1889                            .record_cache_hit("transaction_block", "committed");
1890                        Ok(CacheResult::Hit(Some(tx)))
1891                    }
1892                    Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1893                    None => {
1894                        self.metrics
1895                            .record_cache_miss("transaction_block", "committed");
1896
1897                        Ok(CacheResult::Miss)
1898                    }
1899                }
1900            },
1901            |remaining| {
1902                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1903                let results: Vec<_> = self
1904                    .record_db_multi_get("transaction_block", remaining.len())
1905                    .multi_get_transaction_blocks(&remaining_digests)
1906                    .map(|v| v.into_iter().map(|o| o.map(Arc::new)).collect())?;
1907                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1908                    if result.is_none() {
1909                        self.cached.transactions.insert(digest, None, *ticket).ok();
1910                    }
1911                }
1912                Ok(results)
1913            },
1914        )
1915    }
1916
1917    #[instrument(level = "trace", skip_all)]
1918    fn try_multi_get_executed_effects_digests(
1919        &self,
1920        digests: &[TransactionDigest],
1921    ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
1922        let digests_and_tickets: Vec<_> = digests
1923            .iter()
1924            .map(|d| {
1925                (
1926                    *d,
1927                    self.cached.executed_effects_digests.get_ticket_for_read(d),
1928                )
1929            })
1930            .collect();
1931        try_do_fallback_lookup(
1932            &digests_and_tickets,
1933            |(digest, _)| {
1934                self.metrics
1935                    .record_cache_request("executed_effects_digests", "uncommitted");
1936                if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
1937                    self.metrics
1938                        .record_cache_hit("executed_effects_digests", "uncommitted");
1939                    return Ok(CacheResult::Hit(Some(*digest)));
1940                }
1941                self.metrics
1942                    .record_cache_miss("executed_effects_digests", "uncommitted");
1943
1944                self.metrics
1945                    .record_cache_request("executed_effects_digests", "committed");
1946                match self
1947                    .cached
1948                    .executed_effects_digests
1949                    .get(digest)
1950                    .map(|l| *l.lock())
1951                {
1952                    Some(PointCacheItem::Some(digest)) => {
1953                        self.metrics
1954                            .record_cache_hit("executed_effects_digests", "committed");
1955                        Ok(CacheResult::Hit(Some(digest)))
1956                    }
1957                    Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1958                    None => {
1959                        self.metrics
1960                            .record_cache_miss("executed_effects_digests", "committed");
1961                        Ok(CacheResult::Miss)
1962                    }
1963                }
1964            },
1965            |remaining| {
1966                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1967                let results = self
1968                    .record_db_multi_get("executed_effects_digests", remaining.len())
1969                    .multi_get_executed_effects_digests(&remaining_digests)?;
1970                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1971                    if result.is_none() {
1972                        self.cached
1973                            .executed_effects_digests
1974                            .insert(digest, None, *ticket)
1975                            .ok();
1976                    }
1977                }
1978                Ok(results)
1979            },
1980        )
1981    }
1982
1983    #[instrument(level = "trace", skip_all)]
1984    fn try_multi_get_effects(
1985        &self,
1986        digests: &[TransactionEffectsDigest],
1987    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
1988        let digests_and_tickets: Vec<_> = digests
1989            .iter()
1990            .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
1991            .collect();
1992        try_do_fallback_lookup(
1993            &digests_and_tickets,
1994            |(digest, _)| {
1995                self.metrics
1996                    .record_cache_request("transaction_effects", "uncommitted");
1997                if let Some(effects) = self.dirty.transaction_effects.get(digest) {
1998                    self.metrics
1999                        .record_cache_hit("transaction_effects", "uncommitted");
2000                    return Ok(CacheResult::Hit(Some(effects.clone())));
2001                }
2002                self.metrics
2003                    .record_cache_miss("transaction_effects", "uncommitted");
2004
2005                self.metrics
2006                    .record_cache_request("transaction_effects", "committed");
2007                match self
2008                    .cached
2009                    .transaction_effects
2010                    .get(digest)
2011                    .map(|l| l.lock().clone())
2012                {
2013                    Some(PointCacheItem::Some(effects)) => {
2014                        self.metrics
2015                            .record_cache_hit("transaction_effects", "committed");
2016                        Ok(CacheResult::Hit(Some((*effects).clone())))
2017                    }
2018                    Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
2019                    None => {
2020                        self.metrics
2021                            .record_cache_miss("transaction_effects", "committed");
2022                        Ok(CacheResult::Miss)
2023                    }
2024                }
2025            },
2026            |remaining| {
2027                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2028                let results = self
2029                    .record_db_multi_get("transaction_effects", remaining.len())
2030                    .multi_get_effects(remaining_digests.iter())
2031                    .expect("db error");
2032                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2033                    if result.is_none() {
2034                        self.cached
2035                            .transaction_effects
2036                            .insert(digest, None, *ticket)
2037                            .ok();
2038                    }
2039                }
2040                Ok(results)
2041            },
2042        )
2043    }
2044
2045    #[instrument(level = "trace", skip_all)]
2046    fn try_notify_read_executed_effects_digests<'a>(
2047        &'a self,
2048        digests: &'a [TransactionDigest],
2049    ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
2050        self.executed_effects_digests_notify_read
2051            .read(digests, |digests| {
2052                self.try_multi_get_executed_effects_digests(digests)
2053            })
2054            .boxed()
2055    }
2056
2057    #[instrument(level = "trace", skip_all)]
2058    fn try_multi_get_events(
2059        &self,
2060        tx_digests: &[TransactionDigest],
2061    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
2062        fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2063            if events.data.is_empty() {
2064                None
2065            } else {
2066                Some(events)
2067            }
2068        }
2069
2070        let digests_and_tickets: Vec<_> = tx_digests
2071            .iter()
2072            .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2073            .collect();
2074        try_do_fallback_lookup(
2075            &digests_and_tickets,
2076            |(digest, _)| {
2077                self.metrics
2078                    .record_cache_request("transaction_events", "uncommitted");
2079                if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2080                    self.metrics
2081                        .record_cache_hit("transaction_events", "uncommitted");
2082
2083                    return Ok(CacheResult::Hit(map_events(events)));
2084                }
2085                self.metrics
2086                    .record_cache_miss("transaction_events", "uncommitted");
2087
2088                self.metrics
2089                    .record_cache_request("transaction_events", "committed");
2090                match self
2091                    .cached
2092                    .transaction_events
2093                    .get(digest)
2094                    .map(|l| l.lock().clone())
2095                {
2096                    Some(PointCacheItem::Some(events)) => {
2097                        self.metrics
2098                            .record_cache_hit("transaction_events", "committed");
2099                        Ok(CacheResult::Hit(map_events((*events).clone())))
2100                    }
2101                    Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
2102                    None => {
2103                        self.metrics
2104                            .record_cache_miss("transaction_events", "committed");
2105
2106                        Ok(CacheResult::Miss)
2107                    }
2108                }
2109            },
2110            |remaining| {
2111                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2112                let results = self
2113                    .store
2114                    .multi_get_events(&remaining_digests)
2115                    .expect("db error");
2116                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2117                    if result.is_none() {
2118                        self.cached
2119                            .transaction_events
2120                            .insert(digest, None, *ticket)
2121                            .ok();
2122                    }
2123                }
2124                Ok(results)
2125            },
2126        )
2127    }
2128}
2129
2130impl ExecutionCacheWrite for WritebackCache {
2131    #[instrument(level = "debug", skip_all)]
2132    fn try_acquire_transaction_locks(
2133        &self,
2134        epoch_store: &AuthorityPerEpochStore,
2135        owned_input_objects: &[ObjectRef],
2136        transaction: VerifiedSignedTransaction,
2137    ) -> IotaResult {
2138        self.object_locks.acquire_transaction_locks(
2139            self,
2140            epoch_store,
2141            owned_input_objects,
2142            transaction,
2143        )
2144    }
2145
2146    fn try_write_transaction_outputs(
2147        &self,
2148        epoch_id: EpochId,
2149        tx_outputs: Arc<TransactionOutputs>,
2150    ) -> IotaResult {
2151        WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs)
2152    }
2153}
2154
2155implement_passthrough_traits!(WritebackCache);
2156
2157impl GlobalStateHashStore for WritebackCache {
2158    fn get_root_state_hash_for_epoch(
2159        &self,
2160        epoch: EpochId,
2161    ) -> IotaResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2162        self.store.get_root_state_hash_for_epoch(epoch)
2163    }
2164
2165    fn get_root_state_hash_for_highest_epoch(
2166        &self,
2167    ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2168        self.store.get_root_state_hash_for_highest_epoch()
2169    }
2170
2171    fn insert_state_hash_for_epoch(
2172        &self,
2173        epoch: EpochId,
2174        checkpoint_seq_num: &CheckpointSequenceNumber,
2175        acc: &GlobalStateHash,
2176    ) -> IotaResult {
2177        self.store
2178            .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2179    }
2180
2181    fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2182        // The only time it is safe to iterate the live object set is at an epoch
2183        // boundary, at which point the db is consistent and the dirty cache is
2184        // empty. So this does read the cache
2185        assert!(
2186            self.dirty.is_empty(),
2187            "cannot iterate live object set with dirty data"
2188        );
2189        self.store.iter_live_object_set()
2190    }
2191
2192    // A version of iter_live_object_set that reads the cache. Only use for testing.
2193    // If used on a live validator, can cause the server to block for as long as
2194    // it takes to iterate the entire live object set.
2195    fn iter_cached_live_object_set_for_testing(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2196        // hold iter until we are finished to prevent any concurrent inserts/deletes
2197        let iter = self.dirty.objects.iter();
2198        let mut dirty_objects = BTreeMap::new();
2199
2200        // add everything from the store
2201        for obj in self.store.iter_live_object_set() {
2202            dirty_objects.insert(obj.object_id(), obj);
2203        }
2204
2205        // add everything from the cache, but also remove deletions
2206        for entry in iter {
2207            let id = *entry.key();
2208            let value = entry.value();
2209            match value.get_highest().unwrap() {
2210                (_, ObjectEntry::Object(object)) => {
2211                    dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2212                }
2213                (_version, ObjectEntry::Wrapped) => {
2214                    dirty_objects.remove(&id);
2215                }
2216                (_, ObjectEntry::Deleted) => {
2217                    dirty_objects.remove(&id);
2218                }
2219            }
2220        }
2221
2222        Box::new(dirty_objects.into_values())
2223    }
2224}
2225
2226// TODO: For correctness, we must at least invalidate the cache when items are
2227// written through this trait (since they could be negatively cached as absent).
2228// But it may or may not be optimal to actually insert them into the cache. For
2229// instance if state sync is running ahead of execution, they might evict other
2230// items that are about to be read. This could be an area for tuning in the
2231// future.
2232impl StateSyncAPI for WritebackCache {
2233    fn try_insert_transaction_and_effects(
2234        &self,
2235        transaction: &VerifiedTransaction,
2236        transaction_effects: &TransactionEffects,
2237    ) -> IotaResult {
2238        self.store
2239            .insert_transaction_and_effects(transaction, transaction_effects)?;
2240
2241        // Cache operations should not fail the entire operation after DB write succeeds
2242        // Use .ok() to ignore cache failures and avoid data inconsistency
2243        self.cached
2244            .transactions
2245            .insert(
2246                transaction.digest(),
2247                PointCacheItem::Some(Arc::new(transaction.clone())),
2248                Ticket::Write,
2249            )
2250            .ok();
2251        self.cached
2252            .transaction_effects
2253            .insert(
2254                &transaction_effects.digest(),
2255                PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2256                Ticket::Write,
2257            )
2258            .ok();
2259
2260        Ok(())
2261    }
2262
2263    fn try_multi_insert_transaction_and_effects(
2264        &self,
2265        transactions_and_effects: &[VerifiedExecutionData],
2266    ) -> IotaResult {
2267        self.store
2268            .multi_insert_transaction_and_effects(transactions_and_effects.iter())?;
2269        for VerifiedExecutionData {
2270            transaction,
2271            effects,
2272        } in transactions_and_effects
2273        {
2274            self.cached
2275                .transactions
2276                .insert(
2277                    transaction.digest(),
2278                    PointCacheItem::Some(Arc::new(transaction.clone())),
2279                    Ticket::Write,
2280                )
2281                .ok();
2282            self.cached
2283                .transaction_effects
2284                .insert(
2285                    &effects.digest(),
2286                    PointCacheItem::Some(Arc::new(effects.clone())),
2287                    Ticket::Write,
2288                )
2289                .ok();
2290        }
2291
2292        Ok(())
2293    }
2294}
2295
2296#[cfg(test)]
2297impl WritebackCache {
2298    pub(super) fn write_object_for_testing(&self, object: Object) {
2299        let id = object.id();
2300        let version = object.version();
2301        self.write_object_entry(&id, version, ObjectEntry::Object(object));
2302    }
2303
2304    pub(super) fn write_marker_for_testing(
2305        &self,
2306        epoch_id: EpochId,
2307        object_key: &ObjectKey,
2308        marker_value: MarkerValue,
2309    ) {
2310        self.write_marker_value(epoch_id, object_key, marker_value);
2311    }
2312}