1use std::{iter, mem, ops::Not, sync::Arc, thread};
6
7use either::Either;
8use fastcrypto::hash::{HashFunction, Sha3_256};
9use futures::stream::FuturesUnordered;
10use iota_common::sync::notify_read::NotifyRead;
11use iota_config::{migration_tx_data::MigrationTxData, node::AuthorityStorePruningConfig};
12use iota_macros::fail_point_arg;
13use iota_storage::mutex_table::{MutexGuard, MutexTable};
14use iota_types::{
15 accumulator::Accumulator,
16 base_types::SequenceNumber,
17 digests::TransactionEventsDigest,
18 effects::{TransactionEffects, TransactionEvents},
19 error::UserInputError,
20 execution::TypeLayoutStore,
21 fp_bail, fp_ensure,
22 iota_system_state::{
23 get_iota_system_state, iota_system_state_summary::IotaSystemStateSummaryV2,
24 },
25 message_envelope::Message,
26 storage::{
27 BackingPackageStore, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, get_module,
28 },
29};
30use itertools::izip;
31use move_core_types::resolver::ModuleResolver;
32use tokio::time::Instant;
33use tracing::{debug, info, trace};
34use typed_store::{
35 TypedStoreError,
36 rocks::{DBBatch, DBMap},
37 traits::Map,
38};
39
40use super::{
41 authority_store_tables::{AuthorityPerpetualTables, LiveObject},
42 *,
43};
44use crate::{
45 authority::{
46 authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails},
47 authority_store_pruner::{
48 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
49 },
50 authority_store_tables::TotalIotaSupplyCheck,
51 authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object},
52 epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
53 },
54 rest_index::RestIndexStore,
55 state_accumulator::AccumulatorStore,
56 transaction_outputs::TransactionOutputs,
57};
58
59const NUM_SHARDS: usize = 4096;
60
61struct AuthorityStoreMetrics {
62 iota_conservation_check_latency: IntGauge,
63 iota_conservation_live_object_count: IntGauge,
64 iota_conservation_live_object_size: IntGauge,
65 iota_conservation_imbalance: IntGauge,
66 iota_conservation_storage_fund: IntGauge,
67 iota_conservation_storage_fund_imbalance: IntGauge,
68 epoch_flags: IntGaugeVec,
69}
70
71impl AuthorityStoreMetrics {
72 pub fn new(registry: &Registry) -> Self {
73 Self {
74 iota_conservation_check_latency: register_int_gauge_with_registry!(
75 "iota_conservation_check_latency",
76 "Number of seconds took to scan all live objects in the store for IOTA conservation check",
77 registry,
78 ).unwrap(),
79 iota_conservation_live_object_count: register_int_gauge_with_registry!(
80 "iota_conservation_live_object_count",
81 "Number of live objects in the store",
82 registry,
83 ).unwrap(),
84 iota_conservation_live_object_size: register_int_gauge_with_registry!(
85 "iota_conservation_live_object_size",
86 "Size in bytes of live objects in the store",
87 registry,
88 ).unwrap(),
89 iota_conservation_imbalance: register_int_gauge_with_registry!(
90 "iota_conservation_imbalance",
91 "Total amount of IOTA in the network - 10B * 10^9. This delta shows the amount of imbalance",
92 registry,
93 ).unwrap(),
94 iota_conservation_storage_fund: register_int_gauge_with_registry!(
95 "iota_conservation_storage_fund",
96 "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
97 registry,
98 ).unwrap(),
99 iota_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
100 "iota_conservation_storage_fund_imbalance",
101 "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
102 registry,
103 ).unwrap(),
104 epoch_flags: register_int_gauge_vec_with_registry!(
105 "epoch_flags",
106 "Local flags of the currently running epoch",
107 &["flag"],
108 registry,
109 ).unwrap(),
110 }
111 }
112}
113
114pub struct AuthorityStore {
123 mutex_table: MutexTable<ObjectDigest>,
125
126 pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
127
128 pub(crate) root_state_notify_read: NotifyRead<EpochId, (CheckpointSequenceNumber, Accumulator)>,
129
130 enable_epoch_iota_conservation_check: bool,
132
133 metrics: AuthorityStoreMetrics,
134}
135
136pub type ExecutionLockReadGuard<'a> = tokio::sync::RwLockReadGuard<'a, EpochId>;
137pub type ExecutionLockWriteGuard<'a> = tokio::sync::RwLockWriteGuard<'a, EpochId>;
138
139impl AuthorityStore {
140 pub async fn open(
143 perpetual_tables: Arc<AuthorityPerpetualTables>,
144 genesis: &Genesis,
145 config: &NodeConfig,
146 registry: &Registry,
147 migration_tx_data: Option<&MigrationTxData>,
148 ) -> IotaResult<Arc<Self>> {
149 let enable_epoch_iota_conservation_check = config
150 .expensive_safety_check_config
151 .enable_epoch_iota_conservation_check();
152
153 let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
154 info!("Creating new epoch start config from genesis");
155
156 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
157 let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
158 fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
159 info!("Setting initial epoch flags to {:?}", flags);
160 initial_epoch_flags = flags;
161 });
162
163 let epoch_start_configuration = EpochStartConfiguration::new(
164 genesis.iota_system_object().into_epoch_start_state(),
165 *genesis.checkpoint().digest(),
166 &genesis.objects(),
167 initial_epoch_flags,
168 )?;
169 perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
170 epoch_start_configuration
171 } else {
172 info!("Loading epoch start config from DB");
173 perpetual_tables
174 .epoch_start_configuration
175 .get(&())?
176 .expect("Epoch start configuration must be set in non-empty DB")
177 };
178 let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
179 info!("Epoch start config: {:?}", epoch_start_configuration);
180 info!("Cur epoch: {:?}", cur_epoch);
181 let this = Self::open_inner(
182 genesis,
183 perpetual_tables,
184 enable_epoch_iota_conservation_check,
185 registry,
186 migration_tx_data,
187 )
188 .await?;
189 this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
190 Ok(this)
191 }
192
193 pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
194 for flag in old {
195 self.metrics
196 .epoch_flags
197 .with_label_values(&[&flag.to_string()])
198 .set(0);
199 }
200 for flag in new {
201 self.metrics
202 .epoch_flags
203 .with_label_values(&[&flag.to_string()])
204 .set(1);
205 }
206 }
207
208 pub fn clear_object_per_epoch_marker_table(
212 &self,
213 _execution_guard: &ExecutionLockWriteGuard<'_>,
214 ) -> IotaResult<()> {
215 Ok(self
220 .perpetual_tables
221 .object_per_epoch_marker_table
222 .schedule_delete_all()?)
223 }
224
225 pub async fn open_with_committee_for_testing(
226 perpetual_tables: Arc<AuthorityPerpetualTables>,
227 committee: &Committee,
228 genesis: &Genesis,
229 ) -> IotaResult<Arc<Self>> {
230 assert_eq!(committee.epoch, 0);
233 Self::open_inner(genesis, perpetual_tables, true, &Registry::new(), None).await
234 }
235
236 async fn open_inner(
237 genesis: &Genesis,
238 perpetual_tables: Arc<AuthorityPerpetualTables>,
239 enable_epoch_iota_conservation_check: bool,
240 registry: &Registry,
241 migration_tx_data: Option<&MigrationTxData>,
242 ) -> IotaResult<Arc<Self>> {
243 let store = Arc::new(Self {
244 mutex_table: MutexTable::new(NUM_SHARDS),
245 perpetual_tables,
246 root_state_notify_read:
247 NotifyRead::<EpochId, (CheckpointSequenceNumber, Accumulator)>::new(),
248 enable_epoch_iota_conservation_check,
249 metrics: AuthorityStoreMetrics::new(registry),
250 });
251 if store
253 .database_is_empty()
254 .expect("database read should not fail at init.")
255 {
256 store
259 .bulk_insert_genesis_objects(genesis.objects())
260 .expect("cannot bulk insert genesis objects");
261
262 let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
264 store
265 .perpetual_tables
266 .transactions
267 .insert(transaction.digest(), transaction.serializable_ref())
268 .expect("cannot insert genesis transaction");
269 store
270 .perpetual_tables
271 .effects
272 .insert(&genesis.effects().digest(), genesis.effects())
273 .expect("cannot insert genesis effects");
274
275 let event_digests = genesis.events().digest();
280 let events = genesis
281 .events()
282 .data
283 .iter()
284 .enumerate()
285 .map(|(i, e)| ((event_digests, i), e));
286 store.perpetual_tables.events.multi_insert(events).unwrap();
287
288 if let Some(migration_transactions) = migration_tx_data {
290 let txs_data = migration_transactions.txs_data();
293
294 for (_, execution_digest) in genesis
298 .checkpoint_contents()
299 .enumerate_transactions(&genesis.checkpoint())
300 {
301 let tx_digest = &execution_digest.transaction;
302 if tx_digest == genesis.transaction().digest() {
305 continue;
306 }
307 let Some((tx, effects, events)) = txs_data.get(tx_digest) else {
310 panic!("tx digest not found in migrated objects blob");
311 };
312 let transaction = VerifiedTransaction::new_unchecked(tx.clone());
313 let objects = migration_transactions
314 .objects_by_tx_digest(*tx_digest)
315 .expect("the migration data is corrupted");
316 store
317 .bulk_insert_genesis_objects(&objects)
318 .expect("cannot bulk insert migrated objects");
319 store
320 .perpetual_tables
321 .transactions
322 .insert(transaction.digest(), transaction.serializable_ref())
323 .expect("cannot insert migration transaction");
324 store
325 .perpetual_tables
326 .effects
327 .insert(&effects.digest(), effects)
328 .expect("cannot insert migration effects");
329 let events = events
330 .data
331 .iter()
332 .enumerate()
333 .map(|(i, e)| ((events.digest(), i), e));
334 store.perpetual_tables.events.multi_insert(events).unwrap();
335 }
336 }
337 }
338
339 Ok(store)
340 }
341
342 pub fn open_no_genesis(
346 perpetual_tables: Arc<AuthorityPerpetualTables>,
347 enable_epoch_iota_conservation_check: bool,
348 registry: &Registry,
349 ) -> IotaResult<Arc<Self>> {
350 let store = Arc::new(Self {
351 mutex_table: MutexTable::new(NUM_SHARDS),
352 perpetual_tables,
353 root_state_notify_read:
354 NotifyRead::<EpochId, (CheckpointSequenceNumber, Accumulator)>::new(),
355 enable_epoch_iota_conservation_check,
356 metrics: AuthorityStoreMetrics::new(registry),
357 });
358 Ok(store)
359 }
360
361 pub fn get_recovery_epoch_at_restart(&self) -> IotaResult<EpochId> {
362 self.perpetual_tables.get_recovery_epoch_at_restart()
363 }
364
365 pub fn get_effects(
366 &self,
367 effects_digest: &TransactionEffectsDigest,
368 ) -> IotaResult<Option<TransactionEffects>> {
369 Ok(self.perpetual_tables.effects.get(effects_digest)?)
370 }
371
372 pub fn effects_exists(&self, effects_digest: &TransactionEffectsDigest) -> IotaResult<bool> {
374 self.perpetual_tables
375 .effects
376 .contains_key(effects_digest)
377 .map_err(|e| e.into())
378 }
379
380 pub fn get_events(
381 &self,
382 event_digest: &TransactionEventsDigest,
383 ) -> Result<Option<TransactionEvents>, TypedStoreError> {
384 let data = self
385 .perpetual_tables
386 .events
387 .safe_range_iter((*event_digest, 0)..=(*event_digest, usize::MAX))
388 .map_ok(|(_, event)| event)
389 .collect::<Result<Vec<_>, TypedStoreError>>()?;
390 Ok(data.is_empty().not().then_some(TransactionEvents { data }))
391 }
392
393 pub fn multi_get_events(
394 &self,
395 event_digests: &[TransactionEventsDigest],
396 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
397 Ok(event_digests
398 .iter()
399 .map(|digest| self.get_events(digest))
400 .collect::<Result<Vec<_>, _>>()?)
401 }
402
403 pub fn multi_get_effects<'a>(
404 &self,
405 effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
406 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
407 Ok(self.perpetual_tables.effects.multi_get(effects_digests)?)
408 }
409
410 pub fn get_executed_effects(
411 &self,
412 tx_digest: &TransactionDigest,
413 ) -> IotaResult<Option<TransactionEffects>> {
414 let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
415 match effects_digest {
416 Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
417 None => Ok(None),
418 }
419 }
420
421 pub fn multi_get_executed_effects_digests(
425 &self,
426 digests: &[TransactionDigest],
427 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
428 Ok(self.perpetual_tables.executed_effects.multi_get(digests)?)
429 }
430
431 pub fn multi_get_executed_effects(
435 &self,
436 digests: &[TransactionDigest],
437 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
438 let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
439 let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
440 let mut tx_to_effects_map = effects
441 .into_iter()
442 .flatten()
443 .map(|effects| (*effects.transaction_digest(), effects))
444 .collect::<HashMap<_, _>>();
445 Ok(digests
446 .iter()
447 .map(|digest| tx_to_effects_map.remove(digest))
448 .collect())
449 }
450
451 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
452 Ok(self
453 .perpetual_tables
454 .executed_effects
455 .contains_key(digest)?)
456 }
457
458 pub fn get_marker_value(
459 &self,
460 object_id: &ObjectID,
461 version: &SequenceNumber,
462 epoch_id: EpochId,
463 ) -> IotaResult<Option<MarkerValue>> {
464 let object_key = (epoch_id, ObjectKey(*object_id, *version));
465 Ok(self
466 .perpetual_tables
467 .object_per_epoch_marker_table
468 .get(&object_key)?)
469 }
470
471 pub fn get_latest_marker(
472 &self,
473 object_id: &ObjectID,
474 epoch_id: EpochId,
475 ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
476 let min_key = (epoch_id, ObjectKey::min_for_id(object_id));
477 let max_key = (epoch_id, ObjectKey::max_for_id(object_id));
478
479 let marker_entry = self
480 .perpetual_tables
481 .object_per_epoch_marker_table
482 .reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
483 .next();
484 match marker_entry {
485 Some(Ok(((epoch, key), marker))) => {
486 assert_eq!(epoch, epoch_id);
488 assert_eq!(key.0, *object_id);
489 Ok(Some((key.1, marker)))
490 }
491 Some(Err(e)) => Err(e.into()),
492 None => Ok(None),
493 }
494 }
495
496 pub async fn notify_read_root_state_hash(
499 &self,
500 epoch: EpochId,
501 ) -> IotaResult<(CheckpointSequenceNumber, Accumulator)> {
502 let registration = self.root_state_notify_read.register_one(&epoch);
505 let hash = self.perpetual_tables.root_state_hash_by_epoch.get(&epoch)?;
506
507 let result = match hash {
508 Some(ready) => Either::Left(futures::future::ready(ready)),
510 None => Either::Right(registration),
511 }
512 .await;
513
514 Ok(result)
515 }
516
517 pub(crate) fn insert_finalized_transactions_perpetual_checkpoints(
519 &self,
520 digests: &[TransactionDigest],
521 epoch: EpochId,
522 sequence: CheckpointSequenceNumber,
523 ) -> IotaResult {
524 let mut batch = self
525 .perpetual_tables
526 .executed_transactions_to_checkpoint
527 .batch();
528 batch.insert_batch(
529 &self.perpetual_tables.executed_transactions_to_checkpoint,
530 digests.iter().map(|d| (*d, (epoch, sequence))),
531 )?;
532 batch.write()?;
533 trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
534 Ok(())
535 }
536
537 pub(crate) fn get_transaction_perpetual_checkpoint(
539 &self,
540 digest: &TransactionDigest,
541 ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
542 Ok(self
543 .perpetual_tables
544 .executed_transactions_to_checkpoint
545 .get(digest)?)
546 }
547
548 pub(crate) fn multi_get_transactions_perpetual_checkpoints(
550 &self,
551 digests: &[TransactionDigest],
552 ) -> IotaResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
553 Ok(self
554 .perpetual_tables
555 .executed_transactions_to_checkpoint
556 .multi_get(digests)?)
557 }
558
559 pub fn database_is_empty(&self) -> IotaResult<bool> {
561 self.perpetual_tables.database_is_empty()
562 }
563
564 fn acquire_locks(&self, input_objects: &[ObjectRef]) -> Vec<MutexGuard> {
567 self.mutex_table
568 .acquire_locks(input_objects.iter().map(|(_, _, digest)| *digest))
569 }
570
571 pub fn object_exists_by_key(
572 &self,
573 object_id: &ObjectID,
574 version: VersionNumber,
575 ) -> IotaResult<bool> {
576 Ok(self
577 .perpetual_tables
578 .objects
579 .contains_key(&ObjectKey(*object_id, version))?)
580 }
581
582 pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
583 Ok(self
584 .perpetual_tables
585 .objects
586 .multi_contains_keys(object_keys.to_vec())?
587 .into_iter()
588 .collect())
589 }
590
591 pub fn multi_get_objects_by_key(
592 &self,
593 object_keys: &[ObjectKey],
594 ) -> Result<Vec<Option<Object>>, IotaError> {
595 let wrappers = self
596 .perpetual_tables
597 .objects
598 .multi_get(object_keys.to_vec())?;
599 let mut ret = vec![];
600
601 for (idx, w) in wrappers.into_iter().enumerate() {
602 ret.push(
603 w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
604 .transpose()?
605 .flatten(),
606 );
607 }
608 Ok(ret)
609 }
610
611 pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, IotaError> {
613 let mut result = Vec::new();
614 for id in objects {
615 result.push(self.try_get_object(id)?);
616 }
617 Ok(result)
618 }
619
620 pub fn have_deleted_owned_object_at_version_or_after(
621 &self,
622 object_id: &ObjectID,
623 version: VersionNumber,
624 epoch_id: EpochId,
625 ) -> Result<bool, IotaError> {
626 let object_key = ObjectKey::max_for_id(object_id);
627 let marker_key = (epoch_id, object_key);
628
629 let marker_entry = self
632 .perpetual_tables
633 .object_per_epoch_marker_table
634 .reversed_safe_iter_with_bounds(None, Some(marker_key))?
635 .next();
636 match marker_entry.transpose()? {
637 Some(((epoch, key), marker)) => {
638 let object_data_ok = key.0 == *object_id && key.1 >= version;
640 let epoch_data_ok = epoch == epoch_id;
642 let mark_data_ok = marker == MarkerValue::OwnedDeleted;
644 Ok(object_data_ok && epoch_data_ok && mark_data_ok)
645 }
646 None => Ok(false),
647 }
648 }
649
650 pub(crate) fn insert_genesis_object(&self, object: Object) -> IotaResult {
655 debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
657 let object_ref = object.compute_object_reference();
658 self.insert_object_direct(object_ref, &object)
659 }
660
661 fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> IotaResult {
665 let mut write_batch = self.perpetual_tables.objects.batch();
666
667 let store_object = get_store_object(object.clone());
669 write_batch.insert_batch(
670 &self.perpetual_tables.objects,
671 std::iter::once((ObjectKey::from(object_ref), store_object)),
672 )?;
673
674 if object.get_single_owner().is_some() {
676 if !object.is_child_object() {
678 self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref])?;
679 }
680 }
681
682 write_batch.write()?;
683
684 Ok(())
685 }
686
687 #[instrument(level = "debug", skip_all)]
690 pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> IotaResult<()> {
691 let mut batch = self.perpetual_tables.objects.batch();
692 let ref_and_objects: Vec<_> = objects
693 .iter()
694 .map(|o| (o.compute_object_reference(), o))
695 .collect();
696
697 batch.insert_batch(
698 &self.perpetual_tables.objects,
699 ref_and_objects
700 .iter()
701 .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
702 )?;
703
704 let non_child_object_refs: Vec<_> = ref_and_objects
705 .iter()
706 .filter(|(_, object)| !object.is_child_object())
707 .map(|(oref, _)| *oref)
708 .collect();
709
710 self.initialize_live_object_markers_impl(&mut batch, &non_child_object_refs)?;
711
712 batch.write()?;
713
714 Ok(())
715 }
716
717 pub fn bulk_insert_live_objects(
718 perpetual_db: &AuthorityPerpetualTables,
719 live_objects: impl Iterator<Item = LiveObject>,
720 expected_sha3_digest: &[u8; 32],
721 ) -> IotaResult<()> {
722 let mut hasher = Sha3_256::default();
723 let mut batch = perpetual_db.objects.batch();
724 for object in live_objects {
725 hasher.update(object.object_reference().2.inner());
726 match object {
727 LiveObject::Normal(object) => {
728 let store_object_wrapper = get_store_object(object.clone());
729 batch.insert_batch(
730 &perpetual_db.objects,
731 std::iter::once((
732 ObjectKey::from(object.compute_object_reference()),
733 store_object_wrapper,
734 )),
735 )?;
736 if !object.is_child_object() {
737 Self::initialize_live_object_markers(
738 &perpetual_db.live_owned_object_markers,
739 &mut batch,
740 &[object.compute_object_reference()],
741 )?;
742 }
743 }
744 LiveObject::Wrapped(object_key) => {
745 batch.insert_batch(
746 &perpetual_db.objects,
747 std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
748 object_key,
749 StoreObject::Wrapped.into(),
750 )),
751 )?;
752 }
753 }
754 }
755 let sha3_digest = hasher.finalize().digest;
756 if *expected_sha3_digest != sha3_digest {
757 error!(
758 "Sha does not match! expected: {:?}, actual: {:?}",
759 expected_sha3_digest, sha3_digest
760 );
761 return Err(IotaError::from("Sha does not match"));
762 }
763 batch.write()?;
764 Ok(())
765 }
766
767 pub fn set_epoch_start_configuration(
768 &self,
769 epoch_start_configuration: &EpochStartConfiguration,
770 ) -> IotaResult {
771 self.perpetual_tables
772 .set_epoch_start_configuration(epoch_start_configuration)?;
773 Ok(())
774 }
775
776 pub fn get_epoch_start_configuration(&self) -> IotaResult<Option<EpochStartConfiguration>> {
777 Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
778 }
779
780 #[instrument(level = "debug", skip_all)]
786 pub fn write_transaction_outputs(
787 &self,
788 epoch_id: EpochId,
789 tx_outputs: &[Arc<TransactionOutputs>],
790 ) -> IotaResult {
791 let mut written = Vec::with_capacity(tx_outputs.len());
792 for outputs in tx_outputs {
793 written.extend(outputs.written.values().cloned());
794 }
795
796 let mut write_batch = self.perpetual_tables.transactions.batch();
797 for outputs in tx_outputs {
798 self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
799 }
800 fail_point!("crash");
802
803 write_batch.write()?;
804 trace!(
805 "committed transactions: {:?}",
806 tx_outputs
807 .iter()
808 .map(|tx| tx.transaction.digest())
809 .collect::<Vec<_>>()
810 );
811
812 fail_point!("crash");
814
815 Ok(())
816 }
817
818 fn write_one_transaction_outputs(
819 &self,
820 write_batch: &mut DBBatch,
821 epoch_id: EpochId,
822 tx_outputs: &TransactionOutputs,
823 ) -> IotaResult {
824 let TransactionOutputs {
825 transaction,
826 effects,
827 markers,
828 wrapped,
829 deleted,
830 written,
831 events,
832 live_object_markers_to_delete,
833 new_live_object_markers_to_init,
834 ..
835 } = tx_outputs;
836
837 let transaction_digest = transaction.digest();
839 write_batch.insert_batch(
840 &self.perpetual_tables.transactions,
841 iter::once((transaction_digest, transaction.serializable_ref())),
842 )?;
843
844 let effects_digest = effects.digest();
846
847 write_batch.insert_batch(
848 &self.perpetual_tables.object_per_epoch_marker_table,
849 markers
850 .iter()
851 .map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
852 )?;
853
854 write_batch.insert_batch(
855 &self.perpetual_tables.objects,
856 deleted
857 .iter()
858 .map(|key| (key, StoreObject::Deleted))
859 .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
860 .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
861 )?;
862
863 let new_objects = written.iter().map(|(id, new_object)| {
865 let version = new_object.version();
866 trace!(?id, ?version, "writing object");
867 let store_object = get_store_object(new_object.clone());
868 (ObjectKey(*id, version), store_object)
869 });
870
871 write_batch.insert_batch(&self.perpetual_tables.objects, new_objects)?;
872
873 let event_digest = events.digest();
874 let events = events
875 .data
876 .iter()
877 .enumerate()
878 .map(|(i, e)| ((event_digest, i), e));
879
880 write_batch.insert_batch(&self.perpetual_tables.events, events)?;
881
882 self.initialize_live_object_markers_impl(write_batch, new_live_object_markers_to_init)?;
883
884 self.delete_live_object_markers(write_batch, live_object_markers_to_delete)?;
887
888 write_batch
889 .insert_batch(
890 &self.perpetual_tables.effects,
891 [(effects_digest, effects.clone())],
892 )?
893 .insert_batch(
894 &self.perpetual_tables.executed_effects,
895 [(transaction_digest, effects_digest)],
896 )?;
897
898 debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
899
900 Ok(())
901 }
902
903 pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> IotaResult {
906 let mut batch = self.perpetual_tables.transactions.batch();
907 batch.insert_batch(
908 &self.perpetual_tables.transactions,
909 [(tx.digest(), tx.clone().into_unsigned().serializable_ref())],
910 )?;
911 batch.write()?;
912 Ok(())
913 }
914
915 pub fn acquire_transaction_locks(
916 &self,
917 epoch_store: &AuthorityPerEpochStore,
918 owned_input_objects: &[ObjectRef],
919 transaction: VerifiedSignedTransaction,
920 ) -> IotaResult {
921 let tx_digest = *transaction.digest();
922 let _mutexes = self.acquire_locks(owned_input_objects);
926
927 trace!(?owned_input_objects, "acquire_transaction_locks");
928 let mut locks_to_write = Vec::new();
929
930 let live_object_markers = self
931 .perpetual_tables
932 .live_owned_object_markers
933 .multi_get(owned_input_objects)?;
934
935 let epoch_tables = epoch_store.tables()?;
936
937 let locks = epoch_tables.multi_get_locked_transactions(owned_input_objects)?;
938
939 assert_eq!(locks.len(), live_object_markers.len());
940
941 for (live_marker, lock, obj_ref) in izip!(
942 live_object_markers.into_iter(),
943 locks.into_iter(),
944 owned_input_objects
945 ) {
946 if live_marker.is_none() {
947 let latest_live_version = self.get_latest_live_version_for_object_id(obj_ref.0)?;
949 fp_bail!(
950 UserInputError::ObjectVersionUnavailableForConsumption {
951 provided_obj_ref: *obj_ref,
952 current_version: latest_live_version.1
953 }
954 .into()
955 );
956 };
957
958 if let Some(previous_tx_digest) = &lock {
959 if previous_tx_digest == &tx_digest {
960 continue;
962 } else {
963 info!(prev_tx_digest = ?previous_tx_digest,
965 cur_tx_digest = ?tx_digest,
966 "Cannot acquire lock: conflicting transaction!");
967 return Err(IotaError::ObjectLockConflict {
968 obj_ref: *obj_ref,
969 pending_transaction: *previous_tx_digest,
970 });
971 }
972 }
973
974 locks_to_write.push((*obj_ref, tx_digest));
975 }
976
977 if !locks_to_write.is_empty() {
978 trace!(?locks_to_write, "Writing locks");
979 epoch_tables.write_transaction_locks(transaction, locks_to_write.into_iter())?;
980 }
981
982 Ok(())
983 }
984
985 pub(crate) fn get_lock(
989 &self,
990 obj_ref: ObjectRef,
991 epoch_store: &AuthorityPerEpochStore,
992 ) -> IotaLockResult {
993 if self
994 .perpetual_tables
995 .live_owned_object_markers
996 .get(&obj_ref)?
997 .is_none()
998 {
999 return Ok(ObjectLockStatus::LockedAtDifferentVersion {
1001 locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
1002 });
1003 }
1004
1005 let tables = epoch_store.tables()?;
1006 if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
1007 Ok(ObjectLockStatus::LockedToTx {
1008 locked_by_tx: tx_digest,
1009 })
1010 } else {
1011 Ok(ObjectLockStatus::Initialized)
1012 }
1013 }
1014
1015 pub(crate) fn get_latest_live_version_for_object_id(
1018 &self,
1019 object_id: ObjectID,
1020 ) -> IotaResult<ObjectRef> {
1021 let mut iterator = self
1022 .perpetual_tables
1023 .live_owned_object_markers
1024 .reversed_safe_iter_with_bounds(
1025 None,
1026 Some((object_id, SequenceNumber::MAX_VALID_EXCL, ObjectDigest::MAX)),
1027 )?;
1028 Ok(iterator
1029 .next()
1030 .transpose()?
1031 .and_then(|value| {
1032 if value.0.0 == object_id {
1033 Some(value)
1034 } else {
1035 None
1036 }
1037 })
1038 .ok_or_else(|| {
1039 IotaError::from(UserInputError::ObjectNotFound {
1040 object_id,
1041 version: None,
1042 })
1043 })?
1044 .0)
1045 }
1046
1047 pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> IotaResult {
1053 let live_markers = self
1054 .perpetual_tables
1055 .live_owned_object_markers
1056 .multi_get(objects)?;
1057 for (live_marker, obj_ref) in live_markers.into_iter().zip(objects) {
1058 if live_marker.is_none() {
1059 let latest_live_version = self.get_latest_live_version_for_object_id(obj_ref.0)?;
1061 fp_bail!(
1062 UserInputError::ObjectVersionUnavailableForConsumption {
1063 provided_obj_ref: *obj_ref,
1064 current_version: latest_live_version.1
1065 }
1066 .into()
1067 );
1068 }
1069 }
1070 Ok(())
1071 }
1072
1073 fn initialize_live_object_markers_impl(
1075 &self,
1076 write_batch: &mut DBBatch,
1077 objects: &[ObjectRef],
1078 ) -> IotaResult {
1079 AuthorityStore::initialize_live_object_markers(
1080 &self.perpetual_tables.live_owned_object_markers,
1081 write_batch,
1082 objects,
1083 )
1084 }
1085
1086 pub fn initialize_live_object_markers(
1087 live_object_marker_table: &DBMap<ObjectRef, ()>,
1088 write_batch: &mut DBBatch,
1089 objects: &[ObjectRef],
1090 ) -> IotaResult {
1091 trace!(?objects, "initialize_live_object_markers");
1092
1093 write_batch.insert_batch(
1094 live_object_marker_table,
1095 objects.iter().map(|obj_ref| (obj_ref, ())),
1096 )?;
1097 Ok(())
1098 }
1099
1100 fn delete_live_object_markers(
1102 &self,
1103 write_batch: &mut DBBatch,
1104 objects: &[ObjectRef],
1105 ) -> IotaResult {
1106 trace!(?objects, "delete_live_object_markers");
1107 write_batch.delete_batch(
1108 &self.perpetual_tables.live_owned_object_markers,
1109 objects.iter(),
1110 )?;
1111 Ok(())
1112 }
1113
1114 #[cfg(test)]
1115 pub(crate) fn reset_locks_and_live_markers_for_test(
1116 &self,
1117 transactions: &[TransactionDigest],
1118 objects: &[ObjectRef],
1119 epoch_store: &AuthorityPerEpochStore,
1120 ) {
1121 for tx in transactions {
1122 epoch_store.delete_signed_transaction_for_test(tx);
1123 epoch_store.delete_object_locks_for_test(objects);
1124 }
1125
1126 let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1127 batch
1128 .delete_batch(
1129 &self.perpetual_tables.live_owned_object_markers,
1130 objects.iter(),
1131 )
1132 .unwrap();
1133 batch.write().unwrap();
1134
1135 let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1136 self.initialize_live_object_markers_impl(&mut batch, objects)
1137 .unwrap();
1138 batch.write().unwrap();
1139 }
1140
1141 pub fn revert_state_update(&self, tx_digest: &TransactionDigest) -> IotaResult {
1155 let Some(effects) = self.get_executed_effects(tx_digest)? else {
1156 info!("Not reverting {:?} as it was not executed", tx_digest);
1157 return Ok(());
1158 };
1159
1160 info!(?tx_digest, ?effects, "reverting transaction");
1161
1162 assert!(effects.input_shared_objects().is_empty());
1164
1165 let mut write_batch = self.perpetual_tables.transactions.batch();
1166 write_batch.delete_batch(
1167 &self.perpetual_tables.executed_effects,
1168 iter::once(tx_digest),
1169 )?;
1170 if let Some(events_digest) = effects.events_digest() {
1171 write_batch.schedule_delete_range(
1172 &self.perpetual_tables.events,
1173 &(*events_digest, usize::MIN),
1174 &(*events_digest, usize::MAX),
1175 )?;
1176 }
1177
1178 let tombstones = effects
1179 .all_tombstones()
1180 .into_iter()
1181 .map(|(id, version)| ObjectKey(id, version));
1182 write_batch.delete_batch(&self.perpetual_tables.objects, tombstones)?;
1183
1184 let all_new_object_keys = effects
1185 .all_changed_objects()
1186 .into_iter()
1187 .map(|((id, version, _), _, _)| ObjectKey(id, version));
1188 write_batch.delete_batch(&self.perpetual_tables.objects, all_new_object_keys.clone())?;
1189
1190 let modified_object_keys = effects
1191 .modified_at_versions()
1192 .into_iter()
1193 .map(|(id, version)| ObjectKey(id, version));
1194
1195 macro_rules! get_objects_and_locks {
1196 ($object_keys: expr) => {
1197 self.perpetual_tables
1198 .objects
1199 .multi_get($object_keys.clone())?
1200 .into_iter()
1201 .zip($object_keys)
1202 .filter_map(|(obj_opt, key)| {
1203 let obj = self
1204 .perpetual_tables
1205 .object(
1206 &key,
1207 obj_opt.unwrap_or_else(|| {
1208 panic!("Older object version not found: {:?}", key)
1209 }),
1210 )
1211 .expect("Matching indirect object not found")?;
1212
1213 if obj.is_immutable() {
1214 return None;
1215 }
1216
1217 let obj_ref = obj.compute_object_reference();
1218 Some(obj.is_address_owned().then_some(obj_ref))
1219 })
1220 };
1221 }
1222
1223 let old_locks = get_objects_and_locks!(modified_object_keys);
1224 let new_locks = get_objects_and_locks!(all_new_object_keys);
1225
1226 let old_locks: Vec<_> = old_locks.flatten().collect();
1227
1228 self.initialize_live_object_markers_impl(&mut write_batch, &old_locks)?;
1230
1231 write_batch.delete_batch(
1233 &self.perpetual_tables.live_owned_object_markers,
1234 new_locks.flatten(),
1235 )?;
1236
1237 write_batch.write()?;
1238
1239 Ok(())
1240 }
1241
1242 pub fn find_object_lt_or_eq_version(
1248 &self,
1249 object_id: ObjectID,
1250 version: SequenceNumber,
1251 ) -> IotaResult<Option<Object>> {
1252 self.perpetual_tables
1253 .find_object_lt_or_eq_version(object_id, version)
1254 }
1255
1256 pub fn get_latest_object_ref_or_tombstone(
1267 &self,
1268 object_id: ObjectID,
1269 ) -> Result<Option<ObjectRef>, IotaError> {
1270 self.perpetual_tables
1271 .get_latest_object_ref_or_tombstone(object_id)
1272 }
1273
1274 pub fn get_latest_object_ref_if_alive(
1277 &self,
1278 object_id: ObjectID,
1279 ) -> Result<Option<ObjectRef>, IotaError> {
1280 match self.get_latest_object_ref_or_tombstone(object_id)? {
1281 Some(objref) if objref.2.is_alive() => Ok(Some(objref)),
1282 _ => Ok(None),
1283 }
1284 }
1285
1286 pub fn get_latest_object_or_tombstone(
1291 &self,
1292 object_id: ObjectID,
1293 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
1294 let Some((object_key, store_object)) = self
1295 .perpetual_tables
1296 .get_latest_object_or_tombstone(object_id)?
1297 else {
1298 return Ok(None);
1299 };
1300
1301 if let Some(object_ref) = self
1302 .perpetual_tables
1303 .tombstone_reference(&object_key, &store_object)?
1304 {
1305 return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1306 }
1307
1308 let object = self
1309 .perpetual_tables
1310 .object(&object_key, store_object)?
1311 .expect("Non tombstone store object could not be converted to object");
1312
1313 Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1314 }
1315
1316 pub fn insert_transaction_and_effects(
1317 &self,
1318 transaction: &VerifiedTransaction,
1319 transaction_effects: &TransactionEffects,
1320 ) -> Result<(), TypedStoreError> {
1321 let mut write_batch = self.perpetual_tables.transactions.batch();
1322 write_batch
1323 .insert_batch(
1324 &self.perpetual_tables.transactions,
1325 [(transaction.digest(), transaction.serializable_ref())],
1326 )?
1327 .insert_batch(
1328 &self.perpetual_tables.effects,
1329 [(transaction_effects.digest(), transaction_effects)],
1330 )?;
1331
1332 write_batch.write()?;
1333 Ok(())
1334 }
1335
1336 pub fn multi_insert_transaction_and_effects<'a>(
1337 &self,
1338 transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1339 ) -> Result<(), TypedStoreError> {
1340 let mut write_batch = self.perpetual_tables.transactions.batch();
1341 for tx in transactions {
1342 write_batch
1343 .insert_batch(
1344 &self.perpetual_tables.transactions,
1345 [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1346 )?
1347 .insert_batch(
1348 &self.perpetual_tables.effects,
1349 [(tx.effects.digest(), &tx.effects)],
1350 )?;
1351 }
1352
1353 write_batch.write()?;
1354 Ok(())
1355 }
1356
1357 pub fn multi_get_transaction_blocks(
1358 &self,
1359 tx_digests: &[TransactionDigest],
1360 ) -> IotaResult<Vec<Option<VerifiedTransaction>>> {
1361 Ok(self
1362 .perpetual_tables
1363 .transactions
1364 .multi_get(tx_digests)
1365 .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())?)
1366 }
1367
1368 pub fn get_transaction_block(
1369 &self,
1370 tx_digest: &TransactionDigest,
1371 ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1372 self.perpetual_tables
1373 .transactions
1374 .get(tx_digest)
1375 .map(|v| v.map(|v| v.into()))
1376 }
1377
1378 pub fn get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
1386 get_iota_system_state(self.perpetual_tables.as_ref())
1387 }
1388
1389 pub fn expensive_check_iota_conservation<T>(
1390 self: &Arc<Self>,
1391 type_layout_store: T,
1392 old_epoch_store: &AuthorityPerEpochStore,
1393 epoch_supply_change: Option<i64>,
1394 ) -> IotaResult
1395 where
1396 T: TypeLayoutStore + Send + Copy,
1397 {
1398 if !self.enable_epoch_iota_conservation_check {
1399 return Ok(());
1400 }
1401
1402 let executor = old_epoch_store.executor();
1403 info!("Starting IOTA conservation check. This may take a while..");
1404 let cur_time = Instant::now();
1405 let mut pending_objects = vec![];
1406 let mut count = 0;
1407 let mut size = 0;
1408 let (mut total_iota, mut total_storage_rebate) = thread::scope(|s| {
1409 let pending_tasks = FuturesUnordered::new();
1410 for o in self.iter_live_object_set() {
1411 match o {
1412 LiveObject::Normal(object) => {
1413 size += object.object_size_for_gas_metering();
1414 count += 1;
1415 pending_objects.push(object);
1416 if count % 1_000_000 == 0 {
1417 let mut task_objects = vec![];
1418 mem::swap(&mut pending_objects, &mut task_objects);
1419 pending_tasks.push(s.spawn(move || {
1420 let mut layout_resolver =
1421 executor.type_layout_resolver(Box::new(type_layout_store));
1422 let mut total_storage_rebate = 0;
1423 let mut total_iota = 0;
1424 for object in task_objects {
1425 total_storage_rebate += object.storage_rebate;
1426 total_iota +=
1430 object.get_total_iota(layout_resolver.as_mut()).unwrap()
1431 - object.storage_rebate;
1432 }
1433 if count % 50_000_000 == 0 {
1434 info!("Processed {} objects", count);
1435 }
1436 (total_iota, total_storage_rebate)
1437 }));
1438 }
1439 }
1440 LiveObject::Wrapped(_) => {
1441 unreachable!("Explicitly asked to not include wrapped tombstones")
1442 }
1443 }
1444 }
1445 pending_tasks.into_iter().fold((0, 0), |init, result| {
1446 let result = result.join().unwrap();
1447 (init.0 + result.0, init.1 + result.1)
1448 })
1449 });
1450 let mut layout_resolver = executor.type_layout_resolver(Box::new(type_layout_store));
1451 for object in pending_objects {
1452 total_storage_rebate += object.storage_rebate;
1453 total_iota +=
1454 object.get_total_iota(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1455 }
1456 info!(
1457 "Scanned {} live objects, took {:?}",
1458 count,
1459 cur_time.elapsed()
1460 );
1461 self.metrics
1462 .iota_conservation_live_object_count
1463 .set(count as i64);
1464 self.metrics
1465 .iota_conservation_live_object_size
1466 .set(size as i64);
1467 self.metrics
1468 .iota_conservation_check_latency
1469 .set(cur_time.elapsed().as_secs() as i64);
1470
1471 let system_state: IotaSystemStateSummaryV2 = self
1474 .get_iota_system_state_object_unsafe()
1475 .expect("Reading iota system state object cannot fail")
1476 .into_iota_system_state_summary()
1477 .try_into()?;
1478 let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1479 info!(
1480 "Total IOTA amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1481 total_iota, storage_fund_balance, total_storage_rebate, system_state.epoch
1482 );
1483
1484 let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1485 self.metrics
1486 .iota_conservation_storage_fund
1487 .set(storage_fund_balance as i64);
1488 self.metrics
1489 .iota_conservation_storage_fund_imbalance
1490 .set(imbalance);
1491 self.metrics
1492 .iota_conservation_imbalance
1493 .set((total_iota as i128 - system_state.iota_total_supply as i128) as i64);
1494
1495 if let Some(expected_imbalance) = self
1496 .perpetual_tables
1497 .expected_storage_fund_imbalance
1498 .get(&())
1499 .map_err(|err| {
1500 IotaError::from(
1501 format!("failed to read expected storage fund imbalance: {err}").as_str(),
1502 )
1503 })?
1504 {
1505 fp_ensure!(
1506 imbalance == expected_imbalance,
1507 IotaError::from(
1508 format!(
1509 "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1510 system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1511 ).as_str()
1512 )
1513 );
1514 } else {
1515 self.perpetual_tables
1516 .expected_storage_fund_imbalance
1517 .insert(&(), &imbalance)
1518 .map_err(|err| {
1519 IotaError::from(
1520 format!("failed to write expected storage fund imbalance: {err}").as_str(),
1521 )
1522 })?;
1523 }
1524
1525 let total_supply = self
1526 .perpetual_tables
1527 .total_iota_supply
1528 .get(&())
1529 .map_err(|err| {
1530 IotaError::from(format!("failed to read total iota supply: {err}").as_str())
1531 })?;
1532
1533 match total_supply.zip(epoch_supply_change) {
1534 Some((old_supply, epoch_supply_change))
1539 if old_supply.last_check_epoch + 1 == old_epoch_store.epoch() =>
1540 {
1541 let expected_new_supply = if epoch_supply_change >= 0 {
1542 old_supply
1543 .total_supply
1544 .checked_add(epoch_supply_change.unsigned_abs())
1545 .ok_or_else(|| {
1546 IotaError::from(
1547 format!(
1548 "Inconsistent state detected at epoch {}: old supply {} + supply change {} overflowed",
1549 system_state.epoch, old_supply.total_supply, epoch_supply_change
1550 ).as_str())
1551 })?
1552 } else {
1553 old_supply.total_supply.checked_sub(epoch_supply_change.unsigned_abs()).ok_or_else(|| {
1554 IotaError::from(
1555 format!(
1556 "Inconsistent state detected at epoch {}: old supply {} - supply change {} underflowed",
1557 system_state.epoch, old_supply.total_supply, epoch_supply_change
1558 ).as_str())
1559 })?
1560 };
1561
1562 fp_ensure!(
1563 total_iota == expected_new_supply,
1564 IotaError::from(
1565 format!(
1566 "Inconsistent state detected at epoch {}: total iota: {}, expecting {}",
1567 system_state.epoch, total_iota, expected_new_supply
1568 )
1569 .as_str()
1570 )
1571 );
1572
1573 let new_supply = TotalIotaSupplyCheck {
1574 total_supply: expected_new_supply,
1575 last_check_epoch: old_epoch_store.epoch(),
1576 };
1577
1578 self.perpetual_tables
1579 .total_iota_supply
1580 .insert(&(), &new_supply)
1581 .map_err(|err| {
1582 IotaError::from(
1583 format!("failed to write total iota supply: {err}").as_str(),
1584 )
1585 })?;
1586 }
1587 _ => {
1590 info!("Skipping total supply check");
1591
1592 let supply = TotalIotaSupplyCheck {
1593 total_supply: total_iota,
1594 last_check_epoch: old_epoch_store.epoch(),
1595 };
1596
1597 self.perpetual_tables
1598 .total_iota_supply
1599 .insert(&(), &supply)
1600 .map_err(|err| {
1601 IotaError::from(
1602 format!("failed to write total iota supply: {err}").as_str(),
1603 )
1604 })?;
1605
1606 return Ok(());
1607 }
1608 };
1609
1610 Ok(())
1611 }
1612
1613 pub async fn prune_objects_and_compact_for_testing(
1614 &self,
1615 checkpoint_store: &Arc<CheckpointStore>,
1616 rest_index: Option<&RestIndexStore>,
1617 ) {
1618 let pruning_config = AuthorityStorePruningConfig {
1619 num_epochs_to_retain: 0,
1620 ..Default::default()
1621 };
1622 let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1623 &self.perpetual_tables,
1624 checkpoint_store,
1625 rest_index,
1626 None,
1627 pruning_config,
1628 AuthorityStorePruningMetrics::new_for_test(),
1629 EPOCH_DURATION_MS_FOR_TESTING,
1630 )
1631 .await;
1632 let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1633 }
1634
1635 #[cfg(test)]
1636 pub async fn prune_objects_immediately_for_testing(
1637 &self,
1638 transaction_effects: Vec<TransactionEffects>,
1639 ) -> anyhow::Result<()> {
1640 let mut wb = self.perpetual_tables.objects.batch();
1641
1642 let mut object_keys_to_prune = vec![];
1643 for effects in &transaction_effects {
1644 for (object_id, seq_number) in effects.modified_at_versions() {
1645 info!("Pruning object {:?} version {:?}", object_id, seq_number);
1646 object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1647 }
1648 }
1649
1650 wb.delete_batch(
1651 &self.perpetual_tables.objects,
1652 object_keys_to_prune.into_iter(),
1653 )?;
1654 wb.write()?;
1655 Ok(())
1656 }
1657
1658 #[cfg(msim)]
1659 pub fn remove_all_versions_of_object(&self, object_id: ObjectID) {
1660 let entries: Vec<_> = self
1661 .perpetual_tables
1662 .objects
1663 .unbounded_iter()
1664 .filter_map(|(key, _)| if key.0 == object_id { Some(key) } else { None })
1665 .collect();
1666 info!("Removing all versions of object: {:?}", entries);
1667 self.perpetual_tables.objects.multi_remove(entries).unwrap();
1668 }
1669
1670 #[cfg(msim)]
1673 pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1674 self.perpetual_tables
1675 .objects
1676 .safe_iter_with_bounds(
1677 Some(ObjectKey(object_id, VersionNumber::MIN_VALID_INCL)),
1678 Some(ObjectKey(object_id, VersionNumber::MAX_VALID_EXCL)),
1679 )
1680 .collect::<Result<Vec<_>, _>>()
1681 .unwrap()
1682 .len()
1683 }
1684}
1685
1686impl AccumulatorStore for AuthorityStore {
1687 fn get_root_state_accumulator_for_epoch(
1688 &self,
1689 epoch: EpochId,
1690 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
1691 self.perpetual_tables
1692 .root_state_hash_by_epoch
1693 .get(&epoch)
1694 .map_err(Into::into)
1695 }
1696
1697 fn get_root_state_accumulator_for_highest_epoch(
1698 &self,
1699 ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
1700 Ok(self
1701 .perpetual_tables
1702 .root_state_hash_by_epoch
1703 .reversed_safe_iter_with_bounds(None, None)?
1704 .next()
1705 .transpose()?)
1706 }
1707
1708 fn insert_state_accumulator_for_epoch(
1709 &self,
1710 epoch: EpochId,
1711 last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1712 acc: &Accumulator,
1713 ) -> IotaResult {
1714 self.perpetual_tables
1715 .root_state_hash_by_epoch
1716 .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1717 self.root_state_notify_read
1718 .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1719
1720 Ok(())
1721 }
1722
1723 fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1724 Box::new(self.perpetual_tables.iter_live_object_set())
1725 }
1726}
1727
1728impl ObjectStore for AuthorityStore {
1729 fn try_get_object(
1731 &self,
1732 object_id: &ObjectID,
1733 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
1734 self.perpetual_tables.as_ref().try_get_object(object_id)
1735 }
1736
1737 fn try_get_object_by_key(
1738 &self,
1739 object_id: &ObjectID,
1740 version: VersionNumber,
1741 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
1742 self.perpetual_tables
1743 .try_get_object_by_key(object_id, version)
1744 }
1745}
1746
1747pub struct ResolverWrapper {
1749 pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1750 pub metrics: Arc<ResolverMetrics>,
1751}
1752
1753impl ResolverWrapper {
1754 pub fn new(
1755 resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1756 metrics: Arc<ResolverMetrics>,
1757 ) -> Self {
1758 metrics.module_cache_size.set(0);
1759 ResolverWrapper { resolver, metrics }
1760 }
1761
1762 fn inc_cache_size_gauge(&self) {
1763 let current = self.metrics.module_cache_size.get();
1765 self.metrics.module_cache_size.set(current + 1);
1766 }
1767}
1768
1769impl ModuleResolver for ResolverWrapper {
1770 type Error = IotaError;
1771 fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1772 self.inc_cache_size_gauge();
1773 get_module(&*self.resolver, module_id)
1774 }
1775}
1776
1777pub enum UpdateType {
1778 Transaction(TransactionEffectsDigest),
1779 Genesis,
1780}
1781
1782pub type IotaLockResult = IotaResult<ObjectLockStatus>;
1783
1784#[derive(Debug, PartialEq, Eq)]
1785pub enum ObjectLockStatus {
1786 Initialized,
1787 LockedToTx { locked_by_tx: LockDetails }, LockedAtDifferentVersion { locked_ref: ObjectRef },
1789}