iota_core/authority/
authority_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{iter, mem, ops::Not, sync::Arc, thread};
6
7use either::Either;
8use fastcrypto::hash::{HashFunction, Sha3_256};
9use futures::stream::FuturesUnordered;
10use iota_common::sync::notify_read::NotifyRead;
11use iota_config::{migration_tx_data::MigrationTxData, node::AuthorityStorePruningConfig};
12use iota_macros::fail_point_arg;
13use iota_storage::mutex_table::{MutexGuard, MutexTable, RwLockGuard, RwLockTable};
14use iota_types::{
15    accumulator::Accumulator,
16    base_types::SequenceNumber,
17    digests::TransactionEventsDigest,
18    effects::{TransactionEffects, TransactionEvents},
19    error::UserInputError,
20    execution::TypeLayoutStore,
21    fp_bail, fp_ensure,
22    iota_system_state::{
23        get_iota_system_state, iota_system_state_summary::IotaSystemStateSummaryV2,
24    },
25    message_envelope::Message,
26    storage::{
27        BackingPackageStore, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, get_module,
28    },
29};
30use itertools::izip;
31use move_core_types::resolver::ModuleResolver;
32use tokio::{
33    sync::{RwLockReadGuard, RwLockWriteGuard},
34    time::Instant,
35};
36use tracing::{debug, info, trace};
37use typed_store::{
38    TypedStoreError,
39    rocks::{DBBatch, DBMap, util::is_ref_count_value},
40    traits::Map,
41};
42
43use super::{
44    authority_store_tables::{AuthorityPerpetualTables, LiveObject},
45    *,
46};
47use crate::{
48    authority::{
49        authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails},
50        authority_store_pruner::{
51            AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
52        },
53        authority_store_tables::TotalIotaSupplyCheck,
54        authority_store_types::{
55            ObjectContentDigest, StoreObject, StoreObjectPair, StoreObjectWrapper,
56            get_store_object_pair,
57        },
58        epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
59    },
60    rest_index::RestIndexStore,
61    state_accumulator::AccumulatorStore,
62    transaction_outputs::TransactionOutputs,
63};
64
65const NUM_SHARDS: usize = 4096;
66
67struct AuthorityStoreMetrics {
68    iota_conservation_check_latency: IntGauge,
69    iota_conservation_live_object_count: IntGauge,
70    iota_conservation_live_object_size: IntGauge,
71    iota_conservation_imbalance: IntGauge,
72    iota_conservation_storage_fund: IntGauge,
73    iota_conservation_storage_fund_imbalance: IntGauge,
74    epoch_flags: IntGaugeVec,
75}
76
77impl AuthorityStoreMetrics {
78    pub fn new(registry: &Registry) -> Self {
79        Self {
80            iota_conservation_check_latency: register_int_gauge_with_registry!(
81                "iota_conservation_check_latency",
82                "Number of seconds took to scan all live objects in the store for IOTA conservation check",
83                registry,
84            ).unwrap(),
85            iota_conservation_live_object_count: register_int_gauge_with_registry!(
86                "iota_conservation_live_object_count",
87                "Number of live objects in the store",
88                registry,
89            ).unwrap(),
90            iota_conservation_live_object_size: register_int_gauge_with_registry!(
91                "iota_conservation_live_object_size",
92                "Size in bytes of live objects in the store",
93                registry,
94            ).unwrap(),
95            iota_conservation_imbalance: register_int_gauge_with_registry!(
96                "iota_conservation_imbalance",
97                "Total amount of IOTA in the network - 10B * 10^9. This delta shows the amount of imbalance",
98                registry,
99            ).unwrap(),
100            iota_conservation_storage_fund: register_int_gauge_with_registry!(
101                "iota_conservation_storage_fund",
102                "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
103                registry,
104            ).unwrap(),
105            iota_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
106                "iota_conservation_storage_fund_imbalance",
107                "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
108                registry,
109            ).unwrap(),
110            epoch_flags: register_int_gauge_vec_with_registry!(
111                "epoch_flags",
112                "Local flags of the currently running epoch",
113                &["flag"],
114                registry,
115            ).unwrap(),
116        }
117    }
118}
119
120/// The `AuthorityStore` manages the state and operations of an authority's
121/// store. It includes a `mutex_table` to handle concurrent writes to the
122/// database and references to various tables stored in
123/// `AuthorityPerpetualTables`. The struct provides mechanisms for initializing
124/// and accessing locks, managing objects and transactions, and performing
125/// epoch-specific operations. It also includes methods for recovering from
126/// crashes, checking IOTA conservation, and handling object markers and states
127/// during epoch transitions.
128pub struct AuthorityStore {
129    /// Internal vector of locks to manage concurrent writes to the database
130    mutex_table: MutexTable<ObjectDigest>,
131
132    pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
133
134    pub(crate) root_state_notify_read: NotifyRead<EpochId, (CheckpointSequenceNumber, Accumulator)>,
135
136    /// Guards reference count updates to `indirect_move_objects` table
137    pub(crate) objects_lock_table: Arc<RwLockTable<ObjectContentDigest>>,
138
139    indirect_objects_threshold: usize,
140
141    /// Whether to enable expensive IOTA conservation check at epoch boundaries.
142    enable_epoch_iota_conservation_check: bool,
143
144    metrics: AuthorityStoreMetrics,
145}
146
147pub type ExecutionLockReadGuard<'a> = RwLockReadGuard<'a, EpochId>;
148pub type ExecutionLockWriteGuard<'a> = RwLockWriteGuard<'a, EpochId>;
149
150impl AuthorityStore {
151    /// Open an authority store by directory path.
152    /// If the store is empty, initialize it using genesis.
153    pub async fn open(
154        perpetual_tables: Arc<AuthorityPerpetualTables>,
155        genesis: &Genesis,
156        config: &NodeConfig,
157        registry: &Registry,
158        migration_tx_data: Option<&MigrationTxData>,
159    ) -> IotaResult<Arc<Self>> {
160        let indirect_objects_threshold = config.indirect_objects_threshold;
161        let enable_epoch_iota_conservation_check = config
162            .expensive_safety_check_config
163            .enable_epoch_iota_conservation_check();
164
165        let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
166            info!("Creating new epoch start config from genesis");
167
168            #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
169            let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
170            fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
171                info!("Setting initial epoch flags to {:?}", flags);
172                initial_epoch_flags = flags;
173            });
174
175            let epoch_start_configuration = EpochStartConfiguration::new(
176                genesis.iota_system_object().into_epoch_start_state(),
177                *genesis.checkpoint().digest(),
178                &genesis.objects(),
179                initial_epoch_flags,
180            )?;
181            perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
182            epoch_start_configuration
183        } else {
184            info!("Loading epoch start config from DB");
185            perpetual_tables
186                .epoch_start_configuration
187                .get(&())?
188                .expect("Epoch start configuration must be set in non-empty DB")
189        };
190        let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
191        info!("Epoch start config: {:?}", epoch_start_configuration);
192        info!("Cur epoch: {:?}", cur_epoch);
193        let this = Self::open_inner(
194            genesis,
195            perpetual_tables,
196            indirect_objects_threshold,
197            enable_epoch_iota_conservation_check,
198            registry,
199            migration_tx_data,
200        )
201        .await?;
202        this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
203        Ok(this)
204    }
205
206    pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
207        for flag in old {
208            self.metrics
209                .epoch_flags
210                .with_label_values(&[&flag.to_string()])
211                .set(0);
212        }
213        for flag in new {
214            self.metrics
215                .epoch_flags
216                .with_label_values(&[&flag.to_string()])
217                .set(1);
218        }
219    }
220
221    // NB: This must only be called at time of reconfiguration. We take the
222    // execution lock write guard as an argument to ensure that this is the
223    // case.
224    pub fn clear_object_per_epoch_marker_table(
225        &self,
226        _execution_guard: &ExecutionLockWriteGuard<'_>,
227    ) -> IotaResult<()> {
228        // We can safely delete all entries in the per epoch marker table since this is
229        // only called at epoch boundaries (during reconfiguration). Therefore
230        // any entries that currently exist can be removed. Because of this we
231        // can use the `schedule_delete_all` method.
232        Ok(self
233            .perpetual_tables
234            .object_per_epoch_marker_table
235            .schedule_delete_all()?)
236    }
237
238    pub async fn open_with_committee_for_testing(
239        perpetual_tables: Arc<AuthorityPerpetualTables>,
240        committee: &Committee,
241        genesis: &Genesis,
242        indirect_objects_threshold: usize,
243    ) -> IotaResult<Arc<Self>> {
244        // TODO: Since we always start at genesis, the committee should be technically
245        // the same as the genesis committee.
246        assert_eq!(committee.epoch, 0);
247        Self::open_inner(
248            genesis,
249            perpetual_tables,
250            indirect_objects_threshold,
251            true,
252            &Registry::new(),
253            None,
254        )
255        .await
256    }
257
258    async fn open_inner(
259        genesis: &Genesis,
260        perpetual_tables: Arc<AuthorityPerpetualTables>,
261        indirect_objects_threshold: usize,
262        enable_epoch_iota_conservation_check: bool,
263        registry: &Registry,
264        migration_tx_data: Option<&MigrationTxData>,
265    ) -> IotaResult<Arc<Self>> {
266        let store = Arc::new(Self {
267            mutex_table: MutexTable::new(NUM_SHARDS),
268            perpetual_tables,
269            root_state_notify_read:
270                NotifyRead::<EpochId, (CheckpointSequenceNumber, Accumulator)>::new(),
271            objects_lock_table: Arc::new(RwLockTable::new(NUM_SHARDS)),
272            indirect_objects_threshold,
273            enable_epoch_iota_conservation_check,
274            metrics: AuthorityStoreMetrics::new(registry),
275        });
276        // Only initialize an empty database.
277        if store
278            .database_is_empty()
279            .expect("database read should not fail at init.")
280        {
281            // Initialize with genesis data
282            // First insert genesis objects
283            store
284                .bulk_insert_genesis_objects(genesis.objects())
285                .expect("cannot bulk insert genesis objects");
286
287            // Then insert txn and effects of genesis
288            let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
289            store
290                .perpetual_tables
291                .transactions
292                .insert(transaction.digest(), transaction.serializable_ref())
293                .expect("cannot insert genesis transaction");
294            store
295                .perpetual_tables
296                .effects
297                .insert(&genesis.effects().digest(), genesis.effects())
298                .expect("cannot insert genesis effects");
299
300            // In the previous step we don't insert the effects to executed_effects yet
301            // because the genesis tx hasn't but will be executed. This is
302            // important for fullnodes to be able to generate indexing data
303            // right now.
304            let event_digests = genesis.events().digest();
305            let events = genesis
306                .events()
307                .data
308                .iter()
309                .enumerate()
310                .map(|(i, e)| ((event_digests, i), e));
311            store.perpetual_tables.events.multi_insert(events).unwrap();
312
313            // Initialize with migration data if genesis contained migration transactions
314            if let Some(migration_transactions) = migration_tx_data {
315                // This migration data was validated during the loading into the node (invoked
316                // by the caller of this function)
317                let txs_data = migration_transactions.txs_data();
318
319                // We iterate over the contents of the genesis checkpoint, that includes all
320                // migration transactions execution digest. Thus we cover all transactions that
321                // were considered during the creation of the genesis blob.
322                for (_, execution_digest) in genesis
323                    .checkpoint_contents()
324                    .enumerate_transactions(&genesis.checkpoint())
325                {
326                    let tx_digest = &execution_digest.transaction;
327                    // We can skip the genesis transaction and its data because above it was already
328                    // stored in the perpetual_tables.
329                    if tx_digest == genesis.transaction().digest() {
330                        continue;
331                    }
332                    // Now we can store in the perpetual_tables this migration transaction, together
333                    // with its effects, events and created objects.
334                    let Some((tx, effects, events)) = txs_data.get(tx_digest) else {
335                        panic!("tx digest not found in migrated objects blob");
336                    };
337                    let transaction = VerifiedTransaction::new_unchecked(tx.clone());
338                    let objects = migration_transactions
339                        .objects_by_tx_digest(*tx_digest)
340                        .expect("the migration data is corrupted");
341                    store
342                        .bulk_insert_genesis_objects(&objects)
343                        .expect("cannot bulk insert migrated objects");
344                    store
345                        .perpetual_tables
346                        .transactions
347                        .insert(transaction.digest(), transaction.serializable_ref())
348                        .expect("cannot insert migration transaction");
349                    store
350                        .perpetual_tables
351                        .effects
352                        .insert(&effects.digest(), effects)
353                        .expect("cannot insert migration effects");
354                    let events = events
355                        .data
356                        .iter()
357                        .enumerate()
358                        .map(|(i, e)| ((events.digest(), i), e));
359                    store.perpetual_tables.events.multi_insert(events).unwrap();
360                }
361            }
362        }
363
364        Ok(store)
365    }
366
367    /// Open authority store without any operations that require
368    /// genesis, such as constructing EpochStartConfiguration
369    /// or inserting genesis objects.
370    pub fn open_no_genesis(
371        perpetual_tables: Arc<AuthorityPerpetualTables>,
372        indirect_objects_threshold: usize,
373        enable_epoch_iota_conservation_check: bool,
374        registry: &Registry,
375    ) -> IotaResult<Arc<Self>> {
376        let store = Arc::new(Self {
377            mutex_table: MutexTable::new(NUM_SHARDS),
378            perpetual_tables,
379            root_state_notify_read:
380                NotifyRead::<EpochId, (CheckpointSequenceNumber, Accumulator)>::new(),
381            objects_lock_table: Arc::new(RwLockTable::new(NUM_SHARDS)),
382            indirect_objects_threshold,
383            enable_epoch_iota_conservation_check,
384            metrics: AuthorityStoreMetrics::new(registry),
385        });
386        Ok(store)
387    }
388
389    pub fn get_recovery_epoch_at_restart(&self) -> IotaResult<EpochId> {
390        self.perpetual_tables.get_recovery_epoch_at_restart()
391    }
392
393    pub fn get_effects(
394        &self,
395        effects_digest: &TransactionEffectsDigest,
396    ) -> IotaResult<Option<TransactionEffects>> {
397        Ok(self.perpetual_tables.effects.get(effects_digest)?)
398    }
399
400    /// Returns true if we have an effects structure for this transaction digest
401    pub fn effects_exists(&self, effects_digest: &TransactionEffectsDigest) -> IotaResult<bool> {
402        self.perpetual_tables
403            .effects
404            .contains_key(effects_digest)
405            .map_err(|e| e.into())
406    }
407
408    pub fn get_events(
409        &self,
410        event_digest: &TransactionEventsDigest,
411    ) -> Result<Option<TransactionEvents>, TypedStoreError> {
412        let data = self
413            .perpetual_tables
414            .events
415            .safe_range_iter((*event_digest, 0)..=(*event_digest, usize::MAX))
416            .map_ok(|(_, event)| event)
417            .collect::<Result<Vec<_>, TypedStoreError>>()?;
418        Ok(data.is_empty().not().then_some(TransactionEvents { data }))
419    }
420
421    pub fn multi_get_events(
422        &self,
423        event_digests: &[TransactionEventsDigest],
424    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
425        Ok(event_digests
426            .iter()
427            .map(|digest| self.get_events(digest))
428            .collect::<Result<Vec<_>, _>>()?)
429    }
430
431    pub fn multi_get_effects<'a>(
432        &self,
433        effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
434    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
435        Ok(self.perpetual_tables.effects.multi_get(effects_digests)?)
436    }
437
438    pub fn get_executed_effects(
439        &self,
440        tx_digest: &TransactionDigest,
441    ) -> IotaResult<Option<TransactionEffects>> {
442        let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
443        match effects_digest {
444            Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
445            None => Ok(None),
446        }
447    }
448
449    /// Given a list of transaction digests, returns a list of the corresponding
450    /// effects only if they have been executed. For transactions that have
451    /// not been executed, None is returned.
452    pub fn multi_get_executed_effects_digests(
453        &self,
454        digests: &[TransactionDigest],
455    ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
456        Ok(self.perpetual_tables.executed_effects.multi_get(digests)?)
457    }
458
459    /// Given a list of transaction digests, returns a list of the corresponding
460    /// effects only if they have been executed. For transactions that have
461    /// not been executed, None is returned.
462    pub fn multi_get_executed_effects(
463        &self,
464        digests: &[TransactionDigest],
465    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
466        let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
467        let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
468        let mut tx_to_effects_map = effects
469            .into_iter()
470            .flatten()
471            .map(|effects| (*effects.transaction_digest(), effects))
472            .collect::<HashMap<_, _>>();
473        Ok(digests
474            .iter()
475            .map(|digest| tx_to_effects_map.remove(digest))
476            .collect())
477    }
478
479    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
480        Ok(self
481            .perpetual_tables
482            .executed_effects
483            .contains_key(digest)?)
484    }
485
486    pub fn get_marker_value(
487        &self,
488        object_id: &ObjectID,
489        version: &SequenceNumber,
490        epoch_id: EpochId,
491    ) -> IotaResult<Option<MarkerValue>> {
492        let object_key = (epoch_id, ObjectKey(*object_id, *version));
493        Ok(self
494            .perpetual_tables
495            .object_per_epoch_marker_table
496            .get(&object_key)?)
497    }
498
499    pub fn get_latest_marker(
500        &self,
501        object_id: &ObjectID,
502        epoch_id: EpochId,
503    ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
504        let min_key = (epoch_id, ObjectKey::min_for_id(object_id));
505        let max_key = (epoch_id, ObjectKey::max_for_id(object_id));
506
507        let marker_entry = self
508            .perpetual_tables
509            .object_per_epoch_marker_table
510            .safe_iter_with_bounds(Some(min_key), Some(max_key))
511            .skip_prior_to(&max_key)?
512            .next();
513        match marker_entry {
514            Some(Ok(((epoch, key), marker))) => {
515                // because of the iterator bounds these cannot fail
516                assert_eq!(epoch, epoch_id);
517                assert_eq!(key.0, *object_id);
518                Ok(Some((key.1, marker)))
519            }
520            Some(Err(e)) => Err(e.into()),
521            None => Ok(None),
522        }
523    }
524
525    /// Returns future containing the state hash for the given epoch
526    /// once available
527    pub async fn notify_read_root_state_hash(
528        &self,
529        epoch: EpochId,
530    ) -> IotaResult<(CheckpointSequenceNumber, Accumulator)> {
531        // We need to register waiters _before_ reading from the database to avoid race
532        // conditions
533        let registration = self.root_state_notify_read.register_one(&epoch);
534        let hash = self.perpetual_tables.root_state_hash_by_epoch.get(&epoch)?;
535
536        let result = match hash {
537            // Note that Some() clause also drops registration that is already fulfilled
538            Some(ready) => Either::Left(futures::future::ready(ready)),
539            None => Either::Right(registration),
540        }
541        .await;
542
543        Ok(result)
544    }
545
546    // Implementation of the corresponding method of `CheckpointCache` trait.
547    pub(crate) fn insert_finalized_transactions_perpetual_checkpoints(
548        &self,
549        digests: &[TransactionDigest],
550        epoch: EpochId,
551        sequence: CheckpointSequenceNumber,
552    ) -> IotaResult {
553        let mut batch = self
554            .perpetual_tables
555            .executed_transactions_to_checkpoint
556            .batch();
557        batch.insert_batch(
558            &self.perpetual_tables.executed_transactions_to_checkpoint,
559            digests.iter().map(|d| (*d, (epoch, sequence))),
560        )?;
561        batch.write()?;
562        trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
563        Ok(())
564    }
565
566    // Implementation of the corresponding method of `CheckpointCache` trait.
567    pub(crate) fn get_transaction_perpetual_checkpoint(
568        &self,
569        digest: &TransactionDigest,
570    ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
571        Ok(self
572            .perpetual_tables
573            .executed_transactions_to_checkpoint
574            .get(digest)?)
575    }
576
577    // Implementation of the corresponding method of `CheckpointCache` trait.
578    pub(crate) fn multi_get_transactions_perpetual_checkpoints(
579        &self,
580        digests: &[TransactionDigest],
581    ) -> IotaResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
582        Ok(self
583            .perpetual_tables
584            .executed_transactions_to_checkpoint
585            .multi_get(digests)?)
586    }
587
588    /// Returns true if there are no objects in the database
589    pub fn database_is_empty(&self) -> IotaResult<bool> {
590        self.perpetual_tables.database_is_empty()
591    }
592
593    /// A function that acquires all locks associated with the objects (in order
594    /// to avoid deadlocks).
595    async fn acquire_locks(&self, input_objects: &[ObjectRef]) -> Vec<MutexGuard> {
596        self.mutex_table
597            .acquire_locks(input_objects.iter().map(|(_, _, digest)| *digest))
598            .await
599    }
600
601    pub fn object_exists_by_key(
602        &self,
603        object_id: &ObjectID,
604        version: VersionNumber,
605    ) -> IotaResult<bool> {
606        Ok(self
607            .perpetual_tables
608            .objects
609            .contains_key(&ObjectKey(*object_id, version))?)
610    }
611
612    pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
613        Ok(self
614            .perpetual_tables
615            .objects
616            .multi_contains_keys(object_keys.to_vec())?
617            .into_iter()
618            .collect())
619    }
620
621    pub fn multi_get_objects_by_key(
622        &self,
623        object_keys: &[ObjectKey],
624    ) -> Result<Vec<Option<Object>>, IotaError> {
625        let wrappers = self
626            .perpetual_tables
627            .objects
628            .multi_get(object_keys.to_vec())?;
629        let mut ret = vec![];
630
631        for (idx, w) in wrappers.into_iter().enumerate() {
632            ret.push(
633                w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
634                    .transpose()?
635                    .flatten(),
636            );
637        }
638        Ok(ret)
639    }
640
641    /// Get many objects
642    pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, IotaError> {
643        let mut result = Vec::new();
644        for id in objects {
645            result.push(self.get_object(id)?);
646        }
647        Ok(result)
648    }
649
650    pub fn have_deleted_owned_object_at_version_or_after(
651        &self,
652        object_id: &ObjectID,
653        version: VersionNumber,
654        epoch_id: EpochId,
655    ) -> Result<bool, IotaError> {
656        let object_key = ObjectKey::max_for_id(object_id);
657        let marker_key = (epoch_id, object_key);
658
659        // Find the most recent version of the object that was deleted or wrapped.
660        // Return true if the version is >= `version`. Otherwise return false.
661        let marker_entry = self
662            .perpetual_tables
663            .object_per_epoch_marker_table
664            .unbounded_iter()
665            .skip_prior_to(&marker_key)?
666            .next();
667        match marker_entry {
668            Some(((epoch, key), marker)) => {
669                // Make sure object id matches and version is >= `version`
670                let object_data_ok = key.0 == *object_id && key.1 >= version;
671                // Make sure we don't have a stale epoch for some reason (e.g., a revert)
672                let epoch_data_ok = epoch == epoch_id;
673                // Make sure the object was deleted or wrapped.
674                let mark_data_ok = marker == MarkerValue::OwnedDeleted;
675                Ok(object_data_ok && epoch_data_ok && mark_data_ok)
676            }
677            None => Ok(false),
678        }
679    }
680
681    // Methods to mutate the store
682
683    /// Insert a genesis object.
684    /// TODO: delete this method entirely (still used by authority_tests.rs)
685    pub(crate) fn insert_genesis_object(&self, object: Object) -> IotaResult {
686        // We only side load objects with a genesis parent transaction.
687        debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
688        let object_ref = object.compute_object_reference();
689        self.insert_object_direct(object_ref, &object)
690    }
691
692    /// Insert an object directly into the store, and also update relevant
693    /// tables NOTE: does not handle transaction lock.
694    /// This is used to insert genesis objects
695    fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> IotaResult {
696        let mut write_batch = self.perpetual_tables.objects.batch();
697
698        // Insert object
699        let StoreObjectPair(store_object, indirect_object) =
700            get_store_object_pair(object.clone(), self.indirect_objects_threshold);
701        write_batch.insert_batch(
702            &self.perpetual_tables.objects,
703            std::iter::once((ObjectKey::from(object_ref), store_object)),
704        )?;
705        if let Some(indirect_obj) = indirect_object {
706            write_batch.insert_batch(
707                &self.perpetual_tables.indirect_move_objects,
708                std::iter::once((indirect_obj.inner().digest(), indirect_obj)),
709            )?;
710        }
711
712        // Update the index
713        if object.get_single_owner().is_some() {
714            // Only initialize live object markers for address owned objects.
715            if !object.is_child_object() {
716                self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref])?;
717            }
718        }
719
720        write_batch.write()?;
721
722        Ok(())
723    }
724
725    /// This function should only be used for initializing genesis and should
726    /// remain private.
727    #[instrument(level = "debug", skip_all)]
728    pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> IotaResult<()> {
729        let mut batch = self.perpetual_tables.objects.batch();
730        let ref_and_objects: Vec<_> = objects
731            .iter()
732            .map(|o| (o.compute_object_reference(), o))
733            .collect();
734
735        batch
736            .insert_batch(
737                &self.perpetual_tables.objects,
738                ref_and_objects.iter().map(|(oref, o)| {
739                    (
740                        ObjectKey::from(oref),
741                        get_store_object_pair((*o).clone(), self.indirect_objects_threshold).0,
742                    )
743                }),
744            )?
745            .insert_batch(
746                &self.perpetual_tables.indirect_move_objects,
747                ref_and_objects.iter().filter_map(|(_, o)| {
748                    let StoreObjectPair(_, indirect_object) =
749                        get_store_object_pair((*o).clone(), self.indirect_objects_threshold);
750                    indirect_object.map(|obj| (obj.inner().digest(), obj))
751                }),
752            )?;
753
754        let non_child_object_refs: Vec<_> = ref_and_objects
755            .iter()
756            .filter(|(_, object)| !object.is_child_object())
757            .map(|(oref, _)| *oref)
758            .collect();
759
760        self.initialize_live_object_markers_impl(&mut batch, &non_child_object_refs)?;
761
762        batch.write()?;
763
764        Ok(())
765    }
766
767    pub fn bulk_insert_live_objects(
768        perpetual_db: &AuthorityPerpetualTables,
769        live_objects: impl Iterator<Item = LiveObject>,
770        indirect_objects_threshold: usize,
771        expected_sha3_digest: &[u8; 32],
772    ) -> IotaResult<()> {
773        let mut hasher = Sha3_256::default();
774        let mut batch = perpetual_db.objects.batch();
775        for object in live_objects {
776            hasher.update(object.object_reference().2.inner());
777            match object {
778                LiveObject::Normal(object) => {
779                    let StoreObjectPair(store_object_wrapper, indirect_object) =
780                        get_store_object_pair(object.clone(), indirect_objects_threshold);
781                    batch.insert_batch(
782                        &perpetual_db.objects,
783                        std::iter::once((
784                            ObjectKey::from(object.compute_object_reference()),
785                            store_object_wrapper,
786                        )),
787                    )?;
788                    if let Some(indirect_object) = indirect_object {
789                        batch.merge_batch(
790                            &perpetual_db.indirect_move_objects,
791                            iter::once((indirect_object.inner().digest(), indirect_object)),
792                        )?;
793                    }
794                    if !object.is_child_object() {
795                        Self::initialize_live_object_markers(
796                            &perpetual_db.live_owned_object_markers,
797                            &mut batch,
798                            &[object.compute_object_reference()],
799                        )?;
800                    }
801                }
802                LiveObject::Wrapped(object_key) => {
803                    batch.insert_batch(
804                        &perpetual_db.objects,
805                        std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
806                            object_key,
807                            StoreObject::Wrapped.into(),
808                        )),
809                    )?;
810                }
811            }
812        }
813        let sha3_digest = hasher.finalize().digest;
814        if *expected_sha3_digest != sha3_digest {
815            error!(
816                "Sha does not match! expected: {:?}, actual: {:?}",
817                expected_sha3_digest, sha3_digest
818            );
819            return Err(IotaError::from("Sha does not match"));
820        }
821        batch.write()?;
822        Ok(())
823    }
824
825    pub fn set_epoch_start_configuration(
826        &self,
827        epoch_start_configuration: &EpochStartConfiguration,
828    ) -> IotaResult {
829        self.perpetual_tables
830            .set_epoch_start_configuration(epoch_start_configuration)?;
831        Ok(())
832    }
833
834    pub fn get_epoch_start_configuration(&self) -> IotaResult<Option<EpochStartConfiguration>> {
835        Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
836    }
837
838    /// Acquires read locks for affected indirect objects
839    #[instrument(level = "trace", skip_all)]
840    async fn acquire_read_locks_for_indirect_objects(
841        &self,
842        written: &[Object],
843    ) -> Vec<RwLockGuard> {
844        // locking is required to avoid potential race conditions with the pruner
845        // potential race:
846        //   - transaction execution branches to reference count increment
847        //   - pruner decrements ref count to 0
848        //   - compaction job compresses existing merge values to an empty vector
849        //   - tx executor commits ref count increment instead of the full value making
850        //     object inaccessible
851        // read locks are sufficient because ref count increments are safe,
852        // concurrent transaction executions produce independent ref count increments
853        // and don't corrupt the state
854        let digests = written
855            .iter()
856            .filter_map(|object| {
857                let StoreObjectPair(_, indirect_object) =
858                    get_store_object_pair(object.clone(), self.indirect_objects_threshold);
859                indirect_object.map(|obj| obj.inner().digest())
860            })
861            .collect();
862        self.objects_lock_table.acquire_read_locks(digests).await
863    }
864
865    /// Updates the state resulting from the execution of a certificate.
866    ///
867    /// Internally it checks that all locks for active inputs are at the correct
868    /// version, and then writes objects, certificates, parents and clean up
869    /// locks atomically.
870    #[instrument(level = "debug", skip_all)]
871    pub async fn write_transaction_outputs(
872        &self,
873        epoch_id: EpochId,
874        tx_outputs: &[Arc<TransactionOutputs>],
875    ) -> IotaResult {
876        let mut written = Vec::with_capacity(tx_outputs.len());
877        for outputs in tx_outputs {
878            written.extend(outputs.written.values().cloned());
879        }
880
881        let _locks = self.acquire_read_locks_for_indirect_objects(&written).await;
882
883        let mut write_batch = self.perpetual_tables.transactions.batch();
884        for outputs in tx_outputs {
885            self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
886        }
887        // test crashing before writing the batch
888        fail_point_async!("crash");
889
890        write_batch.write()?;
891        trace!(
892            "committed transactions: {:?}",
893            tx_outputs
894                .iter()
895                .map(|tx| tx.transaction.digest())
896                .collect::<Vec<_>>()
897        );
898
899        // test crashing before notifying
900        fail_point_async!("crash");
901
902        Ok(())
903    }
904
905    fn write_one_transaction_outputs(
906        &self,
907        write_batch: &mut DBBatch,
908        epoch_id: EpochId,
909        tx_outputs: &TransactionOutputs,
910    ) -> IotaResult {
911        let TransactionOutputs {
912            transaction,
913            effects,
914            markers,
915            wrapped,
916            deleted,
917            written,
918            events,
919            live_object_markers_to_delete,
920            new_live_object_markers_to_init,
921            ..
922        } = tx_outputs;
923
924        // Store the certificate indexed by transaction digest
925        let transaction_digest = transaction.digest();
926        write_batch.insert_batch(
927            &self.perpetual_tables.transactions,
928            iter::once((transaction_digest, transaction.serializable_ref())),
929        )?;
930
931        // Add batched writes for objects and locks.
932        let effects_digest = effects.digest();
933
934        write_batch.insert_batch(
935            &self.perpetual_tables.object_per_epoch_marker_table,
936            markers
937                .iter()
938                .map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
939        )?;
940
941        write_batch.insert_batch(
942            &self.perpetual_tables.objects,
943            deleted
944                .iter()
945                .map(|key| (key, StoreObject::Deleted))
946                .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
947                .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
948        )?;
949
950        // Insert each output object into the stores
951        let (new_objects, new_indirect_move_objects): (Vec<_>, Vec<_>) = written
952            .iter()
953            .map(|(id, new_object)| {
954                let version = new_object.version();
955                debug!(?id, ?version, "writing object");
956                let StoreObjectPair(store_object, indirect_object) =
957                    get_store_object_pair(new_object.clone(), self.indirect_objects_threshold);
958                (
959                    (ObjectKey(*id, version), store_object),
960                    indirect_object.map(|obj| (obj.inner().digest(), obj)),
961                )
962            })
963            .unzip();
964
965        let indirect_objects: Vec<_> = new_indirect_move_objects.into_iter().flatten().collect();
966        let existing_digests = self
967            .perpetual_tables
968            .indirect_move_objects
969            .multi_get_raw_bytes(indirect_objects.iter().map(|(digest, _)| digest))?;
970        // split updates to existing and new indirect objects
971        // for new objects full merge needs to be triggered. For existing ref count
972        // increment is sufficient
973        let (existing_indirect_objects, new_indirect_objects): (Vec<_>, Vec<_>) = indirect_objects
974            .into_iter()
975            .enumerate()
976            .partition(|(idx, _)| matches!(&existing_digests[*idx], Some(value) if !is_ref_count_value(value)));
977
978        write_batch.insert_batch(&self.perpetual_tables.objects, new_objects.into_iter())?;
979        if !new_indirect_objects.is_empty() {
980            write_batch.merge_batch(
981                &self.perpetual_tables.indirect_move_objects,
982                new_indirect_objects.into_iter().map(|(_, pair)| pair),
983            )?;
984        }
985        if !existing_indirect_objects.is_empty() {
986            write_batch.partial_merge_batch(
987                &self.perpetual_tables.indirect_move_objects,
988                existing_indirect_objects
989                    .into_iter()
990                    .map(|(_, (digest, _))| (digest, 1_u64.to_le_bytes())),
991            )?;
992        }
993
994        let event_digest = events.digest();
995        let events = events
996            .data
997            .iter()
998            .enumerate()
999            .map(|(i, e)| ((event_digest, i), e));
1000
1001        write_batch.insert_batch(&self.perpetual_tables.events, events)?;
1002
1003        self.initialize_live_object_markers_impl(write_batch, new_live_object_markers_to_init)?;
1004
1005        // Note: deletes live object markers for received objects as well (but not for
1006        // objects that were in `Receiving` arguments which were not received)
1007        self.delete_live_object_markers(write_batch, live_object_markers_to_delete)?;
1008
1009        write_batch
1010            .insert_batch(
1011                &self.perpetual_tables.effects,
1012                [(effects_digest, effects.clone())],
1013            )?
1014            .insert_batch(
1015                &self.perpetual_tables.executed_effects,
1016                [(transaction_digest, effects_digest)],
1017            )?;
1018
1019        debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
1020
1021        Ok(())
1022    }
1023
1024    /// Commits transactions only to the db. Called by checkpoint builder. See
1025    /// ExecutionCache::commit_transactions for more info
1026    pub(crate) fn commit_transactions(
1027        &self,
1028        transactions: &[(TransactionDigest, VerifiedTransaction)],
1029    ) -> IotaResult {
1030        let mut batch = self.perpetual_tables.transactions.batch();
1031        batch.insert_batch(
1032            &self.perpetual_tables.transactions,
1033            transactions
1034                .iter()
1035                .map(|(digest, tx)| (*digest, tx.serializable_ref())),
1036        )?;
1037        batch.write()?;
1038        Ok(())
1039    }
1040
1041    pub async fn acquire_transaction_locks(
1042        &self,
1043        epoch_store: &AuthorityPerEpochStore,
1044        owned_input_objects: &[ObjectRef],
1045        transaction: VerifiedSignedTransaction,
1046    ) -> IotaResult {
1047        let tx_digest = *transaction.digest();
1048        // Other writers may be attempting to acquire locks on the same objects, so a
1049        // mutex is required.
1050        // TODO: replace with optimistic db_transactions (i.e. set lock to tx if none)
1051        let _mutexes = self.acquire_locks(owned_input_objects).await;
1052
1053        trace!(?owned_input_objects, "acquire_transaction_locks");
1054        let mut locks_to_write = Vec::new();
1055
1056        let live_object_markers = self
1057            .perpetual_tables
1058            .live_owned_object_markers
1059            .multi_get(owned_input_objects)?;
1060
1061        let epoch_tables = epoch_store.tables()?;
1062
1063        let locks = epoch_tables.multi_get_locked_transactions(owned_input_objects)?;
1064
1065        assert_eq!(locks.len(), live_object_markers.len());
1066
1067        for (live_marker, lock, obj_ref) in izip!(
1068            live_object_markers.into_iter(),
1069            locks.into_iter(),
1070            owned_input_objects
1071        ) {
1072            if live_marker.is_none() {
1073                // object at that version does not exist
1074                let latest_live_version = self.get_latest_live_version_for_object_id(obj_ref.0)?;
1075                fp_bail!(
1076                    UserInputError::ObjectVersionUnavailableForConsumption {
1077                        provided_obj_ref: *obj_ref,
1078                        current_version: latest_live_version.1
1079                    }
1080                    .into()
1081                );
1082            };
1083
1084            if let Some(previous_tx_digest) = &lock {
1085                if previous_tx_digest == &tx_digest {
1086                    // no need to re-write lock
1087                    continue;
1088                } else {
1089                    // TODO: add metrics here
1090                    info!(prev_tx_digest = ?previous_tx_digest,
1091                          cur_tx_digest = ?tx_digest,
1092                          "Cannot acquire lock: conflicting transaction!");
1093                    return Err(IotaError::ObjectLockConflict {
1094                        obj_ref: *obj_ref,
1095                        pending_transaction: *previous_tx_digest,
1096                    });
1097                }
1098            }
1099
1100            locks_to_write.push((*obj_ref, tx_digest));
1101        }
1102
1103        if !locks_to_write.is_empty() {
1104            trace!(?locks_to_write, "Writing locks");
1105            epoch_tables.write_transaction_locks(transaction, locks_to_write.into_iter())?;
1106        }
1107
1108        Ok(())
1109    }
1110
1111    /// Gets ObjectLockInfo that represents state of lock on an object.
1112    /// Returns UserInputError::ObjectNotFound if cannot find lock record for
1113    /// this object
1114    pub(crate) fn get_lock(
1115        &self,
1116        obj_ref: ObjectRef,
1117        epoch_store: &AuthorityPerEpochStore,
1118    ) -> IotaLockResult {
1119        if self
1120            .perpetual_tables
1121            .live_owned_object_markers
1122            .get(&obj_ref)?
1123            .is_none()
1124        {
1125            // object at that version does not exist
1126            return Ok(ObjectLockStatus::LockedAtDifferentVersion {
1127                locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
1128            });
1129        }
1130
1131        let tables = epoch_store.tables()?;
1132        if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
1133            Ok(ObjectLockStatus::LockedToTx {
1134                locked_by_tx: tx_digest,
1135            })
1136        } else {
1137            Ok(ObjectLockStatus::Initialized)
1138        }
1139    }
1140
1141    /// Returns UserInputError::ObjectNotFound if no lock records found for this
1142    /// object.
1143    pub(crate) fn get_latest_live_version_for_object_id(
1144        &self,
1145        object_id: ObjectID,
1146    ) -> IotaResult<ObjectRef> {
1147        let mut iterator = self
1148            .perpetual_tables
1149            .live_owned_object_markers
1150            .unbounded_iter()
1151            // Make the max possible entry for this object ID.
1152            .skip_prior_to(&(object_id, SequenceNumber::MAX, ObjectDigest::MAX))?;
1153        Ok(iterator
1154            .next()
1155            .and_then(|value| {
1156                if value.0.0 == object_id {
1157                    Some(value)
1158                } else {
1159                    None
1160                }
1161            })
1162            .ok_or_else(|| {
1163                IotaError::from(UserInputError::ObjectNotFound {
1164                    object_id,
1165                    version: None,
1166                })
1167            })?
1168            .0)
1169    }
1170
1171    /// Checks multiple object locks exist.
1172    /// Returns UserInputError::ObjectNotFound if cannot find lock record for at
1173    /// least one of the objects.
1174    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if at
1175    /// least one object lock is not initialized     at the given version.
1176    pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> IotaResult {
1177        let live_markers = self
1178            .perpetual_tables
1179            .live_owned_object_markers
1180            .multi_get(objects)?;
1181        for (live_marker, obj_ref) in live_markers.into_iter().zip(objects) {
1182            if live_marker.is_none() {
1183                // object at that version does not exist
1184                let latest_live_version = self.get_latest_live_version_for_object_id(obj_ref.0)?;
1185                fp_bail!(
1186                    UserInputError::ObjectVersionUnavailableForConsumption {
1187                        provided_obj_ref: *obj_ref,
1188                        current_version: latest_live_version.1
1189                    }
1190                    .into()
1191                );
1192            }
1193        }
1194        Ok(())
1195    }
1196
1197    /// Initialize live object markers for a given list of ObjectRefs.
1198    fn initialize_live_object_markers_impl(
1199        &self,
1200        write_batch: &mut DBBatch,
1201        objects: &[ObjectRef],
1202    ) -> IotaResult {
1203        AuthorityStore::initialize_live_object_markers(
1204            &self.perpetual_tables.live_owned_object_markers,
1205            write_batch,
1206            objects,
1207        )
1208    }
1209
1210    pub fn initialize_live_object_markers(
1211        live_object_marker_table: &DBMap<ObjectRef, ()>,
1212        write_batch: &mut DBBatch,
1213        objects: &[ObjectRef],
1214    ) -> IotaResult {
1215        trace!(?objects, "initialize_live_object_markers");
1216
1217        write_batch.insert_batch(
1218            live_object_marker_table,
1219            objects.iter().map(|obj_ref| (obj_ref, ())),
1220        )?;
1221        Ok(())
1222    }
1223
1224    /// Removes locks for a given list of ObjectRefs.
1225    fn delete_live_object_markers(
1226        &self,
1227        write_batch: &mut DBBatch,
1228        objects: &[ObjectRef],
1229    ) -> IotaResult {
1230        trace!(?objects, "delete_live_object_markers");
1231        write_batch.delete_batch(
1232            &self.perpetual_tables.live_owned_object_markers,
1233            objects.iter(),
1234        )?;
1235        Ok(())
1236    }
1237
1238    #[cfg(test)]
1239    pub(crate) fn reset_locks_and_live_markers_for_test(
1240        &self,
1241        transactions: &[TransactionDigest],
1242        objects: &[ObjectRef],
1243        epoch_store: &AuthorityPerEpochStore,
1244    ) {
1245        for tx in transactions {
1246            epoch_store.delete_signed_transaction_for_test(tx);
1247            epoch_store.delete_object_locks_for_test(objects);
1248        }
1249
1250        let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1251        batch
1252            .delete_batch(
1253                &self.perpetual_tables.live_owned_object_markers,
1254                objects.iter(),
1255            )
1256            .unwrap();
1257        batch.write().unwrap();
1258
1259        let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1260        self.initialize_live_object_markers_impl(&mut batch, objects)
1261            .unwrap();
1262        batch.write().unwrap();
1263    }
1264
1265    /// This function is called at the end of epoch for each transaction that's
1266    /// executed locally on the validator but didn't make to the last
1267    /// checkpoint. The effects of the execution is reverted here.
1268    /// The following things are reverted:
1269    /// 1. All new object states are deleted.
1270    /// 2. owner_index table change is reverted.
1271    ///
1272    /// NOTE: transaction and effects are intentionally not deleted. It's
1273    /// possible that if this node is behind, the network will execute the
1274    /// transaction in a later epoch. In that case, we need to keep it saved
1275    /// so that when we receive the checkpoint that includes it from state
1276    /// sync, we are able to execute the checkpoint.
1277    /// TODO: implement GC for transactions that are no longer needed.
1278    pub fn revert_state_update(&self, tx_digest: &TransactionDigest) -> IotaResult {
1279        let Some(effects) = self.get_executed_effects(tx_digest)? else {
1280            info!("Not reverting {:?} as it was not executed", tx_digest);
1281            return Ok(());
1282        };
1283
1284        info!(?tx_digest, ?effects, "reverting transaction");
1285
1286        // We should never be reverting shared object transactions.
1287        assert!(effects.input_shared_objects().is_empty());
1288
1289        let mut write_batch = self.perpetual_tables.transactions.batch();
1290        write_batch.delete_batch(
1291            &self.perpetual_tables.executed_effects,
1292            iter::once(tx_digest),
1293        )?;
1294        if let Some(events_digest) = effects.events_digest() {
1295            write_batch.schedule_delete_range(
1296                &self.perpetual_tables.events,
1297                &(*events_digest, usize::MIN),
1298                &(*events_digest, usize::MAX),
1299            )?;
1300        }
1301
1302        let tombstones = effects
1303            .all_tombstones()
1304            .into_iter()
1305            .map(|(id, version)| ObjectKey(id, version));
1306        write_batch.delete_batch(&self.perpetual_tables.objects, tombstones)?;
1307
1308        let all_new_object_keys = effects
1309            .all_changed_objects()
1310            .into_iter()
1311            .map(|((id, version, _), _, _)| ObjectKey(id, version));
1312        write_batch.delete_batch(&self.perpetual_tables.objects, all_new_object_keys.clone())?;
1313
1314        let modified_object_keys = effects
1315            .modified_at_versions()
1316            .into_iter()
1317            .map(|(id, version)| ObjectKey(id, version));
1318
1319        macro_rules! get_objects_and_locks {
1320            ($object_keys: expr) => {
1321                self.perpetual_tables
1322                    .objects
1323                    .multi_get($object_keys.clone())?
1324                    .into_iter()
1325                    .zip($object_keys)
1326                    .filter_map(|(obj_opt, key)| {
1327                        let obj = self
1328                            .perpetual_tables
1329                            .object(
1330                                &key,
1331                                obj_opt.unwrap_or_else(|| {
1332                                    panic!("Older object version not found: {:?}", key)
1333                                }),
1334                            )
1335                            .expect("Matching indirect object not found")?;
1336
1337                        if obj.is_immutable() {
1338                            return None;
1339                        }
1340
1341                        let obj_ref = obj.compute_object_reference();
1342                        Some(obj.is_address_owned().then_some(obj_ref))
1343                    })
1344            };
1345        }
1346
1347        let old_locks = get_objects_and_locks!(modified_object_keys);
1348        let new_locks = get_objects_and_locks!(all_new_object_keys);
1349
1350        let old_locks: Vec<_> = old_locks.flatten().collect();
1351
1352        // Re-create old live markers.
1353        self.initialize_live_object_markers_impl(&mut write_batch, &old_locks)?;
1354
1355        // Delete new live markers
1356        write_batch.delete_batch(
1357            &self.perpetual_tables.live_owned_object_markers,
1358            new_locks.flatten(),
1359        )?;
1360
1361        write_batch.write()?;
1362
1363        Ok(())
1364    }
1365
1366    /// Return the object with version less then or eq to the provided seq
1367    /// number. This is used by indexer to find the correct version of
1368    /// dynamic field child object. We do not store the version of the child
1369    /// object, but because of lamport timestamp, we know the child must
1370    /// have version number less then or eq to the parent.
1371    pub fn find_object_lt_or_eq_version(
1372        &self,
1373        object_id: ObjectID,
1374        version: SequenceNumber,
1375    ) -> IotaResult<Option<Object>> {
1376        self.perpetual_tables
1377            .find_object_lt_or_eq_version(object_id, version)
1378    }
1379
1380    /// Returns the latest object reference we have for this object_id in the
1381    /// objects table.
1382    ///
1383    /// The method may also return the reference to a deleted object with a
1384    /// digest of ObjectDigest::deleted() or ObjectDigest::wrapped() and
1385    /// lamport version of a transaction that deleted the object.
1386    /// Note that a deleted object may re-appear if the deletion was the result
1387    /// of the object being wrapped in another object.
1388    ///
1389    /// If no entry for the object_id is found, return None.
1390    pub fn get_latest_object_ref_or_tombstone(
1391        &self,
1392        object_id: ObjectID,
1393    ) -> Result<Option<ObjectRef>, IotaError> {
1394        self.perpetual_tables
1395            .get_latest_object_ref_or_tombstone(object_id)
1396    }
1397
1398    /// Returns the latest object reference if and only if the object is still
1399    /// live (i.e. it does not return tombstones)
1400    pub fn get_latest_object_ref_if_alive(
1401        &self,
1402        object_id: ObjectID,
1403    ) -> Result<Option<ObjectRef>, IotaError> {
1404        match self.get_latest_object_ref_or_tombstone(object_id)? {
1405            Some(objref) if objref.2.is_alive() => Ok(Some(objref)),
1406            _ => Ok(None),
1407        }
1408    }
1409
1410    /// Returns the latest object we have for this object_id in the objects
1411    /// table.
1412    ///
1413    /// If no entry for the object_id is found, return None.
1414    pub fn get_latest_object_or_tombstone(
1415        &self,
1416        object_id: ObjectID,
1417    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
1418        let Some((object_key, store_object)) = self
1419            .perpetual_tables
1420            .get_latest_object_or_tombstone(object_id)?
1421        else {
1422            return Ok(None);
1423        };
1424
1425        if let Some(object_ref) = self
1426            .perpetual_tables
1427            .tombstone_reference(&object_key, &store_object)?
1428        {
1429            return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1430        }
1431
1432        let object = self
1433            .perpetual_tables
1434            .object(&object_key, store_object)?
1435            .expect("Non tombstone store object could not be converted to object");
1436
1437        Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1438    }
1439
1440    pub fn insert_transaction_and_effects(
1441        &self,
1442        transaction: &VerifiedTransaction,
1443        transaction_effects: &TransactionEffects,
1444    ) -> Result<(), TypedStoreError> {
1445        let mut write_batch = self.perpetual_tables.transactions.batch();
1446        write_batch
1447            .insert_batch(
1448                &self.perpetual_tables.transactions,
1449                [(transaction.digest(), transaction.serializable_ref())],
1450            )?
1451            .insert_batch(
1452                &self.perpetual_tables.effects,
1453                [(transaction_effects.digest(), transaction_effects)],
1454            )?;
1455
1456        write_batch.write()?;
1457        Ok(())
1458    }
1459
1460    pub fn multi_insert_transaction_and_effects<'a>(
1461        &self,
1462        transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1463    ) -> Result<(), TypedStoreError> {
1464        let mut write_batch = self.perpetual_tables.transactions.batch();
1465        for tx in transactions {
1466            write_batch
1467                .insert_batch(
1468                    &self.perpetual_tables.transactions,
1469                    [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1470                )?
1471                .insert_batch(
1472                    &self.perpetual_tables.effects,
1473                    [(tx.effects.digest(), &tx.effects)],
1474                )?;
1475        }
1476
1477        write_batch.write()?;
1478        Ok(())
1479    }
1480
1481    pub fn multi_get_transaction_blocks(
1482        &self,
1483        tx_digests: &[TransactionDigest],
1484    ) -> IotaResult<Vec<Option<VerifiedTransaction>>> {
1485        Ok(self
1486            .perpetual_tables
1487            .transactions
1488            .multi_get(tx_digests)
1489            .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())?)
1490    }
1491
1492    pub fn get_transaction_block(
1493        &self,
1494        tx_digest: &TransactionDigest,
1495    ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1496        self.perpetual_tables
1497            .transactions
1498            .get(tx_digest)
1499            .map(|v| v.map(|v| v.into()))
1500    }
1501
1502    /// This function reads the DB directly to get the system state object.
1503    /// If reconfiguration is happening at the same time, there is no guarantee
1504    /// whether we would be getting the old or the new system state object.
1505    /// Hence this function should only be called during RPC reads where data
1506    /// race is not a major concern. In general we should avoid this as much
1507    /// as possible. If the intent is for testing, you can use
1508    /// AuthorityState:: get_iota_system_state_object_for_testing.
1509    pub fn get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
1510        get_iota_system_state(self.perpetual_tables.as_ref())
1511    }
1512
1513    pub fn expensive_check_iota_conservation<T>(
1514        self: &Arc<Self>,
1515        type_layout_store: T,
1516        old_epoch_store: &AuthorityPerEpochStore,
1517        epoch_supply_change: Option<i64>,
1518    ) -> IotaResult
1519    where
1520        T: TypeLayoutStore + Send + Copy,
1521    {
1522        if !self.enable_epoch_iota_conservation_check {
1523            return Ok(());
1524        }
1525
1526        let executor = old_epoch_store.executor();
1527        info!("Starting IOTA conservation check. This may take a while..");
1528        let cur_time = Instant::now();
1529        let mut pending_objects = vec![];
1530        let mut count = 0;
1531        let mut size = 0;
1532        let (mut total_iota, mut total_storage_rebate) = thread::scope(|s| {
1533            let pending_tasks = FuturesUnordered::new();
1534            for o in self.iter_live_object_set() {
1535                match o {
1536                    LiveObject::Normal(object) => {
1537                        size += object.object_size_for_gas_metering();
1538                        count += 1;
1539                        pending_objects.push(object);
1540                        if count % 1_000_000 == 0 {
1541                            let mut task_objects = vec![];
1542                            mem::swap(&mut pending_objects, &mut task_objects);
1543                            pending_tasks.push(s.spawn(move || {
1544                                let mut layout_resolver =
1545                                    executor.type_layout_resolver(Box::new(type_layout_store));
1546                                let mut total_storage_rebate = 0;
1547                                let mut total_iota = 0;
1548                                for object in task_objects {
1549                                    total_storage_rebate += object.storage_rebate;
1550                                    // get_total_iota includes storage rebate, however all storage
1551                                    // rebate is also stored in
1552                                    // the storage fund, so we need to subtract it here.
1553                                    total_iota +=
1554                                        object.get_total_iota(layout_resolver.as_mut()).unwrap()
1555                                            - object.storage_rebate;
1556                                }
1557                                if count % 50_000_000 == 0 {
1558                                    info!("Processed {} objects", count);
1559                                }
1560                                (total_iota, total_storage_rebate)
1561                            }));
1562                        }
1563                    }
1564                    LiveObject::Wrapped(_) => {
1565                        unreachable!("Explicitly asked to not include wrapped tombstones")
1566                    }
1567                }
1568            }
1569            pending_tasks.into_iter().fold((0, 0), |init, result| {
1570                let result = result.join().unwrap();
1571                (init.0 + result.0, init.1 + result.1)
1572            })
1573        });
1574        let mut layout_resolver = executor.type_layout_resolver(Box::new(type_layout_store));
1575        for object in pending_objects {
1576            total_storage_rebate += object.storage_rebate;
1577            total_iota +=
1578                object.get_total_iota(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1579        }
1580        info!(
1581            "Scanned {} live objects, took {:?}",
1582            count,
1583            cur_time.elapsed()
1584        );
1585        self.metrics
1586            .iota_conservation_live_object_count
1587            .set(count as i64);
1588        self.metrics
1589            .iota_conservation_live_object_size
1590            .set(size as i64);
1591        self.metrics
1592            .iota_conservation_check_latency
1593            .set(cur_time.elapsed().as_secs() as i64);
1594
1595        // It is safe to call this function because we are in the middle of
1596        // reconfiguration.
1597        let system_state: IotaSystemStateSummaryV2 = self
1598            .get_iota_system_state_object_unsafe()
1599            .expect("Reading iota system state object cannot fail")
1600            .into_iota_system_state_summary()
1601            .try_into()?;
1602        let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1603        info!(
1604            "Total IOTA amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1605            total_iota, storage_fund_balance, total_storage_rebate, system_state.epoch
1606        );
1607
1608        let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1609        self.metrics
1610            .iota_conservation_storage_fund
1611            .set(storage_fund_balance as i64);
1612        self.metrics
1613            .iota_conservation_storage_fund_imbalance
1614            .set(imbalance);
1615        self.metrics
1616            .iota_conservation_imbalance
1617            .set((total_iota as i128 - system_state.iota_total_supply as i128) as i64);
1618
1619        if let Some(expected_imbalance) = self
1620            .perpetual_tables
1621            .expected_storage_fund_imbalance
1622            .get(&())
1623            .map_err(|err| {
1624                IotaError::from(
1625                    format!("failed to read expected storage fund imbalance: {err}").as_str(),
1626                )
1627            })?
1628        {
1629            fp_ensure!(
1630                imbalance == expected_imbalance,
1631                IotaError::from(
1632                    format!(
1633                        "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1634                        system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1635                    ).as_str()
1636                )
1637            );
1638        } else {
1639            self.perpetual_tables
1640                .expected_storage_fund_imbalance
1641                .insert(&(), &imbalance)
1642                .map_err(|err| {
1643                    IotaError::from(
1644                        format!("failed to write expected storage fund imbalance: {err}").as_str(),
1645                    )
1646                })?;
1647        }
1648
1649        let total_supply = self
1650            .perpetual_tables
1651            .total_iota_supply
1652            .get(&())
1653            .map_err(|err| {
1654                IotaError::from(format!("failed to read total iota supply: {err}").as_str())
1655            })?;
1656
1657        match total_supply.zip(epoch_supply_change) {
1658            // Only execute the check if both are set and the supply value was set in the last
1659            // epoch. We have to assume the supply changes every epoch and therefore we
1660            // cannot run the check with a supply value from any epoch earlier than the
1661            // last one. This can happen if the check was disabled for some time.
1662            Some((old_supply, epoch_supply_change))
1663                if old_supply.last_check_epoch + 1 == old_epoch_store.epoch() =>
1664            {
1665                let expected_new_supply = if epoch_supply_change >= 0 {
1666                    old_supply
1667                        .total_supply
1668                        .checked_add(epoch_supply_change.unsigned_abs())
1669                        .ok_or_else(|| {
1670                            IotaError::from(
1671                                format!(
1672                                    "Inconsistent state detected at epoch {}: old supply {} + supply change {} overflowed",
1673                                    system_state.epoch, old_supply.total_supply, epoch_supply_change
1674                                ).as_str())
1675                        })?
1676                } else {
1677                    old_supply.total_supply.checked_sub(epoch_supply_change.unsigned_abs()).ok_or_else(|| {
1678                        IotaError::from(
1679                            format!(
1680                                "Inconsistent state detected at epoch {}: old supply {} - supply change {} underflowed",
1681                                system_state.epoch, old_supply.total_supply, epoch_supply_change
1682                            ).as_str())
1683                    })?
1684                };
1685
1686                fp_ensure!(
1687                    total_iota == expected_new_supply,
1688                    IotaError::from(
1689                        format!(
1690                            "Inconsistent state detected at epoch {}: total iota: {}, expecting {}",
1691                            system_state.epoch, total_iota, expected_new_supply
1692                        )
1693                        .as_str()
1694                    )
1695                );
1696
1697                let new_supply = TotalIotaSupplyCheck {
1698                    total_supply: expected_new_supply,
1699                    last_check_epoch: old_epoch_store.epoch(),
1700                };
1701
1702                self.perpetual_tables
1703                    .total_iota_supply
1704                    .insert(&(), &new_supply)
1705                    .map_err(|err| {
1706                        IotaError::from(
1707                            format!("failed to write total iota supply: {err}").as_str(),
1708                        )
1709                    })?;
1710            }
1711            // If either one is None or if the last value is from an older epoch,
1712            // we update the value in the table since we're at genesis and cannot execute the check.
1713            _ => {
1714                info!("Skipping total supply check");
1715
1716                let supply = TotalIotaSupplyCheck {
1717                    total_supply: total_iota,
1718                    last_check_epoch: old_epoch_store.epoch(),
1719                };
1720
1721                self.perpetual_tables
1722                    .total_iota_supply
1723                    .insert(&(), &supply)
1724                    .map_err(|err| {
1725                        IotaError::from(
1726                            format!("failed to write total iota supply: {err}").as_str(),
1727                        )
1728                    })?;
1729
1730                return Ok(());
1731            }
1732        };
1733
1734        Ok(())
1735    }
1736
1737    pub async fn prune_objects_and_compact_for_testing(
1738        &self,
1739        checkpoint_store: &Arc<CheckpointStore>,
1740        rest_index: Option<&RestIndexStore>,
1741    ) {
1742        let pruning_config = AuthorityStorePruningConfig {
1743            num_epochs_to_retain: 0,
1744            ..Default::default()
1745        };
1746        let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1747            &self.perpetual_tables,
1748            checkpoint_store,
1749            rest_index,
1750            &self.objects_lock_table,
1751            pruning_config,
1752            AuthorityStorePruningMetrics::new_for_test(),
1753            usize::MAX,
1754            EPOCH_DURATION_MS_FOR_TESTING,
1755        )
1756        .await;
1757        let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1758    }
1759
1760    #[cfg(test)]
1761    pub async fn prune_objects_immediately_for_testing(
1762        &self,
1763        transaction_effects: Vec<TransactionEffects>,
1764    ) -> anyhow::Result<()> {
1765        let mut wb = self.perpetual_tables.objects.batch();
1766
1767        let mut object_keys_to_prune = vec![];
1768        for effects in &transaction_effects {
1769            for (object_id, seq_number) in effects.modified_at_versions() {
1770                info!("Pruning object {:?} version {:?}", object_id, seq_number);
1771                object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1772            }
1773        }
1774
1775        wb.delete_batch(
1776            &self.perpetual_tables.objects,
1777            object_keys_to_prune.into_iter(),
1778        )?;
1779        wb.write()?;
1780        Ok(())
1781    }
1782
1783    #[cfg(msim)]
1784    pub fn remove_all_versions_of_object(&self, object_id: ObjectID) {
1785        let entries: Vec<_> = self
1786            .perpetual_tables
1787            .objects
1788            .unbounded_iter()
1789            .filter_map(|(key, _)| if key.0 == object_id { Some(key) } else { None })
1790            .collect();
1791        info!("Removing all versions of object: {:?}", entries);
1792        self.perpetual_tables.objects.multi_remove(entries).unwrap();
1793    }
1794
1795    // Counts the number of versions exist in object store for `object_id`. This
1796    // includes tombstone.
1797    #[cfg(msim)]
1798    pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1799        self.perpetual_tables
1800            .objects
1801            .safe_iter_with_bounds(
1802                Some(ObjectKey(object_id, VersionNumber::MIN)),
1803                Some(ObjectKey(object_id, VersionNumber::MAX)),
1804            )
1805            .collect::<Result<Vec<_>, _>>()
1806            .unwrap()
1807            .len()
1808    }
1809}
1810
1811impl AccumulatorStore for AuthorityStore {
1812    fn get_root_state_accumulator_for_epoch(
1813        &self,
1814        epoch: EpochId,
1815    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
1816        self.perpetual_tables
1817            .root_state_hash_by_epoch
1818            .get(&epoch)
1819            .map_err(Into::into)
1820    }
1821
1822    fn get_root_state_accumulator_for_highest_epoch(
1823        &self,
1824    ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
1825        Ok(self
1826            .perpetual_tables
1827            .root_state_hash_by_epoch
1828            .safe_iter()
1829            .skip_to_last()
1830            .next()
1831            .transpose()?)
1832    }
1833
1834    fn insert_state_accumulator_for_epoch(
1835        &self,
1836        epoch: EpochId,
1837        last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1838        acc: &Accumulator,
1839    ) -> IotaResult {
1840        self.perpetual_tables
1841            .root_state_hash_by_epoch
1842            .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1843        self.root_state_notify_read
1844            .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1845
1846        Ok(())
1847    }
1848
1849    fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1850        Box::new(self.perpetual_tables.iter_live_object_set())
1851    }
1852}
1853
1854impl ObjectStore for AuthorityStore {
1855    /// Read an object and return it, or Ok(None) if the object was not found.
1856    fn get_object(
1857        &self,
1858        object_id: &ObjectID,
1859    ) -> Result<Option<Object>, iota_types::storage::error::Error> {
1860        self.perpetual_tables.as_ref().get_object(object_id)
1861    }
1862
1863    fn get_object_by_key(
1864        &self,
1865        object_id: &ObjectID,
1866        version: VersionNumber,
1867    ) -> Result<Option<Object>, iota_types::storage::error::Error> {
1868        self.perpetual_tables.get_object_by_key(object_id, version)
1869    }
1870}
1871
1872/// A wrapper to make Orphan Rule happy
1873pub struct ResolverWrapper {
1874    pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1875    pub metrics: Arc<ResolverMetrics>,
1876}
1877
1878impl ResolverWrapper {
1879    pub fn new(
1880        resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1881        metrics: Arc<ResolverMetrics>,
1882    ) -> Self {
1883        metrics.module_cache_size.set(0);
1884        ResolverWrapper { resolver, metrics }
1885    }
1886
1887    fn inc_cache_size_gauge(&self) {
1888        // reset the gauge after a restart of the cache
1889        let current = self.metrics.module_cache_size.get();
1890        self.metrics.module_cache_size.set(current + 1);
1891    }
1892}
1893
1894impl ModuleResolver for ResolverWrapper {
1895    type Error = IotaError;
1896    fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1897        self.inc_cache_size_gauge();
1898        get_module(&*self.resolver, module_id)
1899    }
1900}
1901
1902pub enum UpdateType {
1903    Transaction(TransactionEffectsDigest),
1904    Genesis,
1905}
1906
1907pub type IotaLockResult = IotaResult<ObjectLockStatus>;
1908
1909#[derive(Debug, PartialEq, Eq)]
1910pub enum ObjectLockStatus {
1911    Initialized,
1912    LockedToTx { locked_by_tx: LockDetails }, // no need to use wrapper, not stored or serialized
1913    LockedAtDifferentVersion { locked_ref: ObjectRef },
1914}