1use std::{iter, mem, ops::Not, sync::Arc, thread};
6
7use either::Either;
8use fastcrypto::hash::{HashFunction, Sha3_256};
9use futures::stream::FuturesUnordered;
10use iota_common::sync::notify_read::NotifyRead;
11use iota_config::{migration_tx_data::MigrationTxData, node::AuthorityStorePruningConfig};
12use iota_macros::fail_point_arg;
13use iota_storage::mutex_table::{MutexGuard, MutexTable, RwLockGuard, RwLockTable};
14use iota_types::{
15 accumulator::Accumulator,
16 base_types::SequenceNumber,
17 digests::TransactionEventsDigest,
18 effects::{TransactionEffects, TransactionEvents},
19 error::UserInputError,
20 execution::TypeLayoutStore,
21 fp_bail, fp_ensure,
22 iota_system_state::{
23 get_iota_system_state, iota_system_state_summary::IotaSystemStateSummaryV2,
24 },
25 message_envelope::Message,
26 storage::{
27 BackingPackageStore, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, get_module,
28 },
29};
30use itertools::izip;
31use move_core_types::resolver::ModuleResolver;
32use tokio::{
33 sync::{RwLockReadGuard, RwLockWriteGuard},
34 time::Instant,
35};
36use tracing::{debug, info, trace};
37use typed_store::{
38 TypedStoreError,
39 rocks::{DBBatch, DBMap, util::is_ref_count_value},
40 traits::Map,
41};
42
43use super::{
44 authority_store_tables::{AuthorityPerpetualTables, LiveObject},
45 *,
46};
47use crate::{
48 authority::{
49 authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails},
50 authority_store_pruner::{
51 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
52 },
53 authority_store_tables::TotalIotaSupplyCheck,
54 authority_store_types::{
55 ObjectContentDigest, StoreObject, StoreObjectPair, StoreObjectWrapper,
56 get_store_object_pair,
57 },
58 epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
59 },
60 rest_index::RestIndexStore,
61 state_accumulator::AccumulatorStore,
62 transaction_outputs::TransactionOutputs,
63};
64
65const NUM_SHARDS: usize = 4096;
66
67struct AuthorityStoreMetrics {
68 iota_conservation_check_latency: IntGauge,
69 iota_conservation_live_object_count: IntGauge,
70 iota_conservation_live_object_size: IntGauge,
71 iota_conservation_imbalance: IntGauge,
72 iota_conservation_storage_fund: IntGauge,
73 iota_conservation_storage_fund_imbalance: IntGauge,
74 epoch_flags: IntGaugeVec,
75}
76
77impl AuthorityStoreMetrics {
78 pub fn new(registry: &Registry) -> Self {
79 Self {
80 iota_conservation_check_latency: register_int_gauge_with_registry!(
81 "iota_conservation_check_latency",
82 "Number of seconds took to scan all live objects in the store for IOTA conservation check",
83 registry,
84 ).unwrap(),
85 iota_conservation_live_object_count: register_int_gauge_with_registry!(
86 "iota_conservation_live_object_count",
87 "Number of live objects in the store",
88 registry,
89 ).unwrap(),
90 iota_conservation_live_object_size: register_int_gauge_with_registry!(
91 "iota_conservation_live_object_size",
92 "Size in bytes of live objects in the store",
93 registry,
94 ).unwrap(),
95 iota_conservation_imbalance: register_int_gauge_with_registry!(
96 "iota_conservation_imbalance",
97 "Total amount of IOTA in the network - 10B * 10^9. This delta shows the amount of imbalance",
98 registry,
99 ).unwrap(),
100 iota_conservation_storage_fund: register_int_gauge_with_registry!(
101 "iota_conservation_storage_fund",
102 "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
103 registry,
104 ).unwrap(),
105 iota_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
106 "iota_conservation_storage_fund_imbalance",
107 "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
108 registry,
109 ).unwrap(),
110 epoch_flags: register_int_gauge_vec_with_registry!(
111 "epoch_flags",
112 "Local flags of the currently running epoch",
113 &["flag"],
114 registry,
115 ).unwrap(),
116 }
117 }
118}
119
120pub struct AuthorityStore {
129 mutex_table: MutexTable<ObjectDigest>,
131
132 pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
133
134 pub(crate) root_state_notify_read: NotifyRead<EpochId, (CheckpointSequenceNumber, Accumulator)>,
135
136 pub(crate) objects_lock_table: Arc<RwLockTable<ObjectContentDigest>>,
138
139 indirect_objects_threshold: usize,
140
141 enable_epoch_iota_conservation_check: bool,
143
144 metrics: AuthorityStoreMetrics,
145}
146
147pub type ExecutionLockReadGuard<'a> = RwLockReadGuard<'a, EpochId>;
148pub type ExecutionLockWriteGuard<'a> = RwLockWriteGuard<'a, EpochId>;
149
150impl AuthorityStore {
151 pub async fn open(
154 perpetual_tables: Arc<AuthorityPerpetualTables>,
155 genesis: &Genesis,
156 config: &NodeConfig,
157 registry: &Registry,
158 migration_tx_data: Option<&MigrationTxData>,
159 ) -> IotaResult<Arc<Self>> {
160 let indirect_objects_threshold = config.indirect_objects_threshold;
161 let enable_epoch_iota_conservation_check = config
162 .expensive_safety_check_config
163 .enable_epoch_iota_conservation_check();
164
165 let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
166 info!("Creating new epoch start config from genesis");
167
168 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
169 let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
170 fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
171 info!("Setting initial epoch flags to {:?}", flags);
172 initial_epoch_flags = flags;
173 });
174
175 let epoch_start_configuration = EpochStartConfiguration::new(
176 genesis.iota_system_object().into_epoch_start_state(),
177 *genesis.checkpoint().digest(),
178 &genesis.objects(),
179 initial_epoch_flags,
180 )?;
181 perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
182 epoch_start_configuration
183 } else {
184 info!("Loading epoch start config from DB");
185 perpetual_tables
186 .epoch_start_configuration
187 .get(&())?
188 .expect("Epoch start configuration must be set in non-empty DB")
189 };
190 let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
191 info!("Epoch start config: {:?}", epoch_start_configuration);
192 info!("Cur epoch: {:?}", cur_epoch);
193 let this = Self::open_inner(
194 genesis,
195 perpetual_tables,
196 indirect_objects_threshold,
197 enable_epoch_iota_conservation_check,
198 registry,
199 migration_tx_data,
200 )
201 .await?;
202 this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
203 Ok(this)
204 }
205
206 pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
207 for flag in old {
208 self.metrics
209 .epoch_flags
210 .with_label_values(&[&flag.to_string()])
211 .set(0);
212 }
213 for flag in new {
214 self.metrics
215 .epoch_flags
216 .with_label_values(&[&flag.to_string()])
217 .set(1);
218 }
219 }
220
221 pub fn clear_object_per_epoch_marker_table(
225 &self,
226 _execution_guard: &ExecutionLockWriteGuard<'_>,
227 ) -> IotaResult<()> {
228 Ok(self
233 .perpetual_tables
234 .object_per_epoch_marker_table
235 .schedule_delete_all()?)
236 }
237
238 pub async fn open_with_committee_for_testing(
239 perpetual_tables: Arc<AuthorityPerpetualTables>,
240 committee: &Committee,
241 genesis: &Genesis,
242 indirect_objects_threshold: usize,
243 ) -> IotaResult<Arc<Self>> {
244 assert_eq!(committee.epoch, 0);
247 Self::open_inner(
248 genesis,
249 perpetual_tables,
250 indirect_objects_threshold,
251 true,
252 &Registry::new(),
253 None,
254 )
255 .await
256 }
257
258 async fn open_inner(
259 genesis: &Genesis,
260 perpetual_tables: Arc<AuthorityPerpetualTables>,
261 indirect_objects_threshold: usize,
262 enable_epoch_iota_conservation_check: bool,
263 registry: &Registry,
264 migration_tx_data: Option<&MigrationTxData>,
265 ) -> IotaResult<Arc<Self>> {
266 let store = Arc::new(Self {
267 mutex_table: MutexTable::new(NUM_SHARDS),
268 perpetual_tables,
269 root_state_notify_read:
270 NotifyRead::<EpochId, (CheckpointSequenceNumber, Accumulator)>::new(),
271 objects_lock_table: Arc::new(RwLockTable::new(NUM_SHARDS)),
272 indirect_objects_threshold,
273 enable_epoch_iota_conservation_check,
274 metrics: AuthorityStoreMetrics::new(registry),
275 });
276 if store
278 .database_is_empty()
279 .expect("database read should not fail at init.")
280 {
281 store
284 .bulk_insert_genesis_objects(genesis.objects())
285 .expect("cannot bulk insert genesis objects");
286
287 let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
289 store
290 .perpetual_tables
291 .transactions
292 .insert(transaction.digest(), transaction.serializable_ref())
293 .expect("cannot insert genesis transaction");
294 store
295 .perpetual_tables
296 .effects
297 .insert(&genesis.effects().digest(), genesis.effects())
298 .expect("cannot insert genesis effects");
299
300 let event_digests = genesis.events().digest();
305 let events = genesis
306 .events()
307 .data
308 .iter()
309 .enumerate()
310 .map(|(i, e)| ((event_digests, i), e));
311 store.perpetual_tables.events.multi_insert(events).unwrap();
312
313 if let Some(migration_transactions) = migration_tx_data {
315 let txs_data = migration_transactions.txs_data();
318
319 for (_, execution_digest) in genesis
323 .checkpoint_contents()
324 .enumerate_transactions(&genesis.checkpoint())
325 {
326 let tx_digest = &execution_digest.transaction;
327 if tx_digest == genesis.transaction().digest() {
330 continue;
331 }
332 let Some((tx, effects, events)) = txs_data.get(tx_digest) else {
335 panic!("tx digest not found in migrated objects blob");
336 };
337 let transaction = VerifiedTransaction::new_unchecked(tx.clone());
338 let objects = migration_transactions
339 .objects_by_tx_digest(*tx_digest)
340 .expect("the migration data is corrupted");
341 store
342 .bulk_insert_genesis_objects(&objects)
343 .expect("cannot bulk insert migrated objects");
344 store
345 .perpetual_tables
346 .transactions
347 .insert(transaction.digest(), transaction.serializable_ref())
348 .expect("cannot insert migration transaction");
349 store
350 .perpetual_tables
351 .effects
352 .insert(&effects.digest(), effects)
353 .expect("cannot insert migration effects");
354 let events = events
355 .data
356 .iter()
357 .enumerate()
358 .map(|(i, e)| ((events.digest(), i), e));
359 store.perpetual_tables.events.multi_insert(events).unwrap();
360 }
361 }
362 }
363
364 Ok(store)
365 }
366
367 pub fn open_no_genesis(
371 perpetual_tables: Arc<AuthorityPerpetualTables>,
372 indirect_objects_threshold: usize,
373 enable_epoch_iota_conservation_check: bool,
374 registry: &Registry,
375 ) -> IotaResult<Arc<Self>> {
376 let store = Arc::new(Self {
377 mutex_table: MutexTable::new(NUM_SHARDS),
378 perpetual_tables,
379 root_state_notify_read:
380 NotifyRead::<EpochId, (CheckpointSequenceNumber, Accumulator)>::new(),
381 objects_lock_table: Arc::new(RwLockTable::new(NUM_SHARDS)),
382 indirect_objects_threshold,
383 enable_epoch_iota_conservation_check,
384 metrics: AuthorityStoreMetrics::new(registry),
385 });
386 Ok(store)
387 }
388
389 pub fn get_recovery_epoch_at_restart(&self) -> IotaResult<EpochId> {
390 self.perpetual_tables.get_recovery_epoch_at_restart()
391 }
392
393 pub fn get_effects(
394 &self,
395 effects_digest: &TransactionEffectsDigest,
396 ) -> IotaResult<Option<TransactionEffects>> {
397 Ok(self.perpetual_tables.effects.get(effects_digest)?)
398 }
399
400 pub fn effects_exists(&self, effects_digest: &TransactionEffectsDigest) -> IotaResult<bool> {
402 self.perpetual_tables
403 .effects
404 .contains_key(effects_digest)
405 .map_err(|e| e.into())
406 }
407
408 pub fn get_events(
409 &self,
410 event_digest: &TransactionEventsDigest,
411 ) -> Result<Option<TransactionEvents>, TypedStoreError> {
412 let data = self
413 .perpetual_tables
414 .events
415 .safe_range_iter((*event_digest, 0)..=(*event_digest, usize::MAX))
416 .map_ok(|(_, event)| event)
417 .collect::<Result<Vec<_>, TypedStoreError>>()?;
418 Ok(data.is_empty().not().then_some(TransactionEvents { data }))
419 }
420
421 pub fn multi_get_events(
422 &self,
423 event_digests: &[TransactionEventsDigest],
424 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
425 Ok(event_digests
426 .iter()
427 .map(|digest| self.get_events(digest))
428 .collect::<Result<Vec<_>, _>>()?)
429 }
430
431 pub fn multi_get_effects<'a>(
432 &self,
433 effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
434 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
435 Ok(self.perpetual_tables.effects.multi_get(effects_digests)?)
436 }
437
438 pub fn get_executed_effects(
439 &self,
440 tx_digest: &TransactionDigest,
441 ) -> IotaResult<Option<TransactionEffects>> {
442 let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
443 match effects_digest {
444 Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
445 None => Ok(None),
446 }
447 }
448
449 pub fn multi_get_executed_effects_digests(
453 &self,
454 digests: &[TransactionDigest],
455 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
456 Ok(self.perpetual_tables.executed_effects.multi_get(digests)?)
457 }
458
459 pub fn multi_get_executed_effects(
463 &self,
464 digests: &[TransactionDigest],
465 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
466 let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
467 let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
468 let mut tx_to_effects_map = effects
469 .into_iter()
470 .flatten()
471 .map(|effects| (*effects.transaction_digest(), effects))
472 .collect::<HashMap<_, _>>();
473 Ok(digests
474 .iter()
475 .map(|digest| tx_to_effects_map.remove(digest))
476 .collect())
477 }
478
479 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
480 Ok(self
481 .perpetual_tables
482 .executed_effects
483 .contains_key(digest)?)
484 }
485
486 pub fn get_marker_value(
487 &self,
488 object_id: &ObjectID,
489 version: &SequenceNumber,
490 epoch_id: EpochId,
491 ) -> IotaResult<Option<MarkerValue>> {
492 let object_key = (epoch_id, ObjectKey(*object_id, *version));
493 Ok(self
494 .perpetual_tables
495 .object_per_epoch_marker_table
496 .get(&object_key)?)
497 }
498
499 pub fn get_latest_marker(
500 &self,
501 object_id: &ObjectID,
502 epoch_id: EpochId,
503 ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
504 let min_key = (epoch_id, ObjectKey::min_for_id(object_id));
505 let max_key = (epoch_id, ObjectKey::max_for_id(object_id));
506
507 let marker_entry = self
508 .perpetual_tables
509 .object_per_epoch_marker_table
510 .safe_iter_with_bounds(Some(min_key), Some(max_key))
511 .skip_prior_to(&max_key)?
512 .next();
513 match marker_entry {
514 Some(Ok(((epoch, key), marker))) => {
515 assert_eq!(epoch, epoch_id);
517 assert_eq!(key.0, *object_id);
518 Ok(Some((key.1, marker)))
519 }
520 Some(Err(e)) => Err(e.into()),
521 None => Ok(None),
522 }
523 }
524
525 pub async fn notify_read_root_state_hash(
528 &self,
529 epoch: EpochId,
530 ) -> IotaResult<(CheckpointSequenceNumber, Accumulator)> {
531 let registration = self.root_state_notify_read.register_one(&epoch);
534 let hash = self.perpetual_tables.root_state_hash_by_epoch.get(&epoch)?;
535
536 let result = match hash {
537 Some(ready) => Either::Left(futures::future::ready(ready)),
539 None => Either::Right(registration),
540 }
541 .await;
542
543 Ok(result)
544 }
545
546 pub(crate) fn insert_finalized_transactions_perpetual_checkpoints(
548 &self,
549 digests: &[TransactionDigest],
550 epoch: EpochId,
551 sequence: CheckpointSequenceNumber,
552 ) -> IotaResult {
553 let mut batch = self
554 .perpetual_tables
555 .executed_transactions_to_checkpoint
556 .batch();
557 batch.insert_batch(
558 &self.perpetual_tables.executed_transactions_to_checkpoint,
559 digests.iter().map(|d| (*d, (epoch, sequence))),
560 )?;
561 batch.write()?;
562 trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
563 Ok(())
564 }
565
566 pub(crate) fn get_transaction_perpetual_checkpoint(
568 &self,
569 digest: &TransactionDigest,
570 ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
571 Ok(self
572 .perpetual_tables
573 .executed_transactions_to_checkpoint
574 .get(digest)?)
575 }
576
577 pub(crate) fn multi_get_transactions_perpetual_checkpoints(
579 &self,
580 digests: &[TransactionDigest],
581 ) -> IotaResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
582 Ok(self
583 .perpetual_tables
584 .executed_transactions_to_checkpoint
585 .multi_get(digests)?)
586 }
587
588 pub fn database_is_empty(&self) -> IotaResult<bool> {
590 self.perpetual_tables.database_is_empty()
591 }
592
593 async fn acquire_locks(&self, input_objects: &[ObjectRef]) -> Vec<MutexGuard> {
596 self.mutex_table
597 .acquire_locks(input_objects.iter().map(|(_, _, digest)| *digest))
598 .await
599 }
600
601 pub fn object_exists_by_key(
602 &self,
603 object_id: &ObjectID,
604 version: VersionNumber,
605 ) -> IotaResult<bool> {
606 Ok(self
607 .perpetual_tables
608 .objects
609 .contains_key(&ObjectKey(*object_id, version))?)
610 }
611
612 pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
613 Ok(self
614 .perpetual_tables
615 .objects
616 .multi_contains_keys(object_keys.to_vec())?
617 .into_iter()
618 .collect())
619 }
620
621 pub fn multi_get_objects_by_key(
622 &self,
623 object_keys: &[ObjectKey],
624 ) -> Result<Vec<Option<Object>>, IotaError> {
625 let wrappers = self
626 .perpetual_tables
627 .objects
628 .multi_get(object_keys.to_vec())?;
629 let mut ret = vec![];
630
631 for (idx, w) in wrappers.into_iter().enumerate() {
632 ret.push(
633 w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
634 .transpose()?
635 .flatten(),
636 );
637 }
638 Ok(ret)
639 }
640
641 pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, IotaError> {
643 let mut result = Vec::new();
644 for id in objects {
645 result.push(self.get_object(id)?);
646 }
647 Ok(result)
648 }
649
650 pub fn have_deleted_owned_object_at_version_or_after(
651 &self,
652 object_id: &ObjectID,
653 version: VersionNumber,
654 epoch_id: EpochId,
655 ) -> Result<bool, IotaError> {
656 let object_key = ObjectKey::max_for_id(object_id);
657 let marker_key = (epoch_id, object_key);
658
659 let marker_entry = self
662 .perpetual_tables
663 .object_per_epoch_marker_table
664 .unbounded_iter()
665 .skip_prior_to(&marker_key)?
666 .next();
667 match marker_entry {
668 Some(((epoch, key), marker)) => {
669 let object_data_ok = key.0 == *object_id && key.1 >= version;
671 let epoch_data_ok = epoch == epoch_id;
673 let mark_data_ok = marker == MarkerValue::OwnedDeleted;
675 Ok(object_data_ok && epoch_data_ok && mark_data_ok)
676 }
677 None => Ok(false),
678 }
679 }
680
681 pub(crate) fn insert_genesis_object(&self, object: Object) -> IotaResult {
686 debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
688 let object_ref = object.compute_object_reference();
689 self.insert_object_direct(object_ref, &object)
690 }
691
692 fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> IotaResult {
696 let mut write_batch = self.perpetual_tables.objects.batch();
697
698 let StoreObjectPair(store_object, indirect_object) =
700 get_store_object_pair(object.clone(), self.indirect_objects_threshold);
701 write_batch.insert_batch(
702 &self.perpetual_tables.objects,
703 std::iter::once((ObjectKey::from(object_ref), store_object)),
704 )?;
705 if let Some(indirect_obj) = indirect_object {
706 write_batch.insert_batch(
707 &self.perpetual_tables.indirect_move_objects,
708 std::iter::once((indirect_obj.inner().digest(), indirect_obj)),
709 )?;
710 }
711
712 if object.get_single_owner().is_some() {
714 if !object.is_child_object() {
716 self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref])?;
717 }
718 }
719
720 write_batch.write()?;
721
722 Ok(())
723 }
724
725 #[instrument(level = "debug", skip_all)]
728 pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> IotaResult<()> {
729 let mut batch = self.perpetual_tables.objects.batch();
730 let ref_and_objects: Vec<_> = objects
731 .iter()
732 .map(|o| (o.compute_object_reference(), o))
733 .collect();
734
735 batch
736 .insert_batch(
737 &self.perpetual_tables.objects,
738 ref_and_objects.iter().map(|(oref, o)| {
739 (
740 ObjectKey::from(oref),
741 get_store_object_pair((*o).clone(), self.indirect_objects_threshold).0,
742 )
743 }),
744 )?
745 .insert_batch(
746 &self.perpetual_tables.indirect_move_objects,
747 ref_and_objects.iter().filter_map(|(_, o)| {
748 let StoreObjectPair(_, indirect_object) =
749 get_store_object_pair((*o).clone(), self.indirect_objects_threshold);
750 indirect_object.map(|obj| (obj.inner().digest(), obj))
751 }),
752 )?;
753
754 let non_child_object_refs: Vec<_> = ref_and_objects
755 .iter()
756 .filter(|(_, object)| !object.is_child_object())
757 .map(|(oref, _)| *oref)
758 .collect();
759
760 self.initialize_live_object_markers_impl(&mut batch, &non_child_object_refs)?;
761
762 batch.write()?;
763
764 Ok(())
765 }
766
767 pub fn bulk_insert_live_objects(
768 perpetual_db: &AuthorityPerpetualTables,
769 live_objects: impl Iterator<Item = LiveObject>,
770 indirect_objects_threshold: usize,
771 expected_sha3_digest: &[u8; 32],
772 ) -> IotaResult<()> {
773 let mut hasher = Sha3_256::default();
774 let mut batch = perpetual_db.objects.batch();
775 for object in live_objects {
776 hasher.update(object.object_reference().2.inner());
777 match object {
778 LiveObject::Normal(object) => {
779 let StoreObjectPair(store_object_wrapper, indirect_object) =
780 get_store_object_pair(object.clone(), indirect_objects_threshold);
781 batch.insert_batch(
782 &perpetual_db.objects,
783 std::iter::once((
784 ObjectKey::from(object.compute_object_reference()),
785 store_object_wrapper,
786 )),
787 )?;
788 if let Some(indirect_object) = indirect_object {
789 batch.merge_batch(
790 &perpetual_db.indirect_move_objects,
791 iter::once((indirect_object.inner().digest(), indirect_object)),
792 )?;
793 }
794 if !object.is_child_object() {
795 Self::initialize_live_object_markers(
796 &perpetual_db.live_owned_object_markers,
797 &mut batch,
798 &[object.compute_object_reference()],
799 )?;
800 }
801 }
802 LiveObject::Wrapped(object_key) => {
803 batch.insert_batch(
804 &perpetual_db.objects,
805 std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
806 object_key,
807 StoreObject::Wrapped.into(),
808 )),
809 )?;
810 }
811 }
812 }
813 let sha3_digest = hasher.finalize().digest;
814 if *expected_sha3_digest != sha3_digest {
815 error!(
816 "Sha does not match! expected: {:?}, actual: {:?}",
817 expected_sha3_digest, sha3_digest
818 );
819 return Err(IotaError::from("Sha does not match"));
820 }
821 batch.write()?;
822 Ok(())
823 }
824
825 pub fn set_epoch_start_configuration(
826 &self,
827 epoch_start_configuration: &EpochStartConfiguration,
828 ) -> IotaResult {
829 self.perpetual_tables
830 .set_epoch_start_configuration(epoch_start_configuration)?;
831 Ok(())
832 }
833
834 pub fn get_epoch_start_configuration(&self) -> IotaResult<Option<EpochStartConfiguration>> {
835 Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
836 }
837
838 #[instrument(level = "trace", skip_all)]
840 async fn acquire_read_locks_for_indirect_objects(
841 &self,
842 written: &[Object],
843 ) -> Vec<RwLockGuard> {
844 let digests = written
855 .iter()
856 .filter_map(|object| {
857 let StoreObjectPair(_, indirect_object) =
858 get_store_object_pair(object.clone(), self.indirect_objects_threshold);
859 indirect_object.map(|obj| obj.inner().digest())
860 })
861 .collect();
862 self.objects_lock_table.acquire_read_locks(digests).await
863 }
864
865 #[instrument(level = "debug", skip_all)]
871 pub async fn write_transaction_outputs(
872 &self,
873 epoch_id: EpochId,
874 tx_outputs: &[Arc<TransactionOutputs>],
875 ) -> IotaResult {
876 let mut written = Vec::with_capacity(tx_outputs.len());
877 for outputs in tx_outputs {
878 written.extend(outputs.written.values().cloned());
879 }
880
881 let _locks = self.acquire_read_locks_for_indirect_objects(&written).await;
882
883 let mut write_batch = self.perpetual_tables.transactions.batch();
884 for outputs in tx_outputs {
885 self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
886 }
887 fail_point_async!("crash");
889
890 write_batch.write()?;
891 trace!(
892 "committed transactions: {:?}",
893 tx_outputs
894 .iter()
895 .map(|tx| tx.transaction.digest())
896 .collect::<Vec<_>>()
897 );
898
899 fail_point_async!("crash");
901
902 Ok(())
903 }
904
905 fn write_one_transaction_outputs(
906 &self,
907 write_batch: &mut DBBatch,
908 epoch_id: EpochId,
909 tx_outputs: &TransactionOutputs,
910 ) -> IotaResult {
911 let TransactionOutputs {
912 transaction,
913 effects,
914 markers,
915 wrapped,
916 deleted,
917 written,
918 events,
919 live_object_markers_to_delete,
920 new_live_object_markers_to_init,
921 ..
922 } = tx_outputs;
923
924 let transaction_digest = transaction.digest();
926 write_batch.insert_batch(
927 &self.perpetual_tables.transactions,
928 iter::once((transaction_digest, transaction.serializable_ref())),
929 )?;
930
931 let effects_digest = effects.digest();
933
934 write_batch.insert_batch(
935 &self.perpetual_tables.object_per_epoch_marker_table,
936 markers
937 .iter()
938 .map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
939 )?;
940
941 write_batch.insert_batch(
942 &self.perpetual_tables.objects,
943 deleted
944 .iter()
945 .map(|key| (key, StoreObject::Deleted))
946 .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
947 .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
948 )?;
949
950 let (new_objects, new_indirect_move_objects): (Vec<_>, Vec<_>) = written
952 .iter()
953 .map(|(id, new_object)| {
954 let version = new_object.version();
955 debug!(?id, ?version, "writing object");
956 let StoreObjectPair(store_object, indirect_object) =
957 get_store_object_pair(new_object.clone(), self.indirect_objects_threshold);
958 (
959 (ObjectKey(*id, version), store_object),
960 indirect_object.map(|obj| (obj.inner().digest(), obj)),
961 )
962 })
963 .unzip();
964
965 let indirect_objects: Vec<_> = new_indirect_move_objects.into_iter().flatten().collect();
966 let existing_digests = self
967 .perpetual_tables
968 .indirect_move_objects
969 .multi_get_raw_bytes(indirect_objects.iter().map(|(digest, _)| digest))?;
970 let (existing_indirect_objects, new_indirect_objects): (Vec<_>, Vec<_>) = indirect_objects
974 .into_iter()
975 .enumerate()
976 .partition(|(idx, _)| matches!(&existing_digests[*idx], Some(value) if !is_ref_count_value(value)));
977
978 write_batch.insert_batch(&self.perpetual_tables.objects, new_objects.into_iter())?;
979 if !new_indirect_objects.is_empty() {
980 write_batch.merge_batch(
981 &self.perpetual_tables.indirect_move_objects,
982 new_indirect_objects.into_iter().map(|(_, pair)| pair),
983 )?;
984 }
985 if !existing_indirect_objects.is_empty() {
986 write_batch.partial_merge_batch(
987 &self.perpetual_tables.indirect_move_objects,
988 existing_indirect_objects
989 .into_iter()
990 .map(|(_, (digest, _))| (digest, 1_u64.to_le_bytes())),
991 )?;
992 }
993
994 let event_digest = events.digest();
995 let events = events
996 .data
997 .iter()
998 .enumerate()
999 .map(|(i, e)| ((event_digest, i), e));
1000
1001 write_batch.insert_batch(&self.perpetual_tables.events, events)?;
1002
1003 self.initialize_live_object_markers_impl(write_batch, new_live_object_markers_to_init)?;
1004
1005 self.delete_live_object_markers(write_batch, live_object_markers_to_delete)?;
1008
1009 write_batch
1010 .insert_batch(
1011 &self.perpetual_tables.effects,
1012 [(effects_digest, effects.clone())],
1013 )?
1014 .insert_batch(
1015 &self.perpetual_tables.executed_effects,
1016 [(transaction_digest, effects_digest)],
1017 )?;
1018
1019 debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
1020
1021 Ok(())
1022 }
1023
1024 pub(crate) fn commit_transactions(
1027 &self,
1028 transactions: &[(TransactionDigest, VerifiedTransaction)],
1029 ) -> IotaResult {
1030 let mut batch = self.perpetual_tables.transactions.batch();
1031 batch.insert_batch(
1032 &self.perpetual_tables.transactions,
1033 transactions
1034 .iter()
1035 .map(|(digest, tx)| (*digest, tx.serializable_ref())),
1036 )?;
1037 batch.write()?;
1038 Ok(())
1039 }
1040
1041 pub async fn acquire_transaction_locks(
1042 &self,
1043 epoch_store: &AuthorityPerEpochStore,
1044 owned_input_objects: &[ObjectRef],
1045 transaction: VerifiedSignedTransaction,
1046 ) -> IotaResult {
1047 let tx_digest = *transaction.digest();
1048 let _mutexes = self.acquire_locks(owned_input_objects).await;
1052
1053 trace!(?owned_input_objects, "acquire_transaction_locks");
1054 let mut locks_to_write = Vec::new();
1055
1056 let live_object_markers = self
1057 .perpetual_tables
1058 .live_owned_object_markers
1059 .multi_get(owned_input_objects)?;
1060
1061 let epoch_tables = epoch_store.tables()?;
1062
1063 let locks = epoch_tables.multi_get_locked_transactions(owned_input_objects)?;
1064
1065 assert_eq!(locks.len(), live_object_markers.len());
1066
1067 for (live_marker, lock, obj_ref) in izip!(
1068 live_object_markers.into_iter(),
1069 locks.into_iter(),
1070 owned_input_objects
1071 ) {
1072 if live_marker.is_none() {
1073 let latest_live_version = self.get_latest_live_version_for_object_id(obj_ref.0)?;
1075 fp_bail!(
1076 UserInputError::ObjectVersionUnavailableForConsumption {
1077 provided_obj_ref: *obj_ref,
1078 current_version: latest_live_version.1
1079 }
1080 .into()
1081 );
1082 };
1083
1084 if let Some(previous_tx_digest) = &lock {
1085 if previous_tx_digest == &tx_digest {
1086 continue;
1088 } else {
1089 info!(prev_tx_digest = ?previous_tx_digest,
1091 cur_tx_digest = ?tx_digest,
1092 "Cannot acquire lock: conflicting transaction!");
1093 return Err(IotaError::ObjectLockConflict {
1094 obj_ref: *obj_ref,
1095 pending_transaction: *previous_tx_digest,
1096 });
1097 }
1098 }
1099
1100 locks_to_write.push((*obj_ref, tx_digest));
1101 }
1102
1103 if !locks_to_write.is_empty() {
1104 trace!(?locks_to_write, "Writing locks");
1105 epoch_tables.write_transaction_locks(transaction, locks_to_write.into_iter())?;
1106 }
1107
1108 Ok(())
1109 }
1110
1111 pub(crate) fn get_lock(
1115 &self,
1116 obj_ref: ObjectRef,
1117 epoch_store: &AuthorityPerEpochStore,
1118 ) -> IotaLockResult {
1119 if self
1120 .perpetual_tables
1121 .live_owned_object_markers
1122 .get(&obj_ref)?
1123 .is_none()
1124 {
1125 return Ok(ObjectLockStatus::LockedAtDifferentVersion {
1127 locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
1128 });
1129 }
1130
1131 let tables = epoch_store.tables()?;
1132 if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
1133 Ok(ObjectLockStatus::LockedToTx {
1134 locked_by_tx: tx_digest,
1135 })
1136 } else {
1137 Ok(ObjectLockStatus::Initialized)
1138 }
1139 }
1140
1141 pub(crate) fn get_latest_live_version_for_object_id(
1144 &self,
1145 object_id: ObjectID,
1146 ) -> IotaResult<ObjectRef> {
1147 let mut iterator = self
1148 .perpetual_tables
1149 .live_owned_object_markers
1150 .unbounded_iter()
1151 .skip_prior_to(&(object_id, SequenceNumber::MAX, ObjectDigest::MAX))?;
1153 Ok(iterator
1154 .next()
1155 .and_then(|value| {
1156 if value.0.0 == object_id {
1157 Some(value)
1158 } else {
1159 None
1160 }
1161 })
1162 .ok_or_else(|| {
1163 IotaError::from(UserInputError::ObjectNotFound {
1164 object_id,
1165 version: None,
1166 })
1167 })?
1168 .0)
1169 }
1170
1171 pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> IotaResult {
1177 let live_markers = self
1178 .perpetual_tables
1179 .live_owned_object_markers
1180 .multi_get(objects)?;
1181 for (live_marker, obj_ref) in live_markers.into_iter().zip(objects) {
1182 if live_marker.is_none() {
1183 let latest_live_version = self.get_latest_live_version_for_object_id(obj_ref.0)?;
1185 fp_bail!(
1186 UserInputError::ObjectVersionUnavailableForConsumption {
1187 provided_obj_ref: *obj_ref,
1188 current_version: latest_live_version.1
1189 }
1190 .into()
1191 );
1192 }
1193 }
1194 Ok(())
1195 }
1196
1197 fn initialize_live_object_markers_impl(
1199 &self,
1200 write_batch: &mut DBBatch,
1201 objects: &[ObjectRef],
1202 ) -> IotaResult {
1203 AuthorityStore::initialize_live_object_markers(
1204 &self.perpetual_tables.live_owned_object_markers,
1205 write_batch,
1206 objects,
1207 )
1208 }
1209
1210 pub fn initialize_live_object_markers(
1211 live_object_marker_table: &DBMap<ObjectRef, ()>,
1212 write_batch: &mut DBBatch,
1213 objects: &[ObjectRef],
1214 ) -> IotaResult {
1215 trace!(?objects, "initialize_live_object_markers");
1216
1217 write_batch.insert_batch(
1218 live_object_marker_table,
1219 objects.iter().map(|obj_ref| (obj_ref, ())),
1220 )?;
1221 Ok(())
1222 }
1223
1224 fn delete_live_object_markers(
1226 &self,
1227 write_batch: &mut DBBatch,
1228 objects: &[ObjectRef],
1229 ) -> IotaResult {
1230 trace!(?objects, "delete_live_object_markers");
1231 write_batch.delete_batch(
1232 &self.perpetual_tables.live_owned_object_markers,
1233 objects.iter(),
1234 )?;
1235 Ok(())
1236 }
1237
1238 #[cfg(test)]
1239 pub(crate) fn reset_locks_and_live_markers_for_test(
1240 &self,
1241 transactions: &[TransactionDigest],
1242 objects: &[ObjectRef],
1243 epoch_store: &AuthorityPerEpochStore,
1244 ) {
1245 for tx in transactions {
1246 epoch_store.delete_signed_transaction_for_test(tx);
1247 epoch_store.delete_object_locks_for_test(objects);
1248 }
1249
1250 let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1251 batch
1252 .delete_batch(
1253 &self.perpetual_tables.live_owned_object_markers,
1254 objects.iter(),
1255 )
1256 .unwrap();
1257 batch.write().unwrap();
1258
1259 let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1260 self.initialize_live_object_markers_impl(&mut batch, objects)
1261 .unwrap();
1262 batch.write().unwrap();
1263 }
1264
1265 pub fn revert_state_update(&self, tx_digest: &TransactionDigest) -> IotaResult {
1279 let Some(effects) = self.get_executed_effects(tx_digest)? else {
1280 info!("Not reverting {:?} as it was not executed", tx_digest);
1281 return Ok(());
1282 };
1283
1284 info!(?tx_digest, ?effects, "reverting transaction");
1285
1286 assert!(effects.input_shared_objects().is_empty());
1288
1289 let mut write_batch = self.perpetual_tables.transactions.batch();
1290 write_batch.delete_batch(
1291 &self.perpetual_tables.executed_effects,
1292 iter::once(tx_digest),
1293 )?;
1294 if let Some(events_digest) = effects.events_digest() {
1295 write_batch.schedule_delete_range(
1296 &self.perpetual_tables.events,
1297 &(*events_digest, usize::MIN),
1298 &(*events_digest, usize::MAX),
1299 )?;
1300 }
1301
1302 let tombstones = effects
1303 .all_tombstones()
1304 .into_iter()
1305 .map(|(id, version)| ObjectKey(id, version));
1306 write_batch.delete_batch(&self.perpetual_tables.objects, tombstones)?;
1307
1308 let all_new_object_keys = effects
1309 .all_changed_objects()
1310 .into_iter()
1311 .map(|((id, version, _), _, _)| ObjectKey(id, version));
1312 write_batch.delete_batch(&self.perpetual_tables.objects, all_new_object_keys.clone())?;
1313
1314 let modified_object_keys = effects
1315 .modified_at_versions()
1316 .into_iter()
1317 .map(|(id, version)| ObjectKey(id, version));
1318
1319 macro_rules! get_objects_and_locks {
1320 ($object_keys: expr) => {
1321 self.perpetual_tables
1322 .objects
1323 .multi_get($object_keys.clone())?
1324 .into_iter()
1325 .zip($object_keys)
1326 .filter_map(|(obj_opt, key)| {
1327 let obj = self
1328 .perpetual_tables
1329 .object(
1330 &key,
1331 obj_opt.unwrap_or_else(|| {
1332 panic!("Older object version not found: {:?}", key)
1333 }),
1334 )
1335 .expect("Matching indirect object not found")?;
1336
1337 if obj.is_immutable() {
1338 return None;
1339 }
1340
1341 let obj_ref = obj.compute_object_reference();
1342 Some(obj.is_address_owned().then_some(obj_ref))
1343 })
1344 };
1345 }
1346
1347 let old_locks = get_objects_and_locks!(modified_object_keys);
1348 let new_locks = get_objects_and_locks!(all_new_object_keys);
1349
1350 let old_locks: Vec<_> = old_locks.flatten().collect();
1351
1352 self.initialize_live_object_markers_impl(&mut write_batch, &old_locks)?;
1354
1355 write_batch.delete_batch(
1357 &self.perpetual_tables.live_owned_object_markers,
1358 new_locks.flatten(),
1359 )?;
1360
1361 write_batch.write()?;
1362
1363 Ok(())
1364 }
1365
1366 pub fn find_object_lt_or_eq_version(
1372 &self,
1373 object_id: ObjectID,
1374 version: SequenceNumber,
1375 ) -> IotaResult<Option<Object>> {
1376 self.perpetual_tables
1377 .find_object_lt_or_eq_version(object_id, version)
1378 }
1379
1380 pub fn get_latest_object_ref_or_tombstone(
1391 &self,
1392 object_id: ObjectID,
1393 ) -> Result<Option<ObjectRef>, IotaError> {
1394 self.perpetual_tables
1395 .get_latest_object_ref_or_tombstone(object_id)
1396 }
1397
1398 pub fn get_latest_object_ref_if_alive(
1401 &self,
1402 object_id: ObjectID,
1403 ) -> Result<Option<ObjectRef>, IotaError> {
1404 match self.get_latest_object_ref_or_tombstone(object_id)? {
1405 Some(objref) if objref.2.is_alive() => Ok(Some(objref)),
1406 _ => Ok(None),
1407 }
1408 }
1409
1410 pub fn get_latest_object_or_tombstone(
1415 &self,
1416 object_id: ObjectID,
1417 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
1418 let Some((object_key, store_object)) = self
1419 .perpetual_tables
1420 .get_latest_object_or_tombstone(object_id)?
1421 else {
1422 return Ok(None);
1423 };
1424
1425 if let Some(object_ref) = self
1426 .perpetual_tables
1427 .tombstone_reference(&object_key, &store_object)?
1428 {
1429 return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1430 }
1431
1432 let object = self
1433 .perpetual_tables
1434 .object(&object_key, store_object)?
1435 .expect("Non tombstone store object could not be converted to object");
1436
1437 Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1438 }
1439
1440 pub fn insert_transaction_and_effects(
1441 &self,
1442 transaction: &VerifiedTransaction,
1443 transaction_effects: &TransactionEffects,
1444 ) -> Result<(), TypedStoreError> {
1445 let mut write_batch = self.perpetual_tables.transactions.batch();
1446 write_batch
1447 .insert_batch(
1448 &self.perpetual_tables.transactions,
1449 [(transaction.digest(), transaction.serializable_ref())],
1450 )?
1451 .insert_batch(
1452 &self.perpetual_tables.effects,
1453 [(transaction_effects.digest(), transaction_effects)],
1454 )?;
1455
1456 write_batch.write()?;
1457 Ok(())
1458 }
1459
1460 pub fn multi_insert_transaction_and_effects<'a>(
1461 &self,
1462 transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1463 ) -> Result<(), TypedStoreError> {
1464 let mut write_batch = self.perpetual_tables.transactions.batch();
1465 for tx in transactions {
1466 write_batch
1467 .insert_batch(
1468 &self.perpetual_tables.transactions,
1469 [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1470 )?
1471 .insert_batch(
1472 &self.perpetual_tables.effects,
1473 [(tx.effects.digest(), &tx.effects)],
1474 )?;
1475 }
1476
1477 write_batch.write()?;
1478 Ok(())
1479 }
1480
1481 pub fn multi_get_transaction_blocks(
1482 &self,
1483 tx_digests: &[TransactionDigest],
1484 ) -> IotaResult<Vec<Option<VerifiedTransaction>>> {
1485 Ok(self
1486 .perpetual_tables
1487 .transactions
1488 .multi_get(tx_digests)
1489 .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())?)
1490 }
1491
1492 pub fn get_transaction_block(
1493 &self,
1494 tx_digest: &TransactionDigest,
1495 ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1496 self.perpetual_tables
1497 .transactions
1498 .get(tx_digest)
1499 .map(|v| v.map(|v| v.into()))
1500 }
1501
1502 pub fn get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
1510 get_iota_system_state(self.perpetual_tables.as_ref())
1511 }
1512
1513 pub fn expensive_check_iota_conservation<T>(
1514 self: &Arc<Self>,
1515 type_layout_store: T,
1516 old_epoch_store: &AuthorityPerEpochStore,
1517 epoch_supply_change: Option<i64>,
1518 ) -> IotaResult
1519 where
1520 T: TypeLayoutStore + Send + Copy,
1521 {
1522 if !self.enable_epoch_iota_conservation_check {
1523 return Ok(());
1524 }
1525
1526 let executor = old_epoch_store.executor();
1527 info!("Starting IOTA conservation check. This may take a while..");
1528 let cur_time = Instant::now();
1529 let mut pending_objects = vec![];
1530 let mut count = 0;
1531 let mut size = 0;
1532 let (mut total_iota, mut total_storage_rebate) = thread::scope(|s| {
1533 let pending_tasks = FuturesUnordered::new();
1534 for o in self.iter_live_object_set() {
1535 match o {
1536 LiveObject::Normal(object) => {
1537 size += object.object_size_for_gas_metering();
1538 count += 1;
1539 pending_objects.push(object);
1540 if count % 1_000_000 == 0 {
1541 let mut task_objects = vec![];
1542 mem::swap(&mut pending_objects, &mut task_objects);
1543 pending_tasks.push(s.spawn(move || {
1544 let mut layout_resolver =
1545 executor.type_layout_resolver(Box::new(type_layout_store));
1546 let mut total_storage_rebate = 0;
1547 let mut total_iota = 0;
1548 for object in task_objects {
1549 total_storage_rebate += object.storage_rebate;
1550 total_iota +=
1554 object.get_total_iota(layout_resolver.as_mut()).unwrap()
1555 - object.storage_rebate;
1556 }
1557 if count % 50_000_000 == 0 {
1558 info!("Processed {} objects", count);
1559 }
1560 (total_iota, total_storage_rebate)
1561 }));
1562 }
1563 }
1564 LiveObject::Wrapped(_) => {
1565 unreachable!("Explicitly asked to not include wrapped tombstones")
1566 }
1567 }
1568 }
1569 pending_tasks.into_iter().fold((0, 0), |init, result| {
1570 let result = result.join().unwrap();
1571 (init.0 + result.0, init.1 + result.1)
1572 })
1573 });
1574 let mut layout_resolver = executor.type_layout_resolver(Box::new(type_layout_store));
1575 for object in pending_objects {
1576 total_storage_rebate += object.storage_rebate;
1577 total_iota +=
1578 object.get_total_iota(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1579 }
1580 info!(
1581 "Scanned {} live objects, took {:?}",
1582 count,
1583 cur_time.elapsed()
1584 );
1585 self.metrics
1586 .iota_conservation_live_object_count
1587 .set(count as i64);
1588 self.metrics
1589 .iota_conservation_live_object_size
1590 .set(size as i64);
1591 self.metrics
1592 .iota_conservation_check_latency
1593 .set(cur_time.elapsed().as_secs() as i64);
1594
1595 let system_state: IotaSystemStateSummaryV2 = self
1598 .get_iota_system_state_object_unsafe()
1599 .expect("Reading iota system state object cannot fail")
1600 .into_iota_system_state_summary()
1601 .try_into()?;
1602 let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1603 info!(
1604 "Total IOTA amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1605 total_iota, storage_fund_balance, total_storage_rebate, system_state.epoch
1606 );
1607
1608 let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1609 self.metrics
1610 .iota_conservation_storage_fund
1611 .set(storage_fund_balance as i64);
1612 self.metrics
1613 .iota_conservation_storage_fund_imbalance
1614 .set(imbalance);
1615 self.metrics
1616 .iota_conservation_imbalance
1617 .set((total_iota as i128 - system_state.iota_total_supply as i128) as i64);
1618
1619 if let Some(expected_imbalance) = self
1620 .perpetual_tables
1621 .expected_storage_fund_imbalance
1622 .get(&())
1623 .map_err(|err| {
1624 IotaError::from(
1625 format!("failed to read expected storage fund imbalance: {err}").as_str(),
1626 )
1627 })?
1628 {
1629 fp_ensure!(
1630 imbalance == expected_imbalance,
1631 IotaError::from(
1632 format!(
1633 "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1634 system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1635 ).as_str()
1636 )
1637 );
1638 } else {
1639 self.perpetual_tables
1640 .expected_storage_fund_imbalance
1641 .insert(&(), &imbalance)
1642 .map_err(|err| {
1643 IotaError::from(
1644 format!("failed to write expected storage fund imbalance: {err}").as_str(),
1645 )
1646 })?;
1647 }
1648
1649 let total_supply = self
1650 .perpetual_tables
1651 .total_iota_supply
1652 .get(&())
1653 .map_err(|err| {
1654 IotaError::from(format!("failed to read total iota supply: {err}").as_str())
1655 })?;
1656
1657 match total_supply.zip(epoch_supply_change) {
1658 Some((old_supply, epoch_supply_change))
1663 if old_supply.last_check_epoch + 1 == old_epoch_store.epoch() =>
1664 {
1665 let expected_new_supply = if epoch_supply_change >= 0 {
1666 old_supply
1667 .total_supply
1668 .checked_add(epoch_supply_change.unsigned_abs())
1669 .ok_or_else(|| {
1670 IotaError::from(
1671 format!(
1672 "Inconsistent state detected at epoch {}: old supply {} + supply change {} overflowed",
1673 system_state.epoch, old_supply.total_supply, epoch_supply_change
1674 ).as_str())
1675 })?
1676 } else {
1677 old_supply.total_supply.checked_sub(epoch_supply_change.unsigned_abs()).ok_or_else(|| {
1678 IotaError::from(
1679 format!(
1680 "Inconsistent state detected at epoch {}: old supply {} - supply change {} underflowed",
1681 system_state.epoch, old_supply.total_supply, epoch_supply_change
1682 ).as_str())
1683 })?
1684 };
1685
1686 fp_ensure!(
1687 total_iota == expected_new_supply,
1688 IotaError::from(
1689 format!(
1690 "Inconsistent state detected at epoch {}: total iota: {}, expecting {}",
1691 system_state.epoch, total_iota, expected_new_supply
1692 )
1693 .as_str()
1694 )
1695 );
1696
1697 let new_supply = TotalIotaSupplyCheck {
1698 total_supply: expected_new_supply,
1699 last_check_epoch: old_epoch_store.epoch(),
1700 };
1701
1702 self.perpetual_tables
1703 .total_iota_supply
1704 .insert(&(), &new_supply)
1705 .map_err(|err| {
1706 IotaError::from(
1707 format!("failed to write total iota supply: {err}").as_str(),
1708 )
1709 })?;
1710 }
1711 _ => {
1714 info!("Skipping total supply check");
1715
1716 let supply = TotalIotaSupplyCheck {
1717 total_supply: total_iota,
1718 last_check_epoch: old_epoch_store.epoch(),
1719 };
1720
1721 self.perpetual_tables
1722 .total_iota_supply
1723 .insert(&(), &supply)
1724 .map_err(|err| {
1725 IotaError::from(
1726 format!("failed to write total iota supply: {err}").as_str(),
1727 )
1728 })?;
1729
1730 return Ok(());
1731 }
1732 };
1733
1734 Ok(())
1735 }
1736
1737 pub async fn prune_objects_and_compact_for_testing(
1738 &self,
1739 checkpoint_store: &Arc<CheckpointStore>,
1740 rest_index: Option<&RestIndexStore>,
1741 ) {
1742 let pruning_config = AuthorityStorePruningConfig {
1743 num_epochs_to_retain: 0,
1744 ..Default::default()
1745 };
1746 let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1747 &self.perpetual_tables,
1748 checkpoint_store,
1749 rest_index,
1750 &self.objects_lock_table,
1751 pruning_config,
1752 AuthorityStorePruningMetrics::new_for_test(),
1753 usize::MAX,
1754 EPOCH_DURATION_MS_FOR_TESTING,
1755 )
1756 .await;
1757 let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1758 }
1759
1760 #[cfg(test)]
1761 pub async fn prune_objects_immediately_for_testing(
1762 &self,
1763 transaction_effects: Vec<TransactionEffects>,
1764 ) -> anyhow::Result<()> {
1765 let mut wb = self.perpetual_tables.objects.batch();
1766
1767 let mut object_keys_to_prune = vec![];
1768 for effects in &transaction_effects {
1769 for (object_id, seq_number) in effects.modified_at_versions() {
1770 info!("Pruning object {:?} version {:?}", object_id, seq_number);
1771 object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1772 }
1773 }
1774
1775 wb.delete_batch(
1776 &self.perpetual_tables.objects,
1777 object_keys_to_prune.into_iter(),
1778 )?;
1779 wb.write()?;
1780 Ok(())
1781 }
1782
1783 #[cfg(msim)]
1784 pub fn remove_all_versions_of_object(&self, object_id: ObjectID) {
1785 let entries: Vec<_> = self
1786 .perpetual_tables
1787 .objects
1788 .unbounded_iter()
1789 .filter_map(|(key, _)| if key.0 == object_id { Some(key) } else { None })
1790 .collect();
1791 info!("Removing all versions of object: {:?}", entries);
1792 self.perpetual_tables.objects.multi_remove(entries).unwrap();
1793 }
1794
1795 #[cfg(msim)]
1798 pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1799 self.perpetual_tables
1800 .objects
1801 .safe_iter_with_bounds(
1802 Some(ObjectKey(object_id, VersionNumber::MIN)),
1803 Some(ObjectKey(object_id, VersionNumber::MAX)),
1804 )
1805 .collect::<Result<Vec<_>, _>>()
1806 .unwrap()
1807 .len()
1808 }
1809}
1810
1811impl AccumulatorStore for AuthorityStore {
1812 fn get_root_state_accumulator_for_epoch(
1813 &self,
1814 epoch: EpochId,
1815 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
1816 self.perpetual_tables
1817 .root_state_hash_by_epoch
1818 .get(&epoch)
1819 .map_err(Into::into)
1820 }
1821
1822 fn get_root_state_accumulator_for_highest_epoch(
1823 &self,
1824 ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
1825 Ok(self
1826 .perpetual_tables
1827 .root_state_hash_by_epoch
1828 .safe_iter()
1829 .skip_to_last()
1830 .next()
1831 .transpose()?)
1832 }
1833
1834 fn insert_state_accumulator_for_epoch(
1835 &self,
1836 epoch: EpochId,
1837 last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1838 acc: &Accumulator,
1839 ) -> IotaResult {
1840 self.perpetual_tables
1841 .root_state_hash_by_epoch
1842 .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1843 self.root_state_notify_read
1844 .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1845
1846 Ok(())
1847 }
1848
1849 fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1850 Box::new(self.perpetual_tables.iter_live_object_set())
1851 }
1852}
1853
1854impl ObjectStore for AuthorityStore {
1855 fn get_object(
1857 &self,
1858 object_id: &ObjectID,
1859 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
1860 self.perpetual_tables.as_ref().get_object(object_id)
1861 }
1862
1863 fn get_object_by_key(
1864 &self,
1865 object_id: &ObjectID,
1866 version: VersionNumber,
1867 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
1868 self.perpetual_tables.get_object_by_key(object_id, version)
1869 }
1870}
1871
1872pub struct ResolverWrapper {
1874 pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1875 pub metrics: Arc<ResolverMetrics>,
1876}
1877
1878impl ResolverWrapper {
1879 pub fn new(
1880 resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1881 metrics: Arc<ResolverMetrics>,
1882 ) -> Self {
1883 metrics.module_cache_size.set(0);
1884 ResolverWrapper { resolver, metrics }
1885 }
1886
1887 fn inc_cache_size_gauge(&self) {
1888 let current = self.metrics.module_cache_size.get();
1890 self.metrics.module_cache_size.set(current + 1);
1891 }
1892}
1893
1894impl ModuleResolver for ResolverWrapper {
1895 type Error = IotaError;
1896 fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1897 self.inc_cache_size_gauge();
1898 get_module(&*self.resolver, module_id)
1899 }
1900}
1901
1902pub enum UpdateType {
1903 Transaction(TransactionEffectsDigest),
1904 Genesis,
1905}
1906
1907pub type IotaLockResult = IotaResult<ObjectLockStatus>;
1908
1909#[derive(Debug, PartialEq, Eq)]
1910pub enum ObjectLockStatus {
1911 Initialized,
1912 LockedToTx { locked_by_tx: LockDetails }, LockedAtDifferentVersion { locked_ref: ObjectRef },
1914}