1use std::{
51    collections::{BTreeMap, BTreeSet},
52    hash::Hash,
53    sync::{Arc, atomic::AtomicU64},
54};
55
56use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
57use futures::{FutureExt, future::BoxFuture};
58use iota_common::sync::notify_read::NotifyRead;
59use iota_config::WritebackCacheConfig;
60use iota_macros::fail_point;
61use iota_types::{
62    accumulator::Accumulator,
63    base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
64    digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
65    effects::{TransactionEffects, TransactionEvents},
66    error::{IotaError, IotaResult, UserInputError},
67    executable_transaction::VerifiedExecutableTransaction,
68    iota_system_state::{IotaSystemState, get_iota_system_state},
69    message_envelope::Message,
70    messages_checkpoint::CheckpointSequenceNumber,
71    object::Object,
72    storage::{MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject},
73    transaction::{VerifiedSignedTransaction, VerifiedTransaction},
74};
75use moka::sync::Cache as MokaCache;
76use parking_lot::Mutex;
77use prometheus::Registry;
78use tap::TapOptional;
79use tracing::{debug, info, instrument, trace, warn};
80
81use super::{
82    CheckpointCache, ExecutionCacheAPI, ExecutionCacheCommit, ExecutionCacheMetrics,
83    ExecutionCacheReconfigAPI, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI,
84    TransactionCacheRead,
85    cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache, Ticket},
86    implement_passthrough_traits,
87    object_locks::ObjectLocks,
88};
89use crate::{
90    authority::{
91        AuthorityStore,
92        authority_per_epoch_store::AuthorityPerEpochStore,
93        authority_store::{ExecutionLockWriteGuard, IotaLockResult, ObjectLockStatus},
94        authority_store_tables::LiveObject,
95        backpressure::BackpressureManager,
96        epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
97    },
98    fallback_fetch::try_do_fallback_lookup,
99    state_accumulator::AccumulatorStore,
100    transaction_outputs::TransactionOutputs,
101};
102
103#[cfg(test)]
104#[path = "unit_tests/writeback_cache_tests.rs"]
105pub mod writeback_cache_tests;
106
107#[derive(Clone, PartialEq, Eq)]
108enum ObjectEntry {
109    Object(Object),
110    Deleted,
111    Wrapped,
112}
113
114impl ObjectEntry {
115    #[cfg(test)]
116    fn unwrap_object(&self) -> &Object {
117        match self {
118            ObjectEntry::Object(o) => o,
119            _ => panic!("unwrap_object called on non-Object"),
120        }
121    }
122
123    fn is_tombstone(&self) -> bool {
124        match self {
125            ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
126            ObjectEntry::Object(_) => false,
127        }
128    }
129}
130
131impl std::fmt::Debug for ObjectEntry {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        match self {
134            ObjectEntry::Object(o) => {
135                write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
136            }
137            ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
138            ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
139        }
140    }
141}
142
143impl From<Object> for ObjectEntry {
144    fn from(object: Object) -> Self {
145        ObjectEntry::Object(object)
146    }
147}
148
149impl From<ObjectOrTombstone> for ObjectEntry {
150    fn from(object: ObjectOrTombstone) -> Self {
151        match object {
152            ObjectOrTombstone::Object(o) => o.into(),
153            ObjectOrTombstone::Tombstone(obj_ref) => {
154                if obj_ref.2.is_deleted() {
155                    ObjectEntry::Deleted
156                } else if obj_ref.2.is_wrapped() {
157                    ObjectEntry::Wrapped
158                } else {
159                    panic!("tombstone digest must either be deleted or wrapped");
160                }
161            }
162        }
163    }
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
167enum LatestObjectCacheEntry {
168    Object(SequenceNumber, ObjectEntry),
169    NonExistent,
170}
171
172impl LatestObjectCacheEntry {
173    #[cfg(test)]
174    fn version(&self) -> Option<SequenceNumber> {
175        match self {
176            LatestObjectCacheEntry::Object(version, _) => Some(*version),
177            LatestObjectCacheEntry::NonExistent => None,
178        }
179    }
180}
181
182impl IsNewer for LatestObjectCacheEntry {
183    fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
184        match (self, other) {
185            (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
186                v1 > v2
187            }
188            (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
189            _ => false,
190        }
191    }
192}
193
194type MarkerKey = (EpochId, ObjectID);
195
196struct UncommittedData {
199    objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
216
217    markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
222
223    transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
224
225    transaction_events:
230        DashMap<TransactionEventsDigest, (BTreeSet<TransactionDigest>, TransactionEvents)>,
231
232    executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
233
234    pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
237
238    total_transaction_inserts: AtomicU64,
239    total_transaction_commits: AtomicU64,
240}
241
242impl UncommittedData {
243    fn new() -> Self {
244        Self {
245            objects: DashMap::new(),
246            markers: DashMap::new(),
247            transaction_effects: DashMap::new(),
248            executed_effects_digests: DashMap::new(),
249            pending_transaction_writes: DashMap::new(),
250            transaction_events: DashMap::new(),
251            total_transaction_inserts: AtomicU64::new(0),
252            total_transaction_commits: AtomicU64::new(0),
253        }
254    }
255
256    fn clear(&self) {
257        self.objects.clear();
258        self.markers.clear();
259        self.transaction_effects.clear();
260        self.executed_effects_digests.clear();
261        self.pending_transaction_writes.clear();
262        self.transaction_events.clear();
263        self.total_transaction_inserts
264            .store(0, std::sync::atomic::Ordering::Relaxed);
265        self.total_transaction_commits
266            .store(0, std::sync::atomic::Ordering::Relaxed);
267    }
268
269    fn is_empty(&self) -> bool {
270        let empty = self.pending_transaction_writes.is_empty();
271        if empty && cfg!(debug_assertions) {
272            assert!(
273                self.objects.is_empty()
274                    && self.markers.is_empty()
275                    && self.transaction_effects.is_empty()
276                    && self.executed_effects_digests.is_empty()
277                    && self.transaction_events.is_empty()
278                    && self
279                        .total_transaction_inserts
280                        .load(std::sync::atomic::Ordering::Relaxed)
281                        == self
282                            .total_transaction_commits
283                            .load(std::sync::atomic::Ordering::Relaxed),
284            );
285        }
286        empty
287    }
288}
289
290type PointCacheItem<T> = Option<T>;
293
294impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
297    fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
298        match (self, other) {
299            (Some(_), None) => true,
300
301            (Some(a), Some(b)) => {
302                debug_assert_eq!(a, b);
304                false
305            }
306
307            _ => false,
308        }
309    }
310}
311
312struct CachedCommittedData {
315    object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
317
318    object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
324
325    marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
327
328    transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
329
330    transaction_effects:
331        MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
332
333    transaction_events:
334        MonotonicCache<TransactionEventsDigest, PointCacheItem<Arc<TransactionEvents>>>,
335
336    executed_effects_digests:
337        MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
338
339    _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
342}
343
344impl CachedCommittedData {
345    fn new(config: &WritebackCacheConfig) -> Self {
346        let object_cache = MokaCache::builder()
347            .max_capacity(config.object_cache_size())
348            .build();
349        let marker_cache = MokaCache::builder()
350            .max_capacity(config.marker_cache_size())
351            .build();
352
353        let transactions = MonotonicCache::new(config.transaction_cache_size());
354        let transaction_effects = MonotonicCache::new(config.effect_cache_size());
355        let transaction_events = MonotonicCache::new(config.events_cache_size());
356        let executed_effects_digests = MonotonicCache::new(config.executed_effect_cache_size());
357
358        let transaction_objects = MokaCache::builder()
359            .max_capacity(config.transaction_objects_cache_size())
360            .build();
361
362        Self {
363            object_cache,
364            object_by_id_cache: MonotonicCache::new(config.object_by_id_cache_size()),
365            marker_cache,
366            transactions,
367            transaction_effects,
368            transaction_events,
369            executed_effects_digests,
370            _transaction_objects: transaction_objects,
371        }
372    }
373
374    fn clear_and_assert_empty(&self) {
375        self.object_cache.invalidate_all();
376        self.object_by_id_cache.invalidate_all();
377        self.marker_cache.invalidate_all();
378        self.transactions.invalidate_all();
379        self.transaction_effects.invalidate_all();
380        self.transaction_events.invalidate_all();
381        self.executed_effects_digests.invalidate_all();
382        self._transaction_objects.invalidate_all();
383
384        assert_empty(&self.object_cache);
385        assert!(&self.object_by_id_cache.is_empty());
386        assert_empty(&self.marker_cache);
387        assert!(self.transactions.is_empty());
388        assert!(self.transaction_effects.is_empty());
389        assert!(self.transaction_events.is_empty());
390        assert!(self.executed_effects_digests.is_empty());
391        assert_empty(&self._transaction_objects);
392    }
393}
394
395fn assert_empty<K, V>(cache: &MokaCache<K, V>)
396where
397    K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
398    V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
399{
400    if cache.iter().next().is_some() {
401        panic!("cache should be empty");
402    }
403}
404
405pub struct WritebackCache {
406    dirty: UncommittedData,
407    cached: CachedCommittedData,
408
409    packages: MokaCache<ObjectID, PackageObject>,
420
421    object_locks: ObjectLocks,
422
423    executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
424    store: Arc<AuthorityStore>,
425    backpressure_threshold: u64,
426    backpressure_manager: Arc<BackpressureManager>,
427    metrics: Arc<ExecutionCacheMetrics>,
428}
429
430macro_rules! check_cache_entry_by_version {
431    ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
432        $self.metrics.record_cache_request($table, $level);
433        if let Some(cache) = $cache {
434            if let Some(entry) = cache.get(&$version) {
435                $self.metrics.record_cache_hit($table, $level);
436                return CacheResult::Hit(entry.clone());
437            }
438
439            if let Some(least_version) = cache.get_least() {
440                if least_version.0 < $version {
441                    $self.metrics.record_cache_negative_hit($table, $level);
444                    return CacheResult::NegativeHit;
445                }
446            }
447        }
448        $self.metrics.record_cache_miss($table, $level);
449    };
450}
451
452macro_rules! check_cache_entry_by_latest {
453    ($self: ident, $table: expr, $level: expr, $cache: expr) => {
454        $self.metrics.record_cache_request($table, $level);
455        if let Some(cache) = $cache {
456            if let Some((version, entry)) = cache.get_highest() {
457                $self.metrics.record_cache_hit($table, $level);
458                return CacheResult::Hit((*version, entry.clone()));
459            } else {
460                panic!("empty CachedVersionMap should have been removed");
461            }
462        }
463        $self.metrics.record_cache_miss($table, $level);
464    };
465}
466
467impl WritebackCache {
468    pub fn new(
469        config: &WritebackCacheConfig,
470        store: Arc<AuthorityStore>,
471        metrics: Arc<ExecutionCacheMetrics>,
472        backpressure_manager: Arc<BackpressureManager>,
473    ) -> Self {
474        let packages = MokaCache::builder()
475            .max_capacity(config.package_cache_size())
476            .build();
477        Self {
478            dirty: UncommittedData::new(),
479            cached: CachedCommittedData::new(config),
480            packages,
481            object_locks: ObjectLocks::new(),
482            executed_effects_digests_notify_read: NotifyRead::new(),
483            store,
484            backpressure_manager,
485            backpressure_threshold: config.backpressure_threshold(),
486            metrics,
487        }
488    }
489
490    pub fn new_for_tests(store: Arc<AuthorityStore>, registry: &Registry) -> Self {
491        Self::new(
492            &Default::default(),
493            store,
494            ExecutionCacheMetrics::new(registry).into(),
495            BackpressureManager::new_for_tests(),
496        )
497    }
498
499    #[cfg(test)]
500    pub fn reset_for_test(&mut self) {
501        let mut new = Self::new(
502            &Default::default(),
503            self.store.clone(),
504            self.metrics.clone(),
505            self.backpressure_manager.clone(),
506        );
507        std::mem::swap(self, &mut new);
508    }
509
510    fn write_object_entry(
511        &self,
512        object_id: &ObjectID,
513        version: SequenceNumber,
514        object: ObjectEntry,
515    ) {
516        trace!(?object_id, ?version, ?object, "inserting object entry");
517        self.metrics.record_cache_write("object");
518
519        let mut entry = self.dirty.objects.entry(*object_id).or_default();
543
544        self.cached
545            .object_by_id_cache
546            .insert(
547                object_id,
548                LatestObjectCacheEntry::Object(version, object.clone()),
549                Ticket::Write,
550            )
551            .ok();
554
555        entry.insert(version, object);
556    }
557
558    fn write_marker_value(
559        &self,
560        epoch_id: EpochId,
561        object_key: &ObjectKey,
562        marker_value: MarkerValue,
563    ) {
564        tracing::trace!(
565            "inserting marker value {:?}: {:?}",
566            object_key,
567            marker_value
568        );
569        fail_point!("write_marker_entry");
570        self.metrics.record_cache_write("marker");
571        self.dirty
572            .markers
573            .entry((epoch_id, object_key.0))
574            .or_default()
575            .value_mut()
576            .insert(object_key.1, marker_value);
577    }
578
579    fn with_locked_cache_entries<K, V, R>(
583        dirty_map: &DashMap<K, CachedVersionMap<V>>,
584        cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
585        key: &K,
586        cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
587    ) -> R
588    where
589        K: Copy + Eq + Hash + Send + Sync + 'static,
590        V: Send + Sync + 'static,
591    {
592        let dirty_entry = dirty_map.entry(*key);
593        let dirty_entry = match &dirty_entry {
594            DashMapEntry::Occupied(occupied) => Some(occupied.get()),
595            DashMapEntry::Vacant(_) => None,
596        };
597
598        let cached_entry = cached_map.get(key);
599        let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
600        let cached_entry = cached_lock.as_deref();
601
602        cb(dirty_entry, cached_entry)
603    }
604
605    fn get_object_entry_by_key_cache_only(
608        &self,
609        object_id: &ObjectID,
610        version: SequenceNumber,
611    ) -> CacheResult<ObjectEntry> {
612        Self::with_locked_cache_entries(
613            &self.dirty.objects,
614            &self.cached.object_cache,
615            object_id,
616            |dirty_entry, cached_entry| {
617                check_cache_entry_by_version!(
618                    self,
619                    "object_by_version",
620                    "uncommitted",
621                    dirty_entry,
622                    version
623                );
624                check_cache_entry_by_version!(
625                    self,
626                    "object_by_version",
627                    "committed",
628                    cached_entry,
629                    version
630                );
631                CacheResult::Miss
632            },
633        )
634    }
635
636    fn get_object_by_key_cache_only(
637        &self,
638        object_id: &ObjectID,
639        version: SequenceNumber,
640    ) -> CacheResult<Object> {
641        match self.get_object_entry_by_key_cache_only(object_id, version) {
642            CacheResult::Hit(entry) => match entry {
643                ObjectEntry::Object(object) => CacheResult::Hit(object),
644                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
645            },
646            CacheResult::Miss => CacheResult::Miss,
647            CacheResult::NegativeHit => CacheResult::NegativeHit,
648        }
649    }
650
651    fn get_object_entry_by_id_cache_only(
652        &self,
653        request_type: &'static str,
654        object_id: &ObjectID,
655    ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
656        self.metrics
657            .record_cache_request(request_type, "object_by_id");
658        let entry = self.cached.object_by_id_cache.get(object_id);
659
660        if cfg!(debug_assertions) {
661            if let Some(entry) = &entry {
662                let highest: Option<ObjectEntry> = self
664                    .dirty
665                    .objects
666                    .get(object_id)
667                    .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
668                    .or_else(|| {
669                        let obj: Option<ObjectEntry> = self
670                            .store
671                            .get_latest_object_or_tombstone(*object_id)
672                            .unwrap()
673                            .map(|(_, o)| o.into());
674                        obj
675                    });
676
677                let cache_entry = match &*entry.lock() {
678                    LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
679                    LatestObjectCacheEntry::NonExistent => None,
680                };
681
682                let tombstone_possibly_pruned = highest.is_none()
685                    && cache_entry
686                        .as_ref()
687                        .map(|e| e.is_tombstone())
688                        .unwrap_or(false);
689
690                if highest != cache_entry && !tombstone_possibly_pruned {
691                    tracing::error!(
692                        ?highest,
693                        ?cache_entry,
694                        ?tombstone_possibly_pruned,
695                        "object_by_id cache is incoherent for {:?}",
696                        object_id
697                    );
698                    panic!("object_by_id cache is incoherent for {object_id:?}");
699                }
700            }
701        }
702
703        if let Some(entry) = entry {
704            let entry = entry.lock();
705            match &*entry {
706                LatestObjectCacheEntry::Object(latest_version, latest_object) => {
707                    self.metrics.record_cache_hit(request_type, "object_by_id");
708                    return CacheResult::Hit((*latest_version, latest_object.clone()));
709                }
710                LatestObjectCacheEntry::NonExistent => {
711                    self.metrics
712                        .record_cache_negative_hit(request_type, "object_by_id");
713                    return CacheResult::NegativeHit;
714                }
715            }
716        } else {
717            self.metrics.record_cache_miss(request_type, "object_by_id");
718        }
719
720        Self::with_locked_cache_entries(
721            &self.dirty.objects,
722            &self.cached.object_cache,
723            object_id,
724            |dirty_entry, cached_entry| {
725                check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
726                check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
727                CacheResult::Miss
728            },
729        )
730    }
731
732    fn get_object_by_id_cache_only(
733        &self,
734        request_type: &'static str,
735        object_id: &ObjectID,
736    ) -> CacheResult<(SequenceNumber, Object)> {
737        match self.get_object_entry_by_id_cache_only(request_type, object_id) {
738            CacheResult::Hit((version, entry)) => match entry {
739                ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
740                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
741            },
742            CacheResult::NegativeHit => CacheResult::NegativeHit,
743            CacheResult::Miss => CacheResult::Miss,
744        }
745    }
746
747    fn get_marker_value_cache_only(
748        &self,
749        object_id: &ObjectID,
750        version: SequenceNumber,
751        epoch_id: EpochId,
752    ) -> CacheResult<MarkerValue> {
753        Self::with_locked_cache_entries(
754            &self.dirty.markers,
755            &self.cached.marker_cache,
756            &(epoch_id, *object_id),
757            |dirty_entry, cached_entry| {
758                check_cache_entry_by_version!(
759                    self,
760                    "marker_by_version",
761                    "uncommitted",
762                    dirty_entry,
763                    version
764                );
765                check_cache_entry_by_version!(
766                    self,
767                    "marker_by_version",
768                    "committed",
769                    cached_entry,
770                    version
771                );
772                CacheResult::Miss
773            },
774        )
775    }
776
777    fn get_latest_marker_value_cache_only(
778        &self,
779        object_id: &ObjectID,
780        epoch_id: EpochId,
781    ) -> CacheResult<(SequenceNumber, MarkerValue)> {
782        Self::with_locked_cache_entries(
783            &self.dirty.markers,
784            &self.cached.marker_cache,
785            &(epoch_id, *object_id),
786            |dirty_entry, cached_entry| {
787                check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
788                check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
789                CacheResult::Miss
790            },
791        )
792    }
793
794    fn get_object_impl(
795        &self,
796        request_type: &'static str,
797        id: &ObjectID,
798    ) -> IotaResult<Option<Object>> {
799        let ticket = self.cached.object_by_id_cache.get_ticket_for_read(id);
800        match self.get_object_by_id_cache_only(request_type, id) {
801            CacheResult::Hit((_, object)) => Ok(Some(object)),
802            CacheResult::NegativeHit => Ok(None),
803            CacheResult::Miss => {
804                let obj = self.store.try_get_object(id)?;
805                if let Some(obj) = &obj {
806                    self.cache_latest_object_by_id(
807                        id,
808                        LatestObjectCacheEntry::Object(obj.version(), obj.clone().into()),
809                        ticket,
810                    );
811                } else {
812                    self.cache_object_not_found(id, ticket);
813                }
814                Ok(obj)
815            }
816        }
817    }
818
819    fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
820        self.metrics.record_cache_request(request_type, "db");
821        &self.store
822    }
823
824    fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
825        self.metrics
826            .record_cache_multi_request(request_type, "db", count);
827        &self.store
828    }
829
830    #[instrument(level = "debug", skip_all)]
831    fn write_transaction_outputs(
832        &self,
833        epoch_id: EpochId,
834        tx_outputs: Arc<TransactionOutputs>,
835    ) -> IotaResult {
836        trace!(digest = ?tx_outputs.transaction.digest(), "writing transaction outputs to cache");
837
838        let TransactionOutputs {
839            transaction,
840            effects,
841            markers,
842            written,
843            deleted,
844            wrapped,
845            events,
846            ..
847        } = &*tx_outputs;
848
849        for ObjectKey(id, version) in deleted.iter() {
855            self.write_object_entry(id, *version, ObjectEntry::Deleted);
856        }
857
858        for ObjectKey(id, version) in wrapped.iter() {
859            self.write_object_entry(id, *version, ObjectEntry::Wrapped);
860        }
861
862        for (object_key, marker_value) in markers.iter() {
864            self.write_marker_value(epoch_id, object_key, *marker_value);
865        }
866
867        for (object_id, object) in written.iter() {
870            if object.is_child_object() {
871                self.write_object_entry(object_id, object.version(), object.clone().into());
872            }
873        }
874        for (object_id, object) in written.iter() {
875            if !object.is_child_object() {
876                self.write_object_entry(object_id, object.version(), object.clone().into());
877                if object.is_package() {
878                    debug!("caching package: {:?}", object.compute_object_reference());
879                    self.packages
880                        .insert(*object_id, PackageObject::new(object.clone()));
881                }
882            }
883        }
884
885        let tx_digest = *transaction.digest();
886        let effects_digest = effects.digest();
887
888        self.metrics.record_cache_write("transaction_block");
889        self.dirty
890            .pending_transaction_writes
891            .insert(tx_digest, tx_outputs.clone());
892
893        self.metrics.record_cache_write("transaction_effects");
896        self.dirty
897            .transaction_effects
898            .insert(effects_digest, effects.clone());
899
900        self.metrics.record_cache_write("transaction_events");
905        match self.dirty.transaction_events.entry(events.digest()) {
906            DashMapEntry::Occupied(mut occupied) => {
907                occupied.get_mut().0.insert(tx_digest);
908            }
909            DashMapEntry::Vacant(entry) => {
910                let mut txns = BTreeSet::new();
911                txns.insert(tx_digest);
912                entry.insert((txns, events.clone()));
913            }
914        }
915
916        self.metrics.record_cache_write("executed_effects_digests");
917        self.dirty
918            .executed_effects_digests
919            .insert(tx_digest, effects_digest);
920
921        self.executed_effects_digests_notify_read
922            .notify(&tx_digest, &effects_digest);
923
924        self.metrics
925            .pending_notify_read
926            .set(self.executed_effects_digests_notify_read.num_pending() as i64);
927
928        let prev = self
929            .dirty
930            .total_transaction_inserts
931            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
932
933        let pending_count = (prev + 1).saturating_sub(
934            self.dirty
935                .total_transaction_commits
936                .load(std::sync::atomic::Ordering::Relaxed),
937        );
938
939        self.set_backpressure(pending_count);
940
941        Ok(())
942    }
943
944    #[instrument(level = "debug", skip_all)]
946    fn commit_transaction_outputs(
947        &self,
948        epoch: EpochId,
949        digests: &[TransactionDigest],
950    ) -> IotaResult {
951        fail_point!("writeback-cache-commit");
952        trace!(?digests);
953
954        let mut all_outputs = Vec::with_capacity(digests.len());
955        for tx in digests {
956            let Some(outputs) = self
957                .dirty
958                .pending_transaction_writes
959                .get(tx)
960                .map(|o| o.clone())
961            else {
962                warn!("Attempt to commit unknown transaction {:?}", tx);
970                continue;
971            };
972            all_outputs.push(outputs);
973        }
974
975        self.store.write_transaction_outputs(epoch, &all_outputs)?;
979
980        for outputs in all_outputs.iter() {
981            let tx_digest = outputs.transaction.digest();
982            assert!(
983                self.dirty
984                    .pending_transaction_writes
985                    .remove(tx_digest)
986                    .is_some()
987            );
988            self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
989        }
990
991        let num_outputs = all_outputs.len() as u64;
992        let num_commits = self
993            .dirty
994            .total_transaction_commits
995            .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
996            + num_outputs;
997
998        let pending_count = self
999            .dirty
1000            .total_transaction_inserts
1001            .load(std::sync::atomic::Ordering::Relaxed)
1002            .saturating_sub(num_commits);
1003
1004        self.set_backpressure(pending_count);
1005
1006        Ok(())
1007    }
1008
1009    fn approximate_pending_transaction_count(&self) -> u64 {
1010        let num_commits = self
1011            .dirty
1012            .total_transaction_commits
1013            .load(std::sync::atomic::Ordering::Relaxed);
1014
1015        self.dirty
1016            .total_transaction_inserts
1017            .load(std::sync::atomic::Ordering::Relaxed)
1018            .saturating_sub(num_commits)
1019    }
1020
1021    fn set_backpressure(&self, pending_count: u64) {
1022        let backpressure = pending_count > self.backpressure_threshold;
1023        let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1024        if backpressure_changed {
1025            self.metrics.backpressure_toggles.inc();
1026        }
1027        self.metrics
1028            .backpressure_status
1029            .set(if backpressure { 1 } else { 0 });
1030    }
1031
1032    fn flush_transactions_from_dirty_to_cached(
1033        &self,
1034        epoch: EpochId,
1035        tx_digest: TransactionDigest,
1036        outputs: &TransactionOutputs,
1037    ) {
1038        let TransactionOutputs {
1042            transaction,
1043            effects,
1044            markers,
1045            written,
1046            deleted,
1047            wrapped,
1048            events,
1049            ..
1050        } = outputs;
1051
1052        let effects_digest = effects.digest();
1053        let events_digest = events.digest();
1054
1055        self.cached
1058            .transactions
1059            .insert(
1060                &tx_digest,
1061                PointCacheItem::Some(transaction.clone()),
1062                Ticket::Write,
1063            )
1064            .ok();
1065        self.cached
1066            .transaction_effects
1067            .insert(
1068                &effects_digest,
1069                PointCacheItem::Some(effects.clone().into()),
1070                Ticket::Write,
1071            )
1072            .ok();
1073        self.cached
1074            .executed_effects_digests
1075            .insert(
1076                &tx_digest,
1077                PointCacheItem::Some(effects_digest),
1078                Ticket::Write,
1079            )
1080            .ok();
1081        self.cached
1082            .transaction_events
1083            .insert(
1084                &events_digest,
1085                PointCacheItem::Some(events.clone().into()),
1086                Ticket::Write,
1087            )
1088            .ok();
1089
1090        self.dirty
1091            .transaction_effects
1092            .remove(&effects_digest)
1093            .expect("effects must exist");
1094
1095        match self.dirty.transaction_events.entry(events.digest()) {
1096            DashMapEntry::Occupied(mut occupied) => {
1097                let txns = &mut occupied.get_mut().0;
1098                assert!(txns.remove(&tx_digest), "transaction must exist");
1099                if txns.is_empty() {
1100                    occupied.remove();
1101                }
1102            }
1103            DashMapEntry::Vacant(_) => {
1104                panic!("events must exist");
1105            }
1106        }
1107
1108        self.dirty
1109            .executed_effects_digests
1110            .remove(&tx_digest)
1111            .expect("executed effects must exist");
1112
1113        for (object_key, marker_value) in markers.iter() {
1115            Self::move_version_from_dirty_to_cache(
1116                &self.dirty.markers,
1117                &self.cached.marker_cache,
1118                (epoch, object_key.0),
1119                object_key.1,
1120                marker_value,
1121            );
1122        }
1123
1124        for (object_id, object) in written.iter() {
1125            Self::move_version_from_dirty_to_cache(
1126                &self.dirty.objects,
1127                &self.cached.object_cache,
1128                *object_id,
1129                object.version(),
1130                &ObjectEntry::Object(object.clone()),
1131            );
1132        }
1133
1134        for ObjectKey(object_id, version) in deleted.iter() {
1135            Self::move_version_from_dirty_to_cache(
1136                &self.dirty.objects,
1137                &self.cached.object_cache,
1138                *object_id,
1139                *version,
1140                &ObjectEntry::Deleted,
1141            );
1142        }
1143
1144        for ObjectKey(object_id, version) in wrapped.iter() {
1145            Self::move_version_from_dirty_to_cache(
1146                &self.dirty.objects,
1147                &self.cached.object_cache,
1148                *object_id,
1149                *version,
1150                &ObjectEntry::Wrapped,
1151            );
1152        }
1153    }
1154
1155    fn move_version_from_dirty_to_cache<K, V>(
1158        dirty: &DashMap<K, CachedVersionMap<V>>,
1159        cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1160        key: K,
1161        version: SequenceNumber,
1162        value: &V,
1163    ) where
1164        K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1165        V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1166    {
1167        static MAX_VERSIONS: usize = 3;
1168
1169        let dirty_entry = dirty.entry(key);
1173        let cache_entry = cache.entry(key).or_default();
1174        let mut cache_map = cache_entry.value().lock();
1175
1176        cache_map.insert(version, value.clone());
1178        cache_map.truncate_to(MAX_VERSIONS);
1180
1181        let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1182            panic!("dirty map must exist");
1183        };
1184
1185        let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1186
1187        assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1188
1189        if occupied_dirty_entry.get().is_empty() {
1191            occupied_dirty_entry.remove();
1192        }
1193    }
1194
1195    fn cache_latest_object_by_id(
1197        &self,
1198        object_id: &ObjectID,
1199        object: LatestObjectCacheEntry,
1200        ticket: Ticket,
1201    ) {
1202        trace!("caching object by id: {:?} {:?}", object_id, object);
1203        if self
1204            .cached
1205            .object_by_id_cache
1206            .insert(object_id, object, ticket)
1207            .is_ok()
1208        {
1209            self.metrics.record_cache_write("object_by_id");
1210        } else {
1211            trace!("discarded cache write due to expired ticket");
1212            self.metrics.record_ticket_expiry();
1213        }
1214    }
1215
1216    fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1217        self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1218    }
1219
1220    fn clear_state_end_of_epoch_impl(&self, _execution_guard: &ExecutionLockWriteGuard<'_>) {
1221        info!("clearing state at end of epoch");
1222        assert!(
1223            self.dirty.pending_transaction_writes.is_empty(),
1224            "should be empty due to revert_state_update"
1225        );
1226        self.dirty.clear();
1227        info!("clearing old transaction locks");
1228        self.object_locks.clear();
1229    }
1230
1231    fn revert_state_update_impl(&self, tx: &TransactionDigest) -> IotaResult {
1232        let Some((_, outputs)) = self.dirty.pending_transaction_writes.remove(tx) else {
1237            assert!(
1238                !self.try_is_tx_already_executed(tx)?,
1239                "attempt to revert committed transaction"
1240            );
1241
1242            info!("Not reverting {:?} as it was not executed", tx);
1245            return Ok(());
1246        };
1247
1248        for (object_id, object) in outputs.written.iter() {
1249            if object.is_package() {
1250                info!("removing non-finalized package from cache: {:?}", object_id);
1251                self.packages.invalidate(object_id);
1252            }
1253            self.cached.object_by_id_cache.invalidate(object_id);
1254            self.cached.object_cache.invalidate(object_id);
1255        }
1256
1257        for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1258            self.cached.object_by_id_cache.invalidate(object_id);
1259            self.cached.object_cache.invalidate(object_id);
1260        }
1261
1262        Ok(())
1265    }
1266
1267    fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) -> IotaResult {
1268        self.store.bulk_insert_genesis_objects(objects)?;
1269        for obj in objects {
1270            self.cached.object_cache.invalidate(&obj.id());
1271            self.cached.object_by_id_cache.invalidate(&obj.id());
1272        }
1273        Ok(())
1274    }
1275
1276    fn insert_genesis_object_impl(&self, object: Object) -> IotaResult {
1277        self.cached.object_by_id_cache.invalidate(&object.id());
1278        self.cached.object_cache.invalidate(&object.id());
1279        self.store.insert_genesis_object(object)
1280    }
1281
1282    pub fn clear_caches_and_assert_empty(&self) {
1283        info!("clearing caches");
1284        self.cached.clear_and_assert_empty();
1285        self.packages.invalidate_all();
1286        assert_empty(&self.packages);
1287    }
1288}
1289
1290impl ExecutionCacheAPI for WritebackCache {}
1291
1292impl ExecutionCacheCommit for WritebackCache {
1293    fn try_commit_transaction_outputs(
1294        &self,
1295        epoch: EpochId,
1296        digests: &[TransactionDigest],
1297    ) -> IotaResult {
1298        WritebackCache::commit_transaction_outputs(self, epoch, digests)
1299    }
1300
1301    fn try_persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> IotaResult {
1302        self.store.persist_transaction(tx)
1303    }
1304
1305    fn approximate_pending_transaction_count(&self) -> u64 {
1306        WritebackCache::approximate_pending_transaction_count(self)
1307    }
1308}
1309
1310impl ObjectCacheRead for WritebackCache {
1311    #[instrument(level="trace", skip_all, fields(package_id=?package_id))]
1312    fn try_get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1313        self.metrics
1314            .record_cache_request("package", "package_cache");
1315        if let Some(p) = self.packages.get(package_id) {
1316            if cfg!(debug_assertions) {
1317                let canonical_package = self
1318                    .dirty
1319                    .objects
1320                    .get(package_id)
1321                    .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1322                        Some(ObjectEntry::Object(object)) => Some(object),
1323                        _ => None,
1324                    })
1325                    .or_else(|| self.store.get_object(package_id));
1326
1327                if let Some(canonical_package) = canonical_package {
1328                    assert_eq!(
1329                        canonical_package.digest(),
1330                        p.object().digest(),
1331                        "Package object cache is inconsistent for package {package_id:?}"
1332                    );
1333                }
1334            }
1335            self.metrics.record_cache_hit("package", "package_cache");
1336            return Ok(Some(p));
1337        } else {
1338            self.metrics.record_cache_miss("package", "package_cache");
1339        }
1340
1341        if let Some(p) = self.get_object_impl("package", package_id)? {
1345            if p.is_package() {
1346                let p = PackageObject::new(p);
1347                tracing::trace!(
1348                    "caching package: {:?}",
1349                    p.object().compute_object_reference()
1350                );
1351                self.metrics.record_cache_write("package");
1352                self.packages.insert(*package_id, p.clone());
1353                Ok(Some(p))
1354            } else {
1355                Err(IotaError::UserInput {
1356                    error: UserInputError::MoveObjectAsPackage {
1357                        object_id: *package_id,
1358                    },
1359                })
1360            }
1361        } else {
1362            Ok(None)
1363        }
1364    }
1365
1366    fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1367        }
1370
1371    #[instrument(level = "trace", skip_all, fields(object_id=?id))]
1374    fn try_get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
1375        self.get_object_impl("object_latest", id)
1376    }
1377
1378    #[instrument(level = "trace", skip_all, fields(object_id, version))]
1379    fn try_get_object_by_key(
1380        &self,
1381        object_id: &ObjectID,
1382        version: SequenceNumber,
1383    ) -> IotaResult<Option<Object>> {
1384        match self.get_object_by_key_cache_only(object_id, version) {
1385            CacheResult::Hit(object) => Ok(Some(object)),
1386            CacheResult::NegativeHit => Ok(None),
1387            CacheResult::Miss => Ok(self
1388                .record_db_get("object_by_version")
1389                .try_get_object_by_key(object_id, version)?),
1390        }
1391    }
1392
1393    #[instrument(level = "trace", skip_all)]
1394    fn try_multi_get_objects_by_key(
1395        &self,
1396        object_keys: &[ObjectKey],
1397    ) -> Result<Vec<Option<Object>>, IotaError> {
1398        try_do_fallback_lookup(
1399            object_keys,
1400            |key| {
1401                Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1402                    CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1403                    CacheResult::NegativeHit => CacheResult::NegativeHit,
1404                    CacheResult::Miss => CacheResult::Miss,
1405                })
1406            },
1407            |remaining| {
1408                self.record_db_multi_get("object_by_version", remaining.len())
1409                    .multi_get_objects_by_key(remaining)
1410            },
1411        )
1412    }
1413
1414    #[instrument(level = "trace", skip_all, fields(object_id, version))]
1415    fn try_object_exists_by_key(
1416        &self,
1417        object_id: &ObjectID,
1418        version: SequenceNumber,
1419    ) -> IotaResult<bool> {
1420        match self.get_object_by_key_cache_only(object_id, version) {
1421            CacheResult::Hit(_) => Ok(true),
1422            CacheResult::NegativeHit => Ok(false),
1423            CacheResult::Miss => self
1424                .record_db_get("object_by_version")
1425                .object_exists_by_key(object_id, version),
1426        }
1427    }
1428
1429    #[instrument(level = "trace", skip_all)]
1430    fn try_multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
1431        try_do_fallback_lookup(
1432            object_keys,
1433            |key| {
1434                Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1435                    CacheResult::Hit(_) => CacheResult::Hit(true),
1436                    CacheResult::NegativeHit => CacheResult::Hit(false),
1437                    CacheResult::Miss => CacheResult::Miss,
1438                })
1439            },
1440            |remaining| {
1441                self.record_db_multi_get("object_by_version", remaining.len())
1442                    .multi_object_exists_by_key(remaining)
1443            },
1444        )
1445    }
1446
1447    #[instrument(level = "trace", skip_all, fields(object_id))]
1448    fn try_get_latest_object_ref_or_tombstone(
1449        &self,
1450        object_id: ObjectID,
1451    ) -> IotaResult<Option<ObjectRef>> {
1452        match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1453            CacheResult::Hit((version, entry)) => Ok(Some(match entry {
1454                ObjectEntry::Object(object) => object.compute_object_reference(),
1455                ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1456                ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1457            })),
1458            CacheResult::NegativeHit => Ok(None),
1459            CacheResult::Miss => self
1460                .record_db_get("latest_objref_or_tombstone")
1461                .get_latest_object_ref_or_tombstone(object_id),
1462        }
1463    }
1464
1465    #[instrument(level = "trace", skip_all, fields(object_id))]
1466    fn try_get_latest_object_or_tombstone(
1467        &self,
1468        object_id: ObjectID,
1469    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
1470        match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1471            CacheResult::Hit((version, entry)) => {
1472                let key = ObjectKey(object_id, version);
1473                Ok(Some(match entry {
1474                    ObjectEntry::Object(object) => (key, object.into()),
1475                    ObjectEntry::Deleted => (
1476                        key,
1477                        ObjectOrTombstone::Tombstone((
1478                            object_id,
1479                            version,
1480                            ObjectDigest::OBJECT_DIGEST_DELETED,
1481                        )),
1482                    ),
1483                    ObjectEntry::Wrapped => (
1484                        key,
1485                        ObjectOrTombstone::Tombstone((
1486                            object_id,
1487                            version,
1488                            ObjectDigest::OBJECT_DIGEST_WRAPPED,
1489                        )),
1490                    ),
1491                }))
1492            }
1493            CacheResult::NegativeHit => Ok(None),
1494            CacheResult::Miss => self
1495                .record_db_get("latest_object_or_tombstone")
1496                .get_latest_object_or_tombstone(object_id),
1497        }
1498    }
1499
1500    #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1501    fn try_find_object_lt_or_eq_version(
1502        &self,
1503        object_id: ObjectID,
1504        version_bound: SequenceNumber,
1505    ) -> IotaResult<Option<Object>> {
1506        macro_rules! check_cache_entry {
1507            ($level: expr, $objects: expr) => {
1508                self.metrics
1509                    .record_cache_request("object_lt_or_eq_version", $level);
1510                if let Some(objects) = $objects {
1511                    if let Some((_, object)) = objects
1512                        .all_versions_lt_or_eq_descending(&version_bound)
1513                        .next()
1514                    {
1515                        if let ObjectEntry::Object(object) = object {
1516                            self.metrics
1517                                .record_cache_hit("object_lt_or_eq_version", $level);
1518                            return Ok(Some(object.clone()));
1519                        } else {
1520                            self.metrics
1522                                .record_cache_negative_hit("object_lt_or_eq_version", $level);
1523                            return Ok(None);
1524                        }
1525                    } else {
1526                        self.metrics
1527                            .record_cache_miss("object_lt_or_eq_version", $level);
1528                    }
1529                }
1530            };
1531        }
1532
1533        self.metrics
1535            .record_cache_request("object_lt_or_eq_version", "object_by_id");
1536        if let Some(latest) = self.cached.object_by_id_cache.get(&object_id) {
1537            let latest = latest.lock();
1538            match &*latest {
1539                LatestObjectCacheEntry::Object(latest_version, object) => {
1540                    if *latest_version <= version_bound {
1541                        if let ObjectEntry::Object(object) = object {
1542                            self.metrics
1543                                .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1544                            return Ok(Some(object.clone()));
1545                        } else {
1546                            self.metrics.record_cache_negative_hit(
1548                                "object_lt_or_eq_version",
1549                                "object_by_id",
1550                            );
1551                            return Ok(None);
1552                        }
1553                    }
1554                    }
1557                LatestObjectCacheEntry::NonExistent => {
1559                    self.metrics
1560                        .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1561                    return Ok(None);
1562                }
1563            }
1564        }
1565        self.metrics
1566            .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1567
1568        Self::with_locked_cache_entries(
1569            &self.dirty.objects,
1570            &self.cached.object_cache,
1571            &object_id,
1572            |dirty_entry, cached_entry| {
1573                check_cache_entry!("committed", dirty_entry);
1574                check_cache_entry!("uncommitted", cached_entry);
1575
1576                let latest: Option<(SequenceNumber, ObjectEntry)> =
1600                    if let Some(dirty_set) = dirty_entry {
1601                        dirty_set
1602                            .get_highest()
1603                            .cloned()
1604                            .tap_none(|| panic!("dirty set cannot be empty"))
1605                    } else {
1606                        self.record_db_get("object_lt_or_eq_version_latest")
1608                            .get_latest_object_or_tombstone(object_id)?
1609                            .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1610                                (version, ObjectEntry::from(obj_or_tombstone))
1611                            })
1612                    };
1613
1614                if let Some((obj_version, obj_entry)) = latest {
1615                    self.cache_latest_object_by_id(
1624                        &object_id,
1625                        LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1626                        self.cached
1629                            .object_by_id_cache
1630                            .get_ticket_for_read(&object_id),
1631                    );
1632
1633                    if obj_version <= version_bound {
1634                        match obj_entry {
1635                            ObjectEntry::Object(object) => Ok(Some(object)),
1636                            ObjectEntry::Deleted | ObjectEntry::Wrapped => Ok(None),
1637                        }
1638                    } else {
1639                        self.record_db_get("object_lt_or_eq_version_scan")
1643                            .find_object_lt_or_eq_version(object_id, version_bound)
1644                    }
1645                } else {
1646                    let highest = cached_entry.and_then(|c| c.get_highest());
1652                    assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1653                    self.cache_object_not_found(
1654                        &object_id,
1655                        self.cached
1657                            .object_by_id_cache
1658                            .get_ticket_for_read(&object_id),
1659                    );
1660                    Ok(None)
1661                }
1662            },
1663        )
1664    }
1665
1666    fn try_get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
1667        get_iota_system_state(self)
1668    }
1669
1670    fn try_get_marker_value(
1671        &self,
1672        object_id: &ObjectID,
1673        version: SequenceNumber,
1674        epoch_id: EpochId,
1675    ) -> IotaResult<Option<MarkerValue>> {
1676        match self.get_marker_value_cache_only(object_id, version, epoch_id) {
1677            CacheResult::Hit(marker) => Ok(Some(marker)),
1678            CacheResult::NegativeHit => Ok(None),
1679            CacheResult::Miss => self
1680                .record_db_get("marker_by_version")
1681                .get_marker_value(object_id, &version, epoch_id),
1682        }
1683    }
1684
1685    fn try_get_latest_marker(
1686        &self,
1687        object_id: &ObjectID,
1688        epoch_id: EpochId,
1689    ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
1690        match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1691            CacheResult::Hit((v, marker)) => Ok(Some((v, marker))),
1692            CacheResult::NegativeHit => {
1693                panic!("cannot have negative hit when getting latest marker")
1694            }
1695            CacheResult::Miss => self
1696                .record_db_get("marker_latest")
1697                .get_latest_marker(object_id, epoch_id),
1698        }
1699    }
1700
1701    fn try_get_lock(
1702        &self,
1703        obj_ref: ObjectRef,
1704        epoch_store: &AuthorityPerEpochStore,
1705    ) -> IotaLockResult {
1706        match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1707            CacheResult::Hit((_, obj)) => {
1708                let actual_objref = obj.compute_object_reference();
1709                if obj_ref != actual_objref {
1710                    Ok(ObjectLockStatus::LockedAtDifferentVersion {
1711                        locked_ref: actual_objref,
1712                    })
1713                } else {
1714                    Ok(
1716                        match self
1717                            .object_locks
1718                            .get_transaction_lock(&obj_ref, epoch_store)?
1719                        {
1720                            Some(tx_digest) => ObjectLockStatus::LockedToTx {
1721                                locked_by_tx: tx_digest,
1722                            },
1723                            None => ObjectLockStatus::Initialized,
1724                        },
1725                    )
1726                }
1727            }
1728            CacheResult::NegativeHit => {
1729                Err(IotaError::from(UserInputError::ObjectNotFound {
1730                    object_id: obj_ref.0,
1731                    version: None,
1734                }))
1735            }
1736            CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1737        }
1738    }
1739
1740    fn _try_get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
1741        let obj = self.get_object_impl("live_objref", &object_id)?.ok_or(
1742            UserInputError::ObjectNotFound {
1743                object_id,
1744                version: None,
1745            },
1746        )?;
1747        Ok(obj.compute_object_reference())
1748    }
1749
1750    fn try_check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1751        try_do_fallback_lookup(
1752            owned_object_refs,
1753            |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1754                CacheResult::Hit((version, obj)) => {
1755                    if obj.compute_object_reference() != *obj_ref {
1756                        Err(UserInputError::ObjectVersionUnavailableForConsumption {
1757                            provided_obj_ref: *obj_ref,
1758                            current_version: version,
1759                        }
1760                        .into())
1761                    } else {
1762                        Ok(CacheResult::Hit(()))
1763                    }
1764                }
1765                CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1766                    object_id: obj_ref.0,
1767                    version: None,
1768                }
1769                .into()),
1770                CacheResult::Miss => Ok(CacheResult::Miss),
1771            },
1772            |remaining| {
1773                self.record_db_multi_get("object_is_live", remaining.len())
1774                    .check_owned_objects_are_live(remaining)?;
1775                Ok(vec![(); remaining.len()])
1776            },
1777        )?;
1778        Ok(())
1779    }
1780
1781    fn try_get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
1782        self.store.perpetual_tables.get_highest_pruned_checkpoint()
1783    }
1784}
1785
1786impl TransactionCacheRead for WritebackCache {
1787    #[instrument(level = "trace", skip_all)]
1788    fn try_multi_get_transaction_blocks(
1789        &self,
1790        digests: &[TransactionDigest],
1791    ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
1792        let digests_and_tickets: Vec<_> = digests
1793            .iter()
1794            .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1795            .collect();
1796        try_do_fallback_lookup(
1797            &digests_and_tickets,
1798            |(digest, _)| {
1799                self.metrics
1800                    .record_cache_request("transaction_block", "uncommitted");
1801                if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1802                    self.metrics
1803                        .record_cache_hit("transaction_block", "uncommitted");
1804                    return Ok(CacheResult::Hit(Some(tx.transaction.clone())));
1805                }
1806                self.metrics
1807                    .record_cache_miss("transaction_block", "uncommitted");
1808
1809                self.metrics
1810                    .record_cache_request("transaction_block", "committed");
1811                match self
1812                    .cached
1813                    .transactions
1814                    .get(digest)
1815                    .map(|l| l.lock().clone())
1816                {
1817                    Some(PointCacheItem::Some(tx)) => {
1818                        self.metrics
1819                            .record_cache_hit("transaction_block", "committed");
1820                        Ok(CacheResult::Hit(Some(tx)))
1821                    }
1822                    Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1823                    None => {
1824                        self.metrics
1825                            .record_cache_miss("transaction_block", "committed");
1826
1827                        Ok(CacheResult::Miss)
1828                    }
1829                }
1830            },
1831            |remaining| {
1832                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1833                let results: Vec<_> = self
1834                    .record_db_multi_get("transaction_block", remaining.len())
1835                    .multi_get_transaction_blocks(&remaining_digests)
1836                    .map(|v| v.into_iter().map(|o| o.map(Arc::new)).collect())?;
1837                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1838                    if result.is_none() {
1839                        self.cached.transactions.insert(digest, None, *ticket).ok();
1840                    }
1841                }
1842                Ok(results)
1843            },
1844        )
1845    }
1846
1847    #[instrument(level = "trace", skip_all)]
1848    fn try_multi_get_executed_effects_digests(
1849        &self,
1850        digests: &[TransactionDigest],
1851    ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
1852        let digests_and_tickets: Vec<_> = digests
1853            .iter()
1854            .map(|d| {
1855                (
1856                    *d,
1857                    self.cached.executed_effects_digests.get_ticket_for_read(d),
1858                )
1859            })
1860            .collect();
1861        try_do_fallback_lookup(
1862            &digests_and_tickets,
1863            |(digest, _)| {
1864                self.metrics
1865                    .record_cache_request("executed_effects_digests", "uncommitted");
1866                if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
1867                    self.metrics
1868                        .record_cache_hit("executed_effects_digests", "uncommitted");
1869                    return Ok(CacheResult::Hit(Some(*digest)));
1870                }
1871                self.metrics
1872                    .record_cache_miss("executed_effects_digests", "uncommitted");
1873
1874                self.metrics
1875                    .record_cache_request("executed_effects_digests", "committed");
1876                match self
1877                    .cached
1878                    .executed_effects_digests
1879                    .get(digest)
1880                    .map(|l| *l.lock())
1881                {
1882                    Some(PointCacheItem::Some(digest)) => {
1883                        self.metrics
1884                            .record_cache_hit("executed_effects_digests", "committed");
1885                        Ok(CacheResult::Hit(Some(digest)))
1886                    }
1887                    Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1888                    None => {
1889                        self.metrics
1890                            .record_cache_miss("executed_effects_digests", "committed");
1891                        Ok(CacheResult::Miss)
1892                    }
1893                }
1894            },
1895            |remaining| {
1896                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1897                let results = self
1898                    .record_db_multi_get("executed_effects_digests", remaining.len())
1899                    .multi_get_executed_effects_digests(&remaining_digests)?;
1900                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1901                    if result.is_none() {
1902                        self.cached
1903                            .executed_effects_digests
1904                            .insert(digest, None, *ticket)
1905                            .ok();
1906                    }
1907                }
1908                Ok(results)
1909            },
1910        )
1911    }
1912
1913    #[instrument(level = "trace", skip_all)]
1914    fn try_multi_get_effects(
1915        &self,
1916        digests: &[TransactionEffectsDigest],
1917    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
1918        let digests_and_tickets: Vec<_> = digests
1919            .iter()
1920            .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
1921            .collect();
1922        try_do_fallback_lookup(
1923            &digests_and_tickets,
1924            |(digest, _)| {
1925                self.metrics
1926                    .record_cache_request("transaction_effects", "uncommitted");
1927                if let Some(effects) = self.dirty.transaction_effects.get(digest) {
1928                    self.metrics
1929                        .record_cache_hit("transaction_effects", "uncommitted");
1930                    return Ok(CacheResult::Hit(Some(effects.clone())));
1931                }
1932                self.metrics
1933                    .record_cache_miss("transaction_effects", "uncommitted");
1934
1935                self.metrics
1936                    .record_cache_request("transaction_effects", "committed");
1937                match self
1938                    .cached
1939                    .transaction_effects
1940                    .get(digest)
1941                    .map(|l| l.lock().clone())
1942                {
1943                    Some(PointCacheItem::Some(effects)) => {
1944                        self.metrics
1945                            .record_cache_hit("transaction_effects", "committed");
1946                        Ok(CacheResult::Hit(Some((*effects).clone())))
1947                    }
1948                    Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1949                    None => {
1950                        self.metrics
1951                            .record_cache_miss("transaction_effects", "committed");
1952                        Ok(CacheResult::Miss)
1953                    }
1954                }
1955            },
1956            |remaining| {
1957                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1958                let results = self
1959                    .record_db_multi_get("transaction_effects", remaining.len())
1960                    .multi_get_effects(remaining_digests.iter())
1961                    .expect("db error");
1962                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1963                    if result.is_none() {
1964                        self.cached
1965                            .transaction_effects
1966                            .insert(digest, None, *ticket)
1967                            .ok();
1968                    }
1969                }
1970                Ok(results)
1971            },
1972        )
1973    }
1974
1975    #[instrument(level = "trace", skip_all)]
1976    fn try_notify_read_executed_effects_digests<'a>(
1977        &'a self,
1978        digests: &'a [TransactionDigest],
1979    ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
1980        self.executed_effects_digests_notify_read
1981            .read(digests, |digests| {
1982                self.try_multi_get_executed_effects_digests(digests)
1983            })
1984            .boxed()
1985    }
1986
1987    #[instrument(level = "trace", skip_all)]
1988    fn try_multi_get_events(
1989        &self,
1990        event_digests: &[TransactionEventsDigest],
1991    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
1992        fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
1993            if events.data.is_empty() {
1994                None
1995            } else {
1996                Some(events)
1997            }
1998        }
1999
2000        let digests_and_tickets: Vec<_> = event_digests
2001            .iter()
2002            .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2003            .collect();
2004        try_do_fallback_lookup(
2005            &digests_and_tickets,
2006            |(digest, _)| {
2007                self.metrics
2008                    .record_cache_request("transaction_events", "uncommitted");
2009                if let Some(events) = self
2010                    .dirty
2011                    .transaction_events
2012                    .get(digest)
2013                    .map(|e| e.1.clone())
2014                {
2015                    self.metrics
2016                        .record_cache_hit("transaction_events", "uncommitted");
2017
2018                    return Ok(CacheResult::Hit(map_events(events)));
2019                }
2020                self.metrics
2021                    .record_cache_miss("transaction_events", "uncommitted");
2022
2023                self.metrics
2024                    .record_cache_request("transaction_events", "committed");
2025                match self
2026                    .cached
2027                    .transaction_events
2028                    .get(digest)
2029                    .map(|l| l.lock().clone())
2030                {
2031                    Some(PointCacheItem::Some(events)) => {
2032                        self.metrics
2033                            .record_cache_hit("transaction_events", "committed");
2034                        Ok(CacheResult::Hit(map_events((*events).clone())))
2035                    }
2036                    Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
2037                    None => {
2038                        self.metrics
2039                            .record_cache_miss("transaction_events", "committed");
2040
2041                        Ok(CacheResult::Miss)
2042                    }
2043                }
2044            },
2045            |remaining| {
2046                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2047                let results = self
2048                    .store
2049                    .multi_get_events(&remaining_digests)
2050                    .expect("db error");
2051                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2052                    if result.is_none() {
2053                        self.cached
2054                            .transaction_events
2055                            .insert(digest, None, *ticket)
2056                            .ok();
2057                    }
2058                }
2059                Ok(results)
2060            },
2061        )
2062    }
2063}
2064
2065impl ExecutionCacheWrite for WritebackCache {
2066    #[instrument(level = "debug", skip_all)]
2067    fn try_acquire_transaction_locks(
2068        &self,
2069        epoch_store: &AuthorityPerEpochStore,
2070        owned_input_objects: &[ObjectRef],
2071        transaction: VerifiedSignedTransaction,
2072    ) -> IotaResult {
2073        self.object_locks.acquire_transaction_locks(
2074            self,
2075            epoch_store,
2076            owned_input_objects,
2077            transaction,
2078        )
2079    }
2080
2081    fn try_write_transaction_outputs(
2082        &self,
2083        epoch_id: EpochId,
2084        tx_outputs: Arc<TransactionOutputs>,
2085    ) -> IotaResult {
2086        WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs)
2087    }
2088}
2089
2090implement_passthrough_traits!(WritebackCache);
2091
2092impl AccumulatorStore for WritebackCache {
2093    fn get_root_state_accumulator_for_epoch(
2094        &self,
2095        epoch: EpochId,
2096    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
2097        self.store.get_root_state_accumulator_for_epoch(epoch)
2098    }
2099
2100    fn get_root_state_accumulator_for_highest_epoch(
2101        &self,
2102    ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
2103        self.store.get_root_state_accumulator_for_highest_epoch()
2104    }
2105
2106    fn insert_state_accumulator_for_epoch(
2107        &self,
2108        epoch: EpochId,
2109        checkpoint_seq_num: &CheckpointSequenceNumber,
2110        acc: &Accumulator,
2111    ) -> IotaResult {
2112        self.store
2113            .insert_state_accumulator_for_epoch(epoch, checkpoint_seq_num, acc)
2114    }
2115
2116    fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2117        assert!(
2121            self.dirty.is_empty(),
2122            "cannot iterate live object set with dirty data"
2123        );
2124        self.store.iter_live_object_set()
2125    }
2126
2127    fn iter_cached_live_object_set_for_testing(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2131        let iter = self.dirty.objects.iter();
2133        let mut dirty_objects = BTreeMap::new();
2134
2135        for obj in self.store.iter_live_object_set() {
2137            dirty_objects.insert(obj.object_id(), obj);
2138        }
2139
2140        for entry in iter {
2142            let id = *entry.key();
2143            let value = entry.value();
2144            match value.get_highest().unwrap() {
2145                (_, ObjectEntry::Object(object)) => {
2146                    dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2147                }
2148                (_version, ObjectEntry::Wrapped) => {
2149                    dirty_objects.remove(&id);
2150                }
2151                (_, ObjectEntry::Deleted) => {
2152                    dirty_objects.remove(&id);
2153                }
2154            }
2155        }
2156
2157        Box::new(dirty_objects.into_values())
2158    }
2159}
2160
2161impl StateSyncAPI for WritebackCache {
2168    fn try_insert_transaction_and_effects(
2169        &self,
2170        transaction: &VerifiedTransaction,
2171        transaction_effects: &TransactionEffects,
2172    ) -> IotaResult {
2173        self.store
2174            .insert_transaction_and_effects(transaction, transaction_effects)?;
2175
2176        self.cached
2179            .transactions
2180            .insert(
2181                transaction.digest(),
2182                PointCacheItem::Some(Arc::new(transaction.clone())),
2183                Ticket::Write,
2184            )
2185            .ok();
2186        self.cached
2187            .transaction_effects
2188            .insert(
2189                &transaction_effects.digest(),
2190                PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2191                Ticket::Write,
2192            )
2193            .ok();
2194
2195        Ok(())
2196    }
2197
2198    fn try_multi_insert_transaction_and_effects(
2199        &self,
2200        transactions_and_effects: &[VerifiedExecutionData],
2201    ) -> IotaResult {
2202        self.store
2203            .multi_insert_transaction_and_effects(transactions_and_effects.iter())?;
2204        for VerifiedExecutionData {
2205            transaction,
2206            effects,
2207        } in transactions_and_effects
2208        {
2209            self.cached
2210                .transactions
2211                .insert(
2212                    transaction.digest(),
2213                    PointCacheItem::Some(Arc::new(transaction.clone())),
2214                    Ticket::Write,
2215                )
2216                .ok();
2217            self.cached
2218                .transaction_effects
2219                .insert(
2220                    &effects.digest(),
2221                    PointCacheItem::Some(Arc::new(effects.clone())),
2222                    Ticket::Write,
2223                )
2224                .ok();
2225        }
2226
2227        Ok(())
2228    }
2229}