1use std::{
51 collections::{BTreeMap, BTreeSet},
52 hash::Hash,
53 sync::Arc,
54};
55
56use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
57use futures::{FutureExt, future::BoxFuture};
58use iota_common::sync::notify_read::NotifyRead;
59use iota_macros::fail_point_async;
60use iota_types::{
61 accumulator::Accumulator,
62 base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
63 digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
64 effects::{TransactionEffects, TransactionEvents},
65 error::{IotaError, IotaResult, UserInputError},
66 iota_system_state::{IotaSystemState, get_iota_system_state},
67 message_envelope::Message,
68 messages_checkpoint::CheckpointSequenceNumber,
69 object::Object,
70 storage::{MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject},
71 transaction::{VerifiedSignedTransaction, VerifiedTransaction},
72};
73use moka::sync::Cache as MokaCache;
74use parking_lot::Mutex;
75use prometheus::Registry;
76use tap::TapOptional;
77use tracing::{debug, info, instrument, trace, warn};
78
79use super::{
80 CheckpointCache, ExecutionCacheAPI, ExecutionCacheCommit, ExecutionCacheMetrics,
81 ExecutionCacheReconfigAPI, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI,
82 TransactionCacheRead,
83 cache_types::{CachedVersionMap, IsNewer, MonotonicCache},
84 implement_passthrough_traits,
85 object_locks::ObjectLocks,
86};
87use crate::{
88 authority::{
89 AuthorityStore,
90 authority_per_epoch_store::AuthorityPerEpochStore,
91 authority_store::{ExecutionLockWriteGuard, IotaLockResult, ObjectLockStatus},
92 authority_store_tables::LiveObject,
93 epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
94 },
95 state_accumulator::AccumulatorStore,
96 transaction_outputs::TransactionOutputs,
97};
98
99#[cfg(test)]
100#[path = "unit_tests/writeback_cache_tests.rs"]
101pub mod writeback_cache_tests;
102
103#[derive(Clone, PartialEq, Eq)]
104enum ObjectEntry {
105 Object(Object),
106 Deleted,
107 Wrapped,
108}
109
110impl ObjectEntry {
111 #[cfg(test)]
112 fn unwrap_object(&self) -> &Object {
113 match self {
114 ObjectEntry::Object(o) => o,
115 _ => panic!("unwrap_object called on non-Object"),
116 }
117 }
118
119 fn is_tombstone(&self) -> bool {
120 match self {
121 ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
122 ObjectEntry::Object(_) => false,
123 }
124 }
125}
126
127impl std::fmt::Debug for ObjectEntry {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 match self {
130 ObjectEntry::Object(o) => {
131 write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
132 }
133 ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
134 ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
135 }
136 }
137}
138
139impl From<Object> for ObjectEntry {
140 fn from(object: Object) -> Self {
141 ObjectEntry::Object(object)
142 }
143}
144
145impl From<ObjectOrTombstone> for ObjectEntry {
146 fn from(object: ObjectOrTombstone) -> Self {
147 match object {
148 ObjectOrTombstone::Object(o) => o.into(),
149 ObjectOrTombstone::Tombstone(obj_ref) => {
150 if obj_ref.2.is_deleted() {
151 ObjectEntry::Deleted
152 } else if obj_ref.2.is_wrapped() {
153 ObjectEntry::Wrapped
154 } else {
155 panic!("tombstone digest must either be deleted or wrapped");
156 }
157 }
158 }
159 }
160}
161
162#[derive(Debug, Clone, PartialEq, Eq)]
163enum LatestObjectCacheEntry {
164 Object(SequenceNumber, ObjectEntry),
165 NonExistent,
166}
167
168impl LatestObjectCacheEntry {
169 #[cfg(test)]
170 fn version(&self) -> Option<SequenceNumber> {
171 match self {
172 LatestObjectCacheEntry::Object(version, _) => Some(*version),
173 LatestObjectCacheEntry::NonExistent => None,
174 }
175 }
176}
177
178impl IsNewer for LatestObjectCacheEntry {
179 fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
180 match (self, other) {
181 (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
182 v1 > v2
183 }
184 (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
185 _ => false,
186 }
187 }
188}
189
190type MarkerKey = (EpochId, ObjectID);
191
192enum CacheResult<T> {
193 Hit(T),
195 NegativeHit,
197 Miss,
199}
200
201struct UncommittedData {
204 objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
221
222 markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
227
228 transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
229
230 transaction_events:
235 DashMap<TransactionEventsDigest, (BTreeSet<TransactionDigest>, TransactionEvents)>,
236
237 executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
238
239 pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
242}
243
244impl UncommittedData {
245 fn new() -> Self {
246 Self {
247 objects: DashMap::new(),
248 markers: DashMap::new(),
249 transaction_effects: DashMap::new(),
250 executed_effects_digests: DashMap::new(),
251 pending_transaction_writes: DashMap::new(),
252 transaction_events: DashMap::new(),
253 }
254 }
255
256 fn clear(&self) {
257 self.objects.clear();
258 self.markers.clear();
259 self.transaction_effects.clear();
260 self.executed_effects_digests.clear();
261 self.pending_transaction_writes.clear();
262 self.transaction_events.clear();
263 }
264
265 fn is_empty(&self) -> bool {
266 let empty = self.pending_transaction_writes.is_empty();
267 if empty && cfg!(debug_assertions) {
268 assert!(
269 self.objects.is_empty()
270 && self.markers.is_empty()
271 && self.transaction_effects.is_empty()
272 && self.executed_effects_digests.is_empty()
273 && self.transaction_events.is_empty()
274 );
275 }
276 empty
277 }
278}
279
280static MAX_CACHE_SIZE: u64 = 10000;
282
283struct CachedCommittedData {
286 object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
288
289 object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
295
296 marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
298
299 transactions: MokaCache<TransactionDigest, Arc<VerifiedTransaction>>,
300
301 transaction_effects: MokaCache<TransactionEffectsDigest, Arc<TransactionEffects>>,
302
303 transaction_events: MokaCache<TransactionEventsDigest, Arc<TransactionEvents>>,
304
305 executed_effects_digests: MokaCache<TransactionDigest, TransactionEffectsDigest>,
306
307 _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
310}
311
312impl CachedCommittedData {
313 fn new() -> Self {
314 let object_cache = MokaCache::builder()
315 .max_capacity(MAX_CACHE_SIZE)
316 .max_capacity(MAX_CACHE_SIZE)
317 .build();
318 let marker_cache = MokaCache::builder()
319 .max_capacity(MAX_CACHE_SIZE)
320 .max_capacity(MAX_CACHE_SIZE)
321 .build();
322 let transactions = MokaCache::builder()
323 .max_capacity(MAX_CACHE_SIZE)
324 .max_capacity(MAX_CACHE_SIZE)
325 .build();
326 let transaction_effects = MokaCache::builder()
327 .max_capacity(MAX_CACHE_SIZE)
328 .max_capacity(MAX_CACHE_SIZE)
329 .build();
330 let transaction_events = MokaCache::builder()
331 .max_capacity(MAX_CACHE_SIZE)
332 .max_capacity(MAX_CACHE_SIZE)
333 .build();
334 let executed_effects_digests = MokaCache::builder()
335 .max_capacity(MAX_CACHE_SIZE)
336 .max_capacity(MAX_CACHE_SIZE)
337 .build();
338 let transaction_objects = MokaCache::builder()
339 .max_capacity(MAX_CACHE_SIZE)
340 .max_capacity(MAX_CACHE_SIZE)
341 .build();
342
343 Self {
344 object_cache,
345 object_by_id_cache: MonotonicCache::new(MAX_CACHE_SIZE),
346 marker_cache,
347 transactions,
348 transaction_effects,
349 transaction_events,
350 executed_effects_digests,
351 _transaction_objects: transaction_objects,
352 }
353 }
354
355 fn clear_and_assert_empty(&self) {
356 self.object_cache.invalidate_all();
357 self.object_by_id_cache.invalidate_all();
358 self.marker_cache.invalidate_all();
359 self.transactions.invalidate_all();
360 self.transaction_effects.invalidate_all();
361 self.transaction_events.invalidate_all();
362 self.executed_effects_digests.invalidate_all();
363 self._transaction_objects.invalidate_all();
364
365 assert_empty(&self.object_cache);
366 assert!(&self.object_by_id_cache.is_empty());
367 assert_empty(&self.marker_cache);
368 assert_empty(&self.transactions);
369 assert_empty(&self.transaction_effects);
370 assert_empty(&self.transaction_events);
371 assert_empty(&self.executed_effects_digests);
372 assert_empty(&self._transaction_objects);
373 }
374}
375
376fn assert_empty<K, V>(cache: &MokaCache<K, V>)
377where
378 K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
379 V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
380{
381 if cache.iter().next().is_some() {
382 panic!("cache should be empty");
383 }
384}
385
386pub struct WritebackCache {
387 dirty: UncommittedData,
388 cached: CachedCommittedData,
389
390 packages: MokaCache<ObjectID, PackageObject>,
401
402 object_locks: ObjectLocks,
403
404 executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
405 store: Arc<AuthorityStore>,
406 metrics: Arc<ExecutionCacheMetrics>,
407}
408
409macro_rules! check_cache_entry_by_version {
410 ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
411 $self.metrics.record_cache_request($table, $level);
412 if let Some(cache) = $cache {
413 if let Some(entry) = cache.get(&$version) {
414 $self.metrics.record_cache_hit($table, $level);
415 return CacheResult::Hit(entry.clone());
416 }
417
418 if let Some(least_version) = cache.get_least() {
419 if least_version.0 < $version {
420 $self.metrics.record_cache_negative_hit($table, $level);
423 return CacheResult::NegativeHit;
424 }
425 }
426 }
427 $self.metrics.record_cache_miss($table, $level);
428 };
429}
430
431macro_rules! check_cache_entry_by_latest {
432 ($self: ident, $table: expr, $level: expr, $cache: expr) => {
433 $self.metrics.record_cache_request($table, $level);
434 if let Some(cache) = $cache {
435 if let Some((version, entry)) = cache.get_highest() {
436 $self.metrics.record_cache_hit($table, $level);
437 return CacheResult::Hit((*version, entry.clone()));
438 } else {
439 panic!("empty CachedVersionMap should have been removed");
440 }
441 }
442 $self.metrics.record_cache_miss($table, $level);
443 };
444}
445
446impl WritebackCache {
447 pub fn new(store: Arc<AuthorityStore>, metrics: Arc<ExecutionCacheMetrics>) -> Self {
448 let packages = MokaCache::builder()
449 .max_capacity(MAX_CACHE_SIZE)
450 .max_capacity(MAX_CACHE_SIZE)
451 .build();
452 Self {
453 dirty: UncommittedData::new(),
454 cached: CachedCommittedData::new(),
455 packages,
456 object_locks: ObjectLocks::new(),
457 executed_effects_digests_notify_read: NotifyRead::new(),
458 store,
459 metrics,
460 }
461 }
462
463 pub fn new_for_tests(store: Arc<AuthorityStore>, registry: &Registry) -> Self {
464 Self::new(store, ExecutionCacheMetrics::new(registry).into())
465 }
466
467 #[cfg(test)]
468 pub fn reset_for_test(&mut self) {
469 let mut new = Self::new(self.store.clone(), self.metrics.clone());
470 std::mem::swap(self, &mut new);
471 }
472
473 async fn write_object_entry(
474 &self,
475 object_id: &ObjectID,
476 version: SequenceNumber,
477 object: ObjectEntry,
478 ) {
479 trace!(?object_id, ?version, ?object, "inserting object entry");
480 fail_point_async!("write_object_entry");
481 self.metrics.record_cache_write("object");
482
483 let mut entry = self.dirty.objects.entry(*object_id).or_default();
507
508 self.cached.object_by_id_cache.insert(
509 object_id,
510 LatestObjectCacheEntry::Object(version, object.clone()),
511 );
512
513 entry.insert(version, object);
514 }
515
516 async fn write_marker_value(
517 &self,
518 epoch_id: EpochId,
519 object_key: &ObjectKey,
520 marker_value: MarkerValue,
521 ) {
522 tracing::trace!(
523 "inserting marker value {:?}: {:?}",
524 object_key,
525 marker_value
526 );
527 fail_point_async!("write_marker_entry");
528 self.metrics.record_cache_write("marker");
529 self.dirty
530 .markers
531 .entry((epoch_id, object_key.0))
532 .or_default()
533 .value_mut()
534 .insert(object_key.1, marker_value);
535 }
536
537 fn with_locked_cache_entries<K, V, R>(
541 dirty_map: &DashMap<K, CachedVersionMap<V>>,
542 cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
543 key: &K,
544 cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
545 ) -> R
546 where
547 K: Copy + Eq + Hash + Send + Sync + 'static,
548 V: Send + Sync + 'static,
549 {
550 let dirty_entry = dirty_map.entry(*key);
551 let dirty_entry = match &dirty_entry {
552 DashMapEntry::Occupied(occupied) => Some(occupied.get()),
553 DashMapEntry::Vacant(_) => None,
554 };
555
556 let cached_entry = cached_map.get(key);
557 let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
558 let cached_entry = cached_lock.as_deref();
559
560 cb(dirty_entry, cached_entry)
561 }
562
563 fn get_object_entry_by_key_cache_only(
566 &self,
567 object_id: &ObjectID,
568 version: SequenceNumber,
569 ) -> CacheResult<ObjectEntry> {
570 Self::with_locked_cache_entries(
571 &self.dirty.objects,
572 &self.cached.object_cache,
573 object_id,
574 |dirty_entry, cached_entry| {
575 check_cache_entry_by_version!(
576 self,
577 "object_by_version",
578 "uncommitted",
579 dirty_entry,
580 version
581 );
582 check_cache_entry_by_version!(
583 self,
584 "object_by_version",
585 "committed",
586 cached_entry,
587 version
588 );
589 CacheResult::Miss
590 },
591 )
592 }
593
594 fn get_object_by_key_cache_only(
595 &self,
596 object_id: &ObjectID,
597 version: SequenceNumber,
598 ) -> CacheResult<Object> {
599 match self.get_object_entry_by_key_cache_only(object_id, version) {
600 CacheResult::Hit(entry) => match entry {
601 ObjectEntry::Object(object) => CacheResult::Hit(object),
602 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
603 },
604 CacheResult::Miss => CacheResult::Miss,
605 CacheResult::NegativeHit => CacheResult::NegativeHit,
606 }
607 }
608
609 fn get_object_entry_by_id_cache_only(
610 &self,
611 request_type: &'static str,
612 object_id: &ObjectID,
613 ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
614 self.metrics
615 .record_cache_request(request_type, "object_by_id");
616 let entry = self.cached.object_by_id_cache.get(object_id);
617
618 if cfg!(debug_assertions) {
619 if let Some(entry) = &entry {
620 let highest: Option<ObjectEntry> = self
622 .dirty
623 .objects
624 .get(object_id)
625 .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
626 .or_else(|| {
627 let obj: Option<ObjectEntry> = self
628 .store
629 .get_latest_object_or_tombstone(*object_id)
630 .unwrap()
631 .map(|(_, o)| o.into());
632 obj
633 });
634
635 let cache_entry = match &*entry.lock() {
636 LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
637 LatestObjectCacheEntry::NonExistent => None,
638 };
639
640 let tombstone_possibly_pruned = highest.is_none()
643 && cache_entry
644 .as_ref()
645 .map(|e| e.is_tombstone())
646 .unwrap_or(false);
647
648 if highest != cache_entry && !tombstone_possibly_pruned {
649 tracing::error!(
650 ?highest,
651 ?cache_entry,
652 ?tombstone_possibly_pruned,
653 "object_by_id cache is incoherent for {:?}",
654 object_id
655 );
656 panic!("object_by_id cache is incoherent for {object_id:?}");
657 }
658 }
659 }
660
661 if let Some(entry) = entry {
662 let entry = entry.lock();
663 match &*entry {
664 LatestObjectCacheEntry::Object(latest_version, latest_object) => {
665 self.metrics.record_cache_hit(request_type, "object_by_id");
666 return CacheResult::Hit((*latest_version, latest_object.clone()));
667 }
668 LatestObjectCacheEntry::NonExistent => {
669 self.metrics
670 .record_cache_negative_hit(request_type, "object_by_id");
671 return CacheResult::NegativeHit;
672 }
673 }
674 } else {
675 self.metrics.record_cache_miss(request_type, "object_by_id");
676 }
677
678 Self::with_locked_cache_entries(
679 &self.dirty.objects,
680 &self.cached.object_cache,
681 object_id,
682 |dirty_entry, cached_entry| {
683 check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
684 check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
685 CacheResult::Miss
686 },
687 )
688 }
689
690 fn get_object_by_id_cache_only(
691 &self,
692 request_type: &'static str,
693 object_id: &ObjectID,
694 ) -> CacheResult<(SequenceNumber, Object)> {
695 match self.get_object_entry_by_id_cache_only(request_type, object_id) {
696 CacheResult::Hit((version, entry)) => match entry {
697 ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
698 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
699 },
700 CacheResult::NegativeHit => CacheResult::NegativeHit,
701 CacheResult::Miss => CacheResult::Miss,
702 }
703 }
704
705 fn get_marker_value_cache_only(
706 &self,
707 object_id: &ObjectID,
708 version: SequenceNumber,
709 epoch_id: EpochId,
710 ) -> CacheResult<MarkerValue> {
711 Self::with_locked_cache_entries(
712 &self.dirty.markers,
713 &self.cached.marker_cache,
714 &(epoch_id, *object_id),
715 |dirty_entry, cached_entry| {
716 check_cache_entry_by_version!(
717 self,
718 "marker_by_version",
719 "uncommitted",
720 dirty_entry,
721 version
722 );
723 check_cache_entry_by_version!(
724 self,
725 "marker_by_version",
726 "committed",
727 cached_entry,
728 version
729 );
730 CacheResult::Miss
731 },
732 )
733 }
734
735 fn get_latest_marker_value_cache_only(
736 &self,
737 object_id: &ObjectID,
738 epoch_id: EpochId,
739 ) -> CacheResult<(SequenceNumber, MarkerValue)> {
740 Self::with_locked_cache_entries(
741 &self.dirty.markers,
742 &self.cached.marker_cache,
743 &(epoch_id, *object_id),
744 |dirty_entry, cached_entry| {
745 check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
746 check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
747 CacheResult::Miss
748 },
749 )
750 }
751
752 fn get_object_impl(
753 &self,
754 request_type: &'static str,
755 id: &ObjectID,
756 ) -> IotaResult<Option<Object>> {
757 match self.get_object_by_id_cache_only(request_type, id) {
758 CacheResult::Hit((_, object)) => Ok(Some(object)),
759 CacheResult::NegativeHit => Ok(None),
760 CacheResult::Miss => {
761 let obj = self.store.get_object(id)?;
762 if let Some(obj) = &obj {
763 self.cache_latest_object_by_id(
764 id,
765 LatestObjectCacheEntry::Object(obj.version(), obj.clone().into()),
766 );
767 } else {
768 self.cache_object_not_found(id);
769 }
770 Ok(obj)
771 }
772 }
773 }
774
775 fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
776 self.metrics.record_cache_request(request_type, "db");
777 &self.store
778 }
779
780 fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
781 self.metrics
782 .record_cache_multi_request(request_type, "db", count);
783 &self.store
784 }
785
786 #[instrument(level = "debug", skip_all)]
787 async fn write_transaction_outputs(
788 &self,
789 epoch_id: EpochId,
790 tx_outputs: Arc<TransactionOutputs>,
791 ) -> IotaResult {
792 trace!(digest = ?tx_outputs.transaction.digest(), "writing transaction outputs to cache");
793
794 let TransactionOutputs {
795 transaction,
796 effects,
797 markers,
798 written,
799 deleted,
800 wrapped,
801 events,
802 ..
803 } = &*tx_outputs;
804
805 for ObjectKey(id, version) in deleted.iter() {
811 self.write_object_entry(id, *version, ObjectEntry::Deleted)
812 .await;
813 }
814
815 for ObjectKey(id, version) in wrapped.iter() {
816 self.write_object_entry(id, *version, ObjectEntry::Wrapped)
817 .await;
818 }
819
820 for (object_key, marker_value) in markers.iter() {
822 self.write_marker_value(epoch_id, object_key, *marker_value)
823 .await;
824 }
825
826 for (object_id, object) in written.iter() {
829 if object.is_child_object() {
830 self.write_object_entry(object_id, object.version(), object.clone().into())
831 .await;
832 }
833 }
834 for (object_id, object) in written.iter() {
835 if !object.is_child_object() {
836 self.write_object_entry(object_id, object.version(), object.clone().into())
837 .await;
838 if object.is_package() {
839 debug!("caching package: {:?}", object.compute_object_reference());
840 self.packages
841 .insert(*object_id, PackageObject::new(object.clone()));
842 }
843 }
844 }
845
846 let tx_digest = *transaction.digest();
847 let effects_digest = effects.digest();
848
849 self.metrics.record_cache_write("transaction_block");
850 self.dirty
851 .pending_transaction_writes
852 .insert(tx_digest, tx_outputs.clone());
853
854 self.metrics.record_cache_write("transaction_effects");
857 self.dirty
858 .transaction_effects
859 .insert(effects_digest, effects.clone());
860
861 self.metrics.record_cache_write("transaction_events");
866 match self.dirty.transaction_events.entry(events.digest()) {
867 DashMapEntry::Occupied(mut occupied) => {
868 occupied.get_mut().0.insert(tx_digest);
869 }
870 DashMapEntry::Vacant(entry) => {
871 let mut txns = BTreeSet::new();
872 txns.insert(tx_digest);
873 entry.insert((txns, events.clone()));
874 }
875 }
876
877 self.metrics.record_cache_write("executed_effects_digests");
878 self.dirty
879 .executed_effects_digests
880 .insert(tx_digest, effects_digest);
881
882 self.executed_effects_digests_notify_read
883 .notify(&tx_digest, &effects_digest);
884
885 self.metrics
886 .pending_notify_read
887 .set(self.executed_effects_digests_notify_read.num_pending() as i64);
888
889 Ok(())
890 }
891
892 #[instrument(level = "debug", skip_all)]
894 async fn commit_transaction_outputs(
895 &self,
896 epoch: EpochId,
897 digests: &[TransactionDigest],
898 ) -> IotaResult {
899 fail_point_async!("writeback-cache-commit");
900 trace!(?digests);
901
902 let mut all_outputs = Vec::with_capacity(digests.len());
903 for tx in digests {
904 let Some(outputs) = self
905 .dirty
906 .pending_transaction_writes
907 .get(tx)
908 .map(|o| o.clone())
909 else {
910 warn!("Attempt to commit unknown transaction {:?}", tx);
918 continue;
919 };
920 all_outputs.push(outputs);
921 }
922
923 self.store
927 .write_transaction_outputs(epoch, &all_outputs)
928 .await?;
929
930 for outputs in all_outputs.iter() {
931 let tx_digest = outputs.transaction.digest();
932 assert!(
933 self.dirty
934 .pending_transaction_writes
935 .remove(tx_digest)
936 .is_some()
937 );
938 self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
939 }
940
941 Ok(())
942 }
943
944 fn flush_transactions_from_dirty_to_cached(
945 &self,
946 epoch: EpochId,
947 tx_digest: TransactionDigest,
948 outputs: &TransactionOutputs,
949 ) {
950 let TransactionOutputs {
954 transaction,
955 effects,
956 markers,
957 written,
958 deleted,
959 wrapped,
960 events,
961 ..
962 } = outputs;
963
964 let effects_digest = effects.digest();
965 let events_digest = events.digest();
966
967 self.cached
970 .transactions
971 .insert(tx_digest, transaction.clone());
972 self.cached
973 .transaction_effects
974 .insert(effects_digest, effects.clone().into());
975 self.cached
976 .executed_effects_digests
977 .insert(tx_digest, effects_digest);
978 self.cached
979 .transaction_events
980 .insert(events_digest, events.clone().into());
981
982 self.dirty
983 .transaction_effects
984 .remove(&effects_digest)
985 .expect("effects must exist");
986
987 match self.dirty.transaction_events.entry(events.digest()) {
988 DashMapEntry::Occupied(mut occupied) => {
989 let txns = &mut occupied.get_mut().0;
990 assert!(txns.remove(&tx_digest), "transaction must exist");
991 if txns.is_empty() {
992 occupied.remove();
993 }
994 }
995 DashMapEntry::Vacant(_) => {
996 panic!("events must exist");
997 }
998 }
999
1000 self.dirty
1001 .executed_effects_digests
1002 .remove(&tx_digest)
1003 .expect("executed effects must exist");
1004
1005 for (object_key, marker_value) in markers.iter() {
1007 Self::move_version_from_dirty_to_cache(
1008 &self.dirty.markers,
1009 &self.cached.marker_cache,
1010 (epoch, object_key.0),
1011 object_key.1,
1012 marker_value,
1013 );
1014 }
1015
1016 for (object_id, object) in written.iter() {
1017 Self::move_version_from_dirty_to_cache(
1018 &self.dirty.objects,
1019 &self.cached.object_cache,
1020 *object_id,
1021 object.version(),
1022 &ObjectEntry::Object(object.clone()),
1023 );
1024 }
1025
1026 for ObjectKey(object_id, version) in deleted.iter() {
1027 Self::move_version_from_dirty_to_cache(
1028 &self.dirty.objects,
1029 &self.cached.object_cache,
1030 *object_id,
1031 *version,
1032 &ObjectEntry::Deleted,
1033 );
1034 }
1035
1036 for ObjectKey(object_id, version) in wrapped.iter() {
1037 Self::move_version_from_dirty_to_cache(
1038 &self.dirty.objects,
1039 &self.cached.object_cache,
1040 *object_id,
1041 *version,
1042 &ObjectEntry::Wrapped,
1043 );
1044 }
1045 }
1046
1047 async fn persist_transactions(&self, digests: &[TransactionDigest]) -> IotaResult {
1048 let mut txns = Vec::with_capacity(digests.len());
1049 for tx_digest in digests {
1050 let Some(tx) = self
1051 .dirty
1052 .pending_transaction_writes
1053 .get(tx_digest)
1054 .map(|o| o.transaction.clone())
1055 else {
1056 debug_assert!(
1058 self.store
1059 .get_transaction_block(tx_digest)
1060 .unwrap()
1061 .is_some()
1062 );
1063 continue;
1067 };
1068
1069 txns.push((*tx_digest, (*tx).clone()));
1070 }
1071
1072 self.store.commit_transactions(&txns)
1073 }
1074
1075 fn move_version_from_dirty_to_cache<K, V>(
1078 dirty: &DashMap<K, CachedVersionMap<V>>,
1079 cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1080 key: K,
1081 version: SequenceNumber,
1082 value: &V,
1083 ) where
1084 K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1085 V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1086 {
1087 static MAX_VERSIONS: usize = 3;
1088
1089 let dirty_entry = dirty.entry(key);
1093 let cache_entry = cache.entry(key).or_default();
1094 let mut cache_map = cache_entry.value().lock();
1095
1096 cache_map.insert(version, value.clone());
1098 cache_map.truncate_to(MAX_VERSIONS);
1100
1101 let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1102 panic!("dirty map must exist");
1103 };
1104
1105 let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1106
1107 assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1108
1109 if occupied_dirty_entry.get().is_empty() {
1111 occupied_dirty_entry.remove();
1112 }
1113 }
1114
1115 fn cache_latest_object_by_id(&self, object_id: &ObjectID, object: LatestObjectCacheEntry) {
1117 trace!("caching object by id: {:?} {:?}", object_id, object);
1118 self.metrics.record_cache_write("object_by_id");
1119 self.cached.object_by_id_cache.insert(object_id, object);
1120 }
1121
1122 fn cache_object_not_found(&self, object_id: &ObjectID) {
1123 self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent);
1124 }
1125
1126 fn clear_state_end_of_epoch_impl(&self, _execution_guard: &ExecutionLockWriteGuard<'_>) {
1127 info!("clearing state at end of epoch");
1128 assert!(
1129 self.dirty.pending_transaction_writes.is_empty(),
1130 "should be empty due to revert_state_update"
1131 );
1132 self.dirty.clear();
1133 info!("clearing old transaction locks");
1134 self.object_locks.clear();
1135 }
1136
1137 fn revert_state_update_impl(&self, tx: &TransactionDigest) -> IotaResult {
1138 let Some((_, outputs)) = self.dirty.pending_transaction_writes.remove(tx) else {
1143 assert!(
1144 !self.is_tx_already_executed(tx).expect("read cannot fail"),
1145 "attempt to revert committed transaction"
1146 );
1147
1148 info!("Not reverting {:?} as it was not executed", tx);
1151 return Ok(());
1152 };
1153
1154 for (object_id, object) in outputs.written.iter() {
1155 if object.is_package() {
1156 info!("removing non-finalized package from cache: {:?}", object_id);
1157 self.packages.invalidate(object_id);
1158 }
1159 self.cached.object_by_id_cache.invalidate(object_id);
1160 self.cached.object_cache.invalidate(object_id);
1161 }
1162
1163 for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1164 self.cached.object_by_id_cache.invalidate(object_id);
1165 self.cached.object_cache.invalidate(object_id);
1166 }
1167
1168 Ok(())
1171 }
1172
1173 fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) -> IotaResult {
1174 self.store.bulk_insert_genesis_objects(objects)?;
1175 for obj in objects {
1176 self.cached.object_cache.invalidate(&obj.id());
1177 self.cached.object_by_id_cache.invalidate(&obj.id());
1178 }
1179 Ok(())
1180 }
1181
1182 fn insert_genesis_object_impl(&self, object: Object) -> IotaResult {
1183 self.cached.object_by_id_cache.invalidate(&object.id());
1184 self.cached.object_cache.invalidate(&object.id());
1185 self.store.insert_genesis_object(object)
1186 }
1187
1188 pub fn clear_caches_and_assert_empty(&self) {
1189 info!("clearing caches");
1190 self.cached.clear_and_assert_empty();
1191 self.packages.invalidate_all();
1192 assert_empty(&self.packages);
1193 }
1194}
1195
1196impl ExecutionCacheAPI for WritebackCache {}
1197
1198impl ExecutionCacheCommit for WritebackCache {
1199 fn commit_transaction_outputs<'a>(
1200 &'a self,
1201 epoch: EpochId,
1202 digests: &'a [TransactionDigest],
1203 ) -> BoxFuture<'a, IotaResult> {
1204 WritebackCache::commit_transaction_outputs(self, epoch, digests).boxed()
1205 }
1206
1207 fn persist_transactions<'a>(
1208 &'a self,
1209 digests: &'a [TransactionDigest],
1210 ) -> BoxFuture<'a, IotaResult> {
1211 WritebackCache::persist_transactions(self, digests).boxed()
1212 }
1213}
1214
1215impl ObjectCacheRead for WritebackCache {
1216 fn get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1217 self.metrics
1218 .record_cache_request("package", "package_cache");
1219 if let Some(p) = self.packages.get(package_id) {
1220 if cfg!(debug_assertions) {
1221 if let Some(store_package) = self.store.get_object(package_id).unwrap() {
1222 assert_eq!(
1223 store_package.digest(),
1224 p.object().digest(),
1225 "Package object cache is inconsistent for package {package_id:?}"
1226 );
1227 }
1228 }
1229 self.metrics.record_cache_hit("package", "package_cache");
1230 return Ok(Some(p));
1231 } else {
1232 self.metrics.record_cache_miss("package", "package_cache");
1233 }
1234
1235 if let Some(p) = self.get_object_impl("package", package_id)? {
1239 if p.is_package() {
1240 let p = PackageObject::new(p);
1241 tracing::trace!(
1242 "caching package: {:?}",
1243 p.object().compute_object_reference()
1244 );
1245 self.metrics.record_cache_write("package");
1246 self.packages.insert(*package_id, p.clone());
1247 Ok(Some(p))
1248 } else {
1249 Err(IotaError::UserInput {
1250 error: UserInputError::MoveObjectAsPackage {
1251 object_id: *package_id,
1252 },
1253 })
1254 }
1255 } else {
1256 Ok(None)
1257 }
1258 }
1259
1260 fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1261 }
1264
1265 fn get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
1268 self.get_object_impl("object_latest", id)
1269 }
1270
1271 fn get_object_by_key(
1272 &self,
1273 object_id: &ObjectID,
1274 version: SequenceNumber,
1275 ) -> IotaResult<Option<Object>> {
1276 match self.get_object_by_key_cache_only(object_id, version) {
1277 CacheResult::Hit(object) => Ok(Some(object)),
1278 CacheResult::NegativeHit => Ok(None),
1279 CacheResult::Miss => Ok(self
1280 .record_db_get("object_by_version")
1281 .get_object_by_key(object_id, version)?),
1282 }
1283 }
1284
1285 fn multi_get_objects_by_key(
1286 &self,
1287 object_keys: &[ObjectKey],
1288 ) -> Result<Vec<Option<Object>>, IotaError> {
1289 do_fallback_lookup(
1290 object_keys,
1291 |key| {
1292 Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1293 CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1294 CacheResult::NegativeHit => CacheResult::NegativeHit,
1295 CacheResult::Miss => CacheResult::Miss,
1296 })
1297 },
1298 |remaining| {
1299 self.record_db_multi_get("object_by_version", remaining.len())
1300 .multi_get_objects_by_key(remaining)
1301 },
1302 )
1303 }
1304
1305 fn object_exists_by_key(
1306 &self,
1307 object_id: &ObjectID,
1308 version: SequenceNumber,
1309 ) -> IotaResult<bool> {
1310 match self.get_object_by_key_cache_only(object_id, version) {
1311 CacheResult::Hit(_) => Ok(true),
1312 CacheResult::NegativeHit => Ok(false),
1313 CacheResult::Miss => self
1314 .record_db_get("object_by_version")
1315 .object_exists_by_key(object_id, version),
1316 }
1317 }
1318
1319 fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
1320 do_fallback_lookup(
1321 object_keys,
1322 |key| {
1323 Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1324 CacheResult::Hit(_) => CacheResult::Hit(true),
1325 CacheResult::NegativeHit => CacheResult::Hit(false),
1326 CacheResult::Miss => CacheResult::Miss,
1327 })
1328 },
1329 |remaining| {
1330 self.record_db_multi_get("object_by_version", remaining.len())
1331 .multi_object_exists_by_key(remaining)
1332 },
1333 )
1334 }
1335
1336 fn get_latest_object_ref_or_tombstone(
1337 &self,
1338 object_id: ObjectID,
1339 ) -> IotaResult<Option<ObjectRef>> {
1340 match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1341 CacheResult::Hit((version, entry)) => Ok(Some(match entry {
1342 ObjectEntry::Object(object) => object.compute_object_reference(),
1343 ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1344 ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1345 })),
1346 CacheResult::NegativeHit => Ok(None),
1347 CacheResult::Miss => self
1348 .record_db_get("latest_objref_or_tombstone")
1349 .get_latest_object_ref_or_tombstone(object_id),
1350 }
1351 }
1352
1353 fn get_latest_object_or_tombstone(
1354 &self,
1355 object_id: ObjectID,
1356 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
1357 match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1358 CacheResult::Hit((version, entry)) => {
1359 let key = ObjectKey(object_id, version);
1360 Ok(Some(match entry {
1361 ObjectEntry::Object(object) => (key, object.into()),
1362 ObjectEntry::Deleted => (
1363 key,
1364 ObjectOrTombstone::Tombstone((
1365 object_id,
1366 version,
1367 ObjectDigest::OBJECT_DIGEST_DELETED,
1368 )),
1369 ),
1370 ObjectEntry::Wrapped => (
1371 key,
1372 ObjectOrTombstone::Tombstone((
1373 object_id,
1374 version,
1375 ObjectDigest::OBJECT_DIGEST_WRAPPED,
1376 )),
1377 ),
1378 }))
1379 }
1380 CacheResult::NegativeHit => Ok(None),
1381 CacheResult::Miss => self
1382 .record_db_get("latest_object_or_tombstone")
1383 .get_latest_object_or_tombstone(object_id),
1384 }
1385 }
1386
1387 #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1388 fn find_object_lt_or_eq_version(
1389 &self,
1390 object_id: ObjectID,
1391 version_bound: SequenceNumber,
1392 ) -> IotaResult<Option<Object>> {
1393 macro_rules! check_cache_entry {
1394 ($level: expr, $objects: expr) => {
1395 self.metrics
1396 .record_cache_request("object_lt_or_eq_version", $level);
1397 if let Some(objects) = $objects {
1398 if let Some((_, object)) = objects
1399 .all_versions_lt_or_eq_descending(&version_bound)
1400 .next()
1401 {
1402 if let ObjectEntry::Object(object) = object {
1403 self.metrics
1404 .record_cache_hit("object_lt_or_eq_version", $level);
1405 return Ok(Some(object.clone()));
1406 } else {
1407 self.metrics
1409 .record_cache_negative_hit("object_lt_or_eq_version", $level);
1410 return Ok(None);
1411 }
1412 } else {
1413 self.metrics
1414 .record_cache_miss("object_lt_or_eq_version", $level);
1415 }
1416 }
1417 };
1418 }
1419
1420 self.metrics
1422 .record_cache_request("object_lt_or_eq_version", "object_by_id");
1423 if let Some(latest) = self.cached.object_by_id_cache.get(&object_id) {
1424 let latest = latest.lock();
1425 match &*latest {
1426 LatestObjectCacheEntry::Object(latest_version, object) => {
1427 if *latest_version <= version_bound {
1428 if let ObjectEntry::Object(object) = object {
1429 self.metrics
1430 .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1431 return Ok(Some(object.clone()));
1432 } else {
1433 self.metrics.record_cache_negative_hit(
1435 "object_lt_or_eq_version",
1436 "object_by_id",
1437 );
1438 return Ok(None);
1439 }
1440 }
1441 }
1444 LatestObjectCacheEntry::NonExistent => {
1446 self.metrics
1447 .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1448 return Ok(None);
1449 }
1450 }
1451 }
1452 self.metrics
1453 .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1454
1455 Self::with_locked_cache_entries(
1456 &self.dirty.objects,
1457 &self.cached.object_cache,
1458 &object_id,
1459 |dirty_entry, cached_entry| {
1460 check_cache_entry!("committed", dirty_entry);
1461 check_cache_entry!("uncommitted", cached_entry);
1462
1463 let latest: Option<(SequenceNumber, ObjectEntry)> =
1487 if let Some(dirty_set) = dirty_entry {
1488 dirty_set
1489 .get_highest()
1490 .cloned()
1491 .tap_none(|| panic!("dirty set cannot be empty"))
1492 } else {
1493 self.record_db_get("object_lt_or_eq_version_latest")
1494 .get_latest_object_or_tombstone(object_id)?
1495 .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1496 (version, ObjectEntry::from(obj_or_tombstone))
1497 })
1498 };
1499
1500 if let Some((obj_version, obj_entry)) = latest {
1501 self.cache_latest_object_by_id(
1506 &object_id,
1507 LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1508 );
1509
1510 if obj_version <= version_bound {
1511 match obj_entry {
1512 ObjectEntry::Object(object) => Ok(Some(object)),
1513 ObjectEntry::Deleted | ObjectEntry::Wrapped => Ok(None),
1514 }
1515 } else {
1516 self.record_db_get("object_lt_or_eq_version_scan")
1520 .find_object_lt_or_eq_version(object_id, version_bound)
1521 }
1522 } else {
1523 let highest = cached_entry.and_then(|c| c.get_highest());
1529 assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1530 self.cache_object_not_found(&object_id);
1531 Ok(None)
1532 }
1533 },
1534 )
1535 }
1536
1537 fn get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
1538 get_iota_system_state(self)
1539 }
1540
1541 fn get_marker_value(
1542 &self,
1543 object_id: &ObjectID,
1544 version: SequenceNumber,
1545 epoch_id: EpochId,
1546 ) -> IotaResult<Option<MarkerValue>> {
1547 match self.get_marker_value_cache_only(object_id, version, epoch_id) {
1548 CacheResult::Hit(marker) => Ok(Some(marker)),
1549 CacheResult::NegativeHit => Ok(None),
1550 CacheResult::Miss => self
1551 .record_db_get("marker_by_version")
1552 .get_marker_value(object_id, &version, epoch_id),
1553 }
1554 }
1555
1556 fn get_latest_marker(
1557 &self,
1558 object_id: &ObjectID,
1559 epoch_id: EpochId,
1560 ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
1561 match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1562 CacheResult::Hit((v, marker)) => Ok(Some((v, marker))),
1563 CacheResult::NegativeHit => {
1564 panic!("cannot have negative hit when getting latest marker")
1565 }
1566 CacheResult::Miss => self
1567 .record_db_get("marker_latest")
1568 .get_latest_marker(object_id, epoch_id),
1569 }
1570 }
1571
1572 fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> IotaLockResult {
1573 match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1574 CacheResult::Hit((_, obj)) => {
1575 let actual_objref = obj.compute_object_reference();
1576 if obj_ref != actual_objref {
1577 Ok(ObjectLockStatus::LockedAtDifferentVersion {
1578 locked_ref: actual_objref,
1579 })
1580 } else {
1581 Ok(
1583 match self
1584 .object_locks
1585 .get_transaction_lock(&obj_ref, epoch_store)?
1586 {
1587 Some(tx_digest) => ObjectLockStatus::LockedToTx {
1588 locked_by_tx: tx_digest,
1589 },
1590 None => ObjectLockStatus::Initialized,
1591 },
1592 )
1593 }
1594 }
1595 CacheResult::NegativeHit => {
1596 Err(IotaError::from(UserInputError::ObjectNotFound {
1597 object_id: obj_ref.0,
1598 version: None,
1601 }))
1602 }
1603 CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1604 }
1605 }
1606
1607 fn _get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
1608 let obj = self.get_object_impl("live_objref", &object_id)?.ok_or(
1609 UserInputError::ObjectNotFound {
1610 object_id,
1611 version: None,
1612 },
1613 )?;
1614 Ok(obj.compute_object_reference())
1615 }
1616
1617 fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1618 do_fallback_lookup(
1619 owned_object_refs,
1620 |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1621 CacheResult::Hit((version, obj)) => {
1622 if obj.compute_object_reference() != *obj_ref {
1623 Err(UserInputError::ObjectVersionUnavailableForConsumption {
1624 provided_obj_ref: *obj_ref,
1625 current_version: version,
1626 }
1627 .into())
1628 } else {
1629 Ok(CacheResult::Hit(()))
1630 }
1631 }
1632 CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1633 object_id: obj_ref.0,
1634 version: None,
1635 }
1636 .into()),
1637 CacheResult::Miss => Ok(CacheResult::Miss),
1638 },
1639 |remaining| {
1640 self.record_db_multi_get("object_is_live", remaining.len())
1641 .check_owned_objects_are_live(remaining)?;
1642 Ok(vec![(); remaining.len()])
1643 },
1644 )?;
1645 Ok(())
1646 }
1647
1648 fn get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
1649 self.store.perpetual_tables.get_highest_pruned_checkpoint()
1650 }
1651}
1652
1653impl TransactionCacheRead for WritebackCache {
1654 fn multi_get_transaction_blocks(
1655 &self,
1656 digests: &[TransactionDigest],
1657 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
1658 do_fallback_lookup(
1659 digests,
1660 |digest| {
1661 self.metrics
1662 .record_cache_request("transaction_block", "uncommitted");
1663 if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1664 self.metrics
1665 .record_cache_hit("transaction_block", "uncommitted");
1666 return Ok(CacheResult::Hit(Some(tx.transaction.clone())));
1667 }
1668 self.metrics
1669 .record_cache_miss("transaction_block", "uncommitted");
1670
1671 self.metrics
1672 .record_cache_request("transaction_block", "committed");
1673 if let Some(tx) = self.cached.transactions.get(digest) {
1674 self.metrics
1675 .record_cache_hit("transaction_block", "committed");
1676 return Ok(CacheResult::Hit(Some(tx.clone())));
1677 }
1678 self.metrics
1679 .record_cache_miss("transaction_block", "committed");
1680
1681 Ok(CacheResult::Miss)
1682 },
1683 |remaining| {
1684 self.record_db_multi_get("transaction_block", remaining.len())
1685 .multi_get_transaction_blocks(remaining)
1686 .map(|v| v.into_iter().map(|o| o.map(Arc::new)).collect())
1687 },
1688 )
1689 }
1690
1691 fn multi_get_executed_effects_digests(
1692 &self,
1693 digests: &[TransactionDigest],
1694 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
1695 do_fallback_lookup(
1696 digests,
1697 |digest| {
1698 self.metrics
1699 .record_cache_request("executed_effects_digests", "uncommitted");
1700 if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
1701 self.metrics
1702 .record_cache_hit("executed_effects_digests", "uncommitted");
1703 return Ok(CacheResult::Hit(Some(*digest)));
1704 }
1705 self.metrics
1706 .record_cache_miss("executed_effects_digests", "uncommitted");
1707
1708 self.metrics
1709 .record_cache_request("executed_effects_digests", "committed");
1710 if let Some(digest) = self.cached.executed_effects_digests.get(digest) {
1711 self.metrics
1712 .record_cache_hit("executed_effects_digests", "committed");
1713 return Ok(CacheResult::Hit(Some(digest)));
1714 }
1715 self.metrics
1716 .record_cache_miss("executed_effects_digests", "committed");
1717
1718 Ok(CacheResult::Miss)
1719 },
1720 |remaining| {
1721 self.record_db_multi_get("executed_effects_digests", remaining.len())
1722 .multi_get_executed_effects_digests(remaining)
1723 },
1724 )
1725 }
1726
1727 fn multi_get_effects(
1728 &self,
1729 digests: &[TransactionEffectsDigest],
1730 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
1731 do_fallback_lookup(
1732 digests,
1733 |digest| {
1734 self.metrics
1735 .record_cache_request("transaction_effects", "uncommitted");
1736 if let Some(effects) = self.dirty.transaction_effects.get(digest) {
1737 self.metrics
1738 .record_cache_hit("transaction_effects", "uncommitted");
1739 return Ok(CacheResult::Hit(Some(effects.clone())));
1740 }
1741 self.metrics
1742 .record_cache_miss("transaction_effects", "uncommitted");
1743
1744 self.metrics
1745 .record_cache_request("transaction_effects", "committed");
1746 if let Some(effects) = self.cached.transaction_effects.get(digest) {
1747 self.metrics
1748 .record_cache_hit("transaction_effects", "committed");
1749 return Ok(CacheResult::Hit(Some((*effects).clone())));
1750 }
1751 self.metrics
1752 .record_cache_miss("transaction_effects", "committed");
1753
1754 Ok(CacheResult::Miss)
1755 },
1756 |remaining| {
1757 self.record_db_multi_get("transaction_effects", remaining.len())
1758 .multi_get_effects(remaining.iter())
1759 },
1760 )
1761 }
1762
1763 fn notify_read_executed_effects_digests<'a>(
1764 &'a self,
1765 digests: &'a [TransactionDigest],
1766 ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
1767 self.executed_effects_digests_notify_read
1768 .read(digests, |digests| {
1769 self.multi_get_executed_effects_digests(digests)
1770 })
1771 .boxed()
1772 }
1773
1774 fn multi_get_events(
1775 &self,
1776 event_digests: &[TransactionEventsDigest],
1777 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
1778 fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
1779 if events.data.is_empty() {
1780 None
1781 } else {
1782 Some(events)
1783 }
1784 }
1785
1786 do_fallback_lookup(
1787 event_digests,
1788 |digest| {
1789 self.metrics
1790 .record_cache_request("transaction_events", "uncommitted");
1791 if let Some(events) = self
1792 .dirty
1793 .transaction_events
1794 .get(digest)
1795 .map(|e| e.1.clone())
1796 {
1797 self.metrics
1798 .record_cache_hit("transaction_events", "uncommitted");
1799
1800 return Ok(CacheResult::Hit(map_events(events)));
1801 }
1802 self.metrics
1803 .record_cache_miss("transaction_events", "uncommitted");
1804
1805 self.metrics
1806 .record_cache_request("transaction_events", "committed");
1807 if let Some(events) = self
1808 .cached
1809 .transaction_events
1810 .get(digest)
1811 .map(|e| (*e).clone())
1812 {
1813 self.metrics
1814 .record_cache_hit("transaction_events", "committed");
1815 return Ok(CacheResult::Hit(map_events(events)));
1816 }
1817
1818 self.metrics
1819 .record_cache_miss("transaction_events", "committed");
1820
1821 Ok(CacheResult::Miss)
1822 },
1823 |digests| self.store.multi_get_events(digests),
1824 )
1825 }
1826}
1827
1828impl ExecutionCacheWrite for WritebackCache {
1829 fn acquire_transaction_locks<'a>(
1830 &'a self,
1831 epoch_store: &'a AuthorityPerEpochStore,
1832 owned_input_objects: &'a [ObjectRef],
1833 transaction: VerifiedSignedTransaction,
1834 ) -> BoxFuture<'a, IotaResult> {
1835 self.object_locks
1836 .acquire_transaction_locks(self, epoch_store, owned_input_objects, transaction)
1837 .boxed()
1838 }
1839
1840 fn write_transaction_outputs(
1841 &self,
1842 epoch_id: EpochId,
1843 tx_outputs: Arc<TransactionOutputs>,
1844 ) -> BoxFuture<'_, IotaResult> {
1845 WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs).boxed()
1846 }
1847}
1848
1849fn do_fallback_lookup<K: Copy, V: Default + Clone>(
1858 keys: &[K],
1859 get_cached_key: impl Fn(&K) -> IotaResult<CacheResult<V>>,
1860 multiget_fallback: impl Fn(&[K]) -> IotaResult<Vec<V>>,
1861) -> IotaResult<Vec<V>> {
1862 let mut results = vec![V::default(); keys.len()];
1863 let mut fallback_keys = Vec::with_capacity(keys.len());
1864 let mut fallback_indices = Vec::with_capacity(keys.len());
1865
1866 for (i, key) in keys.iter().enumerate() {
1867 match get_cached_key(key)? {
1868 CacheResult::Miss => {
1869 fallback_keys.push(*key);
1870 fallback_indices.push(i);
1871 }
1872 CacheResult::NegativeHit => (),
1873 CacheResult::Hit(value) => {
1874 results[i] = value;
1875 }
1876 }
1877 }
1878
1879 let fallback_results = multiget_fallback(&fallback_keys)?;
1880 assert_eq!(fallback_results.len(), fallback_indices.len());
1881 assert_eq!(fallback_results.len(), fallback_keys.len());
1882
1883 for (i, result) in fallback_indices
1884 .into_iter()
1885 .zip(fallback_results.into_iter())
1886 {
1887 results[i] = result;
1888 }
1889 Ok(results)
1890}
1891
1892implement_passthrough_traits!(WritebackCache);
1893
1894impl AccumulatorStore for WritebackCache {
1895 fn get_root_state_accumulator_for_epoch(
1896 &self,
1897 epoch: EpochId,
1898 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
1899 self.store.get_root_state_accumulator_for_epoch(epoch)
1900 }
1901
1902 fn get_root_state_accumulator_for_highest_epoch(
1903 &self,
1904 ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
1905 self.store.get_root_state_accumulator_for_highest_epoch()
1906 }
1907
1908 fn insert_state_accumulator_for_epoch(
1909 &self,
1910 epoch: EpochId,
1911 checkpoint_seq_num: &CheckpointSequenceNumber,
1912 acc: &Accumulator,
1913 ) -> IotaResult {
1914 self.store
1915 .insert_state_accumulator_for_epoch(epoch, checkpoint_seq_num, acc)
1916 }
1917
1918 fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1919 assert!(
1923 self.dirty.is_empty(),
1924 "cannot iterate live object set with dirty data"
1925 );
1926 self.store.iter_live_object_set()
1927 }
1928
1929 fn iter_cached_live_object_set_for_testing(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1933 let iter = self.dirty.objects.iter();
1935 let mut dirty_objects = BTreeMap::new();
1936
1937 for obj in self.store.iter_live_object_set() {
1939 dirty_objects.insert(obj.object_id(), obj);
1940 }
1941
1942 for entry in iter {
1944 let id = *entry.key();
1945 let value = entry.value();
1946 match value.get_highest().unwrap() {
1947 (_, ObjectEntry::Object(object)) => {
1948 dirty_objects.insert(id, LiveObject::Normal(object.clone()));
1949 }
1950 (_version, ObjectEntry::Wrapped) => {
1951 dirty_objects.remove(&id);
1952 }
1953 (_, ObjectEntry::Deleted) => {
1954 dirty_objects.remove(&id);
1955 }
1956 }
1957 }
1958
1959 Box::new(dirty_objects.into_values())
1960 }
1961}