1use std::{
51 collections::{BTreeMap, BTreeSet},
52 hash::Hash,
53 sync::Arc,
54};
55
56use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
57use futures::{FutureExt, future::BoxFuture};
58use iota_common::sync::notify_read::NotifyRead;
59use iota_macros::fail_point_async;
60use iota_types::{
61 accumulator::Accumulator,
62 base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
63 bridge::{Bridge, get_bridge},
64 digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
65 effects::{TransactionEffects, TransactionEvents},
66 error::{IotaError, IotaResult, UserInputError},
67 iota_system_state::{IotaSystemState, get_iota_system_state},
68 message_envelope::Message,
69 messages_checkpoint::CheckpointSequenceNumber,
70 object::Object,
71 storage::{MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject},
72 transaction::{VerifiedSignedTransaction, VerifiedTransaction},
73};
74use moka::sync::Cache as MokaCache;
75use parking_lot::Mutex;
76use prometheus::Registry;
77use tap::TapOptional;
78use tracing::{debug, info, instrument, trace, warn};
79
80use super::{
81 CheckpointCache, ExecutionCacheAPI, ExecutionCacheCommit, ExecutionCacheMetrics,
82 ExecutionCacheReconfigAPI, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI,
83 TransactionCacheRead,
84 cache_types::{CachedVersionMap, IsNewer, MonotonicCache},
85 implement_passthrough_traits,
86 object_locks::ObjectLocks,
87};
88use crate::{
89 authority::{
90 AuthorityStore,
91 authority_per_epoch_store::AuthorityPerEpochStore,
92 authority_store::{ExecutionLockWriteGuard, IotaLockResult, ObjectLockStatus},
93 authority_store_tables::LiveObject,
94 epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
95 },
96 state_accumulator::AccumulatorStore,
97 transaction_outputs::TransactionOutputs,
98};
99
100#[cfg(test)]
101#[path = "unit_tests/writeback_cache_tests.rs"]
102pub mod writeback_cache_tests;
103
104#[derive(Clone, PartialEq, Eq)]
105enum ObjectEntry {
106 Object(Object),
107 Deleted,
108 Wrapped,
109}
110
111impl ObjectEntry {
112 #[cfg(test)]
113 fn unwrap_object(&self) -> &Object {
114 match self {
115 ObjectEntry::Object(o) => o,
116 _ => panic!("unwrap_object called on non-Object"),
117 }
118 }
119
120 fn is_tombstone(&self) -> bool {
121 match self {
122 ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
123 ObjectEntry::Object(_) => false,
124 }
125 }
126}
127
128impl std::fmt::Debug for ObjectEntry {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 match self {
131 ObjectEntry::Object(o) => {
132 write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
133 }
134 ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
135 ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
136 }
137 }
138}
139
140impl From<Object> for ObjectEntry {
141 fn from(object: Object) -> Self {
142 ObjectEntry::Object(object)
143 }
144}
145
146impl From<ObjectOrTombstone> for ObjectEntry {
147 fn from(object: ObjectOrTombstone) -> Self {
148 match object {
149 ObjectOrTombstone::Object(o) => o.into(),
150 ObjectOrTombstone::Tombstone(obj_ref) => {
151 if obj_ref.2.is_deleted() {
152 ObjectEntry::Deleted
153 } else if obj_ref.2.is_wrapped() {
154 ObjectEntry::Wrapped
155 } else {
156 panic!("tombstone digest must either be deleted or wrapped");
157 }
158 }
159 }
160 }
161}
162
163#[derive(Debug, Clone, PartialEq, Eq)]
164enum LatestObjectCacheEntry {
165 Object(SequenceNumber, ObjectEntry),
166 NonExistent,
167}
168
169impl LatestObjectCacheEntry {
170 #[cfg(test)]
171 fn version(&self) -> Option<SequenceNumber> {
172 match self {
173 LatestObjectCacheEntry::Object(version, _) => Some(*version),
174 LatestObjectCacheEntry::NonExistent => None,
175 }
176 }
177}
178
179impl IsNewer for LatestObjectCacheEntry {
180 fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
181 match (self, other) {
182 (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
183 v1 > v2
184 }
185 (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
186 _ => false,
187 }
188 }
189}
190
191type MarkerKey = (EpochId, ObjectID);
192
193enum CacheResult<T> {
194 Hit(T),
196 NegativeHit,
198 Miss,
200}
201
202struct UncommittedData {
205 objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
222
223 markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
228
229 transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
230
231 transaction_events:
236 DashMap<TransactionEventsDigest, (BTreeSet<TransactionDigest>, TransactionEvents)>,
237
238 executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
239
240 pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
243}
244
245impl UncommittedData {
246 fn new() -> Self {
247 Self {
248 objects: DashMap::new(),
249 markers: DashMap::new(),
250 transaction_effects: DashMap::new(),
251 executed_effects_digests: DashMap::new(),
252 pending_transaction_writes: DashMap::new(),
253 transaction_events: DashMap::new(),
254 }
255 }
256
257 fn clear(&self) {
258 self.objects.clear();
259 self.markers.clear();
260 self.transaction_effects.clear();
261 self.executed_effects_digests.clear();
262 self.pending_transaction_writes.clear();
263 self.transaction_events.clear();
264 }
265
266 fn is_empty(&self) -> bool {
267 let empty = self.pending_transaction_writes.is_empty();
268 if empty && cfg!(debug_assertions) {
269 assert!(
270 self.objects.is_empty()
271 && self.markers.is_empty()
272 && self.transaction_effects.is_empty()
273 && self.executed_effects_digests.is_empty()
274 && self.transaction_events.is_empty()
275 );
276 }
277 empty
278 }
279}
280
281static MAX_CACHE_SIZE: u64 = 10000;
283
284struct CachedCommittedData {
287 object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
289
290 object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
296
297 marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
299
300 transactions: MokaCache<TransactionDigest, Arc<VerifiedTransaction>>,
301
302 transaction_effects: MokaCache<TransactionEffectsDigest, Arc<TransactionEffects>>,
303
304 transaction_events: MokaCache<TransactionEventsDigest, Arc<TransactionEvents>>,
305
306 executed_effects_digests: MokaCache<TransactionDigest, TransactionEffectsDigest>,
307
308 _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
311}
312
313impl CachedCommittedData {
314 fn new() -> Self {
315 let object_cache = MokaCache::builder()
316 .max_capacity(MAX_CACHE_SIZE)
317 .max_capacity(MAX_CACHE_SIZE)
318 .build();
319 let marker_cache = MokaCache::builder()
320 .max_capacity(MAX_CACHE_SIZE)
321 .max_capacity(MAX_CACHE_SIZE)
322 .build();
323 let transactions = MokaCache::builder()
324 .max_capacity(MAX_CACHE_SIZE)
325 .max_capacity(MAX_CACHE_SIZE)
326 .build();
327 let transaction_effects = MokaCache::builder()
328 .max_capacity(MAX_CACHE_SIZE)
329 .max_capacity(MAX_CACHE_SIZE)
330 .build();
331 let transaction_events = MokaCache::builder()
332 .max_capacity(MAX_CACHE_SIZE)
333 .max_capacity(MAX_CACHE_SIZE)
334 .build();
335 let executed_effects_digests = MokaCache::builder()
336 .max_capacity(MAX_CACHE_SIZE)
337 .max_capacity(MAX_CACHE_SIZE)
338 .build();
339 let transaction_objects = MokaCache::builder()
340 .max_capacity(MAX_CACHE_SIZE)
341 .max_capacity(MAX_CACHE_SIZE)
342 .build();
343
344 Self {
345 object_cache,
346 object_by_id_cache: MonotonicCache::new(MAX_CACHE_SIZE),
347 marker_cache,
348 transactions,
349 transaction_effects,
350 transaction_events,
351 executed_effects_digests,
352 _transaction_objects: transaction_objects,
353 }
354 }
355
356 fn clear_and_assert_empty(&self) {
357 self.object_cache.invalidate_all();
358 self.object_by_id_cache.invalidate_all();
359 self.marker_cache.invalidate_all();
360 self.transactions.invalidate_all();
361 self.transaction_effects.invalidate_all();
362 self.transaction_events.invalidate_all();
363 self.executed_effects_digests.invalidate_all();
364 self._transaction_objects.invalidate_all();
365
366 assert_empty(&self.object_cache);
367 assert!(&self.object_by_id_cache.is_empty());
368 assert_empty(&self.marker_cache);
369 assert_empty(&self.transactions);
370 assert_empty(&self.transaction_effects);
371 assert_empty(&self.transaction_events);
372 assert_empty(&self.executed_effects_digests);
373 assert_empty(&self._transaction_objects);
374 }
375}
376
377fn assert_empty<K, V>(cache: &MokaCache<K, V>)
378where
379 K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
380 V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
381{
382 if cache.iter().next().is_some() {
383 panic!("cache should be empty");
384 }
385}
386
387pub struct WritebackCache {
388 dirty: UncommittedData,
389 cached: CachedCommittedData,
390
391 packages: MokaCache<ObjectID, PackageObject>,
402
403 object_locks: ObjectLocks,
404
405 executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
406 store: Arc<AuthorityStore>,
407 metrics: Arc<ExecutionCacheMetrics>,
408}
409
410macro_rules! check_cache_entry_by_version {
411 ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
412 $self.metrics.record_cache_request($table, $level);
413 if let Some(cache) = $cache {
414 if let Some(entry) = cache.get(&$version) {
415 $self.metrics.record_cache_hit($table, $level);
416 return CacheResult::Hit(entry.clone());
417 }
418
419 if let Some(least_version) = cache.get_least() {
420 if least_version.0 < $version {
421 $self.metrics.record_cache_negative_hit($table, $level);
424 return CacheResult::NegativeHit;
425 }
426 }
427 }
428 $self.metrics.record_cache_miss($table, $level);
429 };
430}
431
432macro_rules! check_cache_entry_by_latest {
433 ($self: ident, $table: expr, $level: expr, $cache: expr) => {
434 $self.metrics.record_cache_request($table, $level);
435 if let Some(cache) = $cache {
436 if let Some((version, entry)) = cache.get_highest() {
437 $self.metrics.record_cache_hit($table, $level);
438 return CacheResult::Hit((*version, entry.clone()));
439 } else {
440 panic!("empty CachedVersionMap should have been removed");
441 }
442 }
443 $self.metrics.record_cache_miss($table, $level);
444 };
445}
446
447impl WritebackCache {
448 pub fn new(store: Arc<AuthorityStore>, metrics: Arc<ExecutionCacheMetrics>) -> Self {
449 let packages = MokaCache::builder()
450 .max_capacity(MAX_CACHE_SIZE)
451 .max_capacity(MAX_CACHE_SIZE)
452 .build();
453 Self {
454 dirty: UncommittedData::new(),
455 cached: CachedCommittedData::new(),
456 packages,
457 object_locks: ObjectLocks::new(),
458 executed_effects_digests_notify_read: NotifyRead::new(),
459 store,
460 metrics,
461 }
462 }
463
464 pub fn new_for_tests(store: Arc<AuthorityStore>, registry: &Registry) -> Self {
465 Self::new(store, ExecutionCacheMetrics::new(registry).into())
466 }
467
468 #[cfg(test)]
469 pub fn reset_for_test(&mut self) {
470 let mut new = Self::new(self.store.clone(), self.metrics.clone());
471 std::mem::swap(self, &mut new);
472 }
473
474 async fn write_object_entry(
475 &self,
476 object_id: &ObjectID,
477 version: SequenceNumber,
478 object: ObjectEntry,
479 ) {
480 debug!(?object_id, ?version, ?object, "inserting object entry");
481 fail_point_async!("write_object_entry");
482 self.metrics.record_cache_write("object");
483
484 let mut entry = self.dirty.objects.entry(*object_id).or_default();
508
509 self.cached.object_by_id_cache.insert(
510 object_id,
511 LatestObjectCacheEntry::Object(version, object.clone()),
512 );
513
514 entry.insert(version, object);
515 }
516
517 async fn write_marker_value(
518 &self,
519 epoch_id: EpochId,
520 object_key: &ObjectKey,
521 marker_value: MarkerValue,
522 ) {
523 tracing::trace!(
524 "inserting marker value {:?}: {:?}",
525 object_key,
526 marker_value
527 );
528 fail_point_async!("write_marker_entry");
529 self.metrics.record_cache_write("marker");
530 self.dirty
531 .markers
532 .entry((epoch_id, object_key.0))
533 .or_default()
534 .value_mut()
535 .insert(object_key.1, marker_value);
536 }
537
538 fn with_locked_cache_entries<K, V, R>(
542 dirty_map: &DashMap<K, CachedVersionMap<V>>,
543 cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
544 key: &K,
545 cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
546 ) -> R
547 where
548 K: Copy + Eq + Hash + Send + Sync + 'static,
549 V: Send + Sync + 'static,
550 {
551 let dirty_entry = dirty_map.entry(*key);
552 let dirty_entry = match &dirty_entry {
553 DashMapEntry::Occupied(occupied) => Some(occupied.get()),
554 DashMapEntry::Vacant(_) => None,
555 };
556
557 let cached_entry = cached_map.get(key);
558 let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
559 let cached_entry = cached_lock.as_deref();
560
561 cb(dirty_entry, cached_entry)
562 }
563
564 fn get_object_entry_by_key_cache_only(
567 &self,
568 object_id: &ObjectID,
569 version: SequenceNumber,
570 ) -> CacheResult<ObjectEntry> {
571 Self::with_locked_cache_entries(
572 &self.dirty.objects,
573 &self.cached.object_cache,
574 object_id,
575 |dirty_entry, cached_entry| {
576 check_cache_entry_by_version!(
577 self,
578 "object_by_version",
579 "uncommitted",
580 dirty_entry,
581 version
582 );
583 check_cache_entry_by_version!(
584 self,
585 "object_by_version",
586 "committed",
587 cached_entry,
588 version
589 );
590 CacheResult::Miss
591 },
592 )
593 }
594
595 fn get_object_by_key_cache_only(
596 &self,
597 object_id: &ObjectID,
598 version: SequenceNumber,
599 ) -> CacheResult<Object> {
600 match self.get_object_entry_by_key_cache_only(object_id, version) {
601 CacheResult::Hit(entry) => match entry {
602 ObjectEntry::Object(object) => CacheResult::Hit(object),
603 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
604 },
605 CacheResult::Miss => CacheResult::Miss,
606 CacheResult::NegativeHit => CacheResult::NegativeHit,
607 }
608 }
609
610 fn get_object_entry_by_id_cache_only(
611 &self,
612 request_type: &'static str,
613 object_id: &ObjectID,
614 ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
615 self.metrics
616 .record_cache_request(request_type, "object_by_id");
617 let entry = self.cached.object_by_id_cache.get(object_id);
618
619 if cfg!(debug_assertions) {
620 if let Some(entry) = &entry {
621 let highest: Option<ObjectEntry> = self
623 .dirty
624 .objects
625 .get(object_id)
626 .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
627 .or_else(|| {
628 let obj: Option<ObjectEntry> = self
629 .store
630 .get_latest_object_or_tombstone(*object_id)
631 .unwrap()
632 .map(|(_, o)| o.into());
633 obj
634 });
635
636 let cache_entry = match &*entry.lock() {
637 LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
638 LatestObjectCacheEntry::NonExistent => None,
639 };
640
641 let tombstone_possibly_pruned = highest.is_none()
644 && cache_entry
645 .as_ref()
646 .map(|e| e.is_tombstone())
647 .unwrap_or(false);
648
649 if highest != cache_entry && !tombstone_possibly_pruned {
650 tracing::error!(
651 ?highest,
652 ?cache_entry,
653 ?tombstone_possibly_pruned,
654 "object_by_id cache is incoherent for {:?}",
655 object_id
656 );
657 panic!("object_by_id cache is incoherent for {:?}", object_id);
658 }
659 }
660 }
661
662 if let Some(entry) = entry {
663 let entry = entry.lock();
664 match &*entry {
665 LatestObjectCacheEntry::Object(latest_version, latest_object) => {
666 self.metrics.record_cache_hit(request_type, "object_by_id");
667 return CacheResult::Hit((*latest_version, latest_object.clone()));
668 }
669 LatestObjectCacheEntry::NonExistent => {
670 self.metrics
671 .record_cache_negative_hit(request_type, "object_by_id");
672 return CacheResult::NegativeHit;
673 }
674 }
675 } else {
676 self.metrics.record_cache_miss(request_type, "object_by_id");
677 }
678
679 Self::with_locked_cache_entries(
680 &self.dirty.objects,
681 &self.cached.object_cache,
682 object_id,
683 |dirty_entry, cached_entry| {
684 check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
685 check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
686 CacheResult::Miss
687 },
688 )
689 }
690
691 fn get_object_by_id_cache_only(
692 &self,
693 request_type: &'static str,
694 object_id: &ObjectID,
695 ) -> CacheResult<(SequenceNumber, Object)> {
696 match self.get_object_entry_by_id_cache_only(request_type, object_id) {
697 CacheResult::Hit((version, entry)) => match entry {
698 ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
699 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
700 },
701 CacheResult::NegativeHit => CacheResult::NegativeHit,
702 CacheResult::Miss => CacheResult::Miss,
703 }
704 }
705
706 fn get_marker_value_cache_only(
707 &self,
708 object_id: &ObjectID,
709 version: SequenceNumber,
710 epoch_id: EpochId,
711 ) -> CacheResult<MarkerValue> {
712 Self::with_locked_cache_entries(
713 &self.dirty.markers,
714 &self.cached.marker_cache,
715 &(epoch_id, *object_id),
716 |dirty_entry, cached_entry| {
717 check_cache_entry_by_version!(
718 self,
719 "marker_by_version",
720 "uncommitted",
721 dirty_entry,
722 version
723 );
724 check_cache_entry_by_version!(
725 self,
726 "marker_by_version",
727 "committed",
728 cached_entry,
729 version
730 );
731 CacheResult::Miss
732 },
733 )
734 }
735
736 fn get_latest_marker_value_cache_only(
737 &self,
738 object_id: &ObjectID,
739 epoch_id: EpochId,
740 ) -> CacheResult<(SequenceNumber, MarkerValue)> {
741 Self::with_locked_cache_entries(
742 &self.dirty.markers,
743 &self.cached.marker_cache,
744 &(epoch_id, *object_id),
745 |dirty_entry, cached_entry| {
746 check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
747 check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
748 CacheResult::Miss
749 },
750 )
751 }
752
753 fn get_object_impl(
754 &self,
755 request_type: &'static str,
756 id: &ObjectID,
757 ) -> IotaResult<Option<Object>> {
758 match self.get_object_by_id_cache_only(request_type, id) {
759 CacheResult::Hit((_, object)) => Ok(Some(object)),
760 CacheResult::NegativeHit => Ok(None),
761 CacheResult::Miss => {
762 let obj = self.store.get_object(id)?;
763 if let Some(obj) = &obj {
764 self.cache_latest_object_by_id(
765 id,
766 LatestObjectCacheEntry::Object(obj.version(), obj.clone().into()),
767 );
768 } else {
769 self.cache_object_not_found(id);
770 }
771 Ok(obj)
772 }
773 }
774 }
775
776 fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
777 self.metrics.record_cache_request(request_type, "db");
778 &self.store
779 }
780
781 fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
782 self.metrics
783 .record_cache_multi_request(request_type, "db", count);
784 &self.store
785 }
786
787 #[instrument(level = "debug", skip_all)]
788 async fn write_transaction_outputs(
789 &self,
790 epoch_id: EpochId,
791 tx_outputs: Arc<TransactionOutputs>,
792 ) -> IotaResult {
793 trace!(digest = ?tx_outputs.transaction.digest(), "writing transaction outputs to cache");
794
795 let TransactionOutputs {
796 transaction,
797 effects,
798 markers,
799 written,
800 deleted,
801 wrapped,
802 events,
803 ..
804 } = &*tx_outputs;
805
806 for ObjectKey(id, version) in deleted.iter() {
812 self.write_object_entry(id, *version, ObjectEntry::Deleted)
813 .await;
814 }
815
816 for ObjectKey(id, version) in wrapped.iter() {
817 self.write_object_entry(id, *version, ObjectEntry::Wrapped)
818 .await;
819 }
820
821 for (object_key, marker_value) in markers.iter() {
823 self.write_marker_value(epoch_id, object_key, *marker_value)
824 .await;
825 }
826
827 for (object_id, object) in written.iter() {
830 if object.is_child_object() {
831 self.write_object_entry(object_id, object.version(), object.clone().into())
832 .await;
833 }
834 }
835 for (object_id, object) in written.iter() {
836 if !object.is_child_object() {
837 self.write_object_entry(object_id, object.version(), object.clone().into())
838 .await;
839 if object.is_package() {
840 debug!("caching package: {:?}", object.compute_object_reference());
841 self.packages
842 .insert(*object_id, PackageObject::new(object.clone()));
843 }
844 }
845 }
846
847 let tx_digest = *transaction.digest();
848 let effects_digest = effects.digest();
849
850 self.metrics.record_cache_write("transaction_block");
851 self.dirty
852 .pending_transaction_writes
853 .insert(tx_digest, tx_outputs.clone());
854
855 self.metrics.record_cache_write("transaction_effects");
858 self.dirty
859 .transaction_effects
860 .insert(effects_digest, effects.clone());
861
862 self.metrics.record_cache_write("transaction_events");
867 match self.dirty.transaction_events.entry(events.digest()) {
868 DashMapEntry::Occupied(mut occupied) => {
869 occupied.get_mut().0.insert(tx_digest);
870 }
871 DashMapEntry::Vacant(entry) => {
872 let mut txns = BTreeSet::new();
873 txns.insert(tx_digest);
874 entry.insert((txns, events.clone()));
875 }
876 }
877
878 self.metrics.record_cache_write("executed_effects_digests");
879 self.dirty
880 .executed_effects_digests
881 .insert(tx_digest, effects_digest);
882
883 self.executed_effects_digests_notify_read
884 .notify(&tx_digest, &effects_digest);
885
886 self.metrics
887 .pending_notify_read
888 .set(self.executed_effects_digests_notify_read.num_pending() as i64);
889
890 Ok(())
891 }
892
893 #[instrument(level = "debug", skip_all)]
895 async fn commit_transaction_outputs(
896 &self,
897 epoch: EpochId,
898 digests: &[TransactionDigest],
899 ) -> IotaResult {
900 fail_point_async!("writeback-cache-commit");
901 trace!(?digests);
902
903 let mut all_outputs = Vec::with_capacity(digests.len());
904 for tx in digests {
905 let Some(outputs) = self
906 .dirty
907 .pending_transaction_writes
908 .get(tx)
909 .map(|o| o.clone())
910 else {
911 warn!("Attempt to commit unknown transaction {:?}", tx);
919 continue;
920 };
921 all_outputs.push(outputs);
922 }
923
924 self.store
928 .write_transaction_outputs(epoch, &all_outputs)
929 .await?;
930
931 for outputs in all_outputs.iter() {
932 let tx_digest = outputs.transaction.digest();
933 assert!(
934 self.dirty
935 .pending_transaction_writes
936 .remove(tx_digest)
937 .is_some()
938 );
939 self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
940 }
941
942 Ok(())
943 }
944
945 fn flush_transactions_from_dirty_to_cached(
946 &self,
947 epoch: EpochId,
948 tx_digest: TransactionDigest,
949 outputs: &TransactionOutputs,
950 ) {
951 let TransactionOutputs {
955 transaction,
956 effects,
957 markers,
958 written,
959 deleted,
960 wrapped,
961 events,
962 ..
963 } = outputs;
964
965 let effects_digest = effects.digest();
966 let events_digest = events.digest();
967
968 self.cached
971 .transactions
972 .insert(tx_digest, transaction.clone());
973 self.cached
974 .transaction_effects
975 .insert(effects_digest, effects.clone().into());
976 self.cached
977 .executed_effects_digests
978 .insert(tx_digest, effects_digest);
979 self.cached
980 .transaction_events
981 .insert(events_digest, events.clone().into());
982
983 self.dirty
984 .transaction_effects
985 .remove(&effects_digest)
986 .expect("effects must exist");
987
988 match self.dirty.transaction_events.entry(events.digest()) {
989 DashMapEntry::Occupied(mut occupied) => {
990 let txns = &mut occupied.get_mut().0;
991 assert!(txns.remove(&tx_digest), "transaction must exist");
992 if txns.is_empty() {
993 occupied.remove();
994 }
995 }
996 DashMapEntry::Vacant(_) => {
997 panic!("events must exist");
998 }
999 }
1000
1001 self.dirty
1002 .executed_effects_digests
1003 .remove(&tx_digest)
1004 .expect("executed effects must exist");
1005
1006 for (object_key, marker_value) in markers.iter() {
1008 Self::move_version_from_dirty_to_cache(
1009 &self.dirty.markers,
1010 &self.cached.marker_cache,
1011 (epoch, object_key.0),
1012 object_key.1,
1013 marker_value,
1014 );
1015 }
1016
1017 for (object_id, object) in written.iter() {
1018 Self::move_version_from_dirty_to_cache(
1019 &self.dirty.objects,
1020 &self.cached.object_cache,
1021 *object_id,
1022 object.version(),
1023 &ObjectEntry::Object(object.clone()),
1024 );
1025 }
1026
1027 for ObjectKey(object_id, version) in deleted.iter() {
1028 Self::move_version_from_dirty_to_cache(
1029 &self.dirty.objects,
1030 &self.cached.object_cache,
1031 *object_id,
1032 *version,
1033 &ObjectEntry::Deleted,
1034 );
1035 }
1036
1037 for ObjectKey(object_id, version) in wrapped.iter() {
1038 Self::move_version_from_dirty_to_cache(
1039 &self.dirty.objects,
1040 &self.cached.object_cache,
1041 *object_id,
1042 *version,
1043 &ObjectEntry::Wrapped,
1044 );
1045 }
1046 }
1047
1048 async fn persist_transactions(&self, digests: &[TransactionDigest]) -> IotaResult {
1049 let mut txns = Vec::with_capacity(digests.len());
1050 for tx_digest in digests {
1051 let Some(tx) = self
1052 .dirty
1053 .pending_transaction_writes
1054 .get(tx_digest)
1055 .map(|o| o.transaction.clone())
1056 else {
1057 debug_assert!(
1059 self.store
1060 .get_transaction_block(tx_digest)
1061 .unwrap()
1062 .is_some()
1063 );
1064 continue;
1068 };
1069
1070 txns.push((*tx_digest, (*tx).clone()));
1071 }
1072
1073 self.store.commit_transactions(&txns)
1074 }
1075
1076 fn move_version_from_dirty_to_cache<K, V>(
1079 dirty: &DashMap<K, CachedVersionMap<V>>,
1080 cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1081 key: K,
1082 version: SequenceNumber,
1083 value: &V,
1084 ) where
1085 K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1086 V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1087 {
1088 static MAX_VERSIONS: usize = 3;
1089
1090 let dirty_entry = dirty.entry(key);
1094 let cache_entry = cache.entry(key).or_default();
1095 let mut cache_map = cache_entry.value().lock();
1096
1097 cache_map.insert(version, value.clone());
1099 cache_map.truncate_to(MAX_VERSIONS);
1101
1102 let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1103 panic!("dirty map must exist");
1104 };
1105
1106 let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1107
1108 assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1109
1110 if occupied_dirty_entry.get().is_empty() {
1112 occupied_dirty_entry.remove();
1113 }
1114 }
1115
1116 fn cache_latest_object_by_id(&self, object_id: &ObjectID, object: LatestObjectCacheEntry) {
1118 trace!("caching object by id: {:?} {:?}", object_id, object);
1119 self.metrics.record_cache_write("object_by_id");
1120 self.cached.object_by_id_cache.insert(object_id, object);
1121 }
1122
1123 fn cache_object_not_found(&self, object_id: &ObjectID) {
1124 self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent);
1125 }
1126
1127 fn clear_state_end_of_epoch_impl(&self, _execution_guard: &ExecutionLockWriteGuard<'_>) {
1128 info!("clearing state at end of epoch");
1129 assert!(
1130 self.dirty.pending_transaction_writes.is_empty(),
1131 "should be empty due to revert_state_update"
1132 );
1133 self.dirty.clear();
1134 info!("clearing old transaction locks");
1135 self.object_locks.clear();
1136 }
1137
1138 fn revert_state_update_impl(&self, tx: &TransactionDigest) -> IotaResult {
1139 let Some((_, outputs)) = self.dirty.pending_transaction_writes.remove(tx) else {
1144 assert!(
1145 !self.is_tx_already_executed(tx).expect("read cannot fail"),
1146 "attempt to revert committed transaction"
1147 );
1148
1149 info!("Not reverting {:?} as it was not executed", tx);
1152 return Ok(());
1153 };
1154
1155 for (object_id, object) in outputs.written.iter() {
1156 if object.is_package() {
1157 info!("removing non-finalized package from cache: {:?}", object_id);
1158 self.packages.invalidate(object_id);
1159 }
1160 self.cached.object_by_id_cache.invalidate(object_id);
1161 }
1162
1163 for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1164 self.cached.object_by_id_cache.invalidate(object_id);
1165 }
1166
1167 Ok(())
1170 }
1171
1172 pub fn clear_caches_and_assert_empty(&self) {
1173 info!("clearing caches");
1174 self.cached.clear_and_assert_empty();
1175 self.packages.invalidate_all();
1176 assert_empty(&self.packages);
1177 }
1178}
1179
1180impl ExecutionCacheAPI for WritebackCache {}
1181
1182impl ExecutionCacheCommit for WritebackCache {
1183 fn commit_transaction_outputs<'a>(
1184 &'a self,
1185 epoch: EpochId,
1186 digests: &'a [TransactionDigest],
1187 ) -> BoxFuture<'a, IotaResult> {
1188 WritebackCache::commit_transaction_outputs(self, epoch, digests).boxed()
1189 }
1190
1191 fn persist_transactions<'a>(
1192 &'a self,
1193 digests: &'a [TransactionDigest],
1194 ) -> BoxFuture<'a, IotaResult> {
1195 WritebackCache::persist_transactions(self, digests).boxed()
1196 }
1197}
1198
1199impl ObjectCacheRead for WritebackCache {
1200 fn get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1201 self.metrics
1202 .record_cache_request("package", "package_cache");
1203 if let Some(p) = self.packages.get(package_id) {
1204 if cfg!(debug_assertions) {
1205 if let Some(store_package) = self.store.get_object(package_id).unwrap() {
1206 assert_eq!(
1207 store_package.digest(),
1208 p.object().digest(),
1209 "Package object cache is inconsistent for package {:?}",
1210 package_id
1211 );
1212 }
1213 }
1214 self.metrics.record_cache_hit("package", "package_cache");
1215 return Ok(Some(p));
1216 } else {
1217 self.metrics.record_cache_miss("package", "package_cache");
1218 }
1219
1220 if let Some(p) = self.get_object_impl("package", package_id)? {
1224 if p.is_package() {
1225 let p = PackageObject::new(p);
1226 tracing::trace!(
1227 "caching package: {:?}",
1228 p.object().compute_object_reference()
1229 );
1230 self.metrics.record_cache_write("package");
1231 self.packages.insert(*package_id, p.clone());
1232 Ok(Some(p))
1233 } else {
1234 Err(IotaError::UserInput {
1235 error: UserInputError::MoveObjectAsPackage {
1236 object_id: *package_id,
1237 },
1238 })
1239 }
1240 } else {
1241 Ok(None)
1242 }
1243 }
1244
1245 fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1246 }
1249
1250 fn get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
1253 self.get_object_impl("object_latest", id)
1254 }
1255
1256 fn get_object_by_key(
1257 &self,
1258 object_id: &ObjectID,
1259 version: SequenceNumber,
1260 ) -> IotaResult<Option<Object>> {
1261 match self.get_object_by_key_cache_only(object_id, version) {
1262 CacheResult::Hit(object) => Ok(Some(object)),
1263 CacheResult::NegativeHit => Ok(None),
1264 CacheResult::Miss => Ok(self
1265 .record_db_get("object_by_version")
1266 .get_object_by_key(object_id, version)?),
1267 }
1268 }
1269
1270 fn multi_get_objects_by_key(
1271 &self,
1272 object_keys: &[ObjectKey],
1273 ) -> Result<Vec<Option<Object>>, IotaError> {
1274 do_fallback_lookup(
1275 object_keys,
1276 |key| {
1277 Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1278 CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1279 CacheResult::NegativeHit => CacheResult::NegativeHit,
1280 CacheResult::Miss => CacheResult::Miss,
1281 })
1282 },
1283 |remaining| {
1284 self.record_db_multi_get("object_by_version", remaining.len())
1285 .multi_get_objects_by_key(remaining)
1286 },
1287 )
1288 }
1289
1290 fn object_exists_by_key(
1291 &self,
1292 object_id: &ObjectID,
1293 version: SequenceNumber,
1294 ) -> IotaResult<bool> {
1295 match self.get_object_by_key_cache_only(object_id, version) {
1296 CacheResult::Hit(_) => Ok(true),
1297 CacheResult::NegativeHit => Ok(false),
1298 CacheResult::Miss => self
1299 .record_db_get("object_by_version")
1300 .object_exists_by_key(object_id, version),
1301 }
1302 }
1303
1304 fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
1305 do_fallback_lookup(
1306 object_keys,
1307 |key| {
1308 Ok(match self.get_object_by_key_cache_only(&key.0, key.1) {
1309 CacheResult::Hit(_) => CacheResult::Hit(true),
1310 CacheResult::NegativeHit => CacheResult::Hit(false),
1311 CacheResult::Miss => CacheResult::Miss,
1312 })
1313 },
1314 |remaining| {
1315 self.record_db_multi_get("object_by_version", remaining.len())
1316 .multi_object_exists_by_key(remaining)
1317 },
1318 )
1319 }
1320
1321 fn get_latest_object_ref_or_tombstone(
1322 &self,
1323 object_id: ObjectID,
1324 ) -> IotaResult<Option<ObjectRef>> {
1325 match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1326 CacheResult::Hit((version, entry)) => Ok(Some(match entry {
1327 ObjectEntry::Object(object) => object.compute_object_reference(),
1328 ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1329 ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1330 })),
1331 CacheResult::NegativeHit => Ok(None),
1332 CacheResult::Miss => self
1333 .record_db_get("latest_objref_or_tombstone")
1334 .get_latest_object_ref_or_tombstone(object_id),
1335 }
1336 }
1337
1338 fn get_latest_object_or_tombstone(
1339 &self,
1340 object_id: ObjectID,
1341 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
1342 match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1343 CacheResult::Hit((version, entry)) => {
1344 let key = ObjectKey(object_id, version);
1345 Ok(Some(match entry {
1346 ObjectEntry::Object(object) => (key, object.into()),
1347 ObjectEntry::Deleted => (
1348 key,
1349 ObjectOrTombstone::Tombstone((
1350 object_id,
1351 version,
1352 ObjectDigest::OBJECT_DIGEST_DELETED,
1353 )),
1354 ),
1355 ObjectEntry::Wrapped => (
1356 key,
1357 ObjectOrTombstone::Tombstone((
1358 object_id,
1359 version,
1360 ObjectDigest::OBJECT_DIGEST_WRAPPED,
1361 )),
1362 ),
1363 }))
1364 }
1365 CacheResult::NegativeHit => Ok(None),
1366 CacheResult::Miss => self
1367 .record_db_get("latest_object_or_tombstone")
1368 .get_latest_object_or_tombstone(object_id),
1369 }
1370 }
1371
1372 #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1373 fn find_object_lt_or_eq_version(
1374 &self,
1375 object_id: ObjectID,
1376 version_bound: SequenceNumber,
1377 ) -> IotaResult<Option<Object>> {
1378 macro_rules! check_cache_entry {
1379 ($level: expr, $objects: expr) => {
1380 self.metrics
1381 .record_cache_request("object_lt_or_eq_version", $level);
1382 if let Some(objects) = $objects {
1383 if let Some((_, object)) = objects
1384 .all_versions_lt_or_eq_descending(&version_bound)
1385 .next()
1386 {
1387 if let ObjectEntry::Object(object) = object {
1388 self.metrics
1389 .record_cache_hit("object_lt_or_eq_version", $level);
1390 return Ok(Some(object.clone()));
1391 } else {
1392 self.metrics
1394 .record_cache_negative_hit("object_lt_or_eq_version", $level);
1395 return Ok(None);
1396 }
1397 } else {
1398 self.metrics
1399 .record_cache_miss("object_lt_or_eq_version", $level);
1400 }
1401 }
1402 };
1403 }
1404
1405 self.metrics
1407 .record_cache_request("object_lt_or_eq_version", "object_by_id");
1408 if let Some(latest) = self.cached.object_by_id_cache.get(&object_id) {
1409 let latest = latest.lock();
1410 match &*latest {
1411 LatestObjectCacheEntry::Object(latest_version, object) => {
1412 if *latest_version <= version_bound {
1413 if let ObjectEntry::Object(object) = object {
1414 self.metrics
1415 .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1416 return Ok(Some(object.clone()));
1417 } else {
1418 self.metrics.record_cache_negative_hit(
1420 "object_lt_or_eq_version",
1421 "object_by_id",
1422 );
1423 return Ok(None);
1424 }
1425 }
1426 }
1429 LatestObjectCacheEntry::NonExistent => {
1431 self.metrics
1432 .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1433 return Ok(None);
1434 }
1435 }
1436 }
1437 self.metrics
1438 .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1439
1440 Self::with_locked_cache_entries(
1441 &self.dirty.objects,
1442 &self.cached.object_cache,
1443 &object_id,
1444 |dirty_entry, cached_entry| {
1445 check_cache_entry!("committed", dirty_entry);
1446 check_cache_entry!("uncommitted", cached_entry);
1447
1448 let latest: Option<(SequenceNumber, ObjectEntry)> =
1472 if let Some(dirty_set) = dirty_entry {
1473 dirty_set
1474 .get_highest()
1475 .cloned()
1476 .tap_none(|| panic!("dirty set cannot be empty"))
1477 } else {
1478 self.record_db_get("object_lt_or_eq_version_latest")
1479 .get_latest_object_or_tombstone(object_id)?
1480 .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1481 (version, ObjectEntry::from(obj_or_tombstone))
1482 })
1483 };
1484
1485 if let Some((obj_version, obj_entry)) = latest {
1486 self.cache_latest_object_by_id(
1491 &object_id,
1492 LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1493 );
1494
1495 if obj_version <= version_bound {
1496 match obj_entry {
1497 ObjectEntry::Object(object) => Ok(Some(object)),
1498 ObjectEntry::Deleted | ObjectEntry::Wrapped => Ok(None),
1499 }
1500 } else {
1501 self.record_db_get("object_lt_or_eq_version_scan")
1505 .find_object_lt_or_eq_version(object_id, version_bound)
1506 }
1507 } else {
1508 let highest = cached_entry.and_then(|c| c.get_highest());
1514 assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1515 self.cache_object_not_found(&object_id);
1516 Ok(None)
1517 }
1518 },
1519 )
1520 }
1521
1522 fn get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
1523 get_iota_system_state(self)
1524 }
1525
1526 fn get_bridge_object_unsafe(&self) -> IotaResult<Bridge> {
1527 get_bridge(self)
1528 }
1529
1530 fn get_marker_value(
1531 &self,
1532 object_id: &ObjectID,
1533 version: SequenceNumber,
1534 epoch_id: EpochId,
1535 ) -> IotaResult<Option<MarkerValue>> {
1536 match self.get_marker_value_cache_only(object_id, version, epoch_id) {
1537 CacheResult::Hit(marker) => Ok(Some(marker)),
1538 CacheResult::NegativeHit => Ok(None),
1539 CacheResult::Miss => self
1540 .record_db_get("marker_by_version")
1541 .get_marker_value(object_id, &version, epoch_id),
1542 }
1543 }
1544
1545 fn get_latest_marker(
1546 &self,
1547 object_id: &ObjectID,
1548 epoch_id: EpochId,
1549 ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
1550 match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1551 CacheResult::Hit((v, marker)) => Ok(Some((v, marker))),
1552 CacheResult::NegativeHit => {
1553 panic!("cannot have negative hit when getting latest marker")
1554 }
1555 CacheResult::Miss => self
1556 .record_db_get("marker_latest")
1557 .get_latest_marker(object_id, epoch_id),
1558 }
1559 }
1560
1561 fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> IotaLockResult {
1562 match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1563 CacheResult::Hit((_, obj)) => {
1564 let actual_objref = obj.compute_object_reference();
1565 if obj_ref != actual_objref {
1566 Ok(ObjectLockStatus::LockedAtDifferentVersion {
1567 locked_ref: actual_objref,
1568 })
1569 } else {
1570 Ok(
1572 match self
1573 .object_locks
1574 .get_transaction_lock(&obj_ref, epoch_store)?
1575 {
1576 Some(tx_digest) => ObjectLockStatus::LockedToTx {
1577 locked_by_tx: tx_digest,
1578 },
1579 None => ObjectLockStatus::Initialized,
1580 },
1581 )
1582 }
1583 }
1584 CacheResult::NegativeHit => {
1585 Err(IotaError::from(UserInputError::ObjectNotFound {
1586 object_id: obj_ref.0,
1587 version: None,
1590 }))
1591 }
1592 CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1593 }
1594 }
1595
1596 fn _get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
1597 let obj = self.get_object_impl("live_objref", &object_id)?.ok_or(
1598 UserInputError::ObjectNotFound {
1599 object_id,
1600 version: None,
1601 },
1602 )?;
1603 Ok(obj.compute_object_reference())
1604 }
1605
1606 fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1607 do_fallback_lookup(
1608 owned_object_refs,
1609 |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1610 CacheResult::Hit((version, obj)) => {
1611 if obj.compute_object_reference() != *obj_ref {
1612 Err(UserInputError::ObjectVersionUnavailableForConsumption {
1613 provided_obj_ref: *obj_ref,
1614 current_version: version,
1615 }
1616 .into())
1617 } else {
1618 Ok(CacheResult::Hit(()))
1619 }
1620 }
1621 CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1622 object_id: obj_ref.0,
1623 version: None,
1624 }
1625 .into()),
1626 CacheResult::Miss => Ok(CacheResult::Miss),
1627 },
1628 |remaining| {
1629 self.record_db_multi_get("object_is_live", remaining.len())
1630 .check_owned_objects_are_live(remaining)?;
1631 Ok(vec![(); remaining.len()])
1632 },
1633 )?;
1634 Ok(())
1635 }
1636
1637 fn get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
1638 self.store.perpetual_tables.get_highest_pruned_checkpoint()
1639 }
1640}
1641
1642impl TransactionCacheRead for WritebackCache {
1643 fn multi_get_transaction_blocks(
1644 &self,
1645 digests: &[TransactionDigest],
1646 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
1647 do_fallback_lookup(
1648 digests,
1649 |digest| {
1650 self.metrics
1651 .record_cache_request("transaction_block", "uncommitted");
1652 if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1653 self.metrics
1654 .record_cache_hit("transaction_block", "uncommitted");
1655 return Ok(CacheResult::Hit(Some(tx.transaction.clone())));
1656 }
1657 self.metrics
1658 .record_cache_miss("transaction_block", "uncommitted");
1659
1660 self.metrics
1661 .record_cache_request("transaction_block", "committed");
1662 if let Some(tx) = self.cached.transactions.get(digest) {
1663 self.metrics
1664 .record_cache_hit("transaction_block", "committed");
1665 return Ok(CacheResult::Hit(Some(tx.clone())));
1666 }
1667 self.metrics
1668 .record_cache_miss("transaction_block", "committed");
1669
1670 Ok(CacheResult::Miss)
1671 },
1672 |remaining| {
1673 self.record_db_multi_get("transaction_block", remaining.len())
1674 .multi_get_transaction_blocks(remaining)
1675 .map(|v| v.into_iter().map(|o| o.map(Arc::new)).collect())
1676 },
1677 )
1678 }
1679
1680 fn multi_get_executed_effects_digests(
1681 &self,
1682 digests: &[TransactionDigest],
1683 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
1684 do_fallback_lookup(
1685 digests,
1686 |digest| {
1687 self.metrics
1688 .record_cache_request("executed_effects_digests", "uncommitted");
1689 if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
1690 self.metrics
1691 .record_cache_hit("executed_effects_digests", "uncommitted");
1692 return Ok(CacheResult::Hit(Some(*digest)));
1693 }
1694 self.metrics
1695 .record_cache_miss("executed_effects_digests", "uncommitted");
1696
1697 self.metrics
1698 .record_cache_request("executed_effects_digests", "committed");
1699 if let Some(digest) = self.cached.executed_effects_digests.get(digest) {
1700 self.metrics
1701 .record_cache_hit("executed_effects_digests", "committed");
1702 return Ok(CacheResult::Hit(Some(digest)));
1703 }
1704 self.metrics
1705 .record_cache_miss("executed_effects_digests", "committed");
1706
1707 Ok(CacheResult::Miss)
1708 },
1709 |remaining| {
1710 self.record_db_multi_get("executed_effects_digests", remaining.len())
1711 .multi_get_executed_effects_digests(remaining)
1712 },
1713 )
1714 }
1715
1716 fn multi_get_effects(
1717 &self,
1718 digests: &[TransactionEffectsDigest],
1719 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
1720 do_fallback_lookup(
1721 digests,
1722 |digest| {
1723 self.metrics
1724 .record_cache_request("transaction_effects", "uncommitted");
1725 if let Some(effects) = self.dirty.transaction_effects.get(digest) {
1726 self.metrics
1727 .record_cache_hit("transaction_effects", "uncommitted");
1728 return Ok(CacheResult::Hit(Some(effects.clone())));
1729 }
1730 self.metrics
1731 .record_cache_miss("transaction_effects", "uncommitted");
1732
1733 self.metrics
1734 .record_cache_request("transaction_effects", "committed");
1735 if let Some(effects) = self.cached.transaction_effects.get(digest) {
1736 self.metrics
1737 .record_cache_hit("transaction_effects", "committed");
1738 return Ok(CacheResult::Hit(Some((*effects).clone())));
1739 }
1740 self.metrics
1741 .record_cache_miss("transaction_effects", "committed");
1742
1743 Ok(CacheResult::Miss)
1744 },
1745 |remaining| {
1746 self.record_db_multi_get("transaction_effects", remaining.len())
1747 .multi_get_effects(remaining.iter())
1748 },
1749 )
1750 }
1751
1752 fn notify_read_executed_effects_digests<'a>(
1753 &'a self,
1754 digests: &'a [TransactionDigest],
1755 ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
1756 self.executed_effects_digests_notify_read
1757 .read(digests, |digests| {
1758 self.multi_get_executed_effects_digests(digests)
1759 })
1760 .boxed()
1761 }
1762
1763 fn multi_get_events(
1764 &self,
1765 event_digests: &[TransactionEventsDigest],
1766 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
1767 fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
1768 if events.data.is_empty() {
1769 None
1770 } else {
1771 Some(events)
1772 }
1773 }
1774
1775 do_fallback_lookup(
1776 event_digests,
1777 |digest| {
1778 self.metrics
1779 .record_cache_request("transaction_events", "uncommitted");
1780 if let Some(events) = self
1781 .dirty
1782 .transaction_events
1783 .get(digest)
1784 .map(|e| e.1.clone())
1785 {
1786 self.metrics
1787 .record_cache_hit("transaction_events", "uncommitted");
1788
1789 return Ok(CacheResult::Hit(map_events(events)));
1790 }
1791 self.metrics
1792 .record_cache_miss("transaction_events", "uncommitted");
1793
1794 self.metrics
1795 .record_cache_request("transaction_events", "committed");
1796 if let Some(events) = self
1797 .cached
1798 .transaction_events
1799 .get(digest)
1800 .map(|e| (*e).clone())
1801 {
1802 self.metrics
1803 .record_cache_hit("transaction_events", "committed");
1804 return Ok(CacheResult::Hit(map_events(events)));
1805 }
1806
1807 self.metrics
1808 .record_cache_miss("transaction_events", "committed");
1809
1810 Ok(CacheResult::Miss)
1811 },
1812 |digests| self.store.multi_get_events(digests),
1813 )
1814 }
1815}
1816
1817impl ExecutionCacheWrite for WritebackCache {
1818 fn acquire_transaction_locks<'a>(
1819 &'a self,
1820 epoch_store: &'a AuthorityPerEpochStore,
1821 owned_input_objects: &'a [ObjectRef],
1822 transaction: VerifiedSignedTransaction,
1823 ) -> BoxFuture<'a, IotaResult> {
1824 self.object_locks
1825 .acquire_transaction_locks(self, epoch_store, owned_input_objects, transaction)
1826 .boxed()
1827 }
1828
1829 fn write_transaction_outputs(
1830 &self,
1831 epoch_id: EpochId,
1832 tx_outputs: Arc<TransactionOutputs>,
1833 ) -> BoxFuture<'_, IotaResult> {
1834 WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs).boxed()
1835 }
1836}
1837
1838fn do_fallback_lookup<K: Copy, V: Default + Clone>(
1847 keys: &[K],
1848 get_cached_key: impl Fn(&K) -> IotaResult<CacheResult<V>>,
1849 multiget_fallback: impl Fn(&[K]) -> IotaResult<Vec<V>>,
1850) -> IotaResult<Vec<V>> {
1851 let mut results = vec![V::default(); keys.len()];
1852 let mut fallback_keys = Vec::with_capacity(keys.len());
1853 let mut fallback_indices = Vec::with_capacity(keys.len());
1854
1855 for (i, key) in keys.iter().enumerate() {
1856 match get_cached_key(key)? {
1857 CacheResult::Miss => {
1858 fallback_keys.push(*key);
1859 fallback_indices.push(i);
1860 }
1861 CacheResult::NegativeHit => (),
1862 CacheResult::Hit(value) => {
1863 results[i] = value;
1864 }
1865 }
1866 }
1867
1868 let fallback_results = multiget_fallback(&fallback_keys)?;
1869 assert_eq!(fallback_results.len(), fallback_indices.len());
1870 assert_eq!(fallback_results.len(), fallback_keys.len());
1871
1872 for (i, result) in fallback_indices
1873 .into_iter()
1874 .zip(fallback_results.into_iter())
1875 {
1876 results[i] = result;
1877 }
1878 Ok(results)
1879}
1880
1881implement_passthrough_traits!(WritebackCache);
1882
1883impl AccumulatorStore for WritebackCache {
1884 fn get_root_state_accumulator_for_epoch(
1885 &self,
1886 epoch: EpochId,
1887 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
1888 self.store.get_root_state_accumulator_for_epoch(epoch)
1889 }
1890
1891 fn get_root_state_accumulator_for_highest_epoch(
1892 &self,
1893 ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
1894 self.store.get_root_state_accumulator_for_highest_epoch()
1895 }
1896
1897 fn insert_state_accumulator_for_epoch(
1898 &self,
1899 epoch: EpochId,
1900 checkpoint_seq_num: &CheckpointSequenceNumber,
1901 acc: &Accumulator,
1902 ) -> IotaResult {
1903 self.store
1904 .insert_state_accumulator_for_epoch(epoch, checkpoint_seq_num, acc)
1905 }
1906
1907 fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1908 assert!(
1912 self.dirty.is_empty(),
1913 "cannot iterate live object set with dirty data"
1914 );
1915 self.store.iter_live_object_set()
1916 }
1917
1918 fn iter_cached_live_object_set_for_testing(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1922 let iter = self.dirty.objects.iter();
1924 let mut dirty_objects = BTreeMap::new();
1925
1926 for obj in self.store.iter_live_object_set() {
1928 dirty_objects.insert(obj.object_id(), obj);
1929 }
1930
1931 for entry in iter {
1933 let id = *entry.key();
1934 let value = entry.value();
1935 match value.get_highest().unwrap() {
1936 (_, ObjectEntry::Object(object)) => {
1937 dirty_objects.insert(id, LiveObject::Normal(object.clone()));
1938 }
1939 (_version, ObjectEntry::Wrapped) => {
1940 dirty_objects.remove(&id);
1941 }
1942 (_, ObjectEntry::Deleted) => {
1943 dirty_objects.remove(&id);
1944 }
1945 }
1946 }
1947
1948 Box::new(dirty_objects.into_values())
1949 }
1950}