iota_core/authority/
authority_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{iter, mem, ops::Not, sync::Arc, thread};
6
7use either::Either;
8use fastcrypto::hash::{HashFunction, Sha3_256};
9use futures::stream::FuturesUnordered;
10use iota_common::sync::notify_read::NotifyRead;
11use iota_config::{migration_tx_data::MigrationTxData, node::AuthorityStorePruningConfig};
12use iota_macros::fail_point_arg;
13use iota_storage::mutex_table::{MutexGuard, MutexTable};
14use iota_types::{
15    accumulator::Accumulator,
16    base_types::SequenceNumber,
17    digests::TransactionEventsDigest,
18    effects::{TransactionEffects, TransactionEvents},
19    error::UserInputError,
20    execution::TypeLayoutStore,
21    fp_bail, fp_ensure,
22    iota_system_state::{
23        get_iota_system_state, iota_system_state_summary::IotaSystemStateSummaryV2,
24    },
25    message_envelope::Message,
26    storage::{
27        BackingPackageStore, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, get_module,
28    },
29};
30use itertools::izip;
31use move_core_types::resolver::ModuleResolver;
32use tokio::time::Instant;
33use tracing::{debug, info, trace};
34use typed_store::{
35    TypedStoreError,
36    rocks::{DBBatch, DBMap},
37    traits::Map,
38};
39
40use super::{
41    authority_store_tables::{AuthorityPerpetualTables, LiveObject},
42    *,
43};
44use crate::{
45    authority::{
46        authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails},
47        authority_store_pruner::{
48            AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
49        },
50        authority_store_tables::TotalIotaSupplyCheck,
51        authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object},
52        epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
53    },
54    rest_index::RestIndexStore,
55    state_accumulator::AccumulatorStore,
56    transaction_outputs::TransactionOutputs,
57};
58
59const NUM_SHARDS: usize = 4096;
60
61struct AuthorityStoreMetrics {
62    iota_conservation_check_latency: IntGauge,
63    iota_conservation_live_object_count: IntGauge,
64    iota_conservation_live_object_size: IntGauge,
65    iota_conservation_imbalance: IntGauge,
66    iota_conservation_storage_fund: IntGauge,
67    iota_conservation_storage_fund_imbalance: IntGauge,
68    epoch_flags: IntGaugeVec,
69}
70
71impl AuthorityStoreMetrics {
72    pub fn new(registry: &Registry) -> Self {
73        Self {
74            iota_conservation_check_latency: register_int_gauge_with_registry!(
75                "iota_conservation_check_latency",
76                "Number of seconds took to scan all live objects in the store for IOTA conservation check",
77                registry,
78            ).unwrap(),
79            iota_conservation_live_object_count: register_int_gauge_with_registry!(
80                "iota_conservation_live_object_count",
81                "Number of live objects in the store",
82                registry,
83            ).unwrap(),
84            iota_conservation_live_object_size: register_int_gauge_with_registry!(
85                "iota_conservation_live_object_size",
86                "Size in bytes of live objects in the store",
87                registry,
88            ).unwrap(),
89            iota_conservation_imbalance: register_int_gauge_with_registry!(
90                "iota_conservation_imbalance",
91                "Total amount of IOTA in the network - 10B * 10^9. This delta shows the amount of imbalance",
92                registry,
93            ).unwrap(),
94            iota_conservation_storage_fund: register_int_gauge_with_registry!(
95                "iota_conservation_storage_fund",
96                "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
97                registry,
98            ).unwrap(),
99            iota_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
100                "iota_conservation_storage_fund_imbalance",
101                "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
102                registry,
103            ).unwrap(),
104            epoch_flags: register_int_gauge_vec_with_registry!(
105                "epoch_flags",
106                "Local flags of the currently running epoch",
107                &["flag"],
108                registry,
109            ).unwrap(),
110        }
111    }
112}
113
114/// The `AuthorityStore` manages the state and operations of an authority's
115/// store. It includes a `mutex_table` to handle concurrent writes to the
116/// database and references to various tables stored in
117/// `AuthorityPerpetualTables`. The struct provides mechanisms for initializing
118/// and accessing locks, managing objects and transactions, and performing
119/// epoch-specific operations. It also includes methods for recovering from
120/// crashes, checking IOTA conservation, and handling object markers and states
121/// during epoch transitions.
122pub struct AuthorityStore {
123    /// Internal vector of locks to manage concurrent writes to the database
124    mutex_table: MutexTable<ObjectDigest>,
125
126    pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
127
128    pub(crate) root_state_notify_read: NotifyRead<EpochId, (CheckpointSequenceNumber, Accumulator)>,
129
130    /// Whether to enable expensive IOTA conservation check at epoch boundaries.
131    enable_epoch_iota_conservation_check: bool,
132
133    metrics: AuthorityStoreMetrics,
134}
135
136pub type ExecutionLockReadGuard<'a> = tokio::sync::RwLockReadGuard<'a, EpochId>;
137pub type ExecutionLockWriteGuard<'a> = tokio::sync::RwLockWriteGuard<'a, EpochId>;
138
139impl AuthorityStore {
140    /// Open an authority store by directory path.
141    /// If the store is empty, initialize it using genesis.
142    pub async fn open(
143        perpetual_tables: Arc<AuthorityPerpetualTables>,
144        genesis: &Genesis,
145        config: &NodeConfig,
146        registry: &Registry,
147        migration_tx_data: Option<&MigrationTxData>,
148    ) -> IotaResult<Arc<Self>> {
149        let enable_epoch_iota_conservation_check = config
150            .expensive_safety_check_config
151            .enable_epoch_iota_conservation_check();
152
153        let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
154            info!("Creating new epoch start config from genesis");
155
156            #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
157            let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
158            fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
159                info!("Setting initial epoch flags to {:?}", flags);
160                initial_epoch_flags = flags;
161            });
162
163            let epoch_start_configuration = EpochStartConfiguration::new(
164                genesis.iota_system_object().into_epoch_start_state(),
165                *genesis.checkpoint().digest(),
166                &genesis.objects(),
167                initial_epoch_flags,
168            )?;
169            perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
170            epoch_start_configuration
171        } else {
172            info!("Loading epoch start config from DB");
173            perpetual_tables
174                .epoch_start_configuration
175                .get(&())?
176                .expect("Epoch start configuration must be set in non-empty DB")
177        };
178        let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
179        info!("Epoch start config: {:?}", epoch_start_configuration);
180        info!("Cur epoch: {:?}", cur_epoch);
181        let this = Self::open_inner(
182            genesis,
183            perpetual_tables,
184            enable_epoch_iota_conservation_check,
185            registry,
186            migration_tx_data,
187        )
188        .await?;
189        this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
190        Ok(this)
191    }
192
193    pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
194        for flag in old {
195            self.metrics
196                .epoch_flags
197                .with_label_values(&[&flag.to_string()])
198                .set(0);
199        }
200        for flag in new {
201            self.metrics
202                .epoch_flags
203                .with_label_values(&[&flag.to_string()])
204                .set(1);
205        }
206    }
207
208    // NB: This must only be called at time of reconfiguration. We take the
209    // execution lock write guard as an argument to ensure that this is the
210    // case.
211    pub fn clear_object_per_epoch_marker_table(
212        &self,
213        _execution_guard: &ExecutionLockWriteGuard<'_>,
214    ) -> IotaResult<()> {
215        // We can safely delete all entries in the per epoch marker table since this is
216        // only called at epoch boundaries (during reconfiguration). Therefore
217        // any entries that currently exist can be removed. Because of this we
218        // can use the `schedule_delete_all` method.
219        Ok(self
220            .perpetual_tables
221            .object_per_epoch_marker_table
222            .schedule_delete_all()?)
223    }
224
225    pub async fn open_with_committee_for_testing(
226        perpetual_tables: Arc<AuthorityPerpetualTables>,
227        committee: &Committee,
228        genesis: &Genesis,
229    ) -> IotaResult<Arc<Self>> {
230        // TODO: Since we always start at genesis, the committee should be technically
231        // the same as the genesis committee.
232        assert_eq!(committee.epoch, 0);
233        Self::open_inner(genesis, perpetual_tables, true, &Registry::new(), None).await
234    }
235
236    async fn open_inner(
237        genesis: &Genesis,
238        perpetual_tables: Arc<AuthorityPerpetualTables>,
239        enable_epoch_iota_conservation_check: bool,
240        registry: &Registry,
241        migration_tx_data: Option<&MigrationTxData>,
242    ) -> IotaResult<Arc<Self>> {
243        let store = Arc::new(Self {
244            mutex_table: MutexTable::new(NUM_SHARDS),
245            perpetual_tables,
246            root_state_notify_read:
247                NotifyRead::<EpochId, (CheckpointSequenceNumber, Accumulator)>::new(),
248            enable_epoch_iota_conservation_check,
249            metrics: AuthorityStoreMetrics::new(registry),
250        });
251        // Only initialize an empty database.
252        if store
253            .database_is_empty()
254            .expect("database read should not fail at init.")
255        {
256            // Initialize with genesis data
257            // First insert genesis objects
258            store
259                .bulk_insert_genesis_objects(genesis.objects())
260                .expect("cannot bulk insert genesis objects");
261
262            // Then insert txn and effects of genesis
263            let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
264            store
265                .perpetual_tables
266                .transactions
267                .insert(transaction.digest(), transaction.serializable_ref())
268                .expect("cannot insert genesis transaction");
269            store
270                .perpetual_tables
271                .effects
272                .insert(&genesis.effects().digest(), genesis.effects())
273                .expect("cannot insert genesis effects");
274
275            // In the previous step we don't insert the effects to executed_effects yet
276            // because the genesis tx hasn't but will be executed. This is
277            // important for fullnodes to be able to generate indexing data
278            // right now.
279            let event_digests = genesis.events().digest();
280            let events = genesis
281                .events()
282                .data
283                .iter()
284                .enumerate()
285                .map(|(i, e)| ((event_digests, i), e));
286            store.perpetual_tables.events.multi_insert(events).unwrap();
287
288            // Initialize with migration data if genesis contained migration transactions
289            if let Some(migration_transactions) = migration_tx_data {
290                // This migration data was validated during the loading into the node (invoked
291                // by the caller of this function)
292                let txs_data = migration_transactions.txs_data();
293
294                // We iterate over the contents of the genesis checkpoint, that includes all
295                // migration transactions execution digest. Thus we cover all transactions that
296                // were considered during the creation of the genesis blob.
297                for (_, execution_digest) in genesis
298                    .checkpoint_contents()
299                    .enumerate_transactions(&genesis.checkpoint())
300                {
301                    let tx_digest = &execution_digest.transaction;
302                    // We can skip the genesis transaction and its data because above it was already
303                    // stored in the perpetual_tables.
304                    if tx_digest == genesis.transaction().digest() {
305                        continue;
306                    }
307                    // Now we can store in the perpetual_tables this migration transaction, together
308                    // with its effects, events and created objects.
309                    let Some((tx, effects, events)) = txs_data.get(tx_digest) else {
310                        panic!("tx digest not found in migrated objects blob");
311                    };
312                    let transaction = VerifiedTransaction::new_unchecked(tx.clone());
313                    let objects = migration_transactions
314                        .objects_by_tx_digest(*tx_digest)
315                        .expect("the migration data is corrupted");
316                    store
317                        .bulk_insert_genesis_objects(&objects)
318                        .expect("cannot bulk insert migrated objects");
319                    store
320                        .perpetual_tables
321                        .transactions
322                        .insert(transaction.digest(), transaction.serializable_ref())
323                        .expect("cannot insert migration transaction");
324                    store
325                        .perpetual_tables
326                        .effects
327                        .insert(&effects.digest(), effects)
328                        .expect("cannot insert migration effects");
329                    let events = events
330                        .data
331                        .iter()
332                        .enumerate()
333                        .map(|(i, e)| ((events.digest(), i), e));
334                    store.perpetual_tables.events.multi_insert(events).unwrap();
335                }
336            }
337        }
338
339        Ok(store)
340    }
341
342    /// Open authority store without any operations that require
343    /// genesis, such as constructing EpochStartConfiguration
344    /// or inserting genesis objects.
345    pub fn open_no_genesis(
346        perpetual_tables: Arc<AuthorityPerpetualTables>,
347        enable_epoch_iota_conservation_check: bool,
348        registry: &Registry,
349    ) -> IotaResult<Arc<Self>> {
350        let store = Arc::new(Self {
351            mutex_table: MutexTable::new(NUM_SHARDS),
352            perpetual_tables,
353            root_state_notify_read:
354                NotifyRead::<EpochId, (CheckpointSequenceNumber, Accumulator)>::new(),
355            enable_epoch_iota_conservation_check,
356            metrics: AuthorityStoreMetrics::new(registry),
357        });
358        Ok(store)
359    }
360
361    pub fn get_recovery_epoch_at_restart(&self) -> IotaResult<EpochId> {
362        self.perpetual_tables.get_recovery_epoch_at_restart()
363    }
364
365    pub fn get_effects(
366        &self,
367        effects_digest: &TransactionEffectsDigest,
368    ) -> IotaResult<Option<TransactionEffects>> {
369        Ok(self.perpetual_tables.effects.get(effects_digest)?)
370    }
371
372    /// Returns true if we have an effects structure for this transaction digest
373    pub fn effects_exists(&self, effects_digest: &TransactionEffectsDigest) -> IotaResult<bool> {
374        self.perpetual_tables
375            .effects
376            .contains_key(effects_digest)
377            .map_err(|e| e.into())
378    }
379
380    pub fn get_events(
381        &self,
382        event_digest: &TransactionEventsDigest,
383    ) -> Result<Option<TransactionEvents>, TypedStoreError> {
384        let data = self
385            .perpetual_tables
386            .events
387            .safe_range_iter((*event_digest, 0)..=(*event_digest, usize::MAX))
388            .map_ok(|(_, event)| event)
389            .collect::<Result<Vec<_>, TypedStoreError>>()?;
390        Ok(data.is_empty().not().then_some(TransactionEvents { data }))
391    }
392
393    pub fn multi_get_events(
394        &self,
395        event_digests: &[TransactionEventsDigest],
396    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
397        Ok(event_digests
398            .iter()
399            .map(|digest| self.get_events(digest))
400            .collect::<Result<Vec<_>, _>>()?)
401    }
402
403    pub fn multi_get_effects<'a>(
404        &self,
405        effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
406    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
407        Ok(self.perpetual_tables.effects.multi_get(effects_digests)?)
408    }
409
410    pub fn get_executed_effects(
411        &self,
412        tx_digest: &TransactionDigest,
413    ) -> IotaResult<Option<TransactionEffects>> {
414        let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
415        match effects_digest {
416            Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
417            None => Ok(None),
418        }
419    }
420
421    /// Given a list of transaction digests, returns a list of the corresponding
422    /// effects only if they have been executed. For transactions that have
423    /// not been executed, None is returned.
424    pub fn multi_get_executed_effects_digests(
425        &self,
426        digests: &[TransactionDigest],
427    ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
428        Ok(self.perpetual_tables.executed_effects.multi_get(digests)?)
429    }
430
431    /// Given a list of transaction digests, returns a list of the corresponding
432    /// effects only if they have been executed. For transactions that have
433    /// not been executed, None is returned.
434    pub fn multi_get_executed_effects(
435        &self,
436        digests: &[TransactionDigest],
437    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
438        let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
439        let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
440        let mut tx_to_effects_map = effects
441            .into_iter()
442            .flatten()
443            .map(|effects| (*effects.transaction_digest(), effects))
444            .collect::<HashMap<_, _>>();
445        Ok(digests
446            .iter()
447            .map(|digest| tx_to_effects_map.remove(digest))
448            .collect())
449    }
450
451    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
452        Ok(self
453            .perpetual_tables
454            .executed_effects
455            .contains_key(digest)?)
456    }
457
458    pub fn get_marker_value(
459        &self,
460        object_id: &ObjectID,
461        version: &SequenceNumber,
462        epoch_id: EpochId,
463    ) -> IotaResult<Option<MarkerValue>> {
464        let object_key = (epoch_id, ObjectKey(*object_id, *version));
465        Ok(self
466            .perpetual_tables
467            .object_per_epoch_marker_table
468            .get(&object_key)?)
469    }
470
471    pub fn get_latest_marker(
472        &self,
473        object_id: &ObjectID,
474        epoch_id: EpochId,
475    ) -> IotaResult<Option<(SequenceNumber, MarkerValue)>> {
476        let min_key = (epoch_id, ObjectKey::min_for_id(object_id));
477        let max_key = (epoch_id, ObjectKey::max_for_id(object_id));
478
479        let marker_entry = self
480            .perpetual_tables
481            .object_per_epoch_marker_table
482            .reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
483            .next();
484        match marker_entry {
485            Some(Ok(((epoch, key), marker))) => {
486                // because of the iterator bounds these cannot fail
487                assert_eq!(epoch, epoch_id);
488                assert_eq!(key.0, *object_id);
489                Ok(Some((key.1, marker)))
490            }
491            Some(Err(e)) => Err(e.into()),
492            None => Ok(None),
493        }
494    }
495
496    /// Returns future containing the state hash for the given epoch
497    /// once available
498    pub async fn notify_read_root_state_hash(
499        &self,
500        epoch: EpochId,
501    ) -> IotaResult<(CheckpointSequenceNumber, Accumulator)> {
502        // We need to register waiters _before_ reading from the database to avoid race
503        // conditions
504        let registration = self.root_state_notify_read.register_one(&epoch);
505        let hash = self.perpetual_tables.root_state_hash_by_epoch.get(&epoch)?;
506
507        let result = match hash {
508            // Note that Some() clause also drops registration that is already fulfilled
509            Some(ready) => Either::Left(futures::future::ready(ready)),
510            None => Either::Right(registration),
511        }
512        .await;
513
514        Ok(result)
515    }
516
517    // Implementation of the corresponding method of `CheckpointCache` trait.
518    pub(crate) fn insert_finalized_transactions_perpetual_checkpoints(
519        &self,
520        digests: &[TransactionDigest],
521        epoch: EpochId,
522        sequence: CheckpointSequenceNumber,
523    ) -> IotaResult {
524        let mut batch = self
525            .perpetual_tables
526            .executed_transactions_to_checkpoint
527            .batch();
528        batch.insert_batch(
529            &self.perpetual_tables.executed_transactions_to_checkpoint,
530            digests.iter().map(|d| (*d, (epoch, sequence))),
531        )?;
532        batch.write()?;
533        trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
534        Ok(())
535    }
536
537    // Implementation of the corresponding method of `CheckpointCache` trait.
538    pub(crate) fn get_transaction_perpetual_checkpoint(
539        &self,
540        digest: &TransactionDigest,
541    ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
542        Ok(self
543            .perpetual_tables
544            .executed_transactions_to_checkpoint
545            .get(digest)?)
546    }
547
548    // Implementation of the corresponding method of `CheckpointCache` trait.
549    pub(crate) fn multi_get_transactions_perpetual_checkpoints(
550        &self,
551        digests: &[TransactionDigest],
552    ) -> IotaResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
553        Ok(self
554            .perpetual_tables
555            .executed_transactions_to_checkpoint
556            .multi_get(digests)?)
557    }
558
559    /// Returns true if there are no objects in the database
560    pub fn database_is_empty(&self) -> IotaResult<bool> {
561        self.perpetual_tables.database_is_empty()
562    }
563
564    /// A function that acquires all locks associated with the objects (in order
565    /// to avoid deadlocks).
566    fn acquire_locks(&self, input_objects: &[ObjectRef]) -> Vec<MutexGuard> {
567        self.mutex_table
568            .acquire_locks(input_objects.iter().map(|(_, _, digest)| *digest))
569    }
570
571    pub fn object_exists_by_key(
572        &self,
573        object_id: &ObjectID,
574        version: VersionNumber,
575    ) -> IotaResult<bool> {
576        Ok(self
577            .perpetual_tables
578            .objects
579            .contains_key(&ObjectKey(*object_id, version))?)
580    }
581
582    pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> IotaResult<Vec<bool>> {
583        Ok(self
584            .perpetual_tables
585            .objects
586            .multi_contains_keys(object_keys.to_vec())?
587            .into_iter()
588            .collect())
589    }
590
591    pub fn multi_get_objects_by_key(
592        &self,
593        object_keys: &[ObjectKey],
594    ) -> Result<Vec<Option<Object>>, IotaError> {
595        let wrappers = self
596            .perpetual_tables
597            .objects
598            .multi_get(object_keys.to_vec())?;
599        let mut ret = vec![];
600
601        for (idx, w) in wrappers.into_iter().enumerate() {
602            ret.push(
603                w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
604                    .transpose()?
605                    .flatten(),
606            );
607        }
608        Ok(ret)
609    }
610
611    /// Get many objects
612    pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, IotaError> {
613        let mut result = Vec::new();
614        for id in objects {
615            result.push(self.try_get_object(id)?);
616        }
617        Ok(result)
618    }
619
620    pub fn have_deleted_owned_object_at_version_or_after(
621        &self,
622        object_id: &ObjectID,
623        version: VersionNumber,
624        epoch_id: EpochId,
625    ) -> Result<bool, IotaError> {
626        let object_key = ObjectKey::max_for_id(object_id);
627        let marker_key = (epoch_id, object_key);
628
629        // Find the most recent version of the object that was deleted or wrapped.
630        // Return true if the version is >= `version`. Otherwise return false.
631        let marker_entry = self
632            .perpetual_tables
633            .object_per_epoch_marker_table
634            .reversed_safe_iter_with_bounds(None, Some(marker_key))?
635            .next();
636        match marker_entry.transpose()? {
637            Some(((epoch, key), marker)) => {
638                // Make sure object id matches and version is >= `version`
639                let object_data_ok = key.0 == *object_id && key.1 >= version;
640                // Make sure we don't have a stale epoch for some reason (e.g., a revert)
641                let epoch_data_ok = epoch == epoch_id;
642                // Make sure the object was deleted or wrapped.
643                let mark_data_ok = marker == MarkerValue::OwnedDeleted;
644                Ok(object_data_ok && epoch_data_ok && mark_data_ok)
645            }
646            None => Ok(false),
647        }
648    }
649
650    // Methods to mutate the store
651
652    /// Insert a genesis object.
653    /// TODO: delete this method entirely (still used by authority_tests.rs)
654    pub(crate) fn insert_genesis_object(&self, object: Object) -> IotaResult {
655        // We only side load objects with a genesis parent transaction.
656        debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
657        let object_ref = object.compute_object_reference();
658        self.insert_object_direct(object_ref, &object)
659    }
660
661    /// Insert an object directly into the store, and also update relevant
662    /// tables NOTE: does not handle transaction lock.
663    /// This is used to insert genesis objects
664    fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> IotaResult {
665        let mut write_batch = self.perpetual_tables.objects.batch();
666
667        // Insert object
668        let store_object = get_store_object(object.clone());
669        write_batch.insert_batch(
670            &self.perpetual_tables.objects,
671            std::iter::once((ObjectKey::from(object_ref), store_object)),
672        )?;
673
674        // Update the index
675        if object.get_single_owner().is_some() {
676            // Only initialize live object markers for address owned objects.
677            if !object.is_child_object() {
678                self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref])?;
679            }
680        }
681
682        write_batch.write()?;
683
684        Ok(())
685    }
686
687    /// This function should only be used for initializing genesis and should
688    /// remain private.
689    #[instrument(level = "debug", skip_all)]
690    pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> IotaResult<()> {
691        let mut batch = self.perpetual_tables.objects.batch();
692        let ref_and_objects: Vec<_> = objects
693            .iter()
694            .map(|o| (o.compute_object_reference(), o))
695            .collect();
696
697        batch.insert_batch(
698            &self.perpetual_tables.objects,
699            ref_and_objects
700                .iter()
701                .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
702        )?;
703
704        let non_child_object_refs: Vec<_> = ref_and_objects
705            .iter()
706            .filter(|(_, object)| !object.is_child_object())
707            .map(|(oref, _)| *oref)
708            .collect();
709
710        self.initialize_live_object_markers_impl(&mut batch, &non_child_object_refs)?;
711
712        batch.write()?;
713
714        Ok(())
715    }
716
717    pub fn bulk_insert_live_objects(
718        perpetual_db: &AuthorityPerpetualTables,
719        live_objects: impl Iterator<Item = LiveObject>,
720        expected_sha3_digest: &[u8; 32],
721    ) -> IotaResult<()> {
722        let mut hasher = Sha3_256::default();
723        let mut batch = perpetual_db.objects.batch();
724        for object in live_objects {
725            hasher.update(object.object_reference().2.inner());
726            match object {
727                LiveObject::Normal(object) => {
728                    let store_object_wrapper = get_store_object(object.clone());
729                    batch.insert_batch(
730                        &perpetual_db.objects,
731                        std::iter::once((
732                            ObjectKey::from(object.compute_object_reference()),
733                            store_object_wrapper,
734                        )),
735                    )?;
736                    if !object.is_child_object() {
737                        Self::initialize_live_object_markers(
738                            &perpetual_db.live_owned_object_markers,
739                            &mut batch,
740                            &[object.compute_object_reference()],
741                        )?;
742                    }
743                }
744                LiveObject::Wrapped(object_key) => {
745                    batch.insert_batch(
746                        &perpetual_db.objects,
747                        std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
748                            object_key,
749                            StoreObject::Wrapped.into(),
750                        )),
751                    )?;
752                }
753            }
754        }
755        let sha3_digest = hasher.finalize().digest;
756        if *expected_sha3_digest != sha3_digest {
757            error!(
758                "Sha does not match! expected: {:?}, actual: {:?}",
759                expected_sha3_digest, sha3_digest
760            );
761            return Err(IotaError::from("Sha does not match"));
762        }
763        batch.write()?;
764        Ok(())
765    }
766
767    pub fn set_epoch_start_configuration(
768        &self,
769        epoch_start_configuration: &EpochStartConfiguration,
770    ) -> IotaResult {
771        self.perpetual_tables
772            .set_epoch_start_configuration(epoch_start_configuration)?;
773        Ok(())
774    }
775
776    pub fn get_epoch_start_configuration(&self) -> IotaResult<Option<EpochStartConfiguration>> {
777        Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
778    }
779
780    /// Updates the state resulting from the execution of a certificate.
781    ///
782    /// Internally it checks that all locks for active inputs are at the correct
783    /// version, and then writes objects, certificates, parents and clean up
784    /// locks atomically.
785    #[instrument(level = "debug", skip_all)]
786    pub fn write_transaction_outputs(
787        &self,
788        epoch_id: EpochId,
789        tx_outputs: &[Arc<TransactionOutputs>],
790    ) -> IotaResult {
791        let mut written = Vec::with_capacity(tx_outputs.len());
792        for outputs in tx_outputs {
793            written.extend(outputs.written.values().cloned());
794        }
795
796        let mut write_batch = self.perpetual_tables.transactions.batch();
797        for outputs in tx_outputs {
798            self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
799        }
800        // test crashing before writing the batch
801        fail_point!("crash");
802
803        write_batch.write()?;
804        trace!(
805            "committed transactions: {:?}",
806            tx_outputs
807                .iter()
808                .map(|tx| tx.transaction.digest())
809                .collect::<Vec<_>>()
810        );
811
812        // test crashing before notifying
813        fail_point!("crash");
814
815        Ok(())
816    }
817
818    fn write_one_transaction_outputs(
819        &self,
820        write_batch: &mut DBBatch,
821        epoch_id: EpochId,
822        tx_outputs: &TransactionOutputs,
823    ) -> IotaResult {
824        let TransactionOutputs {
825            transaction,
826            effects,
827            markers,
828            wrapped,
829            deleted,
830            written,
831            events,
832            live_object_markers_to_delete,
833            new_live_object_markers_to_init,
834            ..
835        } = tx_outputs;
836
837        // Store the certificate indexed by transaction digest
838        let transaction_digest = transaction.digest();
839        write_batch.insert_batch(
840            &self.perpetual_tables.transactions,
841            iter::once((transaction_digest, transaction.serializable_ref())),
842        )?;
843
844        // Add batched writes for objects and locks.
845        let effects_digest = effects.digest();
846
847        write_batch.insert_batch(
848            &self.perpetual_tables.object_per_epoch_marker_table,
849            markers
850                .iter()
851                .map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
852        )?;
853
854        write_batch.insert_batch(
855            &self.perpetual_tables.objects,
856            deleted
857                .iter()
858                .map(|key| (key, StoreObject::Deleted))
859                .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
860                .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
861        )?;
862
863        // Insert each output object into the stores
864        let new_objects = written.iter().map(|(id, new_object)| {
865            let version = new_object.version();
866            trace!(?id, ?version, "writing object");
867            let store_object = get_store_object(new_object.clone());
868            (ObjectKey(*id, version), store_object)
869        });
870
871        write_batch.insert_batch(&self.perpetual_tables.objects, new_objects)?;
872
873        let event_digest = events.digest();
874        let events = events
875            .data
876            .iter()
877            .enumerate()
878            .map(|(i, e)| ((event_digest, i), e));
879
880        write_batch.insert_batch(&self.perpetual_tables.events, events)?;
881
882        self.initialize_live_object_markers_impl(write_batch, new_live_object_markers_to_init)?;
883
884        // Note: deletes live object markers for received objects as well (but not for
885        // objects that were in `Receiving` arguments which were not received)
886        self.delete_live_object_markers(write_batch, live_object_markers_to_delete)?;
887
888        write_batch
889            .insert_batch(
890                &self.perpetual_tables.effects,
891                [(effects_digest, effects.clone())],
892            )?
893            .insert_batch(
894                &self.perpetual_tables.executed_effects,
895                [(transaction_digest, effects_digest)],
896            )?;
897
898        debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
899
900        Ok(())
901    }
902
903    /// Commits transactions only (not effects or other transaction outputs) to
904    /// the db. See ExecutionCache::persist_transaction for more info
905    pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> IotaResult {
906        let mut batch = self.perpetual_tables.transactions.batch();
907        batch.insert_batch(
908            &self.perpetual_tables.transactions,
909            [(tx.digest(), tx.clone().into_unsigned().serializable_ref())],
910        )?;
911        batch.write()?;
912        Ok(())
913    }
914
915    pub fn acquire_transaction_locks(
916        &self,
917        epoch_store: &AuthorityPerEpochStore,
918        owned_input_objects: &[ObjectRef],
919        transaction: VerifiedSignedTransaction,
920    ) -> IotaResult {
921        let tx_digest = *transaction.digest();
922        // Other writers may be attempting to acquire locks on the same objects, so a
923        // mutex is required.
924        // TODO: replace with optimistic db_transactions (i.e. set lock to tx if none)
925        let _mutexes = self.acquire_locks(owned_input_objects);
926
927        trace!(?owned_input_objects, "acquire_transaction_locks");
928        let mut locks_to_write = Vec::new();
929
930        let live_object_markers = self
931            .perpetual_tables
932            .live_owned_object_markers
933            .multi_get(owned_input_objects)?;
934
935        let epoch_tables = epoch_store.tables()?;
936
937        let locks = epoch_tables.multi_get_locked_transactions(owned_input_objects)?;
938
939        assert_eq!(locks.len(), live_object_markers.len());
940
941        for (live_marker, lock, obj_ref) in izip!(
942            live_object_markers.into_iter(),
943            locks.into_iter(),
944            owned_input_objects
945        ) {
946            if live_marker.is_none() {
947                // object at that version does not exist
948                let latest_live_version = self.get_latest_live_version_for_object_id(obj_ref.0)?;
949                fp_bail!(
950                    UserInputError::ObjectVersionUnavailableForConsumption {
951                        provided_obj_ref: *obj_ref,
952                        current_version: latest_live_version.1
953                    }
954                    .into()
955                );
956            };
957
958            if let Some(previous_tx_digest) = &lock {
959                if previous_tx_digest == &tx_digest {
960                    // no need to re-write lock
961                    continue;
962                } else {
963                    // TODO: add metrics here
964                    info!(prev_tx_digest = ?previous_tx_digest,
965                          cur_tx_digest = ?tx_digest,
966                          "Cannot acquire lock: conflicting transaction!");
967                    return Err(IotaError::ObjectLockConflict {
968                        obj_ref: *obj_ref,
969                        pending_transaction: *previous_tx_digest,
970                    });
971                }
972            }
973
974            locks_to_write.push((*obj_ref, tx_digest));
975        }
976
977        if !locks_to_write.is_empty() {
978            trace!(?locks_to_write, "Writing locks");
979            epoch_tables.write_transaction_locks(transaction, locks_to_write.into_iter())?;
980        }
981
982        Ok(())
983    }
984
985    /// Gets ObjectLockInfo that represents state of lock on an object.
986    /// Returns UserInputError::ObjectNotFound if cannot find lock record for
987    /// this object
988    pub(crate) fn get_lock(
989        &self,
990        obj_ref: ObjectRef,
991        epoch_store: &AuthorityPerEpochStore,
992    ) -> IotaLockResult {
993        if self
994            .perpetual_tables
995            .live_owned_object_markers
996            .get(&obj_ref)?
997            .is_none()
998        {
999            // object at that version does not exist
1000            return Ok(ObjectLockStatus::LockedAtDifferentVersion {
1001                locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
1002            });
1003        }
1004
1005        let tables = epoch_store.tables()?;
1006        if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
1007            Ok(ObjectLockStatus::LockedToTx {
1008                locked_by_tx: tx_digest,
1009            })
1010        } else {
1011            Ok(ObjectLockStatus::Initialized)
1012        }
1013    }
1014
1015    /// Returns UserInputError::ObjectNotFound if no lock records found for this
1016    /// object.
1017    pub(crate) fn get_latest_live_version_for_object_id(
1018        &self,
1019        object_id: ObjectID,
1020    ) -> IotaResult<ObjectRef> {
1021        let mut iterator = self
1022            .perpetual_tables
1023            .live_owned_object_markers
1024            .reversed_safe_iter_with_bounds(
1025                None,
1026                Some((object_id, SequenceNumber::MAX_VALID_EXCL, ObjectDigest::MAX)),
1027            )?;
1028        Ok(iterator
1029            .next()
1030            .transpose()?
1031            .and_then(|value| {
1032                if value.0.0 == object_id {
1033                    Some(value)
1034                } else {
1035                    None
1036                }
1037            })
1038            .ok_or_else(|| {
1039                IotaError::from(UserInputError::ObjectNotFound {
1040                    object_id,
1041                    version: None,
1042                })
1043            })?
1044            .0)
1045    }
1046
1047    /// Checks multiple object locks exist.
1048    /// Returns UserInputError::ObjectNotFound if cannot find lock record for at
1049    /// least one of the objects.
1050    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if at
1051    /// least one object lock is not initialized     at the given version.
1052    pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> IotaResult {
1053        let live_markers = self
1054            .perpetual_tables
1055            .live_owned_object_markers
1056            .multi_get(objects)?;
1057        for (live_marker, obj_ref) in live_markers.into_iter().zip(objects) {
1058            if live_marker.is_none() {
1059                // object at that version does not exist
1060                let latest_live_version = self.get_latest_live_version_for_object_id(obj_ref.0)?;
1061                fp_bail!(
1062                    UserInputError::ObjectVersionUnavailableForConsumption {
1063                        provided_obj_ref: *obj_ref,
1064                        current_version: latest_live_version.1
1065                    }
1066                    .into()
1067                );
1068            }
1069        }
1070        Ok(())
1071    }
1072
1073    /// Initialize live object markers for a given list of ObjectRefs.
1074    fn initialize_live_object_markers_impl(
1075        &self,
1076        write_batch: &mut DBBatch,
1077        objects: &[ObjectRef],
1078    ) -> IotaResult {
1079        AuthorityStore::initialize_live_object_markers(
1080            &self.perpetual_tables.live_owned_object_markers,
1081            write_batch,
1082            objects,
1083        )
1084    }
1085
1086    pub fn initialize_live_object_markers(
1087        live_object_marker_table: &DBMap<ObjectRef, ()>,
1088        write_batch: &mut DBBatch,
1089        objects: &[ObjectRef],
1090    ) -> IotaResult {
1091        trace!(?objects, "initialize_live_object_markers");
1092
1093        write_batch.insert_batch(
1094            live_object_marker_table,
1095            objects.iter().map(|obj_ref| (obj_ref, ())),
1096        )?;
1097        Ok(())
1098    }
1099
1100    /// Removes locks for a given list of ObjectRefs.
1101    fn delete_live_object_markers(
1102        &self,
1103        write_batch: &mut DBBatch,
1104        objects: &[ObjectRef],
1105    ) -> IotaResult {
1106        trace!(?objects, "delete_live_object_markers");
1107        write_batch.delete_batch(
1108            &self.perpetual_tables.live_owned_object_markers,
1109            objects.iter(),
1110        )?;
1111        Ok(())
1112    }
1113
1114    #[cfg(test)]
1115    pub(crate) fn reset_locks_and_live_markers_for_test(
1116        &self,
1117        transactions: &[TransactionDigest],
1118        objects: &[ObjectRef],
1119        epoch_store: &AuthorityPerEpochStore,
1120    ) {
1121        for tx in transactions {
1122            epoch_store.delete_signed_transaction_for_test(tx);
1123            epoch_store.delete_object_locks_for_test(objects);
1124        }
1125
1126        let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1127        batch
1128            .delete_batch(
1129                &self.perpetual_tables.live_owned_object_markers,
1130                objects.iter(),
1131            )
1132            .unwrap();
1133        batch.write().unwrap();
1134
1135        let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1136        self.initialize_live_object_markers_impl(&mut batch, objects)
1137            .unwrap();
1138        batch.write().unwrap();
1139    }
1140
1141    /// This function is called at the end of epoch for each transaction that's
1142    /// executed locally on the validator but didn't make to the last
1143    /// checkpoint. The effects of the execution is reverted here.
1144    /// The following things are reverted:
1145    /// 1. All new object states are deleted.
1146    /// 2. owner_index table change is reverted.
1147    ///
1148    /// NOTE: transaction and effects are intentionally not deleted. It's
1149    /// possible that if this node is behind, the network will execute the
1150    /// transaction in a later epoch. In that case, we need to keep it saved
1151    /// so that when we receive the checkpoint that includes it from state
1152    /// sync, we are able to execute the checkpoint.
1153    /// TODO: implement GC for transactions that are no longer needed.
1154    pub fn revert_state_update(&self, tx_digest: &TransactionDigest) -> IotaResult {
1155        let Some(effects) = self.get_executed_effects(tx_digest)? else {
1156            info!("Not reverting {:?} as it was not executed", tx_digest);
1157            return Ok(());
1158        };
1159
1160        info!(?tx_digest, ?effects, "reverting transaction");
1161
1162        // We should never be reverting shared object transactions.
1163        assert!(effects.input_shared_objects().is_empty());
1164
1165        let mut write_batch = self.perpetual_tables.transactions.batch();
1166        write_batch.delete_batch(
1167            &self.perpetual_tables.executed_effects,
1168            iter::once(tx_digest),
1169        )?;
1170        if let Some(events_digest) = effects.events_digest() {
1171            write_batch.schedule_delete_range(
1172                &self.perpetual_tables.events,
1173                &(*events_digest, usize::MIN),
1174                &(*events_digest, usize::MAX),
1175            )?;
1176        }
1177
1178        let tombstones = effects
1179            .all_tombstones()
1180            .into_iter()
1181            .map(|(id, version)| ObjectKey(id, version));
1182        write_batch.delete_batch(&self.perpetual_tables.objects, tombstones)?;
1183
1184        let all_new_object_keys = effects
1185            .all_changed_objects()
1186            .into_iter()
1187            .map(|((id, version, _), _, _)| ObjectKey(id, version));
1188        write_batch.delete_batch(&self.perpetual_tables.objects, all_new_object_keys.clone())?;
1189
1190        let modified_object_keys = effects
1191            .modified_at_versions()
1192            .into_iter()
1193            .map(|(id, version)| ObjectKey(id, version));
1194
1195        macro_rules! get_objects_and_locks {
1196            ($object_keys: expr) => {
1197                self.perpetual_tables
1198                    .objects
1199                    .multi_get($object_keys.clone())?
1200                    .into_iter()
1201                    .zip($object_keys)
1202                    .filter_map(|(obj_opt, key)| {
1203                        let obj = self
1204                            .perpetual_tables
1205                            .object(
1206                                &key,
1207                                obj_opt.unwrap_or_else(|| {
1208                                    panic!("Older object version not found: {:?}", key)
1209                                }),
1210                            )
1211                            .expect("Matching indirect object not found")?;
1212
1213                        if obj.is_immutable() {
1214                            return None;
1215                        }
1216
1217                        let obj_ref = obj.compute_object_reference();
1218                        Some(obj.is_address_owned().then_some(obj_ref))
1219                    })
1220            };
1221        }
1222
1223        let old_locks = get_objects_and_locks!(modified_object_keys);
1224        let new_locks = get_objects_and_locks!(all_new_object_keys);
1225
1226        let old_locks: Vec<_> = old_locks.flatten().collect();
1227
1228        // Re-create old live markers.
1229        self.initialize_live_object_markers_impl(&mut write_batch, &old_locks)?;
1230
1231        // Delete new live markers
1232        write_batch.delete_batch(
1233            &self.perpetual_tables.live_owned_object_markers,
1234            new_locks.flatten(),
1235        )?;
1236
1237        write_batch.write()?;
1238
1239        Ok(())
1240    }
1241
1242    /// Return the object with version less then or eq to the provided seq
1243    /// number. This is used by indexer to find the correct version of
1244    /// dynamic field child object. We do not store the version of the child
1245    /// object, but because of lamport timestamp, we know the child must
1246    /// have version number less then or eq to the parent.
1247    pub fn find_object_lt_or_eq_version(
1248        &self,
1249        object_id: ObjectID,
1250        version: SequenceNumber,
1251    ) -> IotaResult<Option<Object>> {
1252        self.perpetual_tables
1253            .find_object_lt_or_eq_version(object_id, version)
1254    }
1255
1256    /// Returns the latest object reference we have for this object_id in the
1257    /// objects table.
1258    ///
1259    /// The method may also return the reference to a deleted object with a
1260    /// digest of ObjectDigest::deleted() or ObjectDigest::wrapped() and
1261    /// lamport version of a transaction that deleted the object.
1262    /// Note that a deleted object may re-appear if the deletion was the result
1263    /// of the object being wrapped in another object.
1264    ///
1265    /// If no entry for the object_id is found, return None.
1266    pub fn get_latest_object_ref_or_tombstone(
1267        &self,
1268        object_id: ObjectID,
1269    ) -> Result<Option<ObjectRef>, IotaError> {
1270        self.perpetual_tables
1271            .get_latest_object_ref_or_tombstone(object_id)
1272    }
1273
1274    /// Returns the latest object reference if and only if the object is still
1275    /// live (i.e. it does not return tombstones)
1276    pub fn get_latest_object_ref_if_alive(
1277        &self,
1278        object_id: ObjectID,
1279    ) -> Result<Option<ObjectRef>, IotaError> {
1280        match self.get_latest_object_ref_or_tombstone(object_id)? {
1281            Some(objref) if objref.2.is_alive() => Ok(Some(objref)),
1282            _ => Ok(None),
1283        }
1284    }
1285
1286    /// Returns the latest object we have for this object_id in the objects
1287    /// table.
1288    ///
1289    /// If no entry for the object_id is found, return None.
1290    pub fn get_latest_object_or_tombstone(
1291        &self,
1292        object_id: ObjectID,
1293    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, IotaError> {
1294        let Some((object_key, store_object)) = self
1295            .perpetual_tables
1296            .get_latest_object_or_tombstone(object_id)?
1297        else {
1298            return Ok(None);
1299        };
1300
1301        if let Some(object_ref) = self
1302            .perpetual_tables
1303            .tombstone_reference(&object_key, &store_object)?
1304        {
1305            return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1306        }
1307
1308        let object = self
1309            .perpetual_tables
1310            .object(&object_key, store_object)?
1311            .expect("Non tombstone store object could not be converted to object");
1312
1313        Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1314    }
1315
1316    pub fn insert_transaction_and_effects(
1317        &self,
1318        transaction: &VerifiedTransaction,
1319        transaction_effects: &TransactionEffects,
1320    ) -> Result<(), TypedStoreError> {
1321        let mut write_batch = self.perpetual_tables.transactions.batch();
1322        write_batch
1323            .insert_batch(
1324                &self.perpetual_tables.transactions,
1325                [(transaction.digest(), transaction.serializable_ref())],
1326            )?
1327            .insert_batch(
1328                &self.perpetual_tables.effects,
1329                [(transaction_effects.digest(), transaction_effects)],
1330            )?;
1331
1332        write_batch.write()?;
1333        Ok(())
1334    }
1335
1336    pub fn multi_insert_transaction_and_effects<'a>(
1337        &self,
1338        transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1339    ) -> Result<(), TypedStoreError> {
1340        let mut write_batch = self.perpetual_tables.transactions.batch();
1341        for tx in transactions {
1342            write_batch
1343                .insert_batch(
1344                    &self.perpetual_tables.transactions,
1345                    [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1346                )?
1347                .insert_batch(
1348                    &self.perpetual_tables.effects,
1349                    [(tx.effects.digest(), &tx.effects)],
1350                )?;
1351        }
1352
1353        write_batch.write()?;
1354        Ok(())
1355    }
1356
1357    pub fn multi_get_transaction_blocks(
1358        &self,
1359        tx_digests: &[TransactionDigest],
1360    ) -> IotaResult<Vec<Option<VerifiedTransaction>>> {
1361        Ok(self
1362            .perpetual_tables
1363            .transactions
1364            .multi_get(tx_digests)
1365            .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())?)
1366    }
1367
1368    pub fn get_transaction_block(
1369        &self,
1370        tx_digest: &TransactionDigest,
1371    ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1372        self.perpetual_tables
1373            .transactions
1374            .get(tx_digest)
1375            .map(|v| v.map(|v| v.into()))
1376    }
1377
1378    /// This function reads the DB directly to get the system state object.
1379    /// If reconfiguration is happening at the same time, there is no guarantee
1380    /// whether we would be getting the old or the new system state object.
1381    /// Hence this function should only be called during RPC reads where data
1382    /// race is not a major concern. In general we should avoid this as much
1383    /// as possible. If the intent is for testing, you can use
1384    /// AuthorityState:: get_iota_system_state_object_for_testing.
1385    pub fn get_iota_system_state_object_unsafe(&self) -> IotaResult<IotaSystemState> {
1386        get_iota_system_state(self.perpetual_tables.as_ref())
1387    }
1388
1389    pub fn expensive_check_iota_conservation<T>(
1390        self: &Arc<Self>,
1391        type_layout_store: T,
1392        old_epoch_store: &AuthorityPerEpochStore,
1393        epoch_supply_change: Option<i64>,
1394    ) -> IotaResult
1395    where
1396        T: TypeLayoutStore + Send + Copy,
1397    {
1398        if !self.enable_epoch_iota_conservation_check {
1399            return Ok(());
1400        }
1401
1402        let executor = old_epoch_store.executor();
1403        info!("Starting IOTA conservation check. This may take a while..");
1404        let cur_time = Instant::now();
1405        let mut pending_objects = vec![];
1406        let mut count = 0;
1407        let mut size = 0;
1408        let (mut total_iota, mut total_storage_rebate) = thread::scope(|s| {
1409            let pending_tasks = FuturesUnordered::new();
1410            for o in self.iter_live_object_set() {
1411                match o {
1412                    LiveObject::Normal(object) => {
1413                        size += object.object_size_for_gas_metering();
1414                        count += 1;
1415                        pending_objects.push(object);
1416                        if count % 1_000_000 == 0 {
1417                            let mut task_objects = vec![];
1418                            mem::swap(&mut pending_objects, &mut task_objects);
1419                            pending_tasks.push(s.spawn(move || {
1420                                let mut layout_resolver =
1421                                    executor.type_layout_resolver(Box::new(type_layout_store));
1422                                let mut total_storage_rebate = 0;
1423                                let mut total_iota = 0;
1424                                for object in task_objects {
1425                                    total_storage_rebate += object.storage_rebate;
1426                                    // get_total_iota includes storage rebate, however all storage
1427                                    // rebate is also stored in
1428                                    // the storage fund, so we need to subtract it here.
1429                                    total_iota +=
1430                                        object.get_total_iota(layout_resolver.as_mut()).unwrap()
1431                                            - object.storage_rebate;
1432                                }
1433                                if count % 50_000_000 == 0 {
1434                                    info!("Processed {} objects", count);
1435                                }
1436                                (total_iota, total_storage_rebate)
1437                            }));
1438                        }
1439                    }
1440                    LiveObject::Wrapped(_) => {
1441                        unreachable!("Explicitly asked to not include wrapped tombstones")
1442                    }
1443                }
1444            }
1445            pending_tasks.into_iter().fold((0, 0), |init, result| {
1446                let result = result.join().unwrap();
1447                (init.0 + result.0, init.1 + result.1)
1448            })
1449        });
1450        let mut layout_resolver = executor.type_layout_resolver(Box::new(type_layout_store));
1451        for object in pending_objects {
1452            total_storage_rebate += object.storage_rebate;
1453            total_iota +=
1454                object.get_total_iota(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1455        }
1456        info!(
1457            "Scanned {} live objects, took {:?}",
1458            count,
1459            cur_time.elapsed()
1460        );
1461        self.metrics
1462            .iota_conservation_live_object_count
1463            .set(count as i64);
1464        self.metrics
1465            .iota_conservation_live_object_size
1466            .set(size as i64);
1467        self.metrics
1468            .iota_conservation_check_latency
1469            .set(cur_time.elapsed().as_secs() as i64);
1470
1471        // It is safe to call this function because we are in the middle of
1472        // reconfiguration.
1473        let system_state: IotaSystemStateSummaryV2 = self
1474            .get_iota_system_state_object_unsafe()
1475            .expect("Reading iota system state object cannot fail")
1476            .into_iota_system_state_summary()
1477            .try_into()?;
1478        let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1479        info!(
1480            "Total IOTA amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1481            total_iota, storage_fund_balance, total_storage_rebate, system_state.epoch
1482        );
1483
1484        let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1485        self.metrics
1486            .iota_conservation_storage_fund
1487            .set(storage_fund_balance as i64);
1488        self.metrics
1489            .iota_conservation_storage_fund_imbalance
1490            .set(imbalance);
1491        self.metrics
1492            .iota_conservation_imbalance
1493            .set((total_iota as i128 - system_state.iota_total_supply as i128) as i64);
1494
1495        if let Some(expected_imbalance) = self
1496            .perpetual_tables
1497            .expected_storage_fund_imbalance
1498            .get(&())
1499            .map_err(|err| {
1500                IotaError::from(
1501                    format!("failed to read expected storage fund imbalance: {err}").as_str(),
1502                )
1503            })?
1504        {
1505            fp_ensure!(
1506                imbalance == expected_imbalance,
1507                IotaError::from(
1508                    format!(
1509                        "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1510                        system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1511                    ).as_str()
1512                )
1513            );
1514        } else {
1515            self.perpetual_tables
1516                .expected_storage_fund_imbalance
1517                .insert(&(), &imbalance)
1518                .map_err(|err| {
1519                    IotaError::from(
1520                        format!("failed to write expected storage fund imbalance: {err}").as_str(),
1521                    )
1522                })?;
1523        }
1524
1525        let total_supply = self
1526            .perpetual_tables
1527            .total_iota_supply
1528            .get(&())
1529            .map_err(|err| {
1530                IotaError::from(format!("failed to read total iota supply: {err}").as_str())
1531            })?;
1532
1533        match total_supply.zip(epoch_supply_change) {
1534            // Only execute the check if both are set and the supply value was set in the last
1535            // epoch. We have to assume the supply changes every epoch and therefore we
1536            // cannot run the check with a supply value from any epoch earlier than the
1537            // last one. This can happen if the check was disabled for some time.
1538            Some((old_supply, epoch_supply_change))
1539                if old_supply.last_check_epoch + 1 == old_epoch_store.epoch() =>
1540            {
1541                let expected_new_supply = if epoch_supply_change >= 0 {
1542                    old_supply
1543                        .total_supply
1544                        .checked_add(epoch_supply_change.unsigned_abs())
1545                        .ok_or_else(|| {
1546                            IotaError::from(
1547                                format!(
1548                                    "Inconsistent state detected at epoch {}: old supply {} + supply change {} overflowed",
1549                                    system_state.epoch, old_supply.total_supply, epoch_supply_change
1550                                ).as_str())
1551                        })?
1552                } else {
1553                    old_supply.total_supply.checked_sub(epoch_supply_change.unsigned_abs()).ok_or_else(|| {
1554                        IotaError::from(
1555                            format!(
1556                                "Inconsistent state detected at epoch {}: old supply {} - supply change {} underflowed",
1557                                system_state.epoch, old_supply.total_supply, epoch_supply_change
1558                            ).as_str())
1559                    })?
1560                };
1561
1562                fp_ensure!(
1563                    total_iota == expected_new_supply,
1564                    IotaError::from(
1565                        format!(
1566                            "Inconsistent state detected at epoch {}: total iota: {}, expecting {}",
1567                            system_state.epoch, total_iota, expected_new_supply
1568                        )
1569                        .as_str()
1570                    )
1571                );
1572
1573                let new_supply = TotalIotaSupplyCheck {
1574                    total_supply: expected_new_supply,
1575                    last_check_epoch: old_epoch_store.epoch(),
1576                };
1577
1578                self.perpetual_tables
1579                    .total_iota_supply
1580                    .insert(&(), &new_supply)
1581                    .map_err(|err| {
1582                        IotaError::from(
1583                            format!("failed to write total iota supply: {err}").as_str(),
1584                        )
1585                    })?;
1586            }
1587            // If either one is None or if the last value is from an older epoch,
1588            // we update the value in the table since we're at genesis and cannot execute the check.
1589            _ => {
1590                info!("Skipping total supply check");
1591
1592                let supply = TotalIotaSupplyCheck {
1593                    total_supply: total_iota,
1594                    last_check_epoch: old_epoch_store.epoch(),
1595                };
1596
1597                self.perpetual_tables
1598                    .total_iota_supply
1599                    .insert(&(), &supply)
1600                    .map_err(|err| {
1601                        IotaError::from(
1602                            format!("failed to write total iota supply: {err}").as_str(),
1603                        )
1604                    })?;
1605
1606                return Ok(());
1607            }
1608        };
1609
1610        Ok(())
1611    }
1612
1613    pub async fn prune_objects_and_compact_for_testing(
1614        &self,
1615        checkpoint_store: &Arc<CheckpointStore>,
1616        rest_index: Option<&RestIndexStore>,
1617    ) {
1618        let pruning_config = AuthorityStorePruningConfig {
1619            num_epochs_to_retain: 0,
1620            ..Default::default()
1621        };
1622        let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1623            &self.perpetual_tables,
1624            checkpoint_store,
1625            rest_index,
1626            None,
1627            pruning_config,
1628            AuthorityStorePruningMetrics::new_for_test(),
1629            EPOCH_DURATION_MS_FOR_TESTING,
1630        )
1631        .await;
1632        let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1633    }
1634
1635    #[cfg(test)]
1636    pub async fn prune_objects_immediately_for_testing(
1637        &self,
1638        transaction_effects: Vec<TransactionEffects>,
1639    ) -> anyhow::Result<()> {
1640        let mut wb = self.perpetual_tables.objects.batch();
1641
1642        let mut object_keys_to_prune = vec![];
1643        for effects in &transaction_effects {
1644            for (object_id, seq_number) in effects.modified_at_versions() {
1645                info!("Pruning object {:?} version {:?}", object_id, seq_number);
1646                object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1647            }
1648        }
1649
1650        wb.delete_batch(
1651            &self.perpetual_tables.objects,
1652            object_keys_to_prune.into_iter(),
1653        )?;
1654        wb.write()?;
1655        Ok(())
1656    }
1657
1658    #[cfg(msim)]
1659    pub fn remove_all_versions_of_object(&self, object_id: ObjectID) {
1660        let entries: Vec<_> = self
1661            .perpetual_tables
1662            .objects
1663            .unbounded_iter()
1664            .filter_map(|(key, _)| if key.0 == object_id { Some(key) } else { None })
1665            .collect();
1666        info!("Removing all versions of object: {:?}", entries);
1667        self.perpetual_tables.objects.multi_remove(entries).unwrap();
1668    }
1669
1670    // Counts the number of versions exist in object store for `object_id`. This
1671    // includes tombstone.
1672    #[cfg(msim)]
1673    pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1674        self.perpetual_tables
1675            .objects
1676            .safe_iter_with_bounds(
1677                Some(ObjectKey(object_id, VersionNumber::MIN_VALID_INCL)),
1678                Some(ObjectKey(object_id, VersionNumber::MAX_VALID_EXCL)),
1679            )
1680            .collect::<Result<Vec<_>, _>>()
1681            .unwrap()
1682            .len()
1683    }
1684}
1685
1686impl AccumulatorStore for AuthorityStore {
1687    fn get_root_state_accumulator_for_epoch(
1688        &self,
1689        epoch: EpochId,
1690    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
1691        self.perpetual_tables
1692            .root_state_hash_by_epoch
1693            .get(&epoch)
1694            .map_err(Into::into)
1695    }
1696
1697    fn get_root_state_accumulator_for_highest_epoch(
1698        &self,
1699    ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
1700        Ok(self
1701            .perpetual_tables
1702            .root_state_hash_by_epoch
1703            .reversed_safe_iter_with_bounds(None, None)?
1704            .next()
1705            .transpose()?)
1706    }
1707
1708    fn insert_state_accumulator_for_epoch(
1709        &self,
1710        epoch: EpochId,
1711        last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1712        acc: &Accumulator,
1713    ) -> IotaResult {
1714        self.perpetual_tables
1715            .root_state_hash_by_epoch
1716            .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1717        self.root_state_notify_read
1718            .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1719
1720        Ok(())
1721    }
1722
1723    fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1724        Box::new(self.perpetual_tables.iter_live_object_set())
1725    }
1726}
1727
1728impl ObjectStore for AuthorityStore {
1729    /// Read an object and return it, or Ok(None) if the object was not found.
1730    fn try_get_object(
1731        &self,
1732        object_id: &ObjectID,
1733    ) -> Result<Option<Object>, iota_types::storage::error::Error> {
1734        self.perpetual_tables.as_ref().try_get_object(object_id)
1735    }
1736
1737    fn try_get_object_by_key(
1738        &self,
1739        object_id: &ObjectID,
1740        version: VersionNumber,
1741    ) -> Result<Option<Object>, iota_types::storage::error::Error> {
1742        self.perpetual_tables
1743            .try_get_object_by_key(object_id, version)
1744    }
1745}
1746
1747/// A wrapper to make Orphan Rule happy
1748pub struct ResolverWrapper {
1749    pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1750    pub metrics: Arc<ResolverMetrics>,
1751}
1752
1753impl ResolverWrapper {
1754    pub fn new(
1755        resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1756        metrics: Arc<ResolverMetrics>,
1757    ) -> Self {
1758        metrics.module_cache_size.set(0);
1759        ResolverWrapper { resolver, metrics }
1760    }
1761
1762    fn inc_cache_size_gauge(&self) {
1763        // reset the gauge after a restart of the cache
1764        let current = self.metrics.module_cache_size.get();
1765        self.metrics.module_cache_size.set(current + 1);
1766    }
1767}
1768
1769impl ModuleResolver for ResolverWrapper {
1770    type Error = IotaError;
1771    fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1772        self.inc_cache_size_gauge();
1773        get_module(&*self.resolver, module_id)
1774    }
1775}
1776
1777pub enum UpdateType {
1778    Transaction(TransactionEffectsDigest),
1779    Genesis,
1780}
1781
1782pub type IotaLockResult = IotaResult<ObjectLockStatus>;
1783
1784#[derive(Debug, PartialEq, Eq)]
1785pub enum ObjectLockStatus {
1786    Initialized,
1787    LockedToTx { locked_by_tx: LockDetails }, // no need to use wrapper, not stored or serialized
1788    LockedAtDifferentVersion { locked_ref: ObjectRef },
1789}