1use std::{sync::Arc, time::Duration};
6
7use futures::{FutureExt, future::BoxFuture};
8use iota_config::node::ExecutionCacheTypeAtomicU8;
9use iota_types::{
10 accumulator::Accumulator,
11 base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
12 digests::{TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
13 effects::{TransactionEffects, TransactionEvents},
14 error::{IotaError, IotaResult},
15 executable_transaction::VerifiedExecutableTransaction,
16 iota_system_state::IotaSystemState,
17 messages_checkpoint::CheckpointSequenceNumber,
18 object::Object,
19 storage::{MarkerValue, ObjectKey, ObjectOrTombstone, PackageObject},
20 transaction::{VerifiedSignedTransaction, VerifiedTransaction},
21};
22use tracing::instrument;
23
24use super::{
25 CheckpointCache, ExecutionCacheCommit, ExecutionCacheConfig, ExecutionCacheMetrics,
26 ExecutionCacheReconfigAPI, ExecutionCacheType, ExecutionCacheWrite, ObjectCacheRead,
27 PassthroughCache, StateSyncAPI, TestingAPI, TransactionCacheRead, WritebackCache,
28};
29use crate::{
30 authority::{
31 AuthorityStore,
32 authority_per_epoch_store::AuthorityPerEpochStore,
33 authority_store::{ExecutionLockWriteGuard, IotaLockResult},
34 backpressure::BackpressureManager,
35 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
36 },
37 state_accumulator::AccumulatorStore,
38 transaction_outputs::TransactionOutputs,
39};
40
41macro_rules! delegate_method {
42 ($self:ident.$method:ident($($args:ident),*)) => {
43 match ExecutionCacheType::from(&$self.mode.load(std::sync::atomic::Ordering::Acquire)) {
44 ExecutionCacheType::PassthroughCache => $self.passthrough_cache.$method($($args),*),
45 ExecutionCacheType::WritebackCache => $self.writeback_cache.$method($($args),*),
46 }
47 };
48}
49
50pub struct ProxyCache {
51 passthrough_cache: PassthroughCache,
58 writeback_cache: WritebackCache,
59 mode: ExecutionCacheTypeAtomicU8,
60}
61
62impl ProxyCache {
63 pub fn new(
64 cache_config: &ExecutionCacheConfig,
65 epoch_start_config: &EpochStartConfiguration,
66 store: Arc<AuthorityStore>,
67 metrics: Arc<ExecutionCacheMetrics>,
68 backpressure_manager: Arc<BackpressureManager>,
69 ) -> Self {
70 let cache_type = epoch_start_config.execution_cache_type();
71 tracing::info!("using cache impl {:?}", cache_type);
72 let passthrough_cache = PassthroughCache::new(store.clone(), metrics.clone());
73
74 let writeback_cache = WritebackCache::new(
75 &cache_config.writeback_cache,
76 store.clone(),
77 metrics.clone(),
78 backpressure_manager,
79 );
80
81 Self {
82 passthrough_cache,
83 writeback_cache,
84 mode: cache_type.into(),
85 }
86 }
87
88 async fn reconfigure_cache_impl(&self, epoch_start_config: &EpochStartConfiguration) {
89 let cache_type = epoch_start_config.execution_cache_type();
90 tracing::info!("switching to cache impl {:?}", cache_type);
91 if matches!(cache_type, ExecutionCacheType::PassthroughCache) {
92 tokio::time::sleep(Duration::from_nanos(100)).await;
107 self.writeback_cache.clear_caches_and_assert_empty();
108 }
109 self.mode
110 .store(cache_type.into(), std::sync::atomic::Ordering::Release);
111 }
112}
113
114impl ObjectCacheRead for ProxyCache {
115 fn try_get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
116 delegate_method!(self.try_get_package_object(package_id))
117 }
118
119 fn force_reload_system_packages(&self, system_package_ids: &[ObjectID]) {
120 delegate_method!(self.force_reload_system_packages(system_package_ids))
121 }
122
123 fn try_get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
124 delegate_method!(self.try_get_object(id))
125 }
126
127 fn try_get_object_by_key(
128 &self,
129 object_id: &ObjectID,
130 version: SequenceNumber,
131 ) -> IotaResult<Option<Object>> {
132 delegate_method!(self.try_get_object_by_key(object_id, version))
133 }
134
135 fn try_multi_get_objects_by_key(
136 &self,
137 object_keys: &[ObjectKey],
138 ) -> Result<Vec<Option<Object>>, IotaError> {
139 delegate_method!(self.try_multi_get_objects_by_key(object_keys))
140 }
141
142 fn try_object_exists_by_key(
143 &self,
144 object_id: &ObjectID,
145 version: SequenceNumber,
146 ) -> IotaResult<bool> {
147 delegate_method!(self.try_object_exists_by_key(object_id, version))
148 }
149
150 fn try_multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
151 delegate_method!(self.try_multi_object_exists_by_key(object_keys))
152 }
153
154 fn try_get_latest_object_ref_or_tombstone(
155 &self,
156 object_id: ObjectID,
157 ) -> IotaResult<Option<ObjectRef>> {
158 delegate_method!(self.try_get_latest_object_ref_or_tombstone(object_id))
159 }
160
161 fn try_get_latest_object_or_tombstone(
162 &self,
163 object_id: ObjectID,
164 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
165 delegate_method!(self.try_get_latest_object_or_tombstone(object_id))
166 }
167
168 fn try_find_object_lt_or_eq_version(
169 &self,
170 object_id: ObjectID,
171 version: SequenceNumber,
172 ) -> IotaResult<Option<Object>> {
173 delegate_method!(self.try_find_object_lt_or_eq_version(object_id, version))
174 }
175
176 fn try_get_lock(
177 &self,
178 obj_ref: ObjectRef,
179 epoch_store: &AuthorityPerEpochStore,
180 ) -> IotaLockResult {
181 delegate_method!(self.try_get_lock(obj_ref, epoch_store))
182 }
183
184 fn _try_get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
185 delegate_method!(self._try_get_live_objref(object_id))
186 }
187
188 fn try_check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
189 delegate_method!(self.try_check_owned_objects_are_live(owned_object_refs))
190 }
191
192 fn try_get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
193 delegate_method!(self.try_get_iota_system_state_object_unsafe())
194 }
195
196 fn try_get_marker_value(
197 &self,
198 object_id: &ObjectID,
199 version: SequenceNumber,
200 epoch_id: EpochId,
201 ) -> IotaResult<Option<MarkerValue>> {
202 delegate_method!(self.try_get_marker_value(object_id, version, epoch_id))
203 }
204
205 fn try_get_latest_marker(
206 &self,
207 object_id: &ObjectID,
208 epoch_id: EpochId,
209 ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
210 delegate_method!(self.try_get_latest_marker(object_id, epoch_id))
211 }
212
213 fn try_get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
214 delegate_method!(self.try_get_highest_pruned_checkpoint())
215 }
216}
217
218impl TransactionCacheRead for ProxyCache {
219 #[instrument(level = "trace", skip_all)]
220 fn try_multi_get_transaction_blocks(
221 &self,
222 digests: &[TransactionDigest],
223 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
224 delegate_method!(self.try_multi_get_transaction_blocks(digests))
225 }
226
227 #[instrument(level = "trace", skip_all)]
228 fn try_multi_get_executed_effects_digests(
229 &self,
230 digests: &[TransactionDigest],
231 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
232 delegate_method!(self.try_multi_get_executed_effects_digests(digests))
233 }
234
235 #[instrument(level = "trace", skip_all)]
236 fn try_multi_get_effects(
237 &self,
238 digests: &[TransactionEffectsDigest],
239 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
240 delegate_method!(self.try_multi_get_effects(digests))
241 }
242
243 #[instrument(level = "trace", skip_all)]
244 fn try_notify_read_executed_effects_digests<'a>(
245 &'a self,
246 digests: &'a [TransactionDigest],
247 ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
248 delegate_method!(self.try_notify_read_executed_effects_digests(digests))
249 }
250
251 #[instrument(level = "trace", skip_all)]
252 fn try_multi_get_events(
253 &self,
254 event_digests: &[TransactionEventsDigest],
255 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
256 delegate_method!(self.try_multi_get_events(event_digests))
257 }
258}
259
260impl ExecutionCacheWrite for ProxyCache {
261 fn try_write_transaction_outputs(
262 &self,
263 epoch_id: EpochId,
264 tx_outputs: Arc<TransactionOutputs>,
265 ) -> IotaResult {
266 delegate_method!(self.try_write_transaction_outputs(epoch_id, tx_outputs))
267 }
268
269 #[instrument(level = "trace", skip_all)]
270 fn try_acquire_transaction_locks<'a>(
271 &'a self,
272 epoch_store: &'a AuthorityPerEpochStore,
273 owned_input_objects: &'a [ObjectRef],
274 transaction: VerifiedSignedTransaction,
275 ) -> IotaResult {
276 delegate_method!(self.try_acquire_transaction_locks(
277 epoch_store,
278 owned_input_objects,
279 transaction
280 ))
281 }
282}
283
284impl AccumulatorStore for ProxyCache {
285 fn get_root_state_accumulator_for_epoch(
286 &self,
287 epoch: EpochId,
288 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
289 delegate_method!(self.get_root_state_accumulator_for_epoch(epoch))
290 }
291
292 fn get_root_state_accumulator_for_highest_epoch(
293 &self,
294 ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
295 delegate_method!(self.get_root_state_accumulator_for_highest_epoch())
296 }
297
298 fn insert_state_accumulator_for_epoch(
299 &self,
300 epoch: EpochId,
301 checkpoint_seq_num: &CheckpointSequenceNumber,
302 acc: &Accumulator,
303 ) -> IotaResult {
304 delegate_method!(self.insert_state_accumulator_for_epoch(epoch, checkpoint_seq_num, acc))
305 }
306
307 fn iter_live_object_set(
308 &self,
309 ) -> Box<dyn Iterator<Item = crate::authority::authority_store_tables::LiveObject> + '_> {
310 delegate_method!(self.iter_live_object_set())
311 }
312
313 fn iter_cached_live_object_set_for_testing(
314 &self,
315 ) -> Box<dyn Iterator<Item = crate::authority::authority_store_tables::LiveObject> + '_> {
316 delegate_method!(self.iter_cached_live_object_set_for_testing())
317 }
318}
319
320impl ExecutionCacheCommit for ProxyCache {
321 fn try_commit_transaction_outputs(
322 &self,
323 epoch: EpochId,
324 digests: &[TransactionDigest],
325 ) -> IotaResult {
326 delegate_method!(self.try_commit_transaction_outputs(epoch, digests))
327 }
328
329 fn try_persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> IotaResult {
330 delegate_method!(self.try_persist_transaction(tx))
331 }
332
333 fn approximate_pending_transaction_count(&self) -> u64 {
334 delegate_method!(self.approximate_pending_transaction_count())
335 }
336}
337
338impl CheckpointCache for ProxyCache {
339 fn try_get_transaction_perpetual_checkpoint(
340 &self,
341 digest: &TransactionDigest,
342 ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
343 delegate_method!(self.try_get_transaction_perpetual_checkpoint(digest))
344 }
345
346 fn try_multi_get_transactions_perpetual_checkpoints(
347 &self,
348 digests: &[TransactionDigest],
349 ) -> IotaResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
350 delegate_method!(self.try_multi_get_transactions_perpetual_checkpoints(digests))
351 }
352
353 fn try_insert_finalized_transactions_perpetual_checkpoints(
354 &self,
355 digests: &[TransactionDigest],
356 epoch: EpochId,
357 sequence: CheckpointSequenceNumber,
358 ) -> IotaResult {
359 delegate_method!(
360 self.try_insert_finalized_transactions_perpetual_checkpoints(digests, epoch, sequence)
361 )
362 }
363}
364
365impl ExecutionCacheReconfigAPI for ProxyCache {
366 fn try_insert_genesis_object(&self, object: Object) -> IotaResult {
367 delegate_method!(self.try_insert_genesis_object(object))
368 }
369
370 fn try_bulk_insert_genesis_objects(&self, objects: &[Object]) -> IotaResult {
371 delegate_method!(self.try_bulk_insert_genesis_objects(objects))
372 }
373
374 fn try_revert_state_update(&self, digest: &TransactionDigest) -> IotaResult {
375 delegate_method!(self.try_revert_state_update(digest))
376 }
377
378 fn try_set_epoch_start_configuration(
379 &self,
380 epoch_start_config: &EpochStartConfiguration,
381 ) -> IotaResult {
382 delegate_method!(self.try_set_epoch_start_configuration(epoch_start_config))
383 }
384
385 fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
386 delegate_method!(self.update_epoch_flags_metrics(old, new))
387 }
388
389 fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
390 delegate_method!(self.clear_state_end_of_epoch(execution_guard))
391 }
392
393 fn try_expensive_check_iota_conservation(
394 &self,
395 old_epoch_store: &AuthorityPerEpochStore,
396 epoch_supply_change: Option<i64>,
397 ) -> IotaResult {
398 delegate_method!(
399 self.try_expensive_check_iota_conservation(old_epoch_store, epoch_supply_change)
400 )
401 }
402
403 fn try_checkpoint_db(&self, path: &std::path::Path) -> IotaResult {
404 delegate_method!(self.try_checkpoint_db(path))
405 }
406
407 fn reconfigure_cache<'a>(
408 &'a self,
409 epoch_start_config: &'a EpochStartConfiguration,
410 ) -> BoxFuture<'a, ()> {
411 self.reconfigure_cache_impl(epoch_start_config).boxed()
412 }
413}
414
415impl StateSyncAPI for ProxyCache {
416 fn try_insert_transaction_and_effects(
417 &self,
418 transaction: &VerifiedTransaction,
419 transaction_effects: &TransactionEffects,
420 ) -> IotaResult {
421 delegate_method!(self.try_insert_transaction_and_effects(transaction, transaction_effects))
422 }
423
424 fn try_multi_insert_transaction_and_effects(
425 &self,
426 transactions_and_effects: &[VerifiedExecutionData],
427 ) -> IotaResult {
428 delegate_method!(self.try_multi_insert_transaction_and_effects(transactions_and_effects))
429 }
430}
431
432impl TestingAPI for ProxyCache {
433 fn database_for_testing(&self) -> Arc<AuthorityStore> {
434 delegate_method!(self.database_for_testing())
435 }
436}