1use std::{
51 collections::{BTreeMap, BTreeSet},
52 hash::Hash,
53 sync::{Arc, atomic::AtomicU64},
54};
55
56use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
57use futures::{FutureExt, future::BoxFuture};
58use iota_common::sync::notify_read::NotifyRead;
59use iota_config::WritebackCacheConfig;
60use iota_macros::fail_point;
61use iota_types::{
62 accumulator::Accumulator,
63 base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
64 digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
65 effects::{TransactionEffects, TransactionEvents},
66 error::{IotaError, IotaResult, UserInputError},
67 executable_transaction::VerifiedExecutableTransaction,
68 iota_system_state::{IotaSystemState, get_iota_system_state},
69 message_envelope::Message,
70 messages_checkpoint::CheckpointSequenceNumber,
71 object::Object,
72 storage::{MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject},
73 transaction::{VerifiedSignedTransaction, VerifiedTransaction},
74};
75use moka::sync::Cache as MokaCache;
76use parking_lot::Mutex;
77use prometheus::Registry;
78use tap::TapOptional;
79use tracing::{debug, info, instrument, trace, warn};
80
81use super::{
82 CheckpointCache, ExecutionCacheAPI, ExecutionCacheCommit, ExecutionCacheMetrics,
83 ExecutionCacheReconfigAPI, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI,
84 TransactionCacheRead,
85 cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache, Ticket},
86 implement_passthrough_traits,
87 object_locks::ObjectLocks,
88};
89use crate::{
90 authority::{
91 AuthorityStore,
92 authority_per_epoch_store::AuthorityPerEpochStore,
93 authority_store::{ExecutionLockWriteGuard, IotaLockResult, ObjectLockStatus},
94 authority_store_tables::LiveObject,
95 backpressure::BackpressureManager,
96 epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
97 },
98 fallback_fetch::try_do_fallback_lookup,
99 state_accumulator::AccumulatorStore,
100 transaction_outputs::TransactionOutputs,
101};
102
103#[cfg(test)]
104#[path = "unit_tests/writeback_cache_tests.rs"]
105pub mod writeback_cache_tests;
106
107#[derive(Clone, PartialEq, Eq)]
108enum ObjectEntry {
109 Object(Object),
110 Deleted,
111 Wrapped,
112}
113
114impl ObjectEntry {
115 #[cfg(test)]
116 fn unwrap_object(&self) -> &Object {
117 match self {
118 ObjectEntry::Object(o) => o,
119 _ => panic!("unwrap_object called on non-Object"),
120 }
121 }
122
123 fn is_tombstone(&self) -> bool {
124 match self {
125 ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
126 ObjectEntry::Object(_) => false,
127 }
128 }
129}
130
131impl std::fmt::Debug for ObjectEntry {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 match self {
134 ObjectEntry::Object(o) => {
135 write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
136 }
137 ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
138 ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
139 }
140 }
141}
142
143impl From<Object> for ObjectEntry {
144 fn from(object: Object) -> Self {
145 ObjectEntry::Object(object)
146 }
147}
148
149impl From<ObjectOrTombstone> for ObjectEntry {
150 fn from(object: ObjectOrTombstone) -> Self {
151 match object {
152 ObjectOrTombstone::Object(o) => o.into(),
153 ObjectOrTombstone::Tombstone(obj_ref) => {
154 if obj_ref.2.is_deleted() {
155 ObjectEntry::Deleted
156 } else if obj_ref.2.is_wrapped() {
157 ObjectEntry::Wrapped
158 } else {
159 panic!("tombstone digest must either be deleted or wrapped");
160 }
161 }
162 }
163 }
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
167enum LatestObjectCacheEntry {
168 Object(SequenceNumber, ObjectEntry),
169 NonExistent,
170}
171
172impl LatestObjectCacheEntry {
173 #[cfg(test)]
174 fn version(&self) -> Option<SequenceNumber> {
175 match self {
176 LatestObjectCacheEntry::Object(version, _) => Some(*version),
177 LatestObjectCacheEntry::NonExistent => None,
178 }
179 }
180}
181
182impl IsNewer for LatestObjectCacheEntry {
183 fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
184 match (self, other) {
185 (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
186 v1 > v2
187 }
188 (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
189 _ => false,
190 }
191 }
192}
193
194type MarkerKey = (EpochId, ObjectID);
195
196struct UncommittedData {
199 objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
216
217 markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
222
223 transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
224
225 transaction_events:
230 DashMap<TransactionEventsDigest, (BTreeSet<TransactionDigest>, TransactionEvents)>,
231
232 executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
233
234 pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
237
238 total_transaction_inserts: AtomicU64,
239 total_transaction_commits: AtomicU64,
240}
241
242impl UncommittedData {
243 fn new() -> Self {
244 Self {
245 objects: DashMap::new(),
246 markers: DashMap::new(),
247 transaction_effects: DashMap::new(),
248 executed_effects_digests: DashMap::new(),
249 pending_transaction_writes: DashMap::new(),
250 transaction_events: DashMap::new(),
251 total_transaction_inserts: AtomicU64::new(0),
252 total_transaction_commits: AtomicU64::new(0),
253 }
254 }
255
256 fn clear(&self) {
257 self.objects.clear();
258 self.markers.clear();
259 self.transaction_effects.clear();
260 self.executed_effects_digests.clear();
261 self.pending_transaction_writes.clear();
262 self.transaction_events.clear();
263 self.total_transaction_inserts
264 .store(0, std::sync::atomic::Ordering::Relaxed);
265 self.total_transaction_commits
266 .store(0, std::sync::atomic::Ordering::Relaxed);
267 }
268
269 fn is_empty(&self) -> bool {
270 let empty = self.pending_transaction_writes.is_empty();
271 if empty && cfg!(debug_assertions) {
272 assert!(
273 self.objects.is_empty()
274 && self.markers.is_empty()
275 && self.transaction_effects.is_empty()
276 && self.executed_effects_digests.is_empty()
277 && self.transaction_events.is_empty()
278 && self
279 .total_transaction_inserts
280 .load(std::sync::atomic::Ordering::Relaxed)
281 == self
282 .total_transaction_commits
283 .load(std::sync::atomic::Ordering::Relaxed),
284 );
285 }
286 empty
287 }
288}
289
290type PointCacheItem<T> = Option<T>;
293
294impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
297 fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
298 match (self, other) {
299 (Some(_), None) => true,
300
301 (Some(a), Some(b)) => {
302 debug_assert_eq!(a, b);
304 false
305 }
306
307 _ => false,
308 }
309 }
310}
311
312struct CachedCommittedData {
315 object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
317
318 object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
324
325 marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
327
328 transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
329
330 transaction_effects:
331 MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
332
333 transaction_events:
334 MonotonicCache<TransactionEventsDigest, PointCacheItem<Arc<TransactionEvents>>>,
335
336 executed_effects_digests:
337 MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
338
339 _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
342}
343
344impl CachedCommittedData {
345 fn new(config: &WritebackCacheConfig) -> Self {
346 let object_cache = MokaCache::builder()
347 .max_capacity(config.object_cache_size())
348 .build();
349 let marker_cache = MokaCache::builder()
350 .max_capacity(config.marker_cache_size())
351 .build();
352
353 let transactions = MonotonicCache::new(config.transaction_cache_size());
354 let transaction_effects = MonotonicCache::new(config.effect_cache_size());
355 let transaction_events = MonotonicCache::new(config.events_cache_size());
356 let executed_effects_digests = MonotonicCache::new(config.executed_effect_cache_size());
357
358 let transaction_objects = MokaCache::builder()
359 .max_capacity(config.transaction_objects_cache_size())
360 .build();
361
362 Self {
363 object_cache,
364 object_by_id_cache: MonotonicCache::new(config.object_by_id_cache_size()),
365 marker_cache,
366 transactions,
367 transaction_effects,
368 transaction_events,
369 executed_effects_digests,
370 _transaction_objects: transaction_objects,
371 }
372 }
373
374 fn clear_and_assert_empty(&self) {
375 self.object_cache.invalidate_all();
376 self.object_by_id_cache.invalidate_all();
377 self.marker_cache.invalidate_all();
378 self.transactions.invalidate_all();
379 self.transaction_effects.invalidate_all();
380 self.transaction_events.invalidate_all();
381 self.executed_effects_digests.invalidate_all();
382 self._transaction_objects.invalidate_all();
383
384 assert_empty(&self.object_cache);
385 assert!(&self.object_by_id_cache.is_empty());
386 assert_empty(&self.marker_cache);
387 assert!(self.transactions.is_empty());
388 assert!(self.transaction_effects.is_empty());
389 assert!(self.transaction_events.is_empty());
390 assert!(self.executed_effects_digests.is_empty());
391 assert_empty(&self._transaction_objects);
392 }
393}
394
395fn assert_empty<K, V>(cache: &MokaCache<K, V>)
396where
397 K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
398 V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
399{
400 if cache.iter().next().is_some() {
401 panic!("cache should be empty");
402 }
403}
404
405pub struct WritebackCache {
406 dirty: UncommittedData,
407 cached: CachedCommittedData,
408
409 packages: MokaCache<ObjectID, PackageObject>,
420
421 object_locks: ObjectLocks,
422
423 executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
424 store: Arc<AuthorityStore>,
425 backpressure_threshold: u64,
426 backpressure_manager: Arc<BackpressureManager>,
427 metrics: Arc<ExecutionCacheMetrics>,
428}
429
430macro_rules! check_cache_entry_by_version {
431 ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
432 $self.metrics.record_cache_request($table, $level);
433 if let Some(cache) = $cache {
434 if let Some(entry) = cache.get(&$version) {
435 $self.metrics.record_cache_hit($table, $level);
436 return CacheResult::Hit(entry.clone());
437 }
438
439 if let Some(least_version) = cache.get_least() {
440 if least_version.0 < $version {
441 $self.metrics.record_cache_negative_hit($table, $level);
444 return CacheResult::NegativeHit;
445 }
446 }
447 }
448 $self.metrics.record_cache_miss($table, $level);
449 };
450}
451
452macro_rules! check_cache_entry_by_latest {
453 ($self: ident, $table: expr, $level: expr, $cache: expr) => {
454 $self.metrics.record_cache_request($table, $level);
455 if let Some(cache) = $cache {
456 if let Some((version, entry)) = cache.get_highest() {
457 $self.metrics.record_cache_hit($table, $level);
458 return CacheResult::Hit((*version, entry.clone()));
459 } else {
460 panic!("empty CachedVersionMap should have been removed");
461 }
462 }
463 $self.metrics.record_cache_miss($table, $level);
464 };
465}
466
467impl WritebackCache {
468 pub fn new(
469 config: &WritebackCacheConfig,
470 store: Arc<AuthorityStore>,
471 metrics: Arc<ExecutionCacheMetrics>,
472 backpressure_manager: Arc<BackpressureManager>,
473 ) -> Self {
474 let packages = MokaCache::builder()
475 .max_capacity(config.package_cache_size())
476 .build();
477 Self {
478 dirty: UncommittedData::new(),
479 cached: CachedCommittedData::new(config),
480 packages,
481 object_locks: ObjectLocks::new(),
482 executed_effects_digests_notify_read: NotifyRead::new(),
483 store,
484 backpressure_manager,
485 backpressure_threshold: config.backpressure_threshold(),
486 metrics,
487 }
488 }
489
490 pub fn new_for_tests(store: Arc<AuthorityStore>, registry: &Registry) -> Self {
491 Self::new(
492 &Default::default(),
493 store,
494 ExecutionCacheMetrics::new(registry).into(),
495 BackpressureManager::new_for_tests(),
496 )
497 }
498
499 #[cfg(test)]
500 pub fn reset_for_test(&mut self) {
501 let mut new = Self::new(
502 &Default::default(),
503 self.store.clone(),
504 self.metrics.clone(),
505 self.backpressure_manager.clone(),
506 );
507 std::mem::swap(self, &mut new);
508 }
509
510 fn write_object_entry(
511 &self,
512 object_id: &ObjectID,
513 version: SequenceNumber,
514 object: ObjectEntry,
515 ) {
516 trace!(?object_id, ?version, ?object, "inserting object entry");
517 self.metrics.record_cache_write("object");
518
519 let mut entry = self.dirty.objects.entry(*object_id).or_default();
543
544 self.cached
545 .object_by_id_cache
546 .insert(
547 object_id,
548 LatestObjectCacheEntry::Object(version, object.clone()),
549 Ticket::Write,
550 )
551 .ok();
554
555 entry.insert(version, object);
556 }
557
558 fn write_marker_value(
559 &self,
560 epoch_id: EpochId,
561 object_key: &ObjectKey,
562 marker_value: MarkerValue,
563 ) {
564 tracing::trace!(
565 "inserting marker value {:?}: {:?}",
566 object_key,
567 marker_value
568 );
569 fail_point!("write_marker_entry");
570 self.metrics.record_cache_write("marker");
571 self.dirty
572 .markers
573 .entry((epoch_id, object_key.0))
574 .or_default()
575 .value_mut()
576 .insert(object_key.1, marker_value);
577 }
578
579 fn with_locked_cache_entries<K, V, R>(
583 dirty_map: &DashMap<K, CachedVersionMap<V>>,
584 cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
585 key: &K,
586 cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
587 ) -> R
588 where
589 K: Copy + Eq + Hash + Send + Sync + 'static,
590 V: Send + Sync + 'static,
591 {
592 let dirty_entry = dirty_map.entry(*key);
593 let dirty_entry = match &dirty_entry {
594 DashMapEntry::Occupied(occupied) => Some(occupied.get()),
595 DashMapEntry::Vacant(_) => None,
596 };
597
598 let cached_entry = cached_map.get(key);
599 let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
600 let cached_entry = cached_lock.as_deref();
601
602 cb(dirty_entry, cached_entry)
603 }
604
605 fn get_object_entry_by_key_cache_only(
608 &self,
609 object_id: &ObjectID,
610 version: SequenceNumber,
611 ) -> CacheResult<ObjectEntry> {
612 Self::with_locked_cache_entries(
613 &self.dirty.objects,
614 &self.cached.object_cache,
615 object_id,
616 |dirty_entry, cached_entry| {
617 check_cache_entry_by_version!(
618 self,
619 "object_by_version",
620 "uncommitted",
621 dirty_entry,
622 version
623 );
624 check_cache_entry_by_version!(
625 self,
626 "object_by_version",
627 "committed",
628 cached_entry,
629 version
630 );
631 CacheResult::Miss
632 },
633 )
634 }
635
636 fn get_object_by_key_cache_only(
637 &self,
638 object_id: &ObjectID,
639 version: SequenceNumber,
640 ) -> CacheResult<Object> {
641 match self.get_object_entry_by_key_cache_only(object_id, version) {
642 CacheResult::Hit(entry) => match entry {
643 ObjectEntry::Object(object) => CacheResult::Hit(object),
644 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
645 },
646 CacheResult::Miss => CacheResult::Miss,
647 CacheResult::NegativeHit => CacheResult::NegativeHit,
648 }
649 }
650
651 fn get_object_entry_by_id_cache_only(
652 &self,
653 request_type: &'static str,
654 object_id: &ObjectID,
655 ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
656 self.metrics
657 .record_cache_request(request_type, "object_by_id");
658 let entry = self.cached.object_by_id_cache.get(object_id);
659
660 if cfg!(debug_assertions) {
661 if let Some(entry) = &entry {
662 let highest: Option<ObjectEntry> = self
664 .dirty
665 .objects
666 .get(object_id)
667 .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
668 .or_else(|| {
669 let obj: Option<ObjectEntry> = self
670 .store
671 .get_latest_object_or_tombstone(*object_id)
672 .unwrap()
673 .map(|(_, o)| o.into());
674 obj
675 });
676
677 let cache_entry = match &*entry.lock() {
678 LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
679 LatestObjectCacheEntry::NonExistent => None,
680 };
681
682 let tombstone_possibly_pruned = highest.is_none()
685 && cache_entry
686 .as_ref()
687 .map(|e| e.is_tombstone())
688 .unwrap_or(false);
689
690 if highest != cache_entry && !tombstone_possibly_pruned {
691 tracing::error!(
692 ?highest,
693 ?cache_entry,
694 ?tombstone_possibly_pruned,
695 "object_by_id cache is incoherent for {:?}",
696 object_id
697 );
698 panic!("object_by_id cache is incoherent for {object_id:?}");
699 }
700 }
701 }
702
703 if let Some(entry) = entry {
704 let entry = entry.lock();
705 match &*entry {
706 LatestObjectCacheEntry::Object(latest_version, latest_object) => {
707 self.metrics.record_cache_hit(request_type, "object_by_id");
708 return CacheResult::Hit((*latest_version, latest_object.clone()));
709 }
710 LatestObjectCacheEntry::NonExistent => {
711 self.metrics
712 .record_cache_negative_hit(request_type, "object_by_id");
713 return CacheResult::NegativeHit;
714 }
715 }
716 } else {
717 self.metrics.record_cache_miss(request_type, "object_by_id");
718 }
719
720 Self::with_locked_cache_entries(
721 &self.dirty.objects,
722 &self.cached.object_cache,
723 object_id,
724 |dirty_entry, cached_entry| {
725 check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
726 check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
727 CacheResult::Miss
728 },
729 )
730 }
731
732 fn get_object_by_id_cache_only(
733 &self,
734 request_type: &'static str,
735 object_id: &ObjectID,
736 ) -> CacheResult<(SequenceNumber, Object)> {
737 match self.get_object_entry_by_id_cache_only(request_type, object_id) {
738 CacheResult::Hit((version, entry)) => match entry {
739 ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
740 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
741 },
742 CacheResult::NegativeHit => CacheResult::NegativeHit,
743 CacheResult::Miss => CacheResult::Miss,
744 }
745 }
746
747 fn get_marker_value_cache_only(
748 &self,
749 object_id: &ObjectID,
750 version: SequenceNumber,
751 epoch_id: EpochId,
752 ) -> CacheResult<MarkerValue> {
753 Self::with_locked_cache_entries(
754 &self.dirty.markers,
755 &self.cached.marker_cache,
756 &(epoch_id, *object_id),
757 |dirty_entry, cached_entry| {
758 check_cache_entry_by_version!(
759 self,
760 "marker_by_version",
761 "uncommitted",
762 dirty_entry,
763 version
764 );
765 check_cache_entry_by_version!(
766 self,
767 "marker_by_version",
768 "committed",
769 cached_entry,
770 version
771 );
772 CacheResult::Miss
773 },
774 )
775 }
776
777 fn get_latest_marker_value_cache_only(
778 &self,
779 object_id: &ObjectID,
780 epoch_id: EpochId,
781 ) -> CacheResult<(SequenceNumber, MarkerValue)> {
782 Self::with_locked_cache_entries(
783 &self.dirty.markers,
784 &self.cached.marker_cache,
785 &(epoch_id, *object_id),
786 |dirty_entry, cached_entry| {
787 check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
788 check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
789 CacheResult::Miss
790 },
791 )
792 }
793
794 fn get_object_impl(
795 &self,
796 request_type: &'static str,
797 id: &ObjectID,
798 ) -> IotaResult<Option<Object>> {
799 let ticket = self.cached.object_by_id_cache.get_ticket_for_read(id);
800 match self.get_object_by_id_cache_only(request_type, id) {
801 CacheResult::Hit((_, object)) => Ok(Some(object)),
802 CacheResult::NegativeHit => Ok(None),
803 CacheResult::Miss => {
804 let obj = self.store.try_get_object(id)?;
805 if let Some(obj) = &obj {
806 self.cache_latest_object_by_id(
807 id,
808 LatestObjectCacheEntry::Object(obj.version(), obj.clone().into()),
809 ticket,
810 );
811 } else {
812 self.cache_object_not_found(id, ticket);
813 }
814 Ok(obj)
815 }
816 }
817 }
818
819 fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
820 self.metrics.record_cache_request(request_type, "db");
821 &self.store
822 }
823
824 fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
825 self.metrics
826 .record_cache_multi_request(request_type, "db", count);
827 &self.store
828 }
829
830 #[instrument(level = "debug", skip_all)]
831 fn write_transaction_outputs(
832 &self,
833 epoch_id: EpochId,
834 tx_outputs: Arc<TransactionOutputs>,
835 ) -> IotaResult {
836 trace!(digest = ?tx_outputs.transaction.digest(), "writing transaction outputs to cache");
837
838 let TransactionOutputs {
839 transaction,
840 effects,
841 markers,
842 written,
843 deleted,
844 wrapped,
845 events,
846 ..
847 } = &*tx_outputs;
848
849 for ObjectKey(id, version) in deleted.iter() {
855 self.write_object_entry(id, *version, ObjectEntry::Deleted);
856 }
857
858 for ObjectKey(id, version) in wrapped.iter() {
859 self.write_object_entry(id, *version, ObjectEntry::Wrapped);
860 }
861
862 for (object_key, marker_value) in markers.iter() {
864 self.write_marker_value(epoch_id, object_key, *marker_value);
865 }
866
867 for (object_id, object) in written.iter() {
870 if object.is_child_object() {
871 self.write_object_entry(object_id, object.version(), object.clone().into());
872 }
873 }
874 for (object_id, object) in written.iter() {
875 if !object.is_child_object() {
876 self.write_object_entry(object_id, object.version(), object.clone().into());
877 if object.is_package() {
878 debug!("caching package: {:?}", object.compute_object_reference());
879 self.packages
880 .insert(*object_id, PackageObject::new(object.clone()));
881 }
882 }
883 }
884
885 let tx_digest = *transaction.digest();
886 let effects_digest = effects.digest();
887
888 self.metrics.record_cache_write("transaction_block");
889 self.dirty
890 .pending_transaction_writes
891 .insert(tx_digest, tx_outputs.clone());
892
893 self.metrics.record_cache_write("transaction_effects");
896 self.dirty
897 .transaction_effects
898 .insert(effects_digest, effects.clone());
899
900 self.metrics.record_cache_write("transaction_events");
905 match self.dirty.transaction_events.entry(events.digest()) {
906 DashMapEntry::Occupied(mut occupied) => {
907 occupied.get_mut().0.insert(tx_digest);
908 }
909 DashMapEntry::Vacant(entry) => {
910 let mut txns = BTreeSet::new();
911 txns.insert(tx_digest);
912 entry.insert((txns, events.clone()));
913 }
914 }
915
916 self.metrics.record_cache_write("executed_effects_digests");
917 self.dirty
918 .executed_effects_digests
919 .insert(tx_digest, effects_digest);
920
921 self.executed_effects_digests_notify_read
922 .notify(&tx_digest, &effects_digest);
923
924 self.metrics
925 .pending_notify_read
926 .set(self.executed_effects_digests_notify_read.num_pending() as i64);
927
928 let prev = self
929 .dirty
930 .total_transaction_inserts
931 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
932
933 let pending_count = (prev + 1).saturating_sub(
934 self.dirty
935 .total_transaction_commits
936 .load(std::sync::atomic::Ordering::Relaxed),
937 );
938
939 self.set_backpressure(pending_count);
940
941 Ok(())
942 }
943
944 #[instrument(level = "debug", skip_all)]
946 fn commit_transaction_outputs(
947 &self,
948 epoch: EpochId,
949 digests: &[TransactionDigest],
950 ) -> IotaResult {
951 fail_point!("writeback-cache-commit");
952 trace!(?digests);
953
954 let mut all_outputs = Vec::with_capacity(digests.len());
955 for tx in digests {
956 let Some(outputs) = self
957 .dirty
958 .pending_transaction_writes
959 .get(tx)
960 .map(|o| o.clone())
961 else {
962 warn!("Attempt to commit unknown transaction {:?}", tx);
970 continue;
971 };
972 all_outputs.push(outputs);
973 }
974
975 self.store.write_transaction_outputs(epoch, &all_outputs)?;
979
980 for outputs in all_outputs.iter() {
981 let tx_digest = outputs.transaction.digest();
982 assert!(
983 self.dirty
984 .pending_transaction_writes
985 .remove(tx_digest)
986 .is_some()
987 );
988 self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
989 }
990
991 let num_outputs = all_outputs.len() as u64;
992 let num_commits = self
993 .dirty
994 .total_transaction_commits
995 .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
996 + num_outputs;
997
998 let pending_count = self
999 .dirty
1000 .total_transaction_inserts
1001 .load(std::sync::atomic::Ordering::Relaxed)
1002 .saturating_sub(num_commits);
1003
1004 self.set_backpressure(pending_count);
1005
1006 Ok(())
1007 }
1008
1009 fn approximate_pending_transaction_count(&self) -> u64 {
1010 let num_commits = self
1011 .dirty
1012 .total_transaction_commits
1013 .load(std::sync::atomic::Ordering::Relaxed);
1014
1015 self.dirty
1016 .total_transaction_inserts
1017 .load(std::sync::atomic::Ordering::Relaxed)
1018 .saturating_sub(num_commits)
1019 }
1020
1021 fn set_backpressure(&self, pending_count: u64) {
1022 let backpressure = pending_count > self.backpressure_threshold;
1023 let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1024 if backpressure_changed {
1025 self.metrics.backpressure_toggles.inc();
1026 }
1027 self.metrics
1028 .backpressure_status
1029 .set(if backpressure { 1 } else { 0 });
1030 }
1031
1032 fn flush_transactions_from_dirty_to_cached(
1033 &self,
1034 epoch: EpochId,
1035 tx_digest: TransactionDigest,
1036 outputs: &TransactionOutputs,
1037 ) {
1038 let TransactionOutputs {
1042 transaction,
1043 effects,
1044 markers,
1045 written,
1046 deleted,
1047 wrapped,
1048 events,
1049 ..
1050 } = outputs;
1051
1052 let effects_digest = effects.digest();
1053 let events_digest = events.digest();
1054
1055 self.cached
1058 .transactions
1059 .insert(
1060 &tx_digest,
1061 PointCacheItem::Some(transaction.clone()),
1062 Ticket::Write,
1063 )
1064 .ok();
1065 self.cached
1066 .transaction_effects
1067 .insert(
1068 &effects_digest,
1069 PointCacheItem::Some(effects.clone().into()),
1070 Ticket::Write,
1071 )
1072 .ok();
1073 self.cached
1074 .executed_effects_digests
1075 .insert(
1076 &tx_digest,
1077 PointCacheItem::Some(effects_digest),
1078 Ticket::Write,
1079 )
1080 .ok();
1081 self.cached
1082 .transaction_events
1083 .insert(
1084 &events_digest,
1085 PointCacheItem::Some(events.clone().into()),
1086 Ticket::Write,
1087 )
1088 .ok();
1089
1090 self.dirty
1091 .transaction_effects
1092 .remove(&effects_digest)
1093 .expect("effects must exist");
1094
1095 match self.dirty.transaction_events.entry(events.digest()) {
1096 DashMapEntry::Occupied(mut occupied) => {
1097 let txns = &mut occupied.get_mut().0;
1098 assert!(txns.remove(&tx_digest), "transaction must exist");
1099 if txns.is_empty() {
1100 occupied.remove();
1101 }
1102 }
1103 DashMapEntry::Vacant(_) => {
1104 panic!("events must exist");
1105 }
1106 }
1107
1108 self.dirty
1109 .executed_effects_digests
1110 .remove(&tx_digest)
1111 .expect("executed effects must exist");
1112
1113 for (object_key, marker_value) in markers.iter() {
1115 Self::move_version_from_dirty_to_cache(
1116 &self.dirty.markers,
1117 &self.cached.marker_cache,
1118 (epoch, object_key.0),
1119 object_key.1,
1120 marker_value,
1121 );
1122 }
1123
1124 for (object_id, object) in written.iter() {
1125 Self::move_version_from_dirty_to_cache(
1126 &self.dirty.objects,
1127 &self.cached.object_cache,
1128 *object_id,
1129 object.version(),
1130 &ObjectEntry::Object(object.clone()),
1131 );
1132 }
1133
1134 for ObjectKey(object_id, version) in deleted.iter() {
1135 Self::move_version_from_dirty_to_cache(
1136 &self.dirty.objects,
1137 &self.cached.object_cache,
1138 *object_id,
1139 *version,
1140 &ObjectEntry::Deleted,
1141 );
1142 }
1143
1144 for ObjectKey(object_id, version) in wrapped.iter() {
1145 Self::move_version_from_dirty_to_cache(
1146 &self.dirty.objects,
1147 &self.cached.object_cache,
1148 *object_id,
1149 *version,
1150 &ObjectEntry::Wrapped,
1151 );
1152 }
1153 }
1154
1155 fn move_version_from_dirty_to_cache<K, V>(
1158 dirty: &DashMap<K, CachedVersionMap<V>>,
1159 cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1160 key: K,
1161 version: SequenceNumber,
1162 value: &V,
1163 ) where
1164 K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1165 V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1166 {
1167 static MAX_VERSIONS: usize = 3;
1168
1169 let dirty_entry = dirty.entry(key);
1173 let cache_entry = cache.entry(key).or_default();
1174 let mut cache_map = cache_entry.value().lock();
1175
1176 cache_map.insert(version, value.clone());
1178 cache_map.truncate_to(MAX_VERSIONS);
1180
1181 let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1182 panic!("dirty map must exist");
1183 };
1184
1185 let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1186
1187 assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1188
1189 if occupied_dirty_entry.get().is_empty() {
1191 occupied_dirty_entry.remove();
1192 }
1193 }
1194
1195 fn cache_latest_object_by_id(
1197 &self,
1198 object_id: &ObjectID,
1199 object: LatestObjectCacheEntry,
1200 ticket: Ticket,
1201 ) {
1202 trace!("caching object by id: {:?} {:?}", object_id, object);
1203 if self
1204 .cached
1205 .object_by_id_cache
1206 .insert(object_id, object, ticket)
1207 .is_ok()
1208 {
1209 self.metrics.record_cache_write("object_by_id");
1210 } else {
1211 trace!("discarded cache write due to expired ticket");
1212 self.metrics.record_ticket_expiry();
1213 }
1214 }
1215
1216 fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1217 self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1218 }
1219
1220 fn clear_state_end_of_epoch_impl(&self, _execution_guard: &ExecutionLockWriteGuard<'_>) {
1221 info!("clearing state at end of epoch");
1222 assert!(
1223 self.dirty.pending_transaction_writes.is_empty(),
1224 "should be empty due to revert_state_update"
1225 );
1226 self.dirty.clear();
1227 info!("clearing old transaction locks");
1228 self.object_locks.clear();
1229 }
1230
1231 fn revert_state_update_impl(&self, tx: &TransactionDigest) -> IotaResult {
1232 let Some((_, outputs)) = self.dirty.pending_transaction_writes.remove(tx) else {
1237 assert!(
1238 !self.try_is_tx_already_executed(tx)?,
1239 "attempt to revert committed transaction"
1240 );
1241
1242 info!("Not reverting {:?} as it was not executed", tx);
1245 return Ok(());
1246 };
1247
1248 for (object_id, object) in outputs.written.iter() {
1249 if object.is_package() {
1250 info!("removing non-finalized package from cache: {:?}", object_id);
1251 self.packages.invalidate(object_id);
1252 }
1253 self.cached.object_by_id_cache.invalidate(object_id);
1254 self.cached.object_cache.invalidate(object_id);
1255 }
1256
1257 for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1258 self.cached.object_by_id_cache.invalidate(object_id);
1259 self.cached.object_cache.invalidate(object_id);
1260 }
1261
1262 Ok(())
1265 }
1266
1267 fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) -> IotaResult {
1268 self.store.bulk_insert_genesis_objects(objects)?;
1269 for obj in objects {
1270 self.cached.object_cache.invalidate(&obj.id());
1271 self.cached.object_by_id_cache.invalidate(&obj.id());
1272 }
1273 Ok(())
1274 }
1275
1276 fn insert_genesis_object_impl(&self, object: Object) -> IotaResult {
1277 self.cached.object_by_id_cache.invalidate(&object.id());
1278 self.cached.object_cache.invalidate(&object.id());
1279 self.store.insert_genesis_object(object)
1280 }
1281
1282 pub fn clear_caches_and_assert_empty(&self) {
1283 info!("clearing caches");
1284 self.cached.clear_and_assert_empty();
1285 self.packages.invalidate_all();
1286 assert_empty(&self.packages);
1287 }
1288}
1289
1290impl ExecutionCacheAPI for WritebackCache {}
1291
1292impl ExecutionCacheCommit for WritebackCache {
1293 fn try_commit_transaction_outputs(
1294 &self,
1295 epoch: EpochId,
1296 digests: &[TransactionDigest],
1297 ) -> IotaResult {
1298 WritebackCache::commit_transaction_outputs(self, epoch, digests)
1299 }
1300
1301 fn try_persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> IotaResult {
1302 self.store.persist_transaction(tx)
1303 }
1304
1305 fn approximate_pending_transaction_count(&self) -> u64 {
1306 WritebackCache::approximate_pending_transaction_count(self)
1307 }
1308}
1309
1310impl ObjectCacheRead for WritebackCache {
1311 fn try_get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1312 self.metrics
1313 .record_cache_request("package", "package_cache");
1314 if let Some(p) = self.packages.get(package_id) {
1315 if cfg!(debug_assertions) {
1316 let canonical_package = self
1317 .dirty
1318 .objects
1319 .get(package_id)
1320 .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1321 Some(ObjectEntry::Object(object)) => Some(object),
1322 _ => None,
1323 })
1324 .or_else(|| self.store.get_object(package_id));
1325
1326 if let Some(canonical_package) = canonical_package {
1327 assert_eq!(
1328 canonical_package.digest(),
1329 p.object().digest(),
1330 "Package object cache is inconsistent for package {package_id:?}"
1331 );
1332 }
1333 }
1334 self.metrics.record_cache_hit("package", "package_cache");
1335 return Ok(Some(p));
1336 } else {
1337 self.metrics.record_cache_miss("package", "package_cache");
1338 }
1339
1340 if let Some(p) = self.get_object_impl("package", package_id)? {
1344 if p.is_package() {
1345 let p = PackageObject::new(p);
1346 tracing::trace!(
1347 "caching package: {:?}",
1348 p.object().compute_object_reference()
1349 );
1350 self.metrics.record_cache_write("package");
1351 self.packages.insert(*package_id, p.clone());
1352 Ok(Some(p))
1353 } else {
1354 Err(IotaError::UserInput {
1355 error: UserInputError::MoveObjectAsPackage {
1356 object_id: *package_id,
1357 },
1358 })
1359 }
1360 } else {
1361 Ok(None)
1362 }
1363 }
1364
1365 fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1366 }
1369
1370 fn try_get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
1373 self.get_object_impl("object_latest", id)
1374 }
1375
1376 fn try_get_object_by_key(
1377 &self,
1378 object_id: &ObjectID,
1379 version: SequenceNumber,
1380 ) -> IotaResult<Option<Object>> {
1381 match self.get_object_by_key_cache_only(object_id, version) {
1382 CacheResult::Hit(object) => Ok(Some(object)),
1383 CacheResult::NegativeHit => Ok(None),
1384 CacheResult::Miss => Ok(self
1385 .record_db_get("object_by_version")
1386 .try_get_object_by_key(object_id, version)?),
1387 }
1388 }
1389
1390 fn try_multi_get_objects_by_key(
1391 &self,
1392 object_keys: &[ObjectKey],
1393 ) -> Result<Vec<Option<Object>>, IotaError> {
1394 try_do_fallback_lookup(
1395 object_keys,
1396 |key| {
1397 Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1398 CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1399 CacheResult::NegativeHit => CacheResult::NegativeHit,
1400 CacheResult::Miss => CacheResult::Miss,
1401 })
1402 },
1403 |remaining| {
1404 self.record_db_multi_get("object_by_version", remaining.len())
1405 .multi_get_objects_by_key(remaining)
1406 },
1407 )
1408 }
1409
1410 fn try_object_exists_by_key(
1411 &self,
1412 object_id: &ObjectID,
1413 version: SequenceNumber,
1414 ) -> IotaResult<bool> {
1415 match self.get_object_by_key_cache_only(object_id, version) {
1416 CacheResult::Hit(_) => Ok(true),
1417 CacheResult::NegativeHit => Ok(false),
1418 CacheResult::Miss => self
1419 .record_db_get("object_by_version")
1420 .object_exists_by_key(object_id, version),
1421 }
1422 }
1423
1424 fn try_multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
1425 try_do_fallback_lookup(
1426 object_keys,
1427 |key| {
1428 Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1429 CacheResult::Hit(_) => CacheResult::Hit(true),
1430 CacheResult::NegativeHit => CacheResult::Hit(false),
1431 CacheResult::Miss => CacheResult::Miss,
1432 })
1433 },
1434 |remaining| {
1435 self.record_db_multi_get("object_by_version", remaining.len())
1436 .multi_object_exists_by_key(remaining)
1437 },
1438 )
1439 }
1440
1441 fn try_get_latest_object_ref_or_tombstone(
1442 &self,
1443 object_id: ObjectID,
1444 ) -> IotaResult<Option<ObjectRef>> {
1445 match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1446 CacheResult::Hit((version, entry)) => Ok(Some(match entry {
1447 ObjectEntry::Object(object) => object.compute_object_reference(),
1448 ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1449 ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1450 })),
1451 CacheResult::NegativeHit => Ok(None),
1452 CacheResult::Miss => self
1453 .record_db_get("latest_objref_or_tombstone")
1454 .get_latest_object_ref_or_tombstone(object_id),
1455 }
1456 }
1457
1458 fn try_get_latest_object_or_tombstone(
1459 &self,
1460 object_id: ObjectID,
1461 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
1462 match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1463 CacheResult::Hit((version, entry)) => {
1464 let key = ObjectKey(object_id, version);
1465 Ok(Some(match entry {
1466 ObjectEntry::Object(object) => (key, object.into()),
1467 ObjectEntry::Deleted => (
1468 key,
1469 ObjectOrTombstone::Tombstone((
1470 object_id,
1471 version,
1472 ObjectDigest::OBJECT_DIGEST_DELETED,
1473 )),
1474 ),
1475 ObjectEntry::Wrapped => (
1476 key,
1477 ObjectOrTombstone::Tombstone((
1478 object_id,
1479 version,
1480 ObjectDigest::OBJECT_DIGEST_WRAPPED,
1481 )),
1482 ),
1483 }))
1484 }
1485 CacheResult::NegativeHit => Ok(None),
1486 CacheResult::Miss => self
1487 .record_db_get("latest_object_or_tombstone")
1488 .get_latest_object_or_tombstone(object_id),
1489 }
1490 }
1491
1492 #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1493 fn try_find_object_lt_or_eq_version(
1494 &self,
1495 object_id: ObjectID,
1496 version_bound: SequenceNumber,
1497 ) -> IotaResult<Option<Object>> {
1498 macro_rules! check_cache_entry {
1499 ($level: expr, $objects: expr) => {
1500 self.metrics
1501 .record_cache_request("object_lt_or_eq_version", $level);
1502 if let Some(objects) = $objects {
1503 if let Some((_, object)) = objects
1504 .all_versions_lt_or_eq_descending(&version_bound)
1505 .next()
1506 {
1507 if let ObjectEntry::Object(object) = object {
1508 self.metrics
1509 .record_cache_hit("object_lt_or_eq_version", $level);
1510 return Ok(Some(object.clone()));
1511 } else {
1512 self.metrics
1514 .record_cache_negative_hit("object_lt_or_eq_version", $level);
1515 return Ok(None);
1516 }
1517 } else {
1518 self.metrics
1519 .record_cache_miss("object_lt_or_eq_version", $level);
1520 }
1521 }
1522 };
1523 }
1524
1525 self.metrics
1527 .record_cache_request("object_lt_or_eq_version", "object_by_id");
1528 if let Some(latest) = self.cached.object_by_id_cache.get(&object_id) {
1529 let latest = latest.lock();
1530 match &*latest {
1531 LatestObjectCacheEntry::Object(latest_version, object) => {
1532 if *latest_version <= version_bound {
1533 if let ObjectEntry::Object(object) = object {
1534 self.metrics
1535 .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1536 return Ok(Some(object.clone()));
1537 } else {
1538 self.metrics.record_cache_negative_hit(
1540 "object_lt_or_eq_version",
1541 "object_by_id",
1542 );
1543 return Ok(None);
1544 }
1545 }
1546 }
1549 LatestObjectCacheEntry::NonExistent => {
1551 self.metrics
1552 .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1553 return Ok(None);
1554 }
1555 }
1556 }
1557 self.metrics
1558 .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1559
1560 Self::with_locked_cache_entries(
1561 &self.dirty.objects,
1562 &self.cached.object_cache,
1563 &object_id,
1564 |dirty_entry, cached_entry| {
1565 check_cache_entry!("committed", dirty_entry);
1566 check_cache_entry!("uncommitted", cached_entry);
1567
1568 let latest: Option<(SequenceNumber, ObjectEntry)> =
1592 if let Some(dirty_set) = dirty_entry {
1593 dirty_set
1594 .get_highest()
1595 .cloned()
1596 .tap_none(|| panic!("dirty set cannot be empty"))
1597 } else {
1598 self.record_db_get("object_lt_or_eq_version_latest")
1600 .get_latest_object_or_tombstone(object_id)?
1601 .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1602 (version, ObjectEntry::from(obj_or_tombstone))
1603 })
1604 };
1605
1606 if let Some((obj_version, obj_entry)) = latest {
1607 self.cache_latest_object_by_id(
1616 &object_id,
1617 LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1618 self.cached
1621 .object_by_id_cache
1622 .get_ticket_for_read(&object_id),
1623 );
1624
1625 if obj_version <= version_bound {
1626 match obj_entry {
1627 ObjectEntry::Object(object) => Ok(Some(object)),
1628 ObjectEntry::Deleted | ObjectEntry::Wrapped => Ok(None),
1629 }
1630 } else {
1631 self.record_db_get("object_lt_or_eq_version_scan")
1635 .find_object_lt_or_eq_version(object_id, version_bound)
1636 }
1637 } else {
1638 let highest = cached_entry.and_then(|c| c.get_highest());
1644 assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1645 self.cache_object_not_found(
1646 &object_id,
1647 self.cached
1649 .object_by_id_cache
1650 .get_ticket_for_read(&object_id),
1651 );
1652 Ok(None)
1653 }
1654 },
1655 )
1656 }
1657
1658 fn try_get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
1659 get_iota_system_state(self)
1660 }
1661
1662 fn try_get_marker_value(
1663 &self,
1664 object_id: &ObjectID,
1665 version: SequenceNumber,
1666 epoch_id: EpochId,
1667 ) -> IotaResult<Option<MarkerValue>> {
1668 match self.get_marker_value_cache_only(object_id, version, epoch_id) {
1669 CacheResult::Hit(marker) => Ok(Some(marker)),
1670 CacheResult::NegativeHit => Ok(None),
1671 CacheResult::Miss => self
1672 .record_db_get("marker_by_version")
1673 .get_marker_value(object_id, &version, epoch_id),
1674 }
1675 }
1676
1677 fn try_get_latest_marker(
1678 &self,
1679 object_id: &ObjectID,
1680 epoch_id: EpochId,
1681 ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
1682 match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1683 CacheResult::Hit((v, marker)) => Ok(Some((v, marker))),
1684 CacheResult::NegativeHit => {
1685 panic!("cannot have negative hit when getting latest marker")
1686 }
1687 CacheResult::Miss => self
1688 .record_db_get("marker_latest")
1689 .get_latest_marker(object_id, epoch_id),
1690 }
1691 }
1692
1693 fn try_get_lock(
1694 &self,
1695 obj_ref: ObjectRef,
1696 epoch_store: &AuthorityPerEpochStore,
1697 ) -> IotaLockResult {
1698 match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1699 CacheResult::Hit((_, obj)) => {
1700 let actual_objref = obj.compute_object_reference();
1701 if obj_ref != actual_objref {
1702 Ok(ObjectLockStatus::LockedAtDifferentVersion {
1703 locked_ref: actual_objref,
1704 })
1705 } else {
1706 Ok(
1708 match self
1709 .object_locks
1710 .get_transaction_lock(&obj_ref, epoch_store)?
1711 {
1712 Some(tx_digest) => ObjectLockStatus::LockedToTx {
1713 locked_by_tx: tx_digest,
1714 },
1715 None => ObjectLockStatus::Initialized,
1716 },
1717 )
1718 }
1719 }
1720 CacheResult::NegativeHit => {
1721 Err(IotaError::from(UserInputError::ObjectNotFound {
1722 object_id: obj_ref.0,
1723 version: None,
1726 }))
1727 }
1728 CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1729 }
1730 }
1731
1732 fn _try_get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
1733 let obj = self.get_object_impl("live_objref", &object_id)?.ok_or(
1734 UserInputError::ObjectNotFound {
1735 object_id,
1736 version: None,
1737 },
1738 )?;
1739 Ok(obj.compute_object_reference())
1740 }
1741
1742 fn try_check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1743 try_do_fallback_lookup(
1744 owned_object_refs,
1745 |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1746 CacheResult::Hit((version, obj)) => {
1747 if obj.compute_object_reference() != *obj_ref {
1748 Err(UserInputError::ObjectVersionUnavailableForConsumption {
1749 provided_obj_ref: *obj_ref,
1750 current_version: version,
1751 }
1752 .into())
1753 } else {
1754 Ok(CacheResult::Hit(()))
1755 }
1756 }
1757 CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1758 object_id: obj_ref.0,
1759 version: None,
1760 }
1761 .into()),
1762 CacheResult::Miss => Ok(CacheResult::Miss),
1763 },
1764 |remaining| {
1765 self.record_db_multi_get("object_is_live", remaining.len())
1766 .check_owned_objects_are_live(remaining)?;
1767 Ok(vec![(); remaining.len()])
1768 },
1769 )?;
1770 Ok(())
1771 }
1772
1773 fn try_get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
1774 self.store.perpetual_tables.get_highest_pruned_checkpoint()
1775 }
1776}
1777
1778impl TransactionCacheRead for WritebackCache {
1779 fn try_multi_get_transaction_blocks(
1780 &self,
1781 digests: &[TransactionDigest],
1782 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
1783 let digests_and_tickets: Vec<_> = digests
1784 .iter()
1785 .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1786 .collect();
1787 try_do_fallback_lookup(
1788 &digests_and_tickets,
1789 |(digest, _)| {
1790 self.metrics
1791 .record_cache_request("transaction_block", "uncommitted");
1792 if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1793 self.metrics
1794 .record_cache_hit("transaction_block", "uncommitted");
1795 return Ok(CacheResult::Hit(Some(tx.transaction.clone())));
1796 }
1797 self.metrics
1798 .record_cache_miss("transaction_block", "uncommitted");
1799
1800 self.metrics
1801 .record_cache_request("transaction_block", "committed");
1802 match self
1803 .cached
1804 .transactions
1805 .get(digest)
1806 .map(|l| l.lock().clone())
1807 {
1808 Some(PointCacheItem::Some(tx)) => {
1809 self.metrics
1810 .record_cache_hit("transaction_block", "committed");
1811 Ok(CacheResult::Hit(Some(tx)))
1812 }
1813 Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1814 None => {
1815 self.metrics
1816 .record_cache_miss("transaction_block", "committed");
1817
1818 Ok(CacheResult::Miss)
1819 }
1820 }
1821 },
1822 |remaining| {
1823 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1824 let results: Vec<_> = self
1825 .record_db_multi_get("transaction_block", remaining.len())
1826 .multi_get_transaction_blocks(&remaining_digests)
1827 .map(|v| v.into_iter().map(|o| o.map(Arc::new)).collect())?;
1828 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1829 if result.is_none() {
1830 self.cached.transactions.insert(digest, None, *ticket).ok();
1831 }
1832 }
1833 Ok(results)
1834 },
1835 )
1836 }
1837
1838 fn try_multi_get_executed_effects_digests(
1839 &self,
1840 digests: &[TransactionDigest],
1841 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
1842 let digests_and_tickets: Vec<_> = digests
1843 .iter()
1844 .map(|d| {
1845 (
1846 *d,
1847 self.cached.executed_effects_digests.get_ticket_for_read(d),
1848 )
1849 })
1850 .collect();
1851 try_do_fallback_lookup(
1852 &digests_and_tickets,
1853 |(digest, _)| {
1854 self.metrics
1855 .record_cache_request("executed_effects_digests", "uncommitted");
1856 if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
1857 self.metrics
1858 .record_cache_hit("executed_effects_digests", "uncommitted");
1859 return Ok(CacheResult::Hit(Some(*digest)));
1860 }
1861 self.metrics
1862 .record_cache_miss("executed_effects_digests", "uncommitted");
1863
1864 self.metrics
1865 .record_cache_request("executed_effects_digests", "committed");
1866 match self
1867 .cached
1868 .executed_effects_digests
1869 .get(digest)
1870 .map(|l| *l.lock())
1871 {
1872 Some(PointCacheItem::Some(digest)) => {
1873 self.metrics
1874 .record_cache_hit("executed_effects_digests", "committed");
1875 Ok(CacheResult::Hit(Some(digest)))
1876 }
1877 Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1878 None => {
1879 self.metrics
1880 .record_cache_miss("executed_effects_digests", "committed");
1881 Ok(CacheResult::Miss)
1882 }
1883 }
1884 },
1885 |remaining| {
1886 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1887 let results = self
1888 .record_db_multi_get("executed_effects_digests", remaining.len())
1889 .multi_get_executed_effects_digests(&remaining_digests)?;
1890 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1891 if result.is_none() {
1892 self.cached
1893 .executed_effects_digests
1894 .insert(digest, None, *ticket)
1895 .ok();
1896 }
1897 }
1898 Ok(results)
1899 },
1900 )
1901 }
1902
1903 fn try_multi_get_effects(
1904 &self,
1905 digests: &[TransactionEffectsDigest],
1906 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
1907 let digests_and_tickets: Vec<_> = digests
1908 .iter()
1909 .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
1910 .collect();
1911 try_do_fallback_lookup(
1912 &digests_and_tickets,
1913 |(digest, _)| {
1914 self.metrics
1915 .record_cache_request("transaction_effects", "uncommitted");
1916 if let Some(effects) = self.dirty.transaction_effects.get(digest) {
1917 self.metrics
1918 .record_cache_hit("transaction_effects", "uncommitted");
1919 return Ok(CacheResult::Hit(Some(effects.clone())));
1920 }
1921 self.metrics
1922 .record_cache_miss("transaction_effects", "uncommitted");
1923
1924 self.metrics
1925 .record_cache_request("transaction_effects", "committed");
1926 match self
1927 .cached
1928 .transaction_effects
1929 .get(digest)
1930 .map(|l| l.lock().clone())
1931 {
1932 Some(PointCacheItem::Some(effects)) => {
1933 self.metrics
1934 .record_cache_hit("transaction_effects", "committed");
1935 Ok(CacheResult::Hit(Some((*effects).clone())))
1936 }
1937 Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1938 None => {
1939 self.metrics
1940 .record_cache_miss("transaction_effects", "committed");
1941 Ok(CacheResult::Miss)
1942 }
1943 }
1944 },
1945 |remaining| {
1946 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1947 let results = self
1948 .record_db_multi_get("transaction_effects", remaining.len())
1949 .multi_get_effects(remaining_digests.iter())
1950 .expect("db error");
1951 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1952 if result.is_none() {
1953 self.cached
1954 .transaction_effects
1955 .insert(digest, None, *ticket)
1956 .ok();
1957 }
1958 }
1959 Ok(results)
1960 },
1961 )
1962 }
1963
1964 fn try_notify_read_executed_effects_digests<'a>(
1965 &'a self,
1966 digests: &'a [TransactionDigest],
1967 ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
1968 self.executed_effects_digests_notify_read
1969 .read(digests, |digests| {
1970 self.try_multi_get_executed_effects_digests(digests)
1971 })
1972 .boxed()
1973 }
1974
1975 fn try_multi_get_events(
1976 &self,
1977 event_digests: &[TransactionEventsDigest],
1978 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
1979 fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
1980 if events.data.is_empty() {
1981 None
1982 } else {
1983 Some(events)
1984 }
1985 }
1986
1987 let digests_and_tickets: Vec<_> = event_digests
1988 .iter()
1989 .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
1990 .collect();
1991 try_do_fallback_lookup(
1992 &digests_and_tickets,
1993 |(digest, _)| {
1994 self.metrics
1995 .record_cache_request("transaction_events", "uncommitted");
1996 if let Some(events) = self
1997 .dirty
1998 .transaction_events
1999 .get(digest)
2000 .map(|e| e.1.clone())
2001 {
2002 self.metrics
2003 .record_cache_hit("transaction_events", "uncommitted");
2004
2005 return Ok(CacheResult::Hit(map_events(events)));
2006 }
2007 self.metrics
2008 .record_cache_miss("transaction_events", "uncommitted");
2009
2010 self.metrics
2011 .record_cache_request("transaction_events", "committed");
2012 match self
2013 .cached
2014 .transaction_events
2015 .get(digest)
2016 .map(|l| l.lock().clone())
2017 {
2018 Some(PointCacheItem::Some(events)) => {
2019 self.metrics
2020 .record_cache_hit("transaction_events", "committed");
2021 Ok(CacheResult::Hit(map_events((*events).clone())))
2022 }
2023 Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
2024 None => {
2025 self.metrics
2026 .record_cache_miss("transaction_events", "committed");
2027
2028 Ok(CacheResult::Miss)
2029 }
2030 }
2031 },
2032 |remaining| {
2033 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2034 let results = self
2035 .store
2036 .multi_get_events(&remaining_digests)
2037 .expect("db error");
2038 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2039 if result.is_none() {
2040 self.cached
2041 .transaction_events
2042 .insert(digest, None, *ticket)
2043 .ok();
2044 }
2045 }
2046 Ok(results)
2047 },
2048 )
2049 }
2050}
2051
2052impl ExecutionCacheWrite for WritebackCache {
2053 fn try_acquire_transaction_locks(
2054 &self,
2055 epoch_store: &AuthorityPerEpochStore,
2056 owned_input_objects: &[ObjectRef],
2057 transaction: VerifiedSignedTransaction,
2058 ) -> IotaResult {
2059 self.object_locks.acquire_transaction_locks(
2060 self,
2061 epoch_store,
2062 owned_input_objects,
2063 transaction,
2064 )
2065 }
2066
2067 fn try_write_transaction_outputs(
2068 &self,
2069 epoch_id: EpochId,
2070 tx_outputs: Arc<TransactionOutputs>,
2071 ) -> IotaResult {
2072 WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs)
2073 }
2074}
2075
2076implement_passthrough_traits!(WritebackCache);
2077
2078impl AccumulatorStore for WritebackCache {
2079 fn get_root_state_accumulator_for_epoch(
2080 &self,
2081 epoch: EpochId,
2082 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
2083 self.store.get_root_state_accumulator_for_epoch(epoch)
2084 }
2085
2086 fn get_root_state_accumulator_for_highest_epoch(
2087 &self,
2088 ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
2089 self.store.get_root_state_accumulator_for_highest_epoch()
2090 }
2091
2092 fn insert_state_accumulator_for_epoch(
2093 &self,
2094 epoch: EpochId,
2095 checkpoint_seq_num: &CheckpointSequenceNumber,
2096 acc: &Accumulator,
2097 ) -> IotaResult {
2098 self.store
2099 .insert_state_accumulator_for_epoch(epoch, checkpoint_seq_num, acc)
2100 }
2101
2102 fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2103 assert!(
2107 self.dirty.is_empty(),
2108 "cannot iterate live object set with dirty data"
2109 );
2110 self.store.iter_live_object_set()
2111 }
2112
2113 fn iter_cached_live_object_set_for_testing(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2117 let iter = self.dirty.objects.iter();
2119 let mut dirty_objects = BTreeMap::new();
2120
2121 for obj in self.store.iter_live_object_set() {
2123 dirty_objects.insert(obj.object_id(), obj);
2124 }
2125
2126 for entry in iter {
2128 let id = *entry.key();
2129 let value = entry.value();
2130 match value.get_highest().unwrap() {
2131 (_, ObjectEntry::Object(object)) => {
2132 dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2133 }
2134 (_version, ObjectEntry::Wrapped) => {
2135 dirty_objects.remove(&id);
2136 }
2137 (_, ObjectEntry::Deleted) => {
2138 dirty_objects.remove(&id);
2139 }
2140 }
2141 }
2142
2143 Box::new(dirty_objects.into_values())
2144 }
2145}
2146
2147impl StateSyncAPI for WritebackCache {
2154 fn try_insert_transaction_and_effects(
2155 &self,
2156 transaction: &VerifiedTransaction,
2157 transaction_effects: &TransactionEffects,
2158 ) -> IotaResult {
2159 self.store
2160 .insert_transaction_and_effects(transaction, transaction_effects)?;
2161
2162 self.cached
2165 .transactions
2166 .insert(
2167 transaction.digest(),
2168 PointCacheItem::Some(Arc::new(transaction.clone())),
2169 Ticket::Write,
2170 )
2171 .ok();
2172 self.cached
2173 .transaction_effects
2174 .insert(
2175 &transaction_effects.digest(),
2176 PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2177 Ticket::Write,
2178 )
2179 .ok();
2180
2181 Ok(())
2182 }
2183
2184 fn try_multi_insert_transaction_and_effects(
2185 &self,
2186 transactions_and_effects: &[VerifiedExecutionData],
2187 ) -> IotaResult {
2188 self.store
2189 .multi_insert_transaction_and_effects(transactions_and_effects.iter())?;
2190 for VerifiedExecutionData {
2191 transaction,
2192 effects,
2193 } in transactions_and_effects
2194 {
2195 self.cached
2196 .transactions
2197 .insert(
2198 transaction.digest(),
2199 PointCacheItem::Some(Arc::new(transaction.clone())),
2200 Ticket::Write,
2201 )
2202 .ok();
2203 self.cached
2204 .transaction_effects
2205 .insert(
2206 &effects.digest(),
2207 PointCacheItem::Some(Arc::new(effects.clone())),
2208 Ticket::Write,
2209 )
2210 .ok();
2211 }
2212
2213 Ok(())
2214 }
2215}