1use std::{
51 collections::{BTreeMap, BTreeSet},
52 hash::Hash,
53 sync::{Arc, atomic::AtomicU64},
54};
55
56use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
57use futures::{FutureExt, future::BoxFuture};
58use iota_common::sync::notify_read::NotifyRead;
59use iota_config::WritebackCacheConfig;
60use iota_macros::fail_point;
61use iota_types::{
62 accumulator::Accumulator,
63 base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
64 digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
65 effects::{TransactionEffects, TransactionEvents},
66 error::{IotaError, IotaResult, UserInputError},
67 executable_transaction::VerifiedExecutableTransaction,
68 iota_system_state::{IotaSystemState, get_iota_system_state},
69 message_envelope::Message,
70 messages_checkpoint::CheckpointSequenceNumber,
71 object::Object,
72 storage::{MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject},
73 transaction::{VerifiedSignedTransaction, VerifiedTransaction},
74};
75use moka::sync::Cache as MokaCache;
76use parking_lot::Mutex;
77use prometheus::Registry;
78use tap::TapOptional;
79use tracing::{debug, info, instrument, trace, warn};
80
81use super::{
82 CheckpointCache, ExecutionCacheAPI, ExecutionCacheCommit, ExecutionCacheMetrics,
83 ExecutionCacheReconfigAPI, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI,
84 TransactionCacheRead,
85 cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache, Ticket},
86 implement_passthrough_traits,
87 object_locks::ObjectLocks,
88};
89use crate::{
90 authority::{
91 AuthorityStore,
92 authority_per_epoch_store::AuthorityPerEpochStore,
93 authority_store::{ExecutionLockWriteGuard, IotaLockResult, ObjectLockStatus},
94 authority_store_tables::LiveObject,
95 backpressure::BackpressureManager,
96 epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
97 },
98 fallback_fetch::try_do_fallback_lookup,
99 state_accumulator::AccumulatorStore,
100 transaction_outputs::TransactionOutputs,
101};
102
103#[cfg(test)]
104#[path = "unit_tests/writeback_cache_tests.rs"]
105pub mod writeback_cache_tests;
106
107#[derive(Clone, PartialEq, Eq)]
108enum ObjectEntry {
109 Object(Object),
110 Deleted,
111 Wrapped,
112}
113
114impl ObjectEntry {
115 #[cfg(test)]
116 fn unwrap_object(&self) -> &Object {
117 match self {
118 ObjectEntry::Object(o) => o,
119 _ => panic!("unwrap_object called on non-Object"),
120 }
121 }
122
123 fn is_tombstone(&self) -> bool {
124 match self {
125 ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
126 ObjectEntry::Object(_) => false,
127 }
128 }
129}
130
131impl std::fmt::Debug for ObjectEntry {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 match self {
134 ObjectEntry::Object(o) => {
135 write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
136 }
137 ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
138 ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
139 }
140 }
141}
142
143impl From<Object> for ObjectEntry {
144 fn from(object: Object) -> Self {
145 ObjectEntry::Object(object)
146 }
147}
148
149impl From<ObjectOrTombstone> for ObjectEntry {
150 fn from(object: ObjectOrTombstone) -> Self {
151 match object {
152 ObjectOrTombstone::Object(o) => o.into(),
153 ObjectOrTombstone::Tombstone(obj_ref) => {
154 if obj_ref.2.is_deleted() {
155 ObjectEntry::Deleted
156 } else if obj_ref.2.is_wrapped() {
157 ObjectEntry::Wrapped
158 } else {
159 panic!("tombstone digest must either be deleted or wrapped");
160 }
161 }
162 }
163 }
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
167enum LatestObjectCacheEntry {
168 Object(SequenceNumber, ObjectEntry),
169 NonExistent,
170}
171
172impl LatestObjectCacheEntry {
173 #[cfg(test)]
174 fn version(&self) -> Option<SequenceNumber> {
175 match self {
176 LatestObjectCacheEntry::Object(version, _) => Some(*version),
177 LatestObjectCacheEntry::NonExistent => None,
178 }
179 }
180}
181
182impl IsNewer for LatestObjectCacheEntry {
183 fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
184 match (self, other) {
185 (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
186 v1 > v2
187 }
188 (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
189 _ => false,
190 }
191 }
192}
193
194type MarkerKey = (EpochId, ObjectID);
195
196struct UncommittedData {
199 objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
216
217 markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
222
223 transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
224
225 transaction_events:
230 DashMap<TransactionEventsDigest, (BTreeSet<TransactionDigest>, TransactionEvents)>,
231
232 executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
233
234 pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
237
238 total_transaction_inserts: AtomicU64,
239 total_transaction_commits: AtomicU64,
240}
241
242impl UncommittedData {
243 fn new() -> Self {
244 Self {
245 objects: DashMap::new(),
246 markers: DashMap::new(),
247 transaction_effects: DashMap::new(),
248 executed_effects_digests: DashMap::new(),
249 pending_transaction_writes: DashMap::new(),
250 transaction_events: DashMap::new(),
251 total_transaction_inserts: AtomicU64::new(0),
252 total_transaction_commits: AtomicU64::new(0),
253 }
254 }
255
256 fn clear(&self) {
257 self.objects.clear();
258 self.markers.clear();
259 self.transaction_effects.clear();
260 self.executed_effects_digests.clear();
261 self.pending_transaction_writes.clear();
262 self.transaction_events.clear();
263 self.total_transaction_inserts
264 .store(0, std::sync::atomic::Ordering::Relaxed);
265 self.total_transaction_commits
266 .store(0, std::sync::atomic::Ordering::Relaxed);
267 }
268
269 fn is_empty(&self) -> bool {
270 let empty = self.pending_transaction_writes.is_empty();
271 if empty && cfg!(debug_assertions) {
272 assert!(
273 self.objects.is_empty()
274 && self.markers.is_empty()
275 && self.transaction_effects.is_empty()
276 && self.executed_effects_digests.is_empty()
277 && self.transaction_events.is_empty()
278 && self
279 .total_transaction_inserts
280 .load(std::sync::atomic::Ordering::Relaxed)
281 == self
282 .total_transaction_commits
283 .load(std::sync::atomic::Ordering::Relaxed),
284 );
285 }
286 empty
287 }
288}
289
290type PointCacheItem<T> = Option<T>;
293
294impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
297 fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
298 match (self, other) {
299 (Some(_), None) => true,
300
301 (Some(a), Some(b)) => {
302 debug_assert_eq!(a, b);
304 false
305 }
306
307 _ => false,
308 }
309 }
310}
311
312struct CachedCommittedData {
315 object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
317
318 object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
324
325 marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
327
328 transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
329
330 transaction_effects:
331 MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
332
333 transaction_events:
334 MonotonicCache<TransactionEventsDigest, PointCacheItem<Arc<TransactionEvents>>>,
335
336 executed_effects_digests:
337 MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
338
339 _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
342}
343
344impl CachedCommittedData {
345 fn new(config: &WritebackCacheConfig) -> Self {
346 let object_cache = MokaCache::builder()
347 .max_capacity(config.object_cache_size())
348 .build();
349 let marker_cache = MokaCache::builder()
350 .max_capacity(config.marker_cache_size())
351 .build();
352
353 let transactions = MonotonicCache::new(config.transaction_cache_size());
354 let transaction_effects = MonotonicCache::new(config.effect_cache_size());
355 let transaction_events = MonotonicCache::new(config.events_cache_size());
356 let executed_effects_digests = MonotonicCache::new(config.executed_effect_cache_size());
357
358 let transaction_objects = MokaCache::builder()
359 .max_capacity(config.transaction_objects_cache_size())
360 .build();
361
362 Self {
363 object_cache,
364 object_by_id_cache: MonotonicCache::new(config.object_by_id_cache_size()),
365 marker_cache,
366 transactions,
367 transaction_effects,
368 transaction_events,
369 executed_effects_digests,
370 _transaction_objects: transaction_objects,
371 }
372 }
373
374 fn clear_and_assert_empty(&self) {
375 self.object_cache.invalidate_all();
376 self.object_by_id_cache.invalidate_all();
377 self.marker_cache.invalidate_all();
378 self.transactions.invalidate_all();
379 self.transaction_effects.invalidate_all();
380 self.transaction_events.invalidate_all();
381 self.executed_effects_digests.invalidate_all();
382 self._transaction_objects.invalidate_all();
383
384 assert_empty(&self.object_cache);
385 assert!(&self.object_by_id_cache.is_empty());
386 assert_empty(&self.marker_cache);
387 assert!(self.transactions.is_empty());
388 assert!(self.transaction_effects.is_empty());
389 assert!(self.transaction_events.is_empty());
390 assert!(self.executed_effects_digests.is_empty());
391 assert_empty(&self._transaction_objects);
392 }
393}
394
395fn assert_empty<K, V>(cache: &MokaCache<K, V>)
396where
397 K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
398 V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
399{
400 if cache.iter().next().is_some() {
401 panic!("cache should be empty");
402 }
403}
404
405pub struct WritebackCache {
406 dirty: UncommittedData,
407 cached: CachedCommittedData,
408
409 packages: MokaCache<ObjectID, PackageObject>,
420
421 object_locks: ObjectLocks,
422
423 executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
424 store: Arc<AuthorityStore>,
425 backpressure_threshold: u64,
426 backpressure_manager: Arc<BackpressureManager>,
427 metrics: Arc<ExecutionCacheMetrics>,
428}
429
430macro_rules! check_cache_entry_by_version {
431 ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
432 $self.metrics.record_cache_request($table, $level);
433 if let Some(cache) = $cache {
434 if let Some(entry) = cache.get(&$version) {
435 $self.metrics.record_cache_hit($table, $level);
436 return CacheResult::Hit(entry.clone());
437 }
438
439 if let Some(least_version) = cache.get_least() {
440 if least_version.0 < $version {
441 $self.metrics.record_cache_negative_hit($table, $level);
444 return CacheResult::NegativeHit;
445 }
446 }
447 }
448 $self.metrics.record_cache_miss($table, $level);
449 };
450}
451
452macro_rules! check_cache_entry_by_latest {
453 ($self: ident, $table: expr, $level: expr, $cache: expr) => {
454 $self.metrics.record_cache_request($table, $level);
455 if let Some(cache) = $cache {
456 if let Some((version, entry)) = cache.get_highest() {
457 $self.metrics.record_cache_hit($table, $level);
458 return CacheResult::Hit((*version, entry.clone()));
459 } else {
460 panic!("empty CachedVersionMap should have been removed");
461 }
462 }
463 $self.metrics.record_cache_miss($table, $level);
464 };
465}
466
467impl WritebackCache {
468 pub fn new(
469 config: &WritebackCacheConfig,
470 store: Arc<AuthorityStore>,
471 metrics: Arc<ExecutionCacheMetrics>,
472 backpressure_manager: Arc<BackpressureManager>,
473 ) -> Self {
474 let packages = MokaCache::builder()
475 .max_capacity(config.package_cache_size())
476 .build();
477 Self {
478 dirty: UncommittedData::new(),
479 cached: CachedCommittedData::new(config),
480 packages,
481 object_locks: ObjectLocks::new(),
482 executed_effects_digests_notify_read: NotifyRead::new(),
483 store,
484 backpressure_manager,
485 backpressure_threshold: config.backpressure_threshold(),
486 metrics,
487 }
488 }
489
490 pub fn new_for_tests(store: Arc<AuthorityStore>, registry: &Registry) -> Self {
491 Self::new(
492 &Default::default(),
493 store,
494 ExecutionCacheMetrics::new(registry).into(),
495 BackpressureManager::new_for_tests(),
496 )
497 }
498
499 #[cfg(test)]
500 pub fn reset_for_test(&mut self) {
501 let mut new = Self::new(
502 &Default::default(),
503 self.store.clone(),
504 self.metrics.clone(),
505 self.backpressure_manager.clone(),
506 );
507 std::mem::swap(self, &mut new);
508 }
509
510 fn write_object_entry(
511 &self,
512 object_id: &ObjectID,
513 version: SequenceNumber,
514 object: ObjectEntry,
515 ) {
516 trace!(?object_id, ?version, ?object, "inserting object entry");
517 self.metrics.record_cache_write("object");
518
519 let mut entry = self.dirty.objects.entry(*object_id).or_default();
543
544 self.cached
545 .object_by_id_cache
546 .insert(
547 object_id,
548 LatestObjectCacheEntry::Object(version, object.clone()),
549 Ticket::Write,
550 )
551 .ok();
554
555 entry.insert(version, object);
556 }
557
558 fn write_marker_value(
559 &self,
560 epoch_id: EpochId,
561 object_key: &ObjectKey,
562 marker_value: MarkerValue,
563 ) {
564 tracing::trace!(
565 "inserting marker value {:?}: {:?}",
566 object_key,
567 marker_value
568 );
569 fail_point!("write_marker_entry");
570 self.metrics.record_cache_write("marker");
571 self.dirty
572 .markers
573 .entry((epoch_id, object_key.0))
574 .or_default()
575 .value_mut()
576 .insert(object_key.1, marker_value);
577 }
578
579 fn with_locked_cache_entries<K, V, R>(
583 dirty_map: &DashMap<K, CachedVersionMap<V>>,
584 cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
585 key: &K,
586 cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
587 ) -> R
588 where
589 K: Copy + Eq + Hash + Send + Sync + 'static,
590 V: Send + Sync + 'static,
591 {
592 let dirty_entry = dirty_map.entry(*key);
593 let dirty_entry = match &dirty_entry {
594 DashMapEntry::Occupied(occupied) => Some(occupied.get()),
595 DashMapEntry::Vacant(_) => None,
596 };
597
598 let cached_entry = cached_map.get(key);
599 let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
600 let cached_entry = cached_lock.as_deref();
601
602 cb(dirty_entry, cached_entry)
603 }
604
605 fn get_object_entry_by_key_cache_only(
608 &self,
609 object_id: &ObjectID,
610 version: SequenceNumber,
611 ) -> CacheResult<ObjectEntry> {
612 Self::with_locked_cache_entries(
613 &self.dirty.objects,
614 &self.cached.object_cache,
615 object_id,
616 |dirty_entry, cached_entry| {
617 check_cache_entry_by_version!(
618 self,
619 "object_by_version",
620 "uncommitted",
621 dirty_entry,
622 version
623 );
624 check_cache_entry_by_version!(
625 self,
626 "object_by_version",
627 "committed",
628 cached_entry,
629 version
630 );
631 CacheResult::Miss
632 },
633 )
634 }
635
636 fn get_object_by_key_cache_only(
637 &self,
638 object_id: &ObjectID,
639 version: SequenceNumber,
640 ) -> CacheResult<Object> {
641 match self.get_object_entry_by_key_cache_only(object_id, version) {
642 CacheResult::Hit(entry) => match entry {
643 ObjectEntry::Object(object) => CacheResult::Hit(object),
644 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
645 },
646 CacheResult::Miss => CacheResult::Miss,
647 CacheResult::NegativeHit => CacheResult::NegativeHit,
648 }
649 }
650
651 fn get_object_entry_by_id_cache_only(
652 &self,
653 request_type: &'static str,
654 object_id: &ObjectID,
655 ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
656 self.metrics
657 .record_cache_request(request_type, "object_by_id");
658 let entry = self.cached.object_by_id_cache.get(object_id);
659
660 if cfg!(debug_assertions) {
661 if let Some(entry) = &entry {
662 let highest: Option<ObjectEntry> = self
664 .dirty
665 .objects
666 .get(object_id)
667 .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
668 .or_else(|| {
669 let obj: Option<ObjectEntry> = self
670 .store
671 .get_latest_object_or_tombstone(*object_id)
672 .unwrap()
673 .map(|(_, o)| o.into());
674 obj
675 });
676
677 let cache_entry = match &*entry.lock() {
678 LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
679 LatestObjectCacheEntry::NonExistent => None,
680 };
681
682 let tombstone_possibly_pruned = highest.is_none()
685 && cache_entry
686 .as_ref()
687 .map(|e| e.is_tombstone())
688 .unwrap_or(false);
689
690 if highest != cache_entry && !tombstone_possibly_pruned {
691 tracing::error!(
692 ?highest,
693 ?cache_entry,
694 ?tombstone_possibly_pruned,
695 "object_by_id cache is incoherent for {:?}",
696 object_id
697 );
698 panic!("object_by_id cache is incoherent for {object_id:?}");
699 }
700 }
701 }
702
703 if let Some(entry) = entry {
704 let entry = entry.lock();
705 match &*entry {
706 LatestObjectCacheEntry::Object(latest_version, latest_object) => {
707 self.metrics.record_cache_hit(request_type, "object_by_id");
708 return CacheResult::Hit((*latest_version, latest_object.clone()));
709 }
710 LatestObjectCacheEntry::NonExistent => {
711 self.metrics
712 .record_cache_negative_hit(request_type, "object_by_id");
713 return CacheResult::NegativeHit;
714 }
715 }
716 } else {
717 self.metrics.record_cache_miss(request_type, "object_by_id");
718 }
719
720 Self::with_locked_cache_entries(
721 &self.dirty.objects,
722 &self.cached.object_cache,
723 object_id,
724 |dirty_entry, cached_entry| {
725 check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
726 check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
727 CacheResult::Miss
728 },
729 )
730 }
731
732 fn get_object_by_id_cache_only(
733 &self,
734 request_type: &'static str,
735 object_id: &ObjectID,
736 ) -> CacheResult<(SequenceNumber, Object)> {
737 match self.get_object_entry_by_id_cache_only(request_type, object_id) {
738 CacheResult::Hit((version, entry)) => match entry {
739 ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
740 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
741 },
742 CacheResult::NegativeHit => CacheResult::NegativeHit,
743 CacheResult::Miss => CacheResult::Miss,
744 }
745 }
746
747 fn get_marker_value_cache_only(
748 &self,
749 object_id: &ObjectID,
750 version: SequenceNumber,
751 epoch_id: EpochId,
752 ) -> CacheResult<MarkerValue> {
753 Self::with_locked_cache_entries(
754 &self.dirty.markers,
755 &self.cached.marker_cache,
756 &(epoch_id, *object_id),
757 |dirty_entry, cached_entry| {
758 check_cache_entry_by_version!(
759 self,
760 "marker_by_version",
761 "uncommitted",
762 dirty_entry,
763 version
764 );
765 check_cache_entry_by_version!(
766 self,
767 "marker_by_version",
768 "committed",
769 cached_entry,
770 version
771 );
772 CacheResult::Miss
773 },
774 )
775 }
776
777 fn get_latest_marker_value_cache_only(
778 &self,
779 object_id: &ObjectID,
780 epoch_id: EpochId,
781 ) -> CacheResult<(SequenceNumber, MarkerValue)> {
782 Self::with_locked_cache_entries(
783 &self.dirty.markers,
784 &self.cached.marker_cache,
785 &(epoch_id, *object_id),
786 |dirty_entry, cached_entry| {
787 check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
788 check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
789 CacheResult::Miss
790 },
791 )
792 }
793
794 fn get_object_impl(
795 &self,
796 request_type: &'static str,
797 id: &ObjectID,
798 ) -> IotaResult<Option<Object>> {
799 let ticket = self.cached.object_by_id_cache.get_ticket_for_read(id);
800 match self.get_object_by_id_cache_only(request_type, id) {
801 CacheResult::Hit((_, object)) => Ok(Some(object)),
802 CacheResult::NegativeHit => Ok(None),
803 CacheResult::Miss => {
804 let obj = self.store.try_get_object(id)?;
805 if let Some(obj) = &obj {
806 self.cache_latest_object_by_id(
807 id,
808 LatestObjectCacheEntry::Object(obj.version(), obj.clone().into()),
809 ticket,
810 );
811 } else {
812 self.cache_object_not_found(id, ticket);
813 }
814 Ok(obj)
815 }
816 }
817 }
818
819 fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
820 self.metrics.record_cache_request(request_type, "db");
821 &self.store
822 }
823
824 fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
825 self.metrics
826 .record_cache_multi_request(request_type, "db", count);
827 &self.store
828 }
829
830 #[instrument(level = "debug", skip_all)]
831 fn write_transaction_outputs(
832 &self,
833 epoch_id: EpochId,
834 tx_outputs: Arc<TransactionOutputs>,
835 ) -> IotaResult {
836 trace!(digest = ?tx_outputs.transaction.digest(), "writing transaction outputs to cache");
837
838 let TransactionOutputs {
839 transaction,
840 effects,
841 markers,
842 written,
843 deleted,
844 wrapped,
845 events,
846 ..
847 } = &*tx_outputs;
848
849 for ObjectKey(id, version) in deleted.iter() {
855 self.write_object_entry(id, *version, ObjectEntry::Deleted);
856 }
857
858 for ObjectKey(id, version) in wrapped.iter() {
859 self.write_object_entry(id, *version, ObjectEntry::Wrapped);
860 }
861
862 for (object_key, marker_value) in markers.iter() {
864 self.write_marker_value(epoch_id, object_key, *marker_value);
865 }
866
867 for (object_id, object) in written.iter() {
870 if object.is_child_object() {
871 self.write_object_entry(object_id, object.version(), object.clone().into());
872 }
873 }
874 for (object_id, object) in written.iter() {
875 if !object.is_child_object() {
876 self.write_object_entry(object_id, object.version(), object.clone().into());
877 if object.is_package() {
878 debug!("caching package: {:?}", object.compute_object_reference());
879 self.packages
880 .insert(*object_id, PackageObject::new(object.clone()));
881 }
882 }
883 }
884
885 let tx_digest = *transaction.digest();
886 let effects_digest = effects.digest();
887
888 self.metrics.record_cache_write("transaction_block");
889 self.dirty
890 .pending_transaction_writes
891 .insert(tx_digest, tx_outputs.clone());
892
893 self.metrics.record_cache_write("transaction_effects");
896 self.dirty
897 .transaction_effects
898 .insert(effects_digest, effects.clone());
899
900 self.metrics.record_cache_write("transaction_events");
905 match self.dirty.transaction_events.entry(events.digest()) {
906 DashMapEntry::Occupied(mut occupied) => {
907 occupied.get_mut().0.insert(tx_digest);
908 }
909 DashMapEntry::Vacant(entry) => {
910 let mut txns = BTreeSet::new();
911 txns.insert(tx_digest);
912 entry.insert((txns, events.clone()));
913 }
914 }
915
916 self.metrics.record_cache_write("executed_effects_digests");
917 self.dirty
918 .executed_effects_digests
919 .insert(tx_digest, effects_digest);
920
921 self.executed_effects_digests_notify_read
922 .notify(&tx_digest, &effects_digest);
923
924 self.metrics
925 .pending_notify_read
926 .set(self.executed_effects_digests_notify_read.num_pending() as i64);
927
928 let prev = self
929 .dirty
930 .total_transaction_inserts
931 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
932
933 let pending_count = (prev + 1).saturating_sub(
934 self.dirty
935 .total_transaction_commits
936 .load(std::sync::atomic::Ordering::Relaxed),
937 );
938
939 self.set_backpressure(pending_count);
940
941 Ok(())
942 }
943
944 #[instrument(level = "debug", skip_all)]
946 fn commit_transaction_outputs(
947 &self,
948 epoch: EpochId,
949 digests: &[TransactionDigest],
950 ) -> IotaResult {
951 fail_point!("writeback-cache-commit");
952 trace!(?digests);
953
954 let mut all_outputs = Vec::with_capacity(digests.len());
955 for tx in digests {
956 let Some(outputs) = self
957 .dirty
958 .pending_transaction_writes
959 .get(tx)
960 .map(|o| o.clone())
961 else {
962 warn!("Attempt to commit unknown transaction {:?}", tx);
970 continue;
971 };
972 all_outputs.push(outputs);
973 }
974
975 self.store.write_transaction_outputs(epoch, &all_outputs)?;
979
980 for outputs in all_outputs.iter() {
981 let tx_digest = outputs.transaction.digest();
982 assert!(
983 self.dirty
984 .pending_transaction_writes
985 .remove(tx_digest)
986 .is_some()
987 );
988 self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
989 }
990
991 let num_outputs = all_outputs.len() as u64;
992 let num_commits = self
993 .dirty
994 .total_transaction_commits
995 .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
996 + num_outputs;
997
998 let pending_count = self
999 .dirty
1000 .total_transaction_inserts
1001 .load(std::sync::atomic::Ordering::Relaxed)
1002 .saturating_sub(num_commits);
1003
1004 self.set_backpressure(pending_count);
1005
1006 Ok(())
1007 }
1008
1009 fn approximate_pending_transaction_count(&self) -> u64 {
1010 let num_commits = self
1011 .dirty
1012 .total_transaction_commits
1013 .load(std::sync::atomic::Ordering::Relaxed);
1014
1015 self.dirty
1016 .total_transaction_inserts
1017 .load(std::sync::atomic::Ordering::Relaxed)
1018 .saturating_sub(num_commits)
1019 }
1020
1021 fn set_backpressure(&self, pending_count: u64) {
1022 let backpressure = pending_count > self.backpressure_threshold;
1023 let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1024 if backpressure_changed {
1025 self.metrics.backpressure_toggles.inc();
1026 }
1027 self.metrics
1028 .backpressure_status
1029 .set(if backpressure { 1 } else { 0 });
1030 }
1031
1032 fn flush_transactions_from_dirty_to_cached(
1033 &self,
1034 epoch: EpochId,
1035 tx_digest: TransactionDigest,
1036 outputs: &TransactionOutputs,
1037 ) {
1038 let TransactionOutputs {
1042 transaction,
1043 effects,
1044 markers,
1045 written,
1046 deleted,
1047 wrapped,
1048 events,
1049 ..
1050 } = outputs;
1051
1052 let effects_digest = effects.digest();
1053 let events_digest = events.digest();
1054
1055 self.cached
1058 .transactions
1059 .insert(
1060 &tx_digest,
1061 PointCacheItem::Some(transaction.clone()),
1062 Ticket::Write,
1063 )
1064 .ok();
1065 self.cached
1066 .transaction_effects
1067 .insert(
1068 &effects_digest,
1069 PointCacheItem::Some(effects.clone().into()),
1070 Ticket::Write,
1071 )
1072 .ok();
1073 self.cached
1074 .executed_effects_digests
1075 .insert(
1076 &tx_digest,
1077 PointCacheItem::Some(effects_digest),
1078 Ticket::Write,
1079 )
1080 .ok();
1081 self.cached
1082 .transaction_events
1083 .insert(
1084 &events_digest,
1085 PointCacheItem::Some(events.clone().into()),
1086 Ticket::Write,
1087 )
1088 .ok();
1089
1090 self.dirty
1091 .transaction_effects
1092 .remove(&effects_digest)
1093 .expect("effects must exist");
1094
1095 match self.dirty.transaction_events.entry(events.digest()) {
1096 DashMapEntry::Occupied(mut occupied) => {
1097 let txns = &mut occupied.get_mut().0;
1098 assert!(txns.remove(&tx_digest), "transaction must exist");
1099 if txns.is_empty() {
1100 occupied.remove();
1101 }
1102 }
1103 DashMapEntry::Vacant(_) => {
1104 panic!("events must exist");
1105 }
1106 }
1107
1108 self.dirty
1109 .executed_effects_digests
1110 .remove(&tx_digest)
1111 .expect("executed effects must exist");
1112
1113 for (object_key, marker_value) in markers.iter() {
1115 Self::move_version_from_dirty_to_cache(
1116 &self.dirty.markers,
1117 &self.cached.marker_cache,
1118 (epoch, object_key.0),
1119 object_key.1,
1120 marker_value,
1121 );
1122 }
1123
1124 for (object_id, object) in written.iter() {
1125 Self::move_version_from_dirty_to_cache(
1126 &self.dirty.objects,
1127 &self.cached.object_cache,
1128 *object_id,
1129 object.version(),
1130 &ObjectEntry::Object(object.clone()),
1131 );
1132 }
1133
1134 for ObjectKey(object_id, version) in deleted.iter() {
1135 Self::move_version_from_dirty_to_cache(
1136 &self.dirty.objects,
1137 &self.cached.object_cache,
1138 *object_id,
1139 *version,
1140 &ObjectEntry::Deleted,
1141 );
1142 }
1143
1144 for ObjectKey(object_id, version) in wrapped.iter() {
1145 Self::move_version_from_dirty_to_cache(
1146 &self.dirty.objects,
1147 &self.cached.object_cache,
1148 *object_id,
1149 *version,
1150 &ObjectEntry::Wrapped,
1151 );
1152 }
1153 }
1154
1155 fn move_version_from_dirty_to_cache<K, V>(
1158 dirty: &DashMap<K, CachedVersionMap<V>>,
1159 cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1160 key: K,
1161 version: SequenceNumber,
1162 value: &V,
1163 ) where
1164 K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1165 V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1166 {
1167 static MAX_VERSIONS: usize = 3;
1168
1169 let dirty_entry = dirty.entry(key);
1173 let cache_entry = cache.entry(key).or_default();
1174 let mut cache_map = cache_entry.value().lock();
1175
1176 cache_map.insert(version, value.clone());
1178 cache_map.truncate_to(MAX_VERSIONS);
1180
1181 let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1182 panic!("dirty map must exist");
1183 };
1184
1185 let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1186
1187 assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1188
1189 if occupied_dirty_entry.get().is_empty() {
1191 occupied_dirty_entry.remove();
1192 }
1193 }
1194
1195 fn cache_latest_object_by_id(
1197 &self,
1198 object_id: &ObjectID,
1199 object: LatestObjectCacheEntry,
1200 ticket: Ticket,
1201 ) {
1202 trace!("caching object by id: {:?} {:?}", object_id, object);
1203 if self
1204 .cached
1205 .object_by_id_cache
1206 .insert(object_id, object, ticket)
1207 .is_ok()
1208 {
1209 self.metrics.record_cache_write("object_by_id");
1210 } else {
1211 trace!("discarded cache write due to expired ticket");
1212 self.metrics.record_ticket_expiry();
1213 }
1214 }
1215
1216 fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1217 self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1218 }
1219
1220 fn clear_state_end_of_epoch_impl(&self, _execution_guard: &ExecutionLockWriteGuard<'_>) {
1221 info!("clearing state at end of epoch");
1222 assert!(
1223 self.dirty.pending_transaction_writes.is_empty(),
1224 "should be empty due to revert_state_update"
1225 );
1226 self.dirty.clear();
1227 info!("clearing old transaction locks");
1228 self.object_locks.clear();
1229 }
1230
1231 fn revert_state_update_impl(&self, tx: &TransactionDigest) -> IotaResult {
1232 let Some((_, outputs)) = self.dirty.pending_transaction_writes.remove(tx) else {
1237 assert!(
1238 !self.try_is_tx_already_executed(tx)?,
1239 "attempt to revert committed transaction"
1240 );
1241
1242 info!("Not reverting {:?} as it was not executed", tx);
1245 return Ok(());
1246 };
1247
1248 for (object_id, object) in outputs.written.iter() {
1249 if object.is_package() {
1250 info!("removing non-finalized package from cache: {:?}", object_id);
1251 self.packages.invalidate(object_id);
1252 }
1253 self.cached.object_by_id_cache.invalidate(object_id);
1254 self.cached.object_cache.invalidate(object_id);
1255 }
1256
1257 for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1258 self.cached.object_by_id_cache.invalidate(object_id);
1259 self.cached.object_cache.invalidate(object_id);
1260 }
1261
1262 Ok(())
1265 }
1266
1267 fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) -> IotaResult {
1268 self.store.bulk_insert_genesis_objects(objects)?;
1269 for obj in objects {
1270 self.cached.object_cache.invalidate(&obj.id());
1271 self.cached.object_by_id_cache.invalidate(&obj.id());
1272 }
1273 Ok(())
1274 }
1275
1276 fn insert_genesis_object_impl(&self, object: Object) -> IotaResult {
1277 self.cached.object_by_id_cache.invalidate(&object.id());
1278 self.cached.object_cache.invalidate(&object.id());
1279 self.store.insert_genesis_object(object)
1280 }
1281
1282 pub fn clear_caches_and_assert_empty(&self) {
1283 info!("clearing caches");
1284 self.cached.clear_and_assert_empty();
1285 self.packages.invalidate_all();
1286 assert_empty(&self.packages);
1287 }
1288}
1289
1290impl ExecutionCacheAPI for WritebackCache {}
1291
1292impl ExecutionCacheCommit for WritebackCache {
1293 fn try_commit_transaction_outputs(
1294 &self,
1295 epoch: EpochId,
1296 digests: &[TransactionDigest],
1297 ) -> IotaResult {
1298 WritebackCache::commit_transaction_outputs(self, epoch, digests)
1299 }
1300
1301 fn try_persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> IotaResult {
1302 self.store.persist_transaction(tx)
1303 }
1304
1305 fn approximate_pending_transaction_count(&self) -> u64 {
1306 WritebackCache::approximate_pending_transaction_count(self)
1307 }
1308}
1309
1310impl ObjectCacheRead for WritebackCache {
1311 #[instrument(level="trace", skip_all, fields(package_id=?package_id))]
1312 fn try_get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1313 self.metrics
1314 .record_cache_request("package", "package_cache");
1315 if let Some(p) = self.packages.get(package_id) {
1316 if cfg!(debug_assertions) {
1317 let canonical_package = self
1318 .dirty
1319 .objects
1320 .get(package_id)
1321 .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1322 Some(ObjectEntry::Object(object)) => Some(object),
1323 _ => None,
1324 })
1325 .or_else(|| self.store.get_object(package_id));
1326
1327 if let Some(canonical_package) = canonical_package {
1328 assert_eq!(
1329 canonical_package.digest(),
1330 p.object().digest(),
1331 "Package object cache is inconsistent for package {package_id:?}"
1332 );
1333 }
1334 }
1335 self.metrics.record_cache_hit("package", "package_cache");
1336 return Ok(Some(p));
1337 } else {
1338 self.metrics.record_cache_miss("package", "package_cache");
1339 }
1340
1341 if let Some(p) = self.get_object_impl("package", package_id)? {
1345 if p.is_package() {
1346 let p = PackageObject::new(p);
1347 tracing::trace!(
1348 "caching package: {:?}",
1349 p.object().compute_object_reference()
1350 );
1351 self.metrics.record_cache_write("package");
1352 self.packages.insert(*package_id, p.clone());
1353 Ok(Some(p))
1354 } else {
1355 Err(IotaError::UserInput {
1356 error: UserInputError::MoveObjectAsPackage {
1357 object_id: *package_id,
1358 },
1359 })
1360 }
1361 } else {
1362 Ok(None)
1363 }
1364 }
1365
1366 fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1367 }
1370
1371 #[instrument(level = "trace", skip_all, fields(object_id=?id))]
1374 fn try_get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
1375 self.get_object_impl("object_latest", id)
1376 }
1377
1378 #[instrument(level = "trace", skip_all, fields(object_id, version))]
1379 fn try_get_object_by_key(
1380 &self,
1381 object_id: &ObjectID,
1382 version: SequenceNumber,
1383 ) -> IotaResult<Option<Object>> {
1384 match self.get_object_by_key_cache_only(object_id, version) {
1385 CacheResult::Hit(object) => Ok(Some(object)),
1386 CacheResult::NegativeHit => Ok(None),
1387 CacheResult::Miss => Ok(self
1388 .record_db_get("object_by_version")
1389 .try_get_object_by_key(object_id, version)?),
1390 }
1391 }
1392
1393 #[instrument(level = "trace", skip_all)]
1394 fn try_multi_get_objects_by_key(
1395 &self,
1396 object_keys: &[ObjectKey],
1397 ) -> Result<Vec<Option<Object>>, IotaError> {
1398 try_do_fallback_lookup(
1399 object_keys,
1400 |key| {
1401 Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1402 CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1403 CacheResult::NegativeHit => CacheResult::NegativeHit,
1404 CacheResult::Miss => CacheResult::Miss,
1405 })
1406 },
1407 |remaining| {
1408 self.record_db_multi_get("object_by_version", remaining.len())
1409 .multi_get_objects_by_key(remaining)
1410 },
1411 )
1412 }
1413
1414 #[instrument(level = "trace", skip_all, fields(object_id, version))]
1415 fn try_object_exists_by_key(
1416 &self,
1417 object_id: &ObjectID,
1418 version: SequenceNumber,
1419 ) -> IotaResult<bool> {
1420 match self.get_object_by_key_cache_only(object_id, version) {
1421 CacheResult::Hit(_) => Ok(true),
1422 CacheResult::NegativeHit => Ok(false),
1423 CacheResult::Miss => self
1424 .record_db_get("object_by_version")
1425 .object_exists_by_key(object_id, version),
1426 }
1427 }
1428
1429 #[instrument(level = "trace", skip_all)]
1430 fn try_multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
1431 try_do_fallback_lookup(
1432 object_keys,
1433 |key| {
1434 Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1435 CacheResult::Hit(_) => CacheResult::Hit(true),
1436 CacheResult::NegativeHit => CacheResult::Hit(false),
1437 CacheResult::Miss => CacheResult::Miss,
1438 })
1439 },
1440 |remaining| {
1441 self.record_db_multi_get("object_by_version", remaining.len())
1442 .multi_object_exists_by_key(remaining)
1443 },
1444 )
1445 }
1446
1447 #[instrument(level = "trace", skip_all, fields(object_id))]
1448 fn try_get_latest_object_ref_or_tombstone(
1449 &self,
1450 object_id: ObjectID,
1451 ) -> IotaResult<Option<ObjectRef>> {
1452 match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1453 CacheResult::Hit((version, entry)) => Ok(Some(match entry {
1454 ObjectEntry::Object(object) => object.compute_object_reference(),
1455 ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1456 ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1457 })),
1458 CacheResult::NegativeHit => Ok(None),
1459 CacheResult::Miss => self
1460 .record_db_get("latest_objref_or_tombstone")
1461 .get_latest_object_ref_or_tombstone(object_id),
1462 }
1463 }
1464
1465 #[instrument(level = "trace", skip_all, fields(object_id))]
1466 fn try_get_latest_object_or_tombstone(
1467 &self,
1468 object_id: ObjectID,
1469 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
1470 match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1471 CacheResult::Hit((version, entry)) => {
1472 let key = ObjectKey(object_id, version);
1473 Ok(Some(match entry {
1474 ObjectEntry::Object(object) => (key, object.into()),
1475 ObjectEntry::Deleted => (
1476 key,
1477 ObjectOrTombstone::Tombstone((
1478 object_id,
1479 version,
1480 ObjectDigest::OBJECT_DIGEST_DELETED,
1481 )),
1482 ),
1483 ObjectEntry::Wrapped => (
1484 key,
1485 ObjectOrTombstone::Tombstone((
1486 object_id,
1487 version,
1488 ObjectDigest::OBJECT_DIGEST_WRAPPED,
1489 )),
1490 ),
1491 }))
1492 }
1493 CacheResult::NegativeHit => Ok(None),
1494 CacheResult::Miss => self
1495 .record_db_get("latest_object_or_tombstone")
1496 .get_latest_object_or_tombstone(object_id),
1497 }
1498 }
1499
1500 #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1501 fn try_find_object_lt_or_eq_version(
1502 &self,
1503 object_id: ObjectID,
1504 version_bound: SequenceNumber,
1505 ) -> IotaResult<Option<Object>> {
1506 macro_rules! check_cache_entry {
1507 ($level: expr, $objects: expr) => {
1508 self.metrics
1509 .record_cache_request("object_lt_or_eq_version", $level);
1510 if let Some(objects) = $objects {
1511 if let Some((_, object)) = objects
1512 .all_versions_lt_or_eq_descending(&version_bound)
1513 .next()
1514 {
1515 if let ObjectEntry::Object(object) = object {
1516 self.metrics
1517 .record_cache_hit("object_lt_or_eq_version", $level);
1518 return Ok(Some(object.clone()));
1519 } else {
1520 self.metrics
1522 .record_cache_negative_hit("object_lt_or_eq_version", $level);
1523 return Ok(None);
1524 }
1525 } else {
1526 self.metrics
1527 .record_cache_miss("object_lt_or_eq_version", $level);
1528 }
1529 }
1530 };
1531 }
1532
1533 self.metrics
1535 .record_cache_request("object_lt_or_eq_version", "object_by_id");
1536 if let Some(latest) = self.cached.object_by_id_cache.get(&object_id) {
1537 let latest = latest.lock();
1538 match &*latest {
1539 LatestObjectCacheEntry::Object(latest_version, object) => {
1540 if *latest_version <= version_bound {
1541 if let ObjectEntry::Object(object) = object {
1542 self.metrics
1543 .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1544 return Ok(Some(object.clone()));
1545 } else {
1546 self.metrics.record_cache_negative_hit(
1548 "object_lt_or_eq_version",
1549 "object_by_id",
1550 );
1551 return Ok(None);
1552 }
1553 }
1554 }
1557 LatestObjectCacheEntry::NonExistent => {
1559 self.metrics
1560 .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1561 return Ok(None);
1562 }
1563 }
1564 }
1565 self.metrics
1566 .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1567
1568 Self::with_locked_cache_entries(
1569 &self.dirty.objects,
1570 &self.cached.object_cache,
1571 &object_id,
1572 |dirty_entry, cached_entry| {
1573 check_cache_entry!("committed", dirty_entry);
1574 check_cache_entry!("uncommitted", cached_entry);
1575
1576 let latest: Option<(SequenceNumber, ObjectEntry)> =
1600 if let Some(dirty_set) = dirty_entry {
1601 dirty_set
1602 .get_highest()
1603 .cloned()
1604 .tap_none(|| panic!("dirty set cannot be empty"))
1605 } else {
1606 self.record_db_get("object_lt_or_eq_version_latest")
1608 .get_latest_object_or_tombstone(object_id)?
1609 .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1610 (version, ObjectEntry::from(obj_or_tombstone))
1611 })
1612 };
1613
1614 if let Some((obj_version, obj_entry)) = latest {
1615 self.cache_latest_object_by_id(
1624 &object_id,
1625 LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1626 self.cached
1629 .object_by_id_cache
1630 .get_ticket_for_read(&object_id),
1631 );
1632
1633 if obj_version <= version_bound {
1634 match obj_entry {
1635 ObjectEntry::Object(object) => Ok(Some(object)),
1636 ObjectEntry::Deleted | ObjectEntry::Wrapped => Ok(None),
1637 }
1638 } else {
1639 self.record_db_get("object_lt_or_eq_version_scan")
1643 .find_object_lt_or_eq_version(object_id, version_bound)
1644 }
1645 } else {
1646 let highest = cached_entry.and_then(|c| c.get_highest());
1652 assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1653 self.cache_object_not_found(
1654 &object_id,
1655 self.cached
1657 .object_by_id_cache
1658 .get_ticket_for_read(&object_id),
1659 );
1660 Ok(None)
1661 }
1662 },
1663 )
1664 }
1665
1666 fn try_get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
1667 get_iota_system_state(self)
1668 }
1669
1670 fn try_get_marker_value(
1671 &self,
1672 object_id: &ObjectID,
1673 version: SequenceNumber,
1674 epoch_id: EpochId,
1675 ) -> IotaResult<Option<MarkerValue>> {
1676 match self.get_marker_value_cache_only(object_id, version, epoch_id) {
1677 CacheResult::Hit(marker) => Ok(Some(marker)),
1678 CacheResult::NegativeHit => Ok(None),
1679 CacheResult::Miss => self
1680 .record_db_get("marker_by_version")
1681 .get_marker_value(object_id, &version, epoch_id),
1682 }
1683 }
1684
1685 fn try_get_latest_marker(
1686 &self,
1687 object_id: &ObjectID,
1688 epoch_id: EpochId,
1689 ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
1690 match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1691 CacheResult::Hit((v, marker)) => Ok(Some((v, marker))),
1692 CacheResult::NegativeHit => {
1693 panic!("cannot have negative hit when getting latest marker")
1694 }
1695 CacheResult::Miss => self
1696 .record_db_get("marker_latest")
1697 .get_latest_marker(object_id, epoch_id),
1698 }
1699 }
1700
1701 fn try_get_lock(
1702 &self,
1703 obj_ref: ObjectRef,
1704 epoch_store: &AuthorityPerEpochStore,
1705 ) -> IotaLockResult {
1706 match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1707 CacheResult::Hit((_, obj)) => {
1708 let actual_objref = obj.compute_object_reference();
1709 if obj_ref != actual_objref {
1710 Ok(ObjectLockStatus::LockedAtDifferentVersion {
1711 locked_ref: actual_objref,
1712 })
1713 } else {
1714 Ok(
1716 match self
1717 .object_locks
1718 .get_transaction_lock(&obj_ref, epoch_store)?
1719 {
1720 Some(tx_digest) => ObjectLockStatus::LockedToTx {
1721 locked_by_tx: tx_digest,
1722 },
1723 None => ObjectLockStatus::Initialized,
1724 },
1725 )
1726 }
1727 }
1728 CacheResult::NegativeHit => {
1729 Err(IotaError::from(UserInputError::ObjectNotFound {
1730 object_id: obj_ref.0,
1731 version: None,
1734 }))
1735 }
1736 CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1737 }
1738 }
1739
1740 fn _try_get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
1741 let obj = self.get_object_impl("live_objref", &object_id)?.ok_or(
1742 UserInputError::ObjectNotFound {
1743 object_id,
1744 version: None,
1745 },
1746 )?;
1747 Ok(obj.compute_object_reference())
1748 }
1749
1750 fn try_check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1751 try_do_fallback_lookup(
1752 owned_object_refs,
1753 |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1754 CacheResult::Hit((version, obj)) => {
1755 if obj.compute_object_reference() != *obj_ref {
1756 Err(UserInputError::ObjectVersionUnavailableForConsumption {
1757 provided_obj_ref: *obj_ref,
1758 current_version: version,
1759 }
1760 .into())
1761 } else {
1762 Ok(CacheResult::Hit(()))
1763 }
1764 }
1765 CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1766 object_id: obj_ref.0,
1767 version: None,
1768 }
1769 .into()),
1770 CacheResult::Miss => Ok(CacheResult::Miss),
1771 },
1772 |remaining| {
1773 self.record_db_multi_get("object_is_live", remaining.len())
1774 .check_owned_objects_are_live(remaining)?;
1775 Ok(vec![(); remaining.len()])
1776 },
1777 )?;
1778 Ok(())
1779 }
1780
1781 fn try_get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
1782 self.store.perpetual_tables.get_highest_pruned_checkpoint()
1783 }
1784}
1785
1786impl TransactionCacheRead for WritebackCache {
1787 #[instrument(level = "trace", skip_all)]
1788 fn try_multi_get_transaction_blocks(
1789 &self,
1790 digests: &[TransactionDigest],
1791 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
1792 let digests_and_tickets: Vec<_> = digests
1793 .iter()
1794 .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1795 .collect();
1796 try_do_fallback_lookup(
1797 &digests_and_tickets,
1798 |(digest, _)| {
1799 self.metrics
1800 .record_cache_request("transaction_block", "uncommitted");
1801 if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1802 self.metrics
1803 .record_cache_hit("transaction_block", "uncommitted");
1804 return Ok(CacheResult::Hit(Some(tx.transaction.clone())));
1805 }
1806 self.metrics
1807 .record_cache_miss("transaction_block", "uncommitted");
1808
1809 self.metrics
1810 .record_cache_request("transaction_block", "committed");
1811 match self
1812 .cached
1813 .transactions
1814 .get(digest)
1815 .map(|l| l.lock().clone())
1816 {
1817 Some(PointCacheItem::Some(tx)) => {
1818 self.metrics
1819 .record_cache_hit("transaction_block", "committed");
1820 Ok(CacheResult::Hit(Some(tx)))
1821 }
1822 Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1823 None => {
1824 self.metrics
1825 .record_cache_miss("transaction_block", "committed");
1826
1827 Ok(CacheResult::Miss)
1828 }
1829 }
1830 },
1831 |remaining| {
1832 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1833 let results: Vec<_> = self
1834 .record_db_multi_get("transaction_block", remaining.len())
1835 .multi_get_transaction_blocks(&remaining_digests)
1836 .map(|v| v.into_iter().map(|o| o.map(Arc::new)).collect())?;
1837 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1838 if result.is_none() {
1839 self.cached.transactions.insert(digest, None, *ticket).ok();
1840 }
1841 }
1842 Ok(results)
1843 },
1844 )
1845 }
1846
1847 #[instrument(level = "trace", skip_all)]
1848 fn try_multi_get_executed_effects_digests(
1849 &self,
1850 digests: &[TransactionDigest],
1851 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
1852 let digests_and_tickets: Vec<_> = digests
1853 .iter()
1854 .map(|d| {
1855 (
1856 *d,
1857 self.cached.executed_effects_digests.get_ticket_for_read(d),
1858 )
1859 })
1860 .collect();
1861 try_do_fallback_lookup(
1862 &digests_and_tickets,
1863 |(digest, _)| {
1864 self.metrics
1865 .record_cache_request("executed_effects_digests", "uncommitted");
1866 if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
1867 self.metrics
1868 .record_cache_hit("executed_effects_digests", "uncommitted");
1869 return Ok(CacheResult::Hit(Some(*digest)));
1870 }
1871 self.metrics
1872 .record_cache_miss("executed_effects_digests", "uncommitted");
1873
1874 self.metrics
1875 .record_cache_request("executed_effects_digests", "committed");
1876 match self
1877 .cached
1878 .executed_effects_digests
1879 .get(digest)
1880 .map(|l| *l.lock())
1881 {
1882 Some(PointCacheItem::Some(digest)) => {
1883 self.metrics
1884 .record_cache_hit("executed_effects_digests", "committed");
1885 Ok(CacheResult::Hit(Some(digest)))
1886 }
1887 Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1888 None => {
1889 self.metrics
1890 .record_cache_miss("executed_effects_digests", "committed");
1891 Ok(CacheResult::Miss)
1892 }
1893 }
1894 },
1895 |remaining| {
1896 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1897 let results = self
1898 .record_db_multi_get("executed_effects_digests", remaining.len())
1899 .multi_get_executed_effects_digests(&remaining_digests)?;
1900 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1901 if result.is_none() {
1902 self.cached
1903 .executed_effects_digests
1904 .insert(digest, None, *ticket)
1905 .ok();
1906 }
1907 }
1908 Ok(results)
1909 },
1910 )
1911 }
1912
1913 #[instrument(level = "trace", skip_all)]
1914 fn try_multi_get_effects(
1915 &self,
1916 digests: &[TransactionEffectsDigest],
1917 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
1918 let digests_and_tickets: Vec<_> = digests
1919 .iter()
1920 .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
1921 .collect();
1922 try_do_fallback_lookup(
1923 &digests_and_tickets,
1924 |(digest, _)| {
1925 self.metrics
1926 .record_cache_request("transaction_effects", "uncommitted");
1927 if let Some(effects) = self.dirty.transaction_effects.get(digest) {
1928 self.metrics
1929 .record_cache_hit("transaction_effects", "uncommitted");
1930 return Ok(CacheResult::Hit(Some(effects.clone())));
1931 }
1932 self.metrics
1933 .record_cache_miss("transaction_effects", "uncommitted");
1934
1935 self.metrics
1936 .record_cache_request("transaction_effects", "committed");
1937 match self
1938 .cached
1939 .transaction_effects
1940 .get(digest)
1941 .map(|l| l.lock().clone())
1942 {
1943 Some(PointCacheItem::Some(effects)) => {
1944 self.metrics
1945 .record_cache_hit("transaction_effects", "committed");
1946 Ok(CacheResult::Hit(Some((*effects).clone())))
1947 }
1948 Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1949 None => {
1950 self.metrics
1951 .record_cache_miss("transaction_effects", "committed");
1952 Ok(CacheResult::Miss)
1953 }
1954 }
1955 },
1956 |remaining| {
1957 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1958 let results = self
1959 .record_db_multi_get("transaction_effects", remaining.len())
1960 .multi_get_effects(remaining_digests.iter())
1961 .expect("db error");
1962 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1963 if result.is_none() {
1964 self.cached
1965 .transaction_effects
1966 .insert(digest, None, *ticket)
1967 .ok();
1968 }
1969 }
1970 Ok(results)
1971 },
1972 )
1973 }
1974
1975 #[instrument(level = "trace", skip_all)]
1976 fn try_notify_read_executed_effects_digests<'a>(
1977 &'a self,
1978 digests: &'a [TransactionDigest],
1979 ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
1980 self.executed_effects_digests_notify_read
1981 .read(digests, |digests| {
1982 self.try_multi_get_executed_effects_digests(digests)
1983 })
1984 .boxed()
1985 }
1986
1987 #[instrument(level = "trace", skip_all)]
1988 fn try_multi_get_events(
1989 &self,
1990 event_digests: &[TransactionEventsDigest],
1991 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
1992 fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
1993 if events.data.is_empty() {
1994 None
1995 } else {
1996 Some(events)
1997 }
1998 }
1999
2000 let digests_and_tickets: Vec<_> = event_digests
2001 .iter()
2002 .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2003 .collect();
2004 try_do_fallback_lookup(
2005 &digests_and_tickets,
2006 |(digest, _)| {
2007 self.metrics
2008 .record_cache_request("transaction_events", "uncommitted");
2009 if let Some(events) = self
2010 .dirty
2011 .transaction_events
2012 .get(digest)
2013 .map(|e| e.1.clone())
2014 {
2015 self.metrics
2016 .record_cache_hit("transaction_events", "uncommitted");
2017
2018 return Ok(CacheResult::Hit(map_events(events)));
2019 }
2020 self.metrics
2021 .record_cache_miss("transaction_events", "uncommitted");
2022
2023 self.metrics
2024 .record_cache_request("transaction_events", "committed");
2025 match self
2026 .cached
2027 .transaction_events
2028 .get(digest)
2029 .map(|l| l.lock().clone())
2030 {
2031 Some(PointCacheItem::Some(events)) => {
2032 self.metrics
2033 .record_cache_hit("transaction_events", "committed");
2034 Ok(CacheResult::Hit(map_events((*events).clone())))
2035 }
2036 Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
2037 None => {
2038 self.metrics
2039 .record_cache_miss("transaction_events", "committed");
2040
2041 Ok(CacheResult::Miss)
2042 }
2043 }
2044 },
2045 |remaining| {
2046 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2047 let results = self
2048 .store
2049 .multi_get_events(&remaining_digests)
2050 .expect("db error");
2051 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2052 if result.is_none() {
2053 self.cached
2054 .transaction_events
2055 .insert(digest, None, *ticket)
2056 .ok();
2057 }
2058 }
2059 Ok(results)
2060 },
2061 )
2062 }
2063}
2064
2065impl ExecutionCacheWrite for WritebackCache {
2066 #[instrument(level = "debug", skip_all)]
2067 fn try_acquire_transaction_locks(
2068 &self,
2069 epoch_store: &AuthorityPerEpochStore,
2070 owned_input_objects: &[ObjectRef],
2071 transaction: VerifiedSignedTransaction,
2072 ) -> IotaResult {
2073 self.object_locks.acquire_transaction_locks(
2074 self,
2075 epoch_store,
2076 owned_input_objects,
2077 transaction,
2078 )
2079 }
2080
2081 fn try_write_transaction_outputs(
2082 &self,
2083 epoch_id: EpochId,
2084 tx_outputs: Arc<TransactionOutputs>,
2085 ) -> IotaResult {
2086 WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs)
2087 }
2088}
2089
2090implement_passthrough_traits!(WritebackCache);
2091
2092impl AccumulatorStore for WritebackCache {
2093 fn get_root_state_accumulator_for_epoch(
2094 &self,
2095 epoch: EpochId,
2096 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
2097 self.store.get_root_state_accumulator_for_epoch(epoch)
2098 }
2099
2100 fn get_root_state_accumulator_for_highest_epoch(
2101 &self,
2102 ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
2103 self.store.get_root_state_accumulator_for_highest_epoch()
2104 }
2105
2106 fn insert_state_accumulator_for_epoch(
2107 &self,
2108 epoch: EpochId,
2109 checkpoint_seq_num: &CheckpointSequenceNumber,
2110 acc: &Accumulator,
2111 ) -> IotaResult {
2112 self.store
2113 .insert_state_accumulator_for_epoch(epoch, checkpoint_seq_num, acc)
2114 }
2115
2116 fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2117 assert!(
2121 self.dirty.is_empty(),
2122 "cannot iterate live object set with dirty data"
2123 );
2124 self.store.iter_live_object_set()
2125 }
2126
2127 fn iter_cached_live_object_set_for_testing(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2131 let iter = self.dirty.objects.iter();
2133 let mut dirty_objects = BTreeMap::new();
2134
2135 for obj in self.store.iter_live_object_set() {
2137 dirty_objects.insert(obj.object_id(), obj);
2138 }
2139
2140 for entry in iter {
2142 let id = *entry.key();
2143 let value = entry.value();
2144 match value.get_highest().unwrap() {
2145 (_, ObjectEntry::Object(object)) => {
2146 dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2147 }
2148 (_version, ObjectEntry::Wrapped) => {
2149 dirty_objects.remove(&id);
2150 }
2151 (_, ObjectEntry::Deleted) => {
2152 dirty_objects.remove(&id);
2153 }
2154 }
2155 }
2156
2157 Box::new(dirty_objects.into_values())
2158 }
2159}
2160
2161impl StateSyncAPI for WritebackCache {
2168 fn try_insert_transaction_and_effects(
2169 &self,
2170 transaction: &VerifiedTransaction,
2171 transaction_effects: &TransactionEffects,
2172 ) -> IotaResult {
2173 self.store
2174 .insert_transaction_and_effects(transaction, transaction_effects)?;
2175
2176 self.cached
2179 .transactions
2180 .insert(
2181 transaction.digest(),
2182 PointCacheItem::Some(Arc::new(transaction.clone())),
2183 Ticket::Write,
2184 )
2185 .ok();
2186 self.cached
2187 .transaction_effects
2188 .insert(
2189 &transaction_effects.digest(),
2190 PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2191 Ticket::Write,
2192 )
2193 .ok();
2194
2195 Ok(())
2196 }
2197
2198 fn try_multi_insert_transaction_and_effects(
2199 &self,
2200 transactions_and_effects: &[VerifiedExecutionData],
2201 ) -> IotaResult {
2202 self.store
2203 .multi_insert_transaction_and_effects(transactions_and_effects.iter())?;
2204 for VerifiedExecutionData {
2205 transaction,
2206 effects,
2207 } in transactions_and_effects
2208 {
2209 self.cached
2210 .transactions
2211 .insert(
2212 transaction.digest(),
2213 PointCacheItem::Some(Arc::new(transaction.clone())),
2214 Ticket::Write,
2215 )
2216 .ok();
2217 self.cached
2218 .transaction_effects
2219 .insert(
2220 &effects.digest(),
2221 PointCacheItem::Some(Arc::new(effects.clone())),
2222 Ticket::Write,
2223 )
2224 .ok();
2225 }
2226
2227 Ok(())
2228 }
2229}