1use std::{
51 collections::{BTreeMap, HashSet},
52 hash::Hash,
53 sync::{Arc, atomic::AtomicU64},
54};
55
56use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
57use futures::{FutureExt, future::BoxFuture};
58use iota_common::{random_util::randomize_cache_capacity_in_tests, sync::notify_read::NotifyRead};
59use iota_config::WritebackCacheConfig;
60use iota_macros::fail_point;
61use iota_types::{
62 base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
63 digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest},
64 effects::{TransactionEffects, TransactionEvents},
65 error::{IotaError, IotaResult, UserInputError},
66 executable_transaction::VerifiedExecutableTransaction,
67 global_state_hash::GlobalStateHash,
68 iota_system_state::{IotaSystemState, get_iota_system_state},
69 message_envelope::Message,
70 messages_checkpoint::CheckpointSequenceNumber,
71 object::Object,
72 storage::{InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject},
73 transaction::{VerifiedSignedTransaction, VerifiedTransaction},
74};
75use moka::sync::SegmentedCache as MokaCache;
76use parking_lot::Mutex;
77use tap::TapOptional;
78use tracing::{debug, info, instrument, trace, warn};
79
80use super::{
81 Batch, CheckpointCache, ExecutionCacheAPI, ExecutionCacheCommit, ExecutionCacheMetrics,
82 ExecutionCacheReconfigAPI, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI,
83 TransactionCacheRead,
84 cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache, Ticket},
85 implement_passthrough_traits,
86 object_locks::ObjectLocks,
87};
88use crate::{
89 authority::{
90 AuthorityStore,
91 authority_per_epoch_store::AuthorityPerEpochStore,
92 authority_store::{ExecutionLockWriteGuard, IotaLockResult, ObjectLockStatus},
93 authority_store_tables::LiveObject,
94 backpressure::BackpressureManager,
95 epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
96 },
97 fallback_fetch::try_do_fallback_lookup,
98 global_state_hasher::GlobalStateHashStore,
99 transaction_outputs::TransactionOutputs,
100};
101
102#[cfg(test)]
103#[path = "unit_tests/writeback_cache_tests.rs"]
104pub mod writeback_cache_tests;
105
106#[derive(Clone, PartialEq, Eq)]
107enum ObjectEntry {
108 Object(Object),
109 Deleted,
110 Wrapped,
111}
112
113impl ObjectEntry {
114 #[cfg(test)]
115 fn unwrap_object(&self) -> &Object {
116 match self {
117 ObjectEntry::Object(o) => o,
118 _ => panic!("unwrap_object called on non-Object"),
119 }
120 }
121
122 fn is_tombstone(&self) -> bool {
123 match self {
124 ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
125 ObjectEntry::Object(_) => false,
126 }
127 }
128}
129
130impl std::fmt::Debug for ObjectEntry {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 ObjectEntry::Object(o) => {
134 write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
135 }
136 ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
137 ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
138 }
139 }
140}
141
142impl From<Object> for ObjectEntry {
143 fn from(object: Object) -> Self {
144 ObjectEntry::Object(object)
145 }
146}
147
148impl From<ObjectOrTombstone> for ObjectEntry {
149 fn from(object: ObjectOrTombstone) -> Self {
150 match object {
151 ObjectOrTombstone::Object(o) => o.into(),
152 ObjectOrTombstone::Tombstone(obj_ref) => {
153 if obj_ref.2.is_deleted() {
154 ObjectEntry::Deleted
155 } else if obj_ref.2.is_wrapped() {
156 ObjectEntry::Wrapped
157 } else {
158 panic!("tombstone digest must either be deleted or wrapped");
159 }
160 }
161 }
162 }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166enum LatestObjectCacheEntry {
167 Object(SequenceNumber, ObjectEntry),
168 NonExistent,
169}
170
171impl LatestObjectCacheEntry {
172 #[cfg(test)]
173 fn version(&self) -> Option<SequenceNumber> {
174 match self {
175 LatestObjectCacheEntry::Object(version, _) => Some(*version),
176 LatestObjectCacheEntry::NonExistent => None,
177 }
178 }
179
180 fn is_alive(&self) -> bool {
181 match self {
182 LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
183 LatestObjectCacheEntry::NonExistent => false,
184 }
185 }
186}
187
188impl IsNewer for LatestObjectCacheEntry {
189 fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
190 match (self, other) {
191 (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
192 v1 > v2
193 }
194 (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
195 _ => false,
196 }
197 }
198}
199
200type MarkerKey = (EpochId, ObjectID);
201
202struct UncommittedData {
205 objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
222
223 markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
228
229 transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
230
231 transaction_events: DashMap<TransactionDigest, TransactionEvents>,
232
233 executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
234
235 pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
238
239 total_transaction_inserts: AtomicU64,
240 total_transaction_commits: AtomicU64,
241}
242
243impl UncommittedData {
244 fn new() -> Self {
245 Self {
246 objects: DashMap::with_shard_amount(2048),
247 markers: DashMap::with_shard_amount(2048),
248 transaction_effects: DashMap::with_shard_amount(2048),
249 executed_effects_digests: DashMap::with_shard_amount(2048),
250 pending_transaction_writes: DashMap::with_shard_amount(2048),
251 transaction_events: DashMap::with_shard_amount(2048),
252 total_transaction_inserts: AtomicU64::new(0),
253 total_transaction_commits: AtomicU64::new(0),
254 }
255 }
256
257 fn clear(&self) {
258 self.objects.clear();
259 self.markers.clear();
260 self.transaction_effects.clear();
261 self.executed_effects_digests.clear();
262 self.pending_transaction_writes.clear();
263 self.transaction_events.clear();
264 self.total_transaction_inserts
265 .store(0, std::sync::atomic::Ordering::Relaxed);
266 self.total_transaction_commits
267 .store(0, std::sync::atomic::Ordering::Relaxed);
268 }
269
270 fn is_empty(&self) -> bool {
271 let empty = self.pending_transaction_writes.is_empty();
272 if empty && cfg!(debug_assertions) {
273 assert!(
274 self.objects.is_empty()
275 && self.markers.is_empty()
276 && self.transaction_effects.is_empty()
277 && self.executed_effects_digests.is_empty()
278 && self.transaction_events.is_empty()
279 && self
280 .total_transaction_inserts
281 .load(std::sync::atomic::Ordering::Relaxed)
282 == self
283 .total_transaction_commits
284 .load(std::sync::atomic::Ordering::Relaxed),
285 );
286 }
287 empty
288 }
289}
290
291type PointCacheItem<T> = Option<T>;
294
295impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
298 fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
299 match (self, other) {
300 (Some(_), None) => true,
301
302 (Some(a), Some(b)) => {
303 debug_assert_eq!(a, b);
305 false
306 }
307
308 _ => false,
309 }
310 }
311}
312
313struct CachedCommittedData {
316 object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
318
319 object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
325
326 marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
328
329 transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
330
331 transaction_effects:
332 MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
333
334 transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
335
336 executed_effects_digests:
337 MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
338
339 _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
342}
343
344impl CachedCommittedData {
345 fn new(config: &WritebackCacheConfig) -> Self {
346 let object_cache = MokaCache::builder(8)
347 .max_capacity(randomize_cache_capacity_in_tests(
348 config.object_cache_size(),
349 ))
350 .build();
351 let marker_cache = MokaCache::builder(8)
352 .max_capacity(randomize_cache_capacity_in_tests(
353 config.marker_cache_size(),
354 ))
355 .build();
356
357 let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
358 config.transaction_cache_size(),
359 ));
360 let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
361 config.effect_cache_size(),
362 ));
363 let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
364 config.events_cache_size(),
365 ));
366 let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
367 config.executed_effect_cache_size(),
368 ));
369
370 let transaction_objects = MokaCache::builder(8)
371 .max_capacity(randomize_cache_capacity_in_tests(
372 config.transaction_objects_cache_size(),
373 ))
374 .build();
375
376 Self {
377 object_cache,
378 object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
379 config.object_by_id_cache_size(),
380 )),
381 marker_cache,
382 transactions,
383 transaction_effects,
384 transaction_events,
385 executed_effects_digests,
386 _transaction_objects: transaction_objects,
387 }
388 }
389
390 fn clear_and_assert_empty(&self) {
391 self.object_cache.invalidate_all();
392 self.object_by_id_cache.invalidate_all();
393 self.marker_cache.invalidate_all();
394 self.transactions.invalidate_all();
395 self.transaction_effects.invalidate_all();
396 self.transaction_events.invalidate_all();
397 self.executed_effects_digests.invalidate_all();
398 self._transaction_objects.invalidate_all();
399
400 assert_empty(&self.object_cache);
401 assert!(&self.object_by_id_cache.is_empty());
402 assert_empty(&self.marker_cache);
403 assert!(self.transactions.is_empty());
404 assert!(self.transaction_effects.is_empty());
405 assert!(self.transaction_events.is_empty());
406 assert!(self.executed_effects_digests.is_empty());
407 assert_empty(&self._transaction_objects);
408 }
409}
410
411fn assert_empty<K, V>(cache: &MokaCache<K, V>)
412where
413 K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
414 V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
415{
416 if cache.iter().next().is_some() {
417 panic!("cache should be empty");
418 }
419}
420
421pub struct WritebackCache {
422 dirty: UncommittedData,
423 cached: CachedCommittedData,
424
425 packages: MokaCache<ObjectID, PackageObject>,
436
437 object_locks: ObjectLocks,
438
439 executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
440 object_notify_read: NotifyRead<InputKey, ()>,
441 store: Arc<AuthorityStore>,
442 backpressure_threshold: u64,
443 backpressure_manager: Arc<BackpressureManager>,
444 metrics: Arc<ExecutionCacheMetrics>,
445}
446
447macro_rules! check_cache_entry_by_version {
448 ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
449 $self.metrics.record_cache_request($table, $level);
450 if let Some(cache) = $cache {
451 if let Some(entry) = cache.get(&$version) {
452 $self.metrics.record_cache_hit($table, $level);
453 return CacheResult::Hit(entry.clone());
454 }
455
456 if let Some(least_version) = cache.get_least() {
457 if least_version.0 < $version {
458 $self.metrics.record_cache_negative_hit($table, $level);
461 return CacheResult::NegativeHit;
462 }
463 }
464 }
465 $self.metrics.record_cache_miss($table, $level);
466 };
467}
468
469macro_rules! check_cache_entry_by_latest {
470 ($self: ident, $table: expr, $level: expr, $cache: expr) => {
471 $self.metrics.record_cache_request($table, $level);
472 if let Some(cache) = $cache {
473 if let Some((version, entry)) = cache.get_highest() {
474 $self.metrics.record_cache_hit($table, $level);
475 return CacheResult::Hit((*version, entry.clone()));
476 } else {
477 panic!("empty CachedVersionMap should have been removed");
478 }
479 }
480 $self.metrics.record_cache_miss($table, $level);
481 };
482}
483
484impl WritebackCache {
485 pub fn new(
486 config: &WritebackCacheConfig,
487 store: Arc<AuthorityStore>,
488 metrics: Arc<ExecutionCacheMetrics>,
489 backpressure_manager: Arc<BackpressureManager>,
490 ) -> Self {
491 let packages = MokaCache::builder(8)
492 .max_capacity(randomize_cache_capacity_in_tests(
493 config.package_cache_size(),
494 ))
495 .build();
496 Self {
497 dirty: UncommittedData::new(),
498 cached: CachedCommittedData::new(config),
499 packages,
500 object_locks: ObjectLocks::new(),
501 executed_effects_digests_notify_read: NotifyRead::new(),
502 object_notify_read: NotifyRead::new(),
503 store,
504 backpressure_manager,
505 backpressure_threshold: config.backpressure_threshold(),
506 metrics,
507 }
508 }
509
510 pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
511 Self::new(
512 &Default::default(),
513 store,
514 ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
515 BackpressureManager::new_for_tests(),
516 )
517 }
518
519 #[cfg(test)]
520 pub fn reset_for_test(&mut self) {
521 let mut new = Self::new(
522 &Default::default(),
523 self.store.clone(),
524 self.metrics.clone(),
525 self.backpressure_manager.clone(),
526 );
527 std::mem::swap(self, &mut new);
528 }
529
530 fn write_object_entry(
531 &self,
532 object_id: &ObjectID,
533 version: SequenceNumber,
534 object: ObjectEntry,
535 ) {
536 trace!(?object_id, ?version, ?object, "inserting object entry");
537 self.metrics.record_cache_write("object");
538
539 let mut entry = self.dirty.objects.entry(*object_id).or_default();
563
564 self.cached
565 .object_by_id_cache
566 .insert(
567 object_id,
568 LatestObjectCacheEntry::Object(version, object.clone()),
569 Ticket::Write,
570 )
571 .ok();
574
575 entry.insert(version, object.clone());
576
577 if let ObjectEntry::Object(object) = &object {
578 super::notify_object_written(&self.object_notify_read, object);
579 }
580 }
581
582 fn write_marker_value(
583 &self,
584 epoch_id: EpochId,
585 object_key: &ObjectKey,
586 marker_value: MarkerValue,
587 ) {
588 tracing::trace!(
589 "inserting marker value {:?}: {:?}",
590 object_key,
591 marker_value
592 );
593 fail_point!("write_marker_entry");
594 self.metrics.record_cache_write("marker");
595 self.dirty
596 .markers
597 .entry((epoch_id, object_key.0))
598 .or_default()
599 .value_mut()
600 .insert(object_key.1, marker_value);
601
602 super::notify_marker_written(&self.object_notify_read, object_key, &marker_value);
607 }
608
609 fn with_locked_cache_entries<K, V, R>(
613 dirty_map: &DashMap<K, CachedVersionMap<V>>,
614 cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
615 key: &K,
616 cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
617 ) -> R
618 where
619 K: Copy + Eq + Hash + Send + Sync + 'static,
620 V: Send + Sync + 'static,
621 {
622 let dirty_entry = dirty_map.entry(*key);
623 let dirty_entry = match &dirty_entry {
624 DashMapEntry::Occupied(occupied) => Some(occupied.get()),
625 DashMapEntry::Vacant(_) => None,
626 };
627
628 let cached_entry = cached_map.get(key);
629 let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
630 let cached_entry = cached_lock.as_deref();
631
632 cb(dirty_entry, cached_entry)
633 }
634
635 fn get_object_entry_by_key_cache_only(
638 &self,
639 object_id: &ObjectID,
640 version: SequenceNumber,
641 ) -> CacheResult<ObjectEntry> {
642 Self::with_locked_cache_entries(
643 &self.dirty.objects,
644 &self.cached.object_cache,
645 object_id,
646 |dirty_entry, cached_entry| {
647 check_cache_entry_by_version!(
648 self,
649 "object_by_version",
650 "uncommitted",
651 dirty_entry,
652 version
653 );
654 check_cache_entry_by_version!(
655 self,
656 "object_by_version",
657 "committed",
658 cached_entry,
659 version
660 );
661 CacheResult::Miss
662 },
663 )
664 }
665
666 fn get_object_by_key_cache_only(
667 &self,
668 object_id: &ObjectID,
669 version: SequenceNumber,
670 ) -> CacheResult<Object> {
671 match self.get_object_entry_by_key_cache_only(object_id, version) {
672 CacheResult::Hit(entry) => match entry {
673 ObjectEntry::Object(object) => CacheResult::Hit(object),
674 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
675 },
676 CacheResult::Miss => CacheResult::Miss,
677 CacheResult::NegativeHit => CacheResult::NegativeHit,
678 }
679 }
680
681 fn get_object_entry_by_id_cache_only(
682 &self,
683 request_type: &'static str,
684 object_id: &ObjectID,
685 ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
686 self.metrics
687 .record_cache_request(request_type, "object_by_id");
688 let entry = self.cached.object_by_id_cache.get(object_id);
689
690 if cfg!(debug_assertions) {
691 if let Some(entry) = &entry {
692 let highest: Option<ObjectEntry> = self
694 .dirty
695 .objects
696 .get(object_id)
697 .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
698 .or_else(|| {
699 let obj: Option<ObjectEntry> = self
700 .store
701 .get_latest_object_or_tombstone(*object_id)
702 .unwrap()
703 .map(|(_, o)| o.into());
704 obj
705 });
706
707 let cache_entry = match &*entry.lock() {
708 LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
709 LatestObjectCacheEntry::NonExistent => None,
710 };
711
712 let tombstone_possibly_pruned = highest.is_none()
715 && cache_entry
716 .as_ref()
717 .map(|e| e.is_tombstone())
718 .unwrap_or(false);
719
720 if highest != cache_entry && !tombstone_possibly_pruned {
721 tracing::error!(
722 ?highest,
723 ?cache_entry,
724 ?tombstone_possibly_pruned,
725 "object_by_id cache is incoherent for {:?}",
726 object_id
727 );
728 panic!("object_by_id cache is incoherent for {object_id:?}");
729 }
730 }
731 }
732
733 if let Some(entry) = entry {
734 let entry = entry.lock();
735 match &*entry {
736 LatestObjectCacheEntry::Object(latest_version, latest_object) => {
737 self.metrics.record_cache_hit(request_type, "object_by_id");
738 return CacheResult::Hit((*latest_version, latest_object.clone()));
739 }
740 LatestObjectCacheEntry::NonExistent => {
741 self.metrics
742 .record_cache_negative_hit(request_type, "object_by_id");
743 return CacheResult::NegativeHit;
744 }
745 }
746 } else {
747 self.metrics.record_cache_miss(request_type, "object_by_id");
748 }
749
750 Self::with_locked_cache_entries(
751 &self.dirty.objects,
752 &self.cached.object_cache,
753 object_id,
754 |dirty_entry, cached_entry| {
755 check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
756 check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
757 CacheResult::Miss
758 },
759 )
760 }
761
762 fn get_object_by_id_cache_only(
763 &self,
764 request_type: &'static str,
765 object_id: &ObjectID,
766 ) -> CacheResult<(SequenceNumber, Object)> {
767 match self.get_object_entry_by_id_cache_only(request_type, object_id) {
768 CacheResult::Hit((version, entry)) => match entry {
769 ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
770 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
771 },
772 CacheResult::NegativeHit => CacheResult::NegativeHit,
773 CacheResult::Miss => CacheResult::Miss,
774 }
775 }
776
777 fn get_marker_value_cache_only(
778 &self,
779 object_id: &ObjectID,
780 version: SequenceNumber,
781 epoch_id: EpochId,
782 ) -> CacheResult<MarkerValue> {
783 Self::with_locked_cache_entries(
784 &self.dirty.markers,
785 &self.cached.marker_cache,
786 &(epoch_id, *object_id),
787 |dirty_entry, cached_entry| {
788 check_cache_entry_by_version!(
789 self,
790 "marker_by_version",
791 "uncommitted",
792 dirty_entry,
793 version
794 );
795 check_cache_entry_by_version!(
796 self,
797 "marker_by_version",
798 "committed",
799 cached_entry,
800 version
801 );
802 CacheResult::Miss
803 },
804 )
805 }
806
807 fn get_latest_marker_value_cache_only(
808 &self,
809 object_id: &ObjectID,
810 epoch_id: EpochId,
811 ) -> CacheResult<(SequenceNumber, MarkerValue)> {
812 Self::with_locked_cache_entries(
813 &self.dirty.markers,
814 &self.cached.marker_cache,
815 &(epoch_id, *object_id),
816 |dirty_entry, cached_entry| {
817 check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
818 check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
819 CacheResult::Miss
820 },
821 )
822 }
823
824 fn get_object_impl(
825 &self,
826 request_type: &'static str,
827 id: &ObjectID,
828 ) -> IotaResult<Option<Object>> {
829 let ticket = self.cached.object_by_id_cache.get_ticket_for_read(id);
830 match self.get_object_entry_by_id_cache_only(request_type, id) {
831 CacheResult::Hit((_, object)) => match object {
832 ObjectEntry::Object(object) => Ok(Some(object)),
833 ObjectEntry::Deleted | ObjectEntry::Wrapped => Ok(None),
834 },
835 CacheResult::NegativeHit => Ok(None),
836 CacheResult::Miss => {
837 let obj = self.store.get_latest_object_or_tombstone(*id);
838 match obj {
839 Ok(Some((key, obj))) => {
840 self.cache_latest_object_by_id(
841 id,
842 LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
843 ticket,
844 );
845 match obj {
846 ObjectOrTombstone::Object(object) => Ok(Some(object)),
847 ObjectOrTombstone::Tombstone(_) => Ok(None),
848 }
849 }
850 Ok(None) => {
851 self.cache_object_not_found(id, ticket);
852 Ok(None)
853 }
854 Err(e) => Err(format!("failed to get latest object or tomebstone: {e}").into()),
855 }
856 }
857 }
858 }
859
860 fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
861 self.metrics.record_cache_request(request_type, "db");
862 &self.store
863 }
864
865 fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
866 self.metrics
867 .record_cache_multi_request(request_type, "db", count);
868 &self.store
869 }
870
871 #[instrument(level = "debug", skip_all)]
872 fn write_transaction_outputs(
873 &self,
874 epoch_id: EpochId,
875 tx_outputs: Arc<TransactionOutputs>,
876 ) -> IotaResult {
877 trace!(digest = ?tx_outputs.transaction.digest(), "writing transaction outputs to cache");
878
879 let TransactionOutputs {
880 transaction,
881 effects,
882 markers,
883 written,
884 deleted,
885 wrapped,
886 events,
887 ..
888 } = &*tx_outputs;
889
890 for ObjectKey(id, version) in deleted.iter() {
896 self.write_object_entry(id, *version, ObjectEntry::Deleted);
897 }
898
899 for ObjectKey(id, version) in wrapped.iter() {
900 self.write_object_entry(id, *version, ObjectEntry::Wrapped);
901 }
902
903 for (object_key, marker_value) in markers.iter() {
905 self.write_marker_value(epoch_id, object_key, *marker_value);
906 }
907
908 for (object_id, object) in written.iter() {
911 if object.is_child_object() {
912 self.write_object_entry(object_id, object.version(), object.clone().into());
913 }
914 }
915 for (object_id, object) in written.iter() {
916 if !object.is_child_object() {
917 self.write_object_entry(object_id, object.version(), object.clone().into());
918 if object.is_package() {
919 debug!("caching package: {:?}", object.compute_object_reference());
920 self.packages
921 .insert(*object_id, PackageObject::new(object.clone()));
922 }
923 }
924 }
925
926 let tx_digest = *transaction.digest();
927 let effects_digest = effects.digest();
928
929 self.metrics.record_cache_write("transaction_block");
930 self.dirty
931 .pending_transaction_writes
932 .insert(tx_digest, tx_outputs.clone());
933
934 self.metrics.record_cache_write("transaction_effects");
937 self.dirty
938 .transaction_effects
939 .insert(effects_digest, effects.clone());
940
941 self.metrics.record_cache_write("transaction_events");
946 self.dirty
947 .transaction_events
948 .insert(tx_digest, events.clone());
949
950 self.metrics.record_cache_write("executed_effects_digests");
951 self.dirty
952 .executed_effects_digests
953 .insert(tx_digest, effects_digest);
954
955 self.executed_effects_digests_notify_read
956 .notify(&tx_digest, &effects_digest);
957
958 self.metrics
959 .pending_notify_read
960 .set(self.executed_effects_digests_notify_read.num_pending() as i64);
961
962 let prev = self
963 .dirty
964 .total_transaction_inserts
965 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
966
967 let pending_count = (prev + 1).saturating_sub(
968 self.dirty
969 .total_transaction_commits
970 .load(std::sync::atomic::Ordering::Relaxed),
971 );
972
973 self.set_backpressure(pending_count);
974
975 Ok(())
976 }
977
978 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
979 let _metrics_guard = iota_metrics::monitored_scope("WritebackCache::build_db_batch");
980 let mut all_outputs = Vec::with_capacity(digests.len());
981 for tx in digests {
982 let Some(outputs) = self
983 .dirty
984 .pending_transaction_writes
985 .get(tx)
986 .map(|o| o.clone())
987 else {
988 warn!("Attempt to commit unknown transaction {:?}", tx);
996 continue;
997 };
998 all_outputs.push(outputs);
999 }
1000
1001 let batch = self
1002 .store
1003 .build_db_batch(epoch, &all_outputs)
1004 .expect("db error");
1005 (all_outputs, batch)
1006 }
1007
1008 #[instrument(level = "debug", skip_all)]
1010 fn commit_transaction_outputs(
1011 &self,
1012 epoch: EpochId,
1013 (all_outputs, db_batch): Batch,
1014 digests: &[TransactionDigest],
1015 ) -> IotaResult {
1016 let _metrics_guard =
1017 iota_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1018 fail_point!("writeback-cache-commit");
1019 trace!(?digests);
1020
1021 db_batch.write().expect("db error");
1025
1026 let _metrics_guard =
1027 iota_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1028 for outputs in all_outputs.iter() {
1029 let tx_digest = outputs.transaction.digest();
1030 assert!(
1031 self.dirty
1032 .pending_transaction_writes
1033 .remove(tx_digest)
1034 .is_some()
1035 );
1036 self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
1037 }
1038
1039 let num_outputs = all_outputs.len() as u64;
1040 let num_commits = self
1041 .dirty
1042 .total_transaction_commits
1043 .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1044 + num_outputs;
1045
1046 let pending_count = self
1047 .dirty
1048 .total_transaction_inserts
1049 .load(std::sync::atomic::Ordering::Relaxed)
1050 .saturating_sub(num_commits);
1051
1052 self.set_backpressure(pending_count);
1053
1054 Ok(())
1055 }
1056
1057 fn approximate_pending_transaction_count(&self) -> u64 {
1058 let num_commits = self
1059 .dirty
1060 .total_transaction_commits
1061 .load(std::sync::atomic::Ordering::Relaxed);
1062
1063 self.dirty
1064 .total_transaction_inserts
1065 .load(std::sync::atomic::Ordering::Relaxed)
1066 .saturating_sub(num_commits)
1067 }
1068
1069 fn set_backpressure(&self, pending_count: u64) {
1070 let backpressure = pending_count > self.backpressure_threshold;
1071 let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1072 if backpressure_changed {
1073 self.metrics.backpressure_toggles.inc();
1074 }
1075 self.metrics
1076 .backpressure_status
1077 .set(if backpressure { 1 } else { 0 });
1078 }
1079
1080 fn flush_transactions_from_dirty_to_cached(
1081 &self,
1082 epoch: EpochId,
1083 tx_digest: TransactionDigest,
1084 outputs: &TransactionOutputs,
1085 ) {
1086 let TransactionOutputs {
1090 transaction,
1091 effects,
1092 markers,
1093 written,
1094 deleted,
1095 wrapped,
1096 events,
1097 ..
1098 } = outputs;
1099
1100 let effects_digest = effects.digest();
1101
1102 self.cached
1105 .transactions
1106 .insert(
1107 &tx_digest,
1108 PointCacheItem::Some(transaction.clone()),
1109 Ticket::Write,
1110 )
1111 .ok();
1112 self.cached
1113 .transaction_effects
1114 .insert(
1115 &effects_digest,
1116 PointCacheItem::Some(effects.clone().into()),
1117 Ticket::Write,
1118 )
1119 .ok();
1120 self.cached
1121 .executed_effects_digests
1122 .insert(
1123 &tx_digest,
1124 PointCacheItem::Some(effects_digest),
1125 Ticket::Write,
1126 )
1127 .ok();
1128 self.cached
1129 .transaction_events
1130 .insert(
1131 &tx_digest,
1132 PointCacheItem::Some(events.clone().into()),
1133 Ticket::Write,
1134 )
1135 .ok();
1136
1137 self.dirty
1138 .transaction_effects
1139 .remove(&effects_digest)
1140 .expect("effects must exist");
1141
1142 self.dirty
1143 .transaction_events
1144 .remove(&tx_digest)
1145 .expect("events must exist");
1146
1147 self.dirty
1148 .executed_effects_digests
1149 .remove(&tx_digest)
1150 .expect("executed effects must exist");
1151
1152 for (object_key, marker_value) in markers.iter() {
1154 Self::move_version_from_dirty_to_cache(
1155 &self.dirty.markers,
1156 &self.cached.marker_cache,
1157 (epoch, object_key.0),
1158 object_key.1,
1159 marker_value,
1160 );
1161 }
1162
1163 for (object_id, object) in written.iter() {
1164 Self::move_version_from_dirty_to_cache(
1165 &self.dirty.objects,
1166 &self.cached.object_cache,
1167 *object_id,
1168 object.version(),
1169 &ObjectEntry::Object(object.clone()),
1170 );
1171 }
1172
1173 for ObjectKey(object_id, version) in deleted.iter() {
1174 Self::move_version_from_dirty_to_cache(
1175 &self.dirty.objects,
1176 &self.cached.object_cache,
1177 *object_id,
1178 *version,
1179 &ObjectEntry::Deleted,
1180 );
1181 }
1182
1183 for ObjectKey(object_id, version) in wrapped.iter() {
1184 Self::move_version_from_dirty_to_cache(
1185 &self.dirty.objects,
1186 &self.cached.object_cache,
1187 *object_id,
1188 *version,
1189 &ObjectEntry::Wrapped,
1190 );
1191 }
1192 }
1193
1194 fn move_version_from_dirty_to_cache<K, V>(
1197 dirty: &DashMap<K, CachedVersionMap<V>>,
1198 cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1199 key: K,
1200 version: SequenceNumber,
1201 value: &V,
1202 ) where
1203 K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1204 V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1205 {
1206 static MAX_VERSIONS: usize = 3;
1207
1208 let dirty_entry = dirty.entry(key);
1212 let cache_entry = cache.entry(key).or_default();
1213 let mut cache_map = cache_entry.value().lock();
1214
1215 cache_map.insert(version, value.clone());
1217 cache_map.truncate_to(MAX_VERSIONS);
1219
1220 let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1221 panic!("dirty map must exist");
1222 };
1223
1224 let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1225
1226 assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1227
1228 if occupied_dirty_entry.get().is_empty() {
1230 occupied_dirty_entry.remove();
1231 }
1232 }
1233
1234 fn cache_latest_object_by_id(
1236 &self,
1237 object_id: &ObjectID,
1238 object: LatestObjectCacheEntry,
1239 ticket: Ticket,
1240 ) {
1241 trace!("caching object by id: {:?} {:?}", object_id, object);
1242 if self
1243 .cached
1244 .object_by_id_cache
1245 .insert(object_id, object, ticket)
1246 .is_ok()
1247 {
1248 self.metrics.record_cache_write("object_by_id");
1249 } else {
1250 trace!("discarded cache write due to expired ticket");
1251 self.metrics.record_ticket_expiry();
1252 }
1253 }
1254
1255 fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1256 self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1257 }
1258
1259 fn clear_state_end_of_epoch_impl(&self, _execution_guard: &ExecutionLockWriteGuard<'_>) {
1260 info!("clearing state at end of epoch");
1261 assert!(
1262 self.dirty.pending_transaction_writes.is_empty(),
1263 "should be empty due to revert_state_update"
1264 );
1265 self.dirty.clear();
1266 info!("clearing old transaction locks");
1267 self.object_locks.clear();
1268 }
1269
1270 fn revert_state_update_impl(&self, tx: &TransactionDigest) -> IotaResult {
1271 let Some((_, outputs)) = self.dirty.pending_transaction_writes.remove(tx) else {
1276 assert!(
1277 !self.try_is_tx_already_executed(tx)?,
1278 "attempt to revert committed transaction"
1279 );
1280
1281 info!("Not reverting {:?} as it was not executed", tx);
1284 return Ok(());
1285 };
1286
1287 for (object_id, object) in outputs.written.iter() {
1288 if object.is_package() {
1289 info!("removing non-finalized package from cache: {:?}", object_id);
1290 self.packages.invalidate(object_id);
1291 }
1292 self.cached.object_by_id_cache.invalidate(object_id);
1293 self.cached.object_cache.invalidate(object_id);
1294 }
1295
1296 for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1297 self.cached.object_by_id_cache.invalidate(object_id);
1298 self.cached.object_cache.invalidate(object_id);
1299 }
1300
1301 Ok(())
1304 }
1305
1306 fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) -> IotaResult {
1307 self.store.bulk_insert_genesis_objects(objects)?;
1308 for obj in objects {
1309 self.cached.object_cache.invalidate(&obj.id());
1310 self.cached.object_by_id_cache.invalidate(&obj.id());
1311 }
1312 Ok(())
1313 }
1314
1315 fn insert_genesis_object_impl(&self, object: Object) -> IotaResult {
1316 self.cached.object_by_id_cache.invalidate(&object.id());
1317 self.cached.object_cache.invalidate(&object.id());
1318 self.store.insert_genesis_object(object)
1319 }
1320
1321 pub fn clear_caches_and_assert_empty(&self) {
1322 info!("clearing caches");
1323 self.cached.clear_and_assert_empty();
1324 self.packages.invalidate_all();
1325 assert_empty(&self.packages);
1326 }
1327}
1328
1329impl ExecutionCacheAPI for WritebackCache {}
1330
1331impl ExecutionCacheCommit for WritebackCache {
1332 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1333 self.build_db_batch(epoch, digests)
1334 }
1335
1336 fn try_commit_transaction_outputs(
1337 &self,
1338 epoch: EpochId,
1339 batch: Batch,
1340 digests: &[TransactionDigest],
1341 ) -> IotaResult {
1342 WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1343 }
1344
1345 fn try_persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> IotaResult {
1346 self.store.persist_transaction(tx)
1347 }
1348
1349 fn approximate_pending_transaction_count(&self) -> u64 {
1350 WritebackCache::approximate_pending_transaction_count(self)
1351 }
1352}
1353
1354impl ObjectCacheRead for WritebackCache {
1355 #[instrument(level="trace", skip_all, fields(package_id=?package_id))]
1356 fn try_get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1357 self.metrics
1358 .record_cache_request("package", "package_cache");
1359 if let Some(p) = self.packages.get(package_id) {
1360 if cfg!(debug_assertions) {
1361 let canonical_package = self
1362 .dirty
1363 .objects
1364 .get(package_id)
1365 .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1366 Some(ObjectEntry::Object(object)) => Some(object),
1367 _ => None,
1368 })
1369 .or_else(|| self.store.get_object(package_id));
1370
1371 if let Some(canonical_package) = canonical_package {
1372 assert_eq!(
1373 canonical_package.digest(),
1374 p.object().digest(),
1375 "Package object cache is inconsistent for package {package_id:?}"
1376 );
1377 }
1378 }
1379 self.metrics.record_cache_hit("package", "package_cache");
1380 return Ok(Some(p));
1381 } else {
1382 self.metrics.record_cache_miss("package", "package_cache");
1383 }
1384
1385 if let Some(p) = self.get_object_impl("package", package_id)? {
1389 if p.is_package() {
1390 let p = PackageObject::new(p);
1391 tracing::trace!(
1392 "caching package: {:?}",
1393 p.object().compute_object_reference()
1394 );
1395 self.metrics.record_cache_write("package");
1396 self.packages.insert(*package_id, p.clone());
1397 Ok(Some(p))
1398 } else {
1399 Err(IotaError::UserInput {
1400 error: UserInputError::MoveObjectAsPackage {
1401 object_id: *package_id,
1402 },
1403 })
1404 }
1405 } else {
1406 Ok(None)
1407 }
1408 }
1409
1410 fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1411 }
1414
1415 #[instrument(level = "trace", skip_all, fields(object_id=?id))]
1418 fn try_get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
1419 self.get_object_impl("object_latest", id)
1420 }
1421
1422 #[instrument(level = "trace", skip_all, fields(object_id, version))]
1423 fn try_get_object_by_key(
1424 &self,
1425 object_id: &ObjectID,
1426 version: SequenceNumber,
1427 ) -> IotaResult<Option<Object>> {
1428 match self.get_object_by_key_cache_only(object_id, version) {
1429 CacheResult::Hit(object) => Ok(Some(object)),
1430 CacheResult::NegativeHit => Ok(None),
1431 CacheResult::Miss => Ok(self
1432 .record_db_get("object_by_version")
1433 .try_get_object_by_key(object_id, version)?),
1434 }
1435 }
1436
1437 #[instrument(level = "trace", skip_all)]
1438 fn try_multi_get_objects_by_key(
1439 &self,
1440 object_keys: &[ObjectKey],
1441 ) -> Result<Vec<Option<Object>>, IotaError> {
1442 try_do_fallback_lookup(
1443 object_keys,
1444 |key| {
1445 Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1446 CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1447 CacheResult::NegativeHit => CacheResult::NegativeHit,
1448 CacheResult::Miss => CacheResult::Miss,
1449 })
1450 },
1451 |remaining| {
1452 self.record_db_multi_get("object_by_version", remaining.len())
1453 .multi_get_objects_by_key(remaining)
1454 },
1455 )
1456 }
1457
1458 #[instrument(level = "trace", skip_all, fields(object_id, version))]
1459 fn try_object_exists_by_key(
1460 &self,
1461 object_id: &ObjectID,
1462 version: SequenceNumber,
1463 ) -> IotaResult<bool> {
1464 match self.get_object_by_key_cache_only(object_id, version) {
1465 CacheResult::Hit(_) => Ok(true),
1466 CacheResult::NegativeHit => Ok(false),
1467 CacheResult::Miss => self
1468 .record_db_get("object_by_version")
1469 .object_exists_by_key(object_id, version),
1470 }
1471 }
1472
1473 #[instrument(level = "trace", skip_all)]
1474 fn try_multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
1475 try_do_fallback_lookup(
1476 object_keys,
1477 |key| {
1478 Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1479 CacheResult::Hit(_) => CacheResult::Hit(true),
1480 CacheResult::NegativeHit => CacheResult::Hit(false),
1481 CacheResult::Miss => CacheResult::Miss,
1482 })
1483 },
1484 |remaining| {
1485 self.record_db_multi_get("object_by_version", remaining.len())
1486 .multi_object_exists_by_key(remaining)
1487 },
1488 )
1489 }
1490
1491 #[instrument(level = "trace", skip_all, fields(object_id))]
1492 fn try_get_latest_object_ref_or_tombstone(
1493 &self,
1494 object_id: ObjectID,
1495 ) -> IotaResult<Option<ObjectRef>> {
1496 match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1497 CacheResult::Hit((version, entry)) => Ok(Some(match entry {
1498 ObjectEntry::Object(object) => object.compute_object_reference(),
1499 ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1500 ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1501 })),
1502 CacheResult::NegativeHit => Ok(None),
1503 CacheResult::Miss => self
1504 .record_db_get("latest_objref_or_tombstone")
1505 .get_latest_object_ref_or_tombstone(object_id),
1506 }
1507 }
1508
1509 #[instrument(level = "trace", skip_all, fields(object_id))]
1510 fn try_get_latest_object_or_tombstone(
1511 &self,
1512 object_id: ObjectID,
1513 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
1514 match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1515 CacheResult::Hit((version, entry)) => {
1516 let key = ObjectKey(object_id, version);
1517 Ok(Some(match entry {
1518 ObjectEntry::Object(object) => (key, object.into()),
1519 ObjectEntry::Deleted => (
1520 key,
1521 ObjectOrTombstone::Tombstone((
1522 object_id,
1523 version,
1524 ObjectDigest::OBJECT_DIGEST_DELETED,
1525 )),
1526 ),
1527 ObjectEntry::Wrapped => (
1528 key,
1529 ObjectOrTombstone::Tombstone((
1530 object_id,
1531 version,
1532 ObjectDigest::OBJECT_DIGEST_WRAPPED,
1533 )),
1534 ),
1535 }))
1536 }
1537 CacheResult::NegativeHit => Ok(None),
1538 CacheResult::Miss => self
1539 .record_db_get("latest_object_or_tombstone")
1540 .get_latest_object_or_tombstone(object_id),
1541 }
1542 }
1543
1544 #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1545 fn try_find_object_lt_or_eq_version(
1546 &self,
1547 object_id: ObjectID,
1548 version_bound: SequenceNumber,
1549 ) -> IotaResult<Option<Object>> {
1550 macro_rules! check_cache_entry {
1551 ($level: expr, $objects: expr) => {
1552 self.metrics
1553 .record_cache_request("object_lt_or_eq_version", $level);
1554 if let Some(objects) = $objects {
1555 if let Some((_, object)) = objects
1556 .all_versions_lt_or_eq_descending(&version_bound)
1557 .next()
1558 {
1559 if let ObjectEntry::Object(object) = object {
1560 self.metrics
1561 .record_cache_hit("object_lt_or_eq_version", $level);
1562 return Ok(Some(object.clone()));
1563 } else {
1564 self.metrics
1566 .record_cache_negative_hit("object_lt_or_eq_version", $level);
1567 return Ok(None);
1568 }
1569 } else {
1570 self.metrics
1571 .record_cache_miss("object_lt_or_eq_version", $level);
1572 }
1573 }
1574 };
1575 }
1576
1577 self.metrics
1579 .record_cache_request("object_lt_or_eq_version", "object_by_id");
1580 let latest_cache_entry = self.cached.object_by_id_cache.get(&object_id);
1581 if let Some(latest) = &latest_cache_entry {
1582 let latest = latest.lock();
1583 match &*latest {
1584 LatestObjectCacheEntry::Object(latest_version, object) => {
1585 if *latest_version <= version_bound {
1586 if let ObjectEntry::Object(object) = object {
1587 self.metrics
1588 .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1589 return Ok(Some(object.clone()));
1590 } else {
1591 self.metrics.record_cache_negative_hit(
1593 "object_lt_or_eq_version",
1594 "object_by_id",
1595 );
1596 return Ok(None);
1597 }
1598 }
1599 }
1602 LatestObjectCacheEntry::NonExistent => {
1604 self.metrics
1605 .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1606 return Ok(None);
1607 }
1608 }
1609 }
1610 self.metrics
1611 .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1612
1613 Self::with_locked_cache_entries(
1614 &self.dirty.objects,
1615 &self.cached.object_cache,
1616 &object_id,
1617 |dirty_entry, cached_entry| {
1618 check_cache_entry!("committed", dirty_entry);
1619 check_cache_entry!("uncommitted", cached_entry);
1620
1621 let latest: Option<(SequenceNumber, ObjectEntry)> =
1645 if let Some(dirty_set) = dirty_entry {
1646 dirty_set
1647 .get_highest()
1648 .cloned()
1649 .tap_none(|| panic!("dirty set cannot be empty"))
1650 } else {
1651 self.record_db_get("object_lt_or_eq_version_latest")
1653 .get_latest_object_or_tombstone(object_id)?
1654 .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1655 (version, ObjectEntry::from(obj_or_tombstone))
1656 })
1657 };
1658
1659 if let Some((obj_version, obj_entry)) = latest {
1660 self.cache_latest_object_by_id(
1669 &object_id,
1670 LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1671 self.cached
1674 .object_by_id_cache
1675 .get_ticket_for_read(&object_id),
1676 );
1677
1678 if obj_version <= version_bound {
1679 match obj_entry {
1680 ObjectEntry::Object(object) => Ok(Some(object)),
1681 ObjectEntry::Deleted | ObjectEntry::Wrapped => Ok(None),
1682 }
1683 } else {
1684 self.record_db_get("object_lt_or_eq_version_scan")
1688 .find_object_lt_or_eq_version(object_id, version_bound)
1689 }
1690
1691 } else if let Some(latest_cache_entry) = latest_cache_entry {
1698 assert!(!latest_cache_entry.lock().is_alive());
1700 Ok(None)
1701 } else {
1702 let highest = cached_entry.and_then(|c| c.get_highest());
1704 assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1705 self.cache_object_not_found(
1706 &object_id,
1707 self.cached
1709 .object_by_id_cache
1710 .get_ticket_for_read(&object_id),
1711 );
1712 Ok(None)
1713 }
1714 },
1715 )
1716 }
1717
1718 fn try_get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
1719 get_iota_system_state(self)
1720 }
1721
1722 fn try_get_marker_value(
1723 &self,
1724 object_id: &ObjectID,
1725 version: SequenceNumber,
1726 epoch_id: EpochId,
1727 ) -> IotaResult<Option<MarkerValue>> {
1728 match self.get_marker_value_cache_only(object_id, version, epoch_id) {
1729 CacheResult::Hit(marker) => Ok(Some(marker)),
1730 CacheResult::NegativeHit => Ok(None),
1731 CacheResult::Miss => self
1732 .record_db_get("marker_by_version")
1733 .get_marker_value(object_id, &version, epoch_id),
1734 }
1735 }
1736
1737 fn try_get_latest_marker(
1738 &self,
1739 object_id: &ObjectID,
1740 epoch_id: EpochId,
1741 ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
1742 match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1743 CacheResult::Hit((v, marker)) => Ok(Some((v, marker))),
1744 CacheResult::NegativeHit => {
1745 panic!("cannot have negative hit when getting latest marker")
1746 }
1747 CacheResult::Miss => self
1748 .record_db_get("marker_latest")
1749 .get_latest_marker(object_id, epoch_id),
1750 }
1751 }
1752
1753 fn try_get_lock(
1754 &self,
1755 obj_ref: ObjectRef,
1756 epoch_store: &AuthorityPerEpochStore,
1757 ) -> IotaLockResult {
1758 match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1759 CacheResult::Hit((_, obj)) => {
1760 let actual_objref = obj.compute_object_reference();
1761 if obj_ref != actual_objref {
1762 Ok(ObjectLockStatus::LockedAtDifferentVersion {
1763 locked_ref: actual_objref,
1764 })
1765 } else {
1766 Ok(
1768 match self
1769 .object_locks
1770 .get_transaction_lock(&obj_ref, epoch_store)?
1771 {
1772 Some(tx_digest) => ObjectLockStatus::LockedToTx {
1773 locked_by_tx: tx_digest,
1774 },
1775 None => ObjectLockStatus::Initialized,
1776 },
1777 )
1778 }
1779 }
1780 CacheResult::NegativeHit => {
1781 Err(IotaError::from(UserInputError::ObjectNotFound {
1782 object_id: obj_ref.0,
1783 version: None,
1786 }))
1787 }
1788 CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1789 }
1790 }
1791
1792 fn _try_get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
1793 let obj = self.get_object_impl("live_objref", &object_id)?.ok_or(
1794 UserInputError::ObjectNotFound {
1795 object_id,
1796 version: None,
1797 },
1798 )?;
1799 Ok(obj.compute_object_reference())
1800 }
1801
1802 fn try_check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1803 try_do_fallback_lookup(
1804 owned_object_refs,
1805 |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1806 CacheResult::Hit((version, obj)) => {
1807 if obj.compute_object_reference() != *obj_ref {
1808 Err(UserInputError::ObjectVersionUnavailableForConsumption {
1809 provided_obj_ref: *obj_ref,
1810 current_version: version,
1811 }
1812 .into())
1813 } else {
1814 Ok(CacheResult::Hit(()))
1815 }
1816 }
1817 CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1818 object_id: obj_ref.0,
1819 version: None,
1820 }
1821 .into()),
1822 CacheResult::Miss => Ok(CacheResult::Miss),
1823 },
1824 |remaining| {
1825 self.record_db_multi_get("object_is_live", remaining.len())
1826 .check_owned_objects_are_live(remaining)?;
1827 Ok(vec![(); remaining.len()])
1828 },
1829 )?;
1830 Ok(())
1831 }
1832
1833 fn try_get_highest_pruned_checkpoint(&self) -> IotaResult<Option<CheckpointSequenceNumber>> {
1834 self.store
1835 .perpetual_tables
1836 .get_highest_pruned_checkpoint()
1837 .map_err(IotaError::from)
1838 }
1839
1840 fn notify_read_input_objects<'a>(
1841 &'a self,
1842 input_and_receiving_keys: &'a [InputKey],
1843 receiving_keys: &'a HashSet<InputKey>,
1844 epoch: &'a EpochId,
1845 ) -> BoxFuture<'a, Vec<()>> {
1846 super::notify_read_input_objects_impl(
1847 &self.object_notify_read,
1848 self,
1849 input_and_receiving_keys,
1850 receiving_keys,
1851 epoch,
1852 )
1853 }
1854}
1855
1856impl TransactionCacheRead for WritebackCache {
1857 #[instrument(level = "trace", skip_all)]
1858 fn try_multi_get_transaction_blocks(
1859 &self,
1860 digests: &[TransactionDigest],
1861 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
1862 let digests_and_tickets: Vec<_> = digests
1863 .iter()
1864 .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1865 .collect();
1866 try_do_fallback_lookup(
1867 &digests_and_tickets,
1868 |(digest, _)| {
1869 self.metrics
1870 .record_cache_request("transaction_block", "uncommitted");
1871 if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1872 self.metrics
1873 .record_cache_hit("transaction_block", "uncommitted");
1874 return Ok(CacheResult::Hit(Some(tx.transaction.clone())));
1875 }
1876 self.metrics
1877 .record_cache_miss("transaction_block", "uncommitted");
1878
1879 self.metrics
1880 .record_cache_request("transaction_block", "committed");
1881 match self
1882 .cached
1883 .transactions
1884 .get(digest)
1885 .map(|l| l.lock().clone())
1886 {
1887 Some(PointCacheItem::Some(tx)) => {
1888 self.metrics
1889 .record_cache_hit("transaction_block", "committed");
1890 Ok(CacheResult::Hit(Some(tx)))
1891 }
1892 Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1893 None => {
1894 self.metrics
1895 .record_cache_miss("transaction_block", "committed");
1896
1897 Ok(CacheResult::Miss)
1898 }
1899 }
1900 },
1901 |remaining| {
1902 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1903 let results: Vec<_> = self
1904 .record_db_multi_get("transaction_block", remaining.len())
1905 .multi_get_transaction_blocks(&remaining_digests)
1906 .map(|v| v.into_iter().map(|o| o.map(Arc::new)).collect())?;
1907 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1908 if result.is_none() {
1909 self.cached.transactions.insert(digest, None, *ticket).ok();
1910 }
1911 }
1912 Ok(results)
1913 },
1914 )
1915 }
1916
1917 #[instrument(level = "trace", skip_all)]
1918 fn try_multi_get_executed_effects_digests(
1919 &self,
1920 digests: &[TransactionDigest],
1921 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
1922 let digests_and_tickets: Vec<_> = digests
1923 .iter()
1924 .map(|d| {
1925 (
1926 *d,
1927 self.cached.executed_effects_digests.get_ticket_for_read(d),
1928 )
1929 })
1930 .collect();
1931 try_do_fallback_lookup(
1932 &digests_and_tickets,
1933 |(digest, _)| {
1934 self.metrics
1935 .record_cache_request("executed_effects_digests", "uncommitted");
1936 if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
1937 self.metrics
1938 .record_cache_hit("executed_effects_digests", "uncommitted");
1939 return Ok(CacheResult::Hit(Some(*digest)));
1940 }
1941 self.metrics
1942 .record_cache_miss("executed_effects_digests", "uncommitted");
1943
1944 self.metrics
1945 .record_cache_request("executed_effects_digests", "committed");
1946 match self
1947 .cached
1948 .executed_effects_digests
1949 .get(digest)
1950 .map(|l| *l.lock())
1951 {
1952 Some(PointCacheItem::Some(digest)) => {
1953 self.metrics
1954 .record_cache_hit("executed_effects_digests", "committed");
1955 Ok(CacheResult::Hit(Some(digest)))
1956 }
1957 Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
1958 None => {
1959 self.metrics
1960 .record_cache_miss("executed_effects_digests", "committed");
1961 Ok(CacheResult::Miss)
1962 }
1963 }
1964 },
1965 |remaining| {
1966 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1967 let results = self
1968 .record_db_multi_get("executed_effects_digests", remaining.len())
1969 .multi_get_executed_effects_digests(&remaining_digests)?;
1970 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1971 if result.is_none() {
1972 self.cached
1973 .executed_effects_digests
1974 .insert(digest, None, *ticket)
1975 .ok();
1976 }
1977 }
1978 Ok(results)
1979 },
1980 )
1981 }
1982
1983 #[instrument(level = "trace", skip_all)]
1984 fn try_multi_get_effects(
1985 &self,
1986 digests: &[TransactionEffectsDigest],
1987 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
1988 let digests_and_tickets: Vec<_> = digests
1989 .iter()
1990 .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
1991 .collect();
1992 try_do_fallback_lookup(
1993 &digests_and_tickets,
1994 |(digest, _)| {
1995 self.metrics
1996 .record_cache_request("transaction_effects", "uncommitted");
1997 if let Some(effects) = self.dirty.transaction_effects.get(digest) {
1998 self.metrics
1999 .record_cache_hit("transaction_effects", "uncommitted");
2000 return Ok(CacheResult::Hit(Some(effects.clone())));
2001 }
2002 self.metrics
2003 .record_cache_miss("transaction_effects", "uncommitted");
2004
2005 self.metrics
2006 .record_cache_request("transaction_effects", "committed");
2007 match self
2008 .cached
2009 .transaction_effects
2010 .get(digest)
2011 .map(|l| l.lock().clone())
2012 {
2013 Some(PointCacheItem::Some(effects)) => {
2014 self.metrics
2015 .record_cache_hit("transaction_effects", "committed");
2016 Ok(CacheResult::Hit(Some((*effects).clone())))
2017 }
2018 Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
2019 None => {
2020 self.metrics
2021 .record_cache_miss("transaction_effects", "committed");
2022 Ok(CacheResult::Miss)
2023 }
2024 }
2025 },
2026 |remaining| {
2027 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2028 let results = self
2029 .record_db_multi_get("transaction_effects", remaining.len())
2030 .multi_get_effects(remaining_digests.iter())
2031 .expect("db error");
2032 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2033 if result.is_none() {
2034 self.cached
2035 .transaction_effects
2036 .insert(digest, None, *ticket)
2037 .ok();
2038 }
2039 }
2040 Ok(results)
2041 },
2042 )
2043 }
2044
2045 #[instrument(level = "trace", skip_all)]
2046 fn try_notify_read_executed_effects_digests<'a>(
2047 &'a self,
2048 digests: &'a [TransactionDigest],
2049 ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
2050 self.executed_effects_digests_notify_read
2051 .read(digests, |digests| {
2052 self.try_multi_get_executed_effects_digests(digests)
2053 })
2054 .boxed()
2055 }
2056
2057 #[instrument(level = "trace", skip_all)]
2058 fn try_multi_get_events(
2059 &self,
2060 tx_digests: &[TransactionDigest],
2061 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
2062 fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2063 if events.data.is_empty() {
2064 None
2065 } else {
2066 Some(events)
2067 }
2068 }
2069
2070 let digests_and_tickets: Vec<_> = tx_digests
2071 .iter()
2072 .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2073 .collect();
2074 try_do_fallback_lookup(
2075 &digests_and_tickets,
2076 |(digest, _)| {
2077 self.metrics
2078 .record_cache_request("transaction_events", "uncommitted");
2079 if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2080 self.metrics
2081 .record_cache_hit("transaction_events", "uncommitted");
2082
2083 return Ok(CacheResult::Hit(map_events(events)));
2084 }
2085 self.metrics
2086 .record_cache_miss("transaction_events", "uncommitted");
2087
2088 self.metrics
2089 .record_cache_request("transaction_events", "committed");
2090 match self
2091 .cached
2092 .transaction_events
2093 .get(digest)
2094 .map(|l| l.lock().clone())
2095 {
2096 Some(PointCacheItem::Some(events)) => {
2097 self.metrics
2098 .record_cache_hit("transaction_events", "committed");
2099 Ok(CacheResult::Hit(map_events((*events).clone())))
2100 }
2101 Some(PointCacheItem::None) => Ok(CacheResult::NegativeHit),
2102 None => {
2103 self.metrics
2104 .record_cache_miss("transaction_events", "committed");
2105
2106 Ok(CacheResult::Miss)
2107 }
2108 }
2109 },
2110 |remaining| {
2111 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2112 let results = self
2113 .store
2114 .multi_get_events(&remaining_digests)
2115 .expect("db error");
2116 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2117 if result.is_none() {
2118 self.cached
2119 .transaction_events
2120 .insert(digest, None, *ticket)
2121 .ok();
2122 }
2123 }
2124 Ok(results)
2125 },
2126 )
2127 }
2128}
2129
2130impl ExecutionCacheWrite for WritebackCache {
2131 #[instrument(level = "debug", skip_all)]
2132 fn try_acquire_transaction_locks(
2133 &self,
2134 epoch_store: &AuthorityPerEpochStore,
2135 owned_input_objects: &[ObjectRef],
2136 transaction: VerifiedSignedTransaction,
2137 ) -> IotaResult {
2138 self.object_locks.acquire_transaction_locks(
2139 self,
2140 epoch_store,
2141 owned_input_objects,
2142 transaction,
2143 )
2144 }
2145
2146 fn try_write_transaction_outputs(
2147 &self,
2148 epoch_id: EpochId,
2149 tx_outputs: Arc<TransactionOutputs>,
2150 ) -> IotaResult {
2151 WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs)
2152 }
2153}
2154
2155implement_passthrough_traits!(WritebackCache);
2156
2157impl GlobalStateHashStore for WritebackCache {
2158 fn get_root_state_hash_for_epoch(
2159 &self,
2160 epoch: EpochId,
2161 ) -> IotaResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2162 self.store.get_root_state_hash_for_epoch(epoch)
2163 }
2164
2165 fn get_root_state_hash_for_highest_epoch(
2166 &self,
2167 ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2168 self.store.get_root_state_hash_for_highest_epoch()
2169 }
2170
2171 fn insert_state_hash_for_epoch(
2172 &self,
2173 epoch: EpochId,
2174 checkpoint_seq_num: &CheckpointSequenceNumber,
2175 acc: &GlobalStateHash,
2176 ) -> IotaResult {
2177 self.store
2178 .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2179 }
2180
2181 fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2182 assert!(
2186 self.dirty.is_empty(),
2187 "cannot iterate live object set with dirty data"
2188 );
2189 self.store.iter_live_object_set()
2190 }
2191
2192 fn iter_cached_live_object_set_for_testing(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2196 let iter = self.dirty.objects.iter();
2198 let mut dirty_objects = BTreeMap::new();
2199
2200 for obj in self.store.iter_live_object_set() {
2202 dirty_objects.insert(obj.object_id(), obj);
2203 }
2204
2205 for entry in iter {
2207 let id = *entry.key();
2208 let value = entry.value();
2209 match value.get_highest().unwrap() {
2210 (_, ObjectEntry::Object(object)) => {
2211 dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2212 }
2213 (_version, ObjectEntry::Wrapped) => {
2214 dirty_objects.remove(&id);
2215 }
2216 (_, ObjectEntry::Deleted) => {
2217 dirty_objects.remove(&id);
2218 }
2219 }
2220 }
2221
2222 Box::new(dirty_objects.into_values())
2223 }
2224}
2225
2226impl StateSyncAPI for WritebackCache {
2233 fn try_insert_transaction_and_effects(
2234 &self,
2235 transaction: &VerifiedTransaction,
2236 transaction_effects: &TransactionEffects,
2237 ) -> IotaResult {
2238 self.store
2239 .insert_transaction_and_effects(transaction, transaction_effects)?;
2240
2241 self.cached
2244 .transactions
2245 .insert(
2246 transaction.digest(),
2247 PointCacheItem::Some(Arc::new(transaction.clone())),
2248 Ticket::Write,
2249 )
2250 .ok();
2251 self.cached
2252 .transaction_effects
2253 .insert(
2254 &transaction_effects.digest(),
2255 PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2256 Ticket::Write,
2257 )
2258 .ok();
2259
2260 Ok(())
2261 }
2262
2263 fn try_multi_insert_transaction_and_effects(
2264 &self,
2265 transactions_and_effects: &[VerifiedExecutionData],
2266 ) -> IotaResult {
2267 self.store
2268 .multi_insert_transaction_and_effects(transactions_and_effects.iter())?;
2269 for VerifiedExecutionData {
2270 transaction,
2271 effects,
2272 } in transactions_and_effects
2273 {
2274 self.cached
2275 .transactions
2276 .insert(
2277 transaction.digest(),
2278 PointCacheItem::Some(Arc::new(transaction.clone())),
2279 Ticket::Write,
2280 )
2281 .ok();
2282 self.cached
2283 .transaction_effects
2284 .insert(
2285 &effects.digest(),
2286 PointCacheItem::Some(Arc::new(effects.clone())),
2287 Ticket::Write,
2288 )
2289 .ok();
2290 }
2291
2292 Ok(())
2293 }
2294}
2295
2296#[cfg(test)]
2297impl WritebackCache {
2298 pub(super) fn write_object_for_testing(&self, object: Object) {
2299 let id = object.id();
2300 let version = object.version();
2301 self.write_object_entry(&id, version, ObjectEntry::Object(object));
2302 }
2303
2304 pub(super) fn write_marker_for_testing(
2305 &self,
2306 epoch_id: EpochId,
2307 object_key: &ObjectKey,
2308 marker_value: MarkerValue,
2309 ) {
2310 self.write_marker_value(epoch_id, object_key, marker_value);
2311 }
2312}