1use std::{sync::Arc, time::Duration};
6
7use futures::{FutureExt, future::BoxFuture};
8use iota_types::{
9 accumulator::Accumulator,
10 base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData},
11 bridge::Bridge,
12 digests::{TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest},
13 effects::{TransactionEffects, TransactionEvents},
14 error::{IotaError, IotaResult},
15 iota_system_state::IotaSystemState,
16 messages_checkpoint::CheckpointSequenceNumber,
17 object::Object,
18 storage::{MarkerValue, ObjectKey, ObjectOrTombstone, PackageObject},
19 transaction::{VerifiedSignedTransaction, VerifiedTransaction},
20};
21use parking_lot::RwLock;
22
23use super::{
24 CheckpointCache, ExecutionCacheCommit, ExecutionCacheConfigType, ExecutionCacheMetrics,
25 ExecutionCacheReconfigAPI, ExecutionCacheWrite, ObjectCacheRead, PassthroughCache,
26 StateSyncAPI, TestingAPI, TransactionCacheRead, WritebackCache,
27};
28use crate::{
29 authority::{
30 AuthorityStore,
31 authority_per_epoch_store::AuthorityPerEpochStore,
32 authority_store::{ExecutionLockWriteGuard, IotaLockResult},
33 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
34 },
35 state_accumulator::AccumulatorStore,
36 transaction_outputs::TransactionOutputs,
37};
38
39macro_rules! delegate_method {
40 ($self:ident.$method:ident($($args:ident),*)) => {
41 match *$self.mode.read() {
42 ExecutionCacheConfigType::PassthroughCache => $self.passthrough_cache.$method($($args),*),
43 ExecutionCacheConfigType::WritebackCache => $self.writeback_cache.$method($($args),*),
44 }
45 };
46}
47
48pub struct ProxyCache {
49 passthrough_cache: PassthroughCache,
56 writeback_cache: WritebackCache,
57 mode: RwLock<ExecutionCacheConfigType>,
58}
59
60impl ProxyCache {
61 pub fn new(
62 epoch_start_config: &EpochStartConfiguration,
63 store: Arc<AuthorityStore>,
64 metrics: Arc<ExecutionCacheMetrics>,
65 ) -> Self {
66 let cache_type = epoch_start_config.execution_cache_type();
67 tracing::info!("using cache impl {:?}", cache_type);
68 let passthrough_cache = PassthroughCache::new(store.clone(), metrics.clone());
69 let writeback_cache = WritebackCache::new(store.clone(), metrics.clone());
70
71 Self {
72 passthrough_cache,
73 writeback_cache,
74 mode: RwLock::new(cache_type),
75 }
76 }
77
78 async fn reconfigure_cache_impl(&self, epoch_start_config: &EpochStartConfiguration) {
79 let cache_type = epoch_start_config.execution_cache_type();
80 tracing::info!("switching to cache impl {:?}", cache_type);
81 if matches!(cache_type, ExecutionCacheConfigType::PassthroughCache) {
82 tokio::time::sleep(Duration::from_nanos(100)).await;
97 self.writeback_cache.clear_caches_and_assert_empty();
98 }
99 *self.mode.write() = cache_type;
100 }
101}
102
103impl ObjectCacheRead for ProxyCache {
104 fn get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
105 delegate_method!(self.get_package_object(package_id))
106 }
107
108 fn force_reload_system_packages(&self, system_package_ids: &[ObjectID]) {
109 delegate_method!(self.force_reload_system_packages(system_package_ids))
110 }
111
112 fn get_object(&self, id: &ObjectID) -> IotaResult<Option<Object>> {
113 delegate_method!(self.get_object(id))
114 }
115
116 fn get_object_by_key(
117 &self,
118 object_id: &ObjectID,
119 version: SequenceNumber,
120 ) -> IotaResult<Option<Object>> {
121 delegate_method!(self.get_object_by_key(object_id, version))
122 }
123
124 fn multi_get_objects_by_key(
125 &self,
126 object_keys: &[ObjectKey],
127 ) -> Result<Vec<Option<Object>>, IotaError> {
128 delegate_method!(self.multi_get_objects_by_key(object_keys))
129 }
130
131 fn object_exists_by_key(
132 &self,
133 object_id: &ObjectID,
134 version: SequenceNumber,
135 ) -> IotaResult<bool> {
136 delegate_method!(self.object_exists_by_key(object_id, version))
137 }
138
139 fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
140 delegate_method!(self.multi_object_exists_by_key(object_keys))
141 }
142
143 fn get_latest_object_ref_or_tombstone(
144 &self,
145 object_id: ObjectID,
146 ) -> IotaResult<Option<ObjectRef>> {
147 delegate_method!(self.get_latest_object_ref_or_tombstone(object_id))
148 }
149
150 fn get_latest_object_or_tombstone(
151 &self,
152 object_id: ObjectID,
153 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
154 delegate_method!(self.get_latest_object_or_tombstone(object_id))
155 }
156
157 fn find_object_lt_or_eq_version(
158 &self,
159 object_id: ObjectID,
160 version: SequenceNumber,
161 ) -> IotaResult<Option<Object>> {
162 delegate_method!(self.find_object_lt_or_eq_version(object_id, version))
163 }
164
165 fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> IotaLockResult {
166 delegate_method!(self.get_lock(obj_ref, epoch_store))
167 }
168
169 fn _get_live_objref(&self, object_id: ObjectID) -> IotaResult<ObjectRef> {
170 delegate_method!(self._get_live_objref(object_id))
171 }
172
173 fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
174 delegate_method!(self.check_owned_objects_are_live(owned_object_refs))
175 }
176
177 fn get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
178 delegate_method!(self.get_iota_system_state_object_unsafe())
179 }
180
181 fn get_bridge_object_unsafe(&self) -> IotaResult<Bridge> {
182 delegate_method!(self.get_bridge_object_unsafe())
183 }
184
185 fn get_marker_value(
186 &self,
187 object_id: &ObjectID,
188 version: SequenceNumber,
189 epoch_id: EpochId,
190 ) -> IotaResult<Option<MarkerValue>> {
191 delegate_method!(self.get_marker_value(object_id, version, epoch_id))
192 }
193
194 fn get_latest_marker(
195 &self,
196 object_id: &ObjectID,
197 epoch_id: EpochId,
198 ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
199 delegate_method!(self.get_latest_marker(object_id, epoch_id))
200 }
201
202 fn get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
203 delegate_method!(self.get_highest_pruned_checkpoint())
204 }
205}
206
207impl TransactionCacheRead for ProxyCache {
208 fn multi_get_transaction_blocks(
209 &self,
210 digests: &[TransactionDigest],
211 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
212 delegate_method!(self.multi_get_transaction_blocks(digests))
213 }
214
215 fn multi_get_executed_effects_digests(
216 &self,
217 digests: &[TransactionDigest],
218 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
219 delegate_method!(self.multi_get_executed_effects_digests(digests))
220 }
221
222 fn multi_get_effects(
223 &self,
224 digests: &[TransactionEffectsDigest],
225 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
226 delegate_method!(self.multi_get_effects(digests))
227 }
228
229 fn notify_read_executed_effects_digests<'a>(
230 &'a self,
231 digests: &'a [TransactionDigest],
232 ) -> BoxFuture<'a, IotaResult<Vec<TransactionEffectsDigest>>> {
233 delegate_method!(self.notify_read_executed_effects_digests(digests))
234 }
235
236 fn multi_get_events(
237 &self,
238 event_digests: &[TransactionEventsDigest],
239 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
240 delegate_method!(self.multi_get_events(event_digests))
241 }
242}
243
244impl ExecutionCacheWrite for ProxyCache {
245 fn write_transaction_outputs(
246 &self,
247 epoch_id: EpochId,
248 tx_outputs: Arc<TransactionOutputs>,
249 ) -> BoxFuture<'_, IotaResult> {
250 delegate_method!(self.write_transaction_outputs(epoch_id, tx_outputs))
251 }
252
253 fn acquire_transaction_locks<'a>(
254 &'a self,
255 epoch_store: &'a AuthorityPerEpochStore,
256 owned_input_objects: &'a [ObjectRef],
257 transaction: VerifiedSignedTransaction,
258 ) -> BoxFuture<'a, IotaResult> {
259 delegate_method!(self.acquire_transaction_locks(
260 epoch_store,
261 owned_input_objects,
262 transaction
263 ))
264 }
265}
266
267impl AccumulatorStore for ProxyCache {
268 fn get_root_state_accumulator_for_epoch(
269 &self,
270 epoch: EpochId,
271 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
272 delegate_method!(self.get_root_state_accumulator_for_epoch(epoch))
273 }
274
275 fn get_root_state_accumulator_for_highest_epoch(
276 &self,
277 ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
278 delegate_method!(self.get_root_state_accumulator_for_highest_epoch())
279 }
280
281 fn insert_state_accumulator_for_epoch(
282 &self,
283 epoch: EpochId,
284 checkpoint_seq_num: &CheckpointSequenceNumber,
285 acc: &Accumulator,
286 ) -> IotaResult {
287 delegate_method!(self.insert_state_accumulator_for_epoch(epoch, checkpoint_seq_num, acc))
288 }
289
290 fn iter_live_object_set(
291 &self,
292 ) -> Box<dyn Iterator<Item = crate::authority::authority_store_tables::LiveObject> + '_> {
293 delegate_method!(self.iter_live_object_set())
294 }
295
296 fn iter_cached_live_object_set_for_testing(
297 &self,
298 ) -> Box<dyn Iterator<Item = crate::authority::authority_store_tables::LiveObject> + '_> {
299 delegate_method!(self.iter_cached_live_object_set_for_testing())
300 }
301}
302
303impl ExecutionCacheCommit for ProxyCache {
304 fn commit_transaction_outputs<'a>(
305 &'a self,
306 epoch: EpochId,
307 digests: &'a [TransactionDigest],
308 ) -> BoxFuture<'a, IotaResult> {
309 delegate_method!(self.commit_transaction_outputs(epoch, digests))
310 }
311
312 fn persist_transactions<'a>(
313 &'a self,
314 digests: &'a [TransactionDigest],
315 ) -> BoxFuture<'a, IotaResult> {
316 delegate_method!(self.persist_transactions(digests))
317 }
318}
319
320impl CheckpointCache for ProxyCache {
321 fn get_transaction_perpetual_checkpoint(
322 &self,
323 digest: &TransactionDigest,
324 ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
325 delegate_method!(self.get_transaction_perpetual_checkpoint(digest))
326 }
327
328 fn multi_get_transactions_perpetual_checkpoints(
329 &self,
330 digests: &[TransactionDigest],
331 ) -> IotaResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
332 delegate_method!(self.multi_get_transactions_perpetual_checkpoints(digests))
333 }
334
335 fn insert_finalized_transactions_perpetual_checkpoints(
336 &self,
337 digests: &[TransactionDigest],
338 epoch: EpochId,
339 sequence: CheckpointSequenceNumber,
340 ) -> IotaResult {
341 delegate_method!(
342 self.insert_finalized_transactions_perpetual_checkpoints(digests, epoch, sequence)
343 )
344 }
345}
346
347impl ExecutionCacheReconfigAPI for ProxyCache {
348 fn insert_genesis_object(&self, object: Object) -> IotaResult {
349 delegate_method!(self.insert_genesis_object(object))
350 }
351
352 fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> IotaResult {
353 delegate_method!(self.bulk_insert_genesis_objects(objects))
354 }
355
356 fn revert_state_update(&self, digest: &TransactionDigest) -> IotaResult {
357 delegate_method!(self.revert_state_update(digest))
358 }
359
360 fn set_epoch_start_configuration(
361 &self,
362 epoch_start_config: &EpochStartConfiguration,
363 ) -> IotaResult {
364 delegate_method!(self.set_epoch_start_configuration(epoch_start_config))
365 }
366
367 fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
368 delegate_method!(self.update_epoch_flags_metrics(old, new))
369 }
370
371 fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
372 delegate_method!(self.clear_state_end_of_epoch(execution_guard))
373 }
374
375 fn expensive_check_iota_conservation(
376 &self,
377 old_epoch_store: &AuthorityPerEpochStore,
378 epoch_supply_change: Option<i64>,
379 ) -> IotaResult {
380 delegate_method!(
381 self.expensive_check_iota_conservation(old_epoch_store, epoch_supply_change)
382 )
383 }
384
385 fn checkpoint_db(&self, path: &std::path::Path) -> IotaResult {
386 delegate_method!(self.checkpoint_db(path))
387 }
388
389 fn reconfigure_cache<'a>(
390 &'a self,
391 epoch_start_config: &'a EpochStartConfiguration,
392 ) -> BoxFuture<'a, ()> {
393 self.reconfigure_cache_impl(epoch_start_config).boxed()
394 }
395}
396
397impl StateSyncAPI for ProxyCache {
398 fn insert_transaction_and_effects(
399 &self,
400 transaction: &VerifiedTransaction,
401 transaction_effects: &TransactionEffects,
402 ) -> IotaResult {
403 delegate_method!(self.insert_transaction_and_effects(transaction, transaction_effects))
404 }
405
406 fn multi_insert_transaction_and_effects(
407 &self,
408 transactions_and_effects: &[VerifiedExecutionData],
409 ) -> IotaResult {
410 delegate_method!(self.multi_insert_transaction_and_effects(transactions_and_effects))
411 }
412}
413
414impl TestingAPI for ProxyCache {
415 fn database_for_testing(&self) -> Arc<AuthorityStore> {
416 delegate_method!(self.database_for_testing())
417 }
418}