iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49    key_value_store::{
50        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51    },
52    key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57    IOTA_SYSTEM_ADDRESS, TypeTag,
58    authenticator_state::get_authenticator_state,
59    base_types::*,
60    committee::{Committee, EpochId, ProtocolVersion},
61    crypto::{
62        AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
63        default_hash,
64    },
65    deny_list_v1::check_coin_deny_list_v1_during_signing,
66    digests::{ChainIdentifier, Digest, TransactionEventsDigest},
67    dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
68    effects::{
69        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
70        TransactionEvents, VerifiedSignedTransactionEffects,
71    },
72    error::{ExecutionError, IotaError, IotaResult, UserInputError},
73    event::{Event, EventID, SystemEpochInfoEvent},
74    executable_transaction::VerifiedExecutableTransaction,
75    execution_config_utils::to_binary_config,
76    execution_status::ExecutionStatus,
77    gas::{GasCostSummary, IotaGasStatus},
78    gas_coin::NANOS_PER_IOTA,
79    inner_temporary_store::{
80        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
81        WrittenObjects,
82    },
83    iota_system_state::{
84        IotaSystemState, IotaSystemStateTrait,
85        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
86    },
87    is_system_package,
88    layout_resolver::{LayoutResolver, into_struct_layout},
89    message_envelope::Message,
90    messages_checkpoint::{
91        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
92        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
93        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
94        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
95    },
96    messages_consensus::AuthorityCapabilitiesV1,
97    messages_grpc::{
98        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
99        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
100        TransactionStatus,
101    },
102    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
103    object::{
104        MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
105        bounded_visitor::BoundedVisitor,
106    },
107    storage::{
108        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
109    },
110    supported_protocol_versions::{
111        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
112    },
113    transaction::*,
114    transaction_executor::{SimulateTransactionResult, VmChecks},
115};
116use itertools::Itertools;
117use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
118use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
119use parking_lot::Mutex;
120use prometheus::{
121    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
122    register_histogram_vec_with_registry, register_histogram_with_registry,
123    register_int_counter_vec_with_registry, register_int_counter_with_registry,
124    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
125};
126use serde::{Deserialize, Serialize, de::DeserializeOwned};
127use tap::TapFallible;
128use tokio::{
129    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
130    task::JoinHandle,
131};
132use tracing::{debug, error, info, instrument, trace, warn};
133use typed_store::TypedStoreError;
134
135use self::{
136    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
137};
138#[cfg(msim)]
139pub use crate::checkpoints::checkpoint_executor::utils::{
140    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
141};
142use crate::{
143    authority::{
144        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
145        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
146        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
147        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
148        authority_store_tables::AuthorityPrunerTables,
149        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
150    },
151    authority_client::NetworkAuthorityClient,
152    checkpoints::CheckpointStore,
153    congestion_tracker::CongestionTracker,
154    consensus_adapter::ConsensusAdapter,
155    epoch::committee_store::CommitteeStore,
156    execution_cache::{
157        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
158        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
159        TransactionCacheRead,
160    },
161    execution_driver::execution_process,
162    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
163    metrics::{LatencyObserver, RateTracker},
164    module_cache_metrics::ResolverMetrics,
165    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
166    rest_index::RestIndexStore,
167    stake_aggregator::StakeAggregator,
168    state_accumulator::{AccumulatorStore, StateAccumulator},
169    subscription_handler::SubscriptionHandler,
170    transaction_input_loader::TransactionInputLoader,
171    transaction_manager::TransactionManager,
172    transaction_outputs::TransactionOutputs,
173    validator_tx_finalizer::ValidatorTxFinalizer,
174    verify_indexes::verify_indexes,
175};
176
177#[cfg(test)]
178#[path = "unit_tests/authority_tests.rs"]
179pub mod authority_tests;
180
181#[cfg(test)]
182#[path = "unit_tests/transaction_tests.rs"]
183pub mod transaction_tests;
184
185#[cfg(test)]
186#[path = "unit_tests/batch_transaction_tests.rs"]
187mod batch_transaction_tests;
188
189#[cfg(test)]
190#[path = "unit_tests/move_integration_tests.rs"]
191pub mod move_integration_tests;
192
193#[cfg(test)]
194#[path = "unit_tests/gas_tests.rs"]
195mod gas_tests;
196
197#[cfg(test)]
198#[path = "unit_tests/batch_verification_tests.rs"]
199mod batch_verification_tests;
200
201#[cfg(test)]
202#[path = "unit_tests/coin_deny_list_tests.rs"]
203mod coin_deny_list_tests;
204
205#[cfg(test)]
206#[path = "unit_tests/auth_unit_test_utils.rs"]
207pub mod auth_unit_test_utils;
208
209pub mod authority_test_utils;
210
211pub mod authority_per_epoch_store;
212pub mod authority_per_epoch_store_pruner;
213
214pub mod authority_store_pruner;
215pub mod authority_store_tables;
216pub mod authority_store_types;
217pub mod epoch_start_configuration;
218pub mod shared_object_congestion_tracker;
219pub mod shared_object_version_manager;
220pub mod suggested_gas_price_calculator;
221pub mod test_authority_builder;
222pub mod transaction_deferral;
223
224pub(crate) mod authority_store;
225pub mod backpressure;
226
227/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
228pub struct AuthorityMetrics {
229    tx_orders: IntCounter,
230    total_certs: IntCounter,
231    total_cert_attempts: IntCounter,
232    total_effects: IntCounter,
233    pub shared_obj_tx: IntCounter,
234    sponsored_tx: IntCounter,
235    tx_already_processed: IntCounter,
236    num_input_objs: Histogram,
237    num_shared_objects: Histogram,
238    batch_size: Histogram,
239
240    authority_state_handle_transaction_latency: Histogram,
241
242    execute_certificate_latency_single_writer: Histogram,
243    execute_certificate_latency_shared_object: Histogram,
244
245    internal_execution_latency: Histogram,
246    execution_load_input_objects_latency: Histogram,
247    prepare_certificate_latency: Histogram,
248    commit_certificate_latency: Histogram,
249    db_checkpoint_latency: Histogram,
250
251    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
252    pub(crate) transaction_manager_num_missing_objects: IntGauge,
253    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
254    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
255    pub(crate) transaction_manager_num_ready: IntGauge,
256    pub(crate) transaction_manager_object_cache_size: IntGauge,
257    pub(crate) transaction_manager_object_cache_hits: IntCounter,
258    pub(crate) transaction_manager_object_cache_misses: IntCounter,
259    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
260    pub(crate) transaction_manager_package_cache_size: IntGauge,
261    pub(crate) transaction_manager_package_cache_hits: IntCounter,
262    pub(crate) transaction_manager_package_cache_misses: IntCounter,
263    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
264    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
265
266    pub(crate) execution_driver_executed_transactions: IntCounter,
267    pub(crate) execution_driver_dispatch_queue: IntGauge,
268    pub(crate) execution_queueing_delay_s: Histogram,
269    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
270    pub(crate) execution_gas_latency_ratio: Histogram,
271
272    pub(crate) skipped_consensus_txns: IntCounter,
273    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
274
275    pub(crate) authority_overload_status: IntGauge,
276    pub(crate) authority_load_shedding_percentage: IntGauge,
277
278    pub(crate) transaction_overload_sources: IntCounterVec,
279
280    /// Post processing metrics
281    post_processing_total_events_emitted: IntCounter,
282    post_processing_total_tx_indexed: IntCounter,
283    post_processing_total_tx_had_event_processed: IntCounter,
284    post_processing_total_failures: IntCounter,
285
286    /// Consensus handler metrics
287    pub consensus_handler_processed: IntCounterVec,
288    pub consensus_handler_transaction_sizes: HistogramVec,
289    pub consensus_handler_num_low_scoring_authorities: IntGauge,
290    pub consensus_handler_scores: IntGaugeVec,
291    pub consensus_handler_deferred_transactions: IntCounter,
292    pub consensus_handler_congested_transactions: IntCounter,
293    pub consensus_handler_cancelled_transactions: IntCounter,
294    pub consensus_handler_max_object_costs: IntGaugeVec,
295    pub consensus_committed_subdags: IntCounterVec,
296    pub consensus_committed_messages: IntGaugeVec,
297    pub consensus_committed_user_transactions: IntGaugeVec,
298    pub consensus_handler_leader_round: IntGauge,
299    pub consensus_calculated_throughput: IntGauge,
300    pub consensus_calculated_throughput_profile: IntGauge,
301
302    pub limits_metrics: Arc<LimitsMetrics>,
303
304    /// bytecode verifier metrics for tracking timeouts
305    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
306
307    /// Count of zklogin signatures
308    pub zklogin_sig_count: IntCounter,
309    /// Count of multisig signatures
310    pub multisig_sig_count: IntCounter,
311
312    // Tracks recent average txn queueing delay between when it is ready for execution
313    // until it starts executing.
314    pub execution_queueing_latency: LatencyObserver,
315
316    // Tracks the rate of transactions become ready for execution in transaction manager.
317    // The need for the Mutex is that the tracker is updated in transaction manager and read
318    // in the overload_monitor. There should be low mutex contention because
319    // transaction manager is single threaded and the read rate in overload_monitor is
320    // low. In the case where transaction manager becomes multi-threaded, we can
321    // create one rate tracker per thread.
322    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
323
324    // Tracks the rate of transactions starts execution in execution driver.
325    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
326    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
327}
328
329// Override default Prom buckets for positive numbers in 0-10M range
330const POSITIVE_INT_BUCKETS: &[f64] = &[
331    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
332    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
333    7000000., 10000000.,
334];
335
336const LATENCY_SEC_BUCKETS: &[f64] = &[
337    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
338    10., 20., 30., 60., 90.,
339];
340
341// Buckets for low latency samples. Starts from 10us.
342const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
343    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
344    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
345];
346
347const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
348    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
349    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
350];
351
352/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
353pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
354
355impl AuthorityMetrics {
356    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
357        let execute_certificate_latency = register_histogram_vec_with_registry!(
358            "authority_state_execute_certificate_latency",
359            "Latency of executing certificates, including waiting for inputs",
360            &["tx_type"],
361            LATENCY_SEC_BUCKETS.to_vec(),
362            registry,
363        )
364        .unwrap();
365
366        let execute_certificate_latency_single_writer =
367            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
368        let execute_certificate_latency_shared_object =
369            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
370
371        Self {
372            tx_orders: register_int_counter_with_registry!(
373                "total_transaction_orders",
374                "Total number of transaction orders",
375                registry,
376            )
377            .unwrap(),
378            total_certs: register_int_counter_with_registry!(
379                "total_transaction_certificates",
380                "Total number of transaction certificates handled",
381                registry,
382            )
383            .unwrap(),
384            total_cert_attempts: register_int_counter_with_registry!(
385                "total_handle_certificate_attempts",
386                "Number of calls to handle_certificate",
387                registry,
388            )
389            .unwrap(),
390            // total_effects == total transactions finished
391            total_effects: register_int_counter_with_registry!(
392                "total_transaction_effects",
393                "Total number of transaction effects produced",
394                registry,
395            )
396            .unwrap(),
397
398            shared_obj_tx: register_int_counter_with_registry!(
399                "num_shared_obj_tx",
400                "Number of transactions involving shared objects",
401                registry,
402            )
403            .unwrap(),
404
405            sponsored_tx: register_int_counter_with_registry!(
406                "num_sponsored_tx",
407                "Number of sponsored transactions",
408                registry,
409            )
410            .unwrap(),
411
412            tx_already_processed: register_int_counter_with_registry!(
413                "num_tx_already_processed",
414                "Number of transaction orders already processed previously",
415                registry,
416            )
417            .unwrap(),
418            num_input_objs: register_histogram_with_registry!(
419                "num_input_objects",
420                "Distribution of number of input TX objects per TX",
421                POSITIVE_INT_BUCKETS.to_vec(),
422                registry,
423            )
424            .unwrap(),
425            num_shared_objects: register_histogram_with_registry!(
426                "num_shared_objects",
427                "Number of shared input objects per TX",
428                POSITIVE_INT_BUCKETS.to_vec(),
429                registry,
430            )
431            .unwrap(),
432            batch_size: register_histogram_with_registry!(
433                "batch_size",
434                "Distribution of size of transaction batch",
435                POSITIVE_INT_BUCKETS.to_vec(),
436                registry,
437            )
438            .unwrap(),
439            authority_state_handle_transaction_latency: register_histogram_with_registry!(
440                "authority_state_handle_transaction_latency",
441                "Latency of handling transactions",
442                LATENCY_SEC_BUCKETS.to_vec(),
443                registry,
444            )
445            .unwrap(),
446            execute_certificate_latency_single_writer,
447            execute_certificate_latency_shared_object,
448            internal_execution_latency: register_histogram_with_registry!(
449                "authority_state_internal_execution_latency",
450                "Latency of actual certificate executions",
451                LATENCY_SEC_BUCKETS.to_vec(),
452                registry,
453            )
454            .unwrap(),
455            execution_load_input_objects_latency: register_histogram_with_registry!(
456                "authority_state_execution_load_input_objects_latency",
457                "Latency of loading input objects for execution",
458                LOW_LATENCY_SEC_BUCKETS.to_vec(),
459                registry,
460            )
461            .unwrap(),
462            prepare_certificate_latency: register_histogram_with_registry!(
463                "authority_state_prepare_certificate_latency",
464                "Latency of executing certificates, before committing the results",
465                LATENCY_SEC_BUCKETS.to_vec(),
466                registry,
467            )
468            .unwrap(),
469            commit_certificate_latency: register_histogram_with_registry!(
470                "authority_state_commit_certificate_latency",
471                "Latency of committing certificate execution results",
472                LATENCY_SEC_BUCKETS.to_vec(),
473                registry,
474            )
475            .unwrap(),
476            db_checkpoint_latency: register_histogram_with_registry!(
477                "db_checkpoint_latency",
478                "Latency of checkpointing dbs",
479                LATENCY_SEC_BUCKETS.to_vec(),
480                registry,
481            ).unwrap(),
482            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
483                "transaction_manager_num_enqueued_certificates",
484                "Current number of certificates enqueued to TransactionManager",
485                &["result"],
486                registry,
487            )
488            .unwrap(),
489            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
490                "transaction_manager_num_missing_objects",
491                "Current number of missing objects in TransactionManager",
492                registry,
493            )
494            .unwrap(),
495            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
496                "transaction_manager_num_pending_certificates",
497                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
498                registry,
499            )
500            .unwrap(),
501            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
502                "transaction_manager_num_executing_certificates",
503                "Number of executing certificates, including queued and actually running certificates",
504                registry,
505            )
506            .unwrap(),
507            transaction_manager_num_ready: register_int_gauge_with_registry!(
508                "transaction_manager_num_ready",
509                "Number of ready transactions in TransactionManager",
510                registry,
511            )
512            .unwrap(),
513            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
514                "transaction_manager_object_cache_size",
515                "Current size of object-availability cache in TransactionManager",
516                registry,
517            )
518            .unwrap(),
519            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
520                "transaction_manager_object_cache_hits",
521                "Number of object-availability cache hits in TransactionManager",
522                registry,
523            )
524            .unwrap(),
525            authority_overload_status: register_int_gauge_with_registry!(
526                "authority_overload_status",
527                "Whether authority is current experiencing overload and enters load shedding mode.",
528                registry)
529            .unwrap(),
530            authority_load_shedding_percentage: register_int_gauge_with_registry!(
531                "authority_load_shedding_percentage",
532                "The percentage of transactions is shed when the authority is in load shedding mode.",
533                registry)
534            .unwrap(),
535            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
536                "transaction_manager_object_cache_misses",
537                "Number of object-availability cache misses in TransactionManager",
538                registry,
539            )
540            .unwrap(),
541            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
542                "transaction_manager_object_cache_evictions",
543                "Number of object-availability cache evictions in TransactionManager",
544                registry,
545            )
546            .unwrap(),
547            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
548                "transaction_manager_package_cache_size",
549                "Current size of package-availability cache in TransactionManager",
550                registry,
551            )
552            .unwrap(),
553            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
554                "transaction_manager_package_cache_hits",
555                "Number of package-availability cache hits in TransactionManager",
556                registry,
557            )
558            .unwrap(),
559            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
560                "transaction_manager_package_cache_misses",
561                "Number of package-availability cache misses in TransactionManager",
562                registry,
563            )
564            .unwrap(),
565            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
566                "transaction_manager_package_cache_evictions",
567                "Number of package-availability cache evictions in TransactionManager",
568                registry,
569            )
570            .unwrap(),
571            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
572                "transaction_manager_transaction_queue_age_s",
573                "Time spent in waiting for transaction in the queue",
574                LATENCY_SEC_BUCKETS.to_vec(),
575                registry,
576            )
577            .unwrap(),
578            transaction_overload_sources: register_int_counter_vec_with_registry!(
579                "transaction_overload_sources",
580                "Number of times each source indicates transaction overload.",
581                &["source"],
582                registry)
583            .unwrap(),
584            execution_driver_executed_transactions: register_int_counter_with_registry!(
585                "execution_driver_executed_transactions",
586                "Cumulative number of transaction executed by execution driver",
587                registry,
588            )
589            .unwrap(),
590            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
591                "execution_driver_dispatch_queue",
592                "Number of transaction pending in execution driver dispatch queue",
593                registry,
594            )
595            .unwrap(),
596            execution_queueing_delay_s: register_histogram_with_registry!(
597                "execution_queueing_delay_s",
598                "Queueing delay between a transaction is ready for execution until it starts executing.",
599                LATENCY_SEC_BUCKETS.to_vec(),
600                registry
601            )
602            .unwrap(),
603            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
604                "prepare_cert_gas_latency_ratio",
605                "The ratio of computation gas divided by VM execution latency.",
606                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
607                registry
608            )
609            .unwrap(),
610            execution_gas_latency_ratio: register_histogram_with_registry!(
611                "execution_gas_latency_ratio",
612                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
613                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
614                registry
615            )
616            .unwrap(),
617            skipped_consensus_txns: register_int_counter_with_registry!(
618                "skipped_consensus_txns",
619                "Total number of consensus transactions skipped",
620                registry,
621            )
622            .unwrap(),
623            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
624                "skipped_consensus_txns_cache_hit",
625                "Total number of consensus transactions skipped because of local cache hit",
626                registry,
627            )
628            .unwrap(),
629            post_processing_total_events_emitted: register_int_counter_with_registry!(
630                "post_processing_total_events_emitted",
631                "Total number of events emitted in post processing",
632                registry,
633            )
634            .unwrap(),
635            post_processing_total_tx_indexed: register_int_counter_with_registry!(
636                "post_processing_total_tx_indexed",
637                "Total number of txes indexed in post processing",
638                registry,
639            )
640            .unwrap(),
641            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
642                "post_processing_total_tx_had_event_processed",
643                "Total number of txes finished event processing in post processing",
644                registry,
645            )
646            .unwrap(),
647            post_processing_total_failures: register_int_counter_with_registry!(
648                "post_processing_total_failures",
649                "Total number of failure in post processing",
650                registry,
651            )
652            .unwrap(),
653            consensus_handler_processed: register_int_counter_vec_with_registry!(
654                "consensus_handler_processed",
655                "Number of transactions processed by consensus handler",
656                &["class"],
657                registry
658            ).unwrap(),
659            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
660                "consensus_handler_transaction_sizes",
661                "Sizes of each type of transactions processed by consensus handler",
662                &["class"],
663                POSITIVE_INT_BUCKETS.to_vec(),
664                registry
665            ).unwrap(),
666            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
667                "consensus_handler_num_low_scoring_authorities",
668                "Number of low scoring authorities based on reputation scores from consensus",
669                registry
670            ).unwrap(),
671            consensus_handler_scores: register_int_gauge_vec_with_registry!(
672                "consensus_handler_scores",
673                "scores from consensus for each authority",
674                &["authority"],
675                registry,
676            ).unwrap(),
677            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
678                "consensus_handler_deferred_transactions",
679                "Number of transactions deferred by consensus handler",
680                registry,
681            ).unwrap(),
682            consensus_handler_congested_transactions: register_int_counter_with_registry!(
683                "consensus_handler_congested_transactions",
684                "Number of transactions deferred by consensus handler due to congestion",
685                registry,
686            ).unwrap(),
687            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
688                "consensus_handler_cancelled_transactions",
689                "Number of transactions cancelled by consensus handler",
690                registry,
691            ).unwrap(),
692            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
693                "consensus_handler_max_congestion_control_object_costs",
694                "Max object costs for congestion control in the current consensus commit",
695                &["commit_type"],
696                registry,
697            ).unwrap(),
698            consensus_committed_subdags: register_int_counter_vec_with_registry!(
699                "consensus_committed_subdags",
700                "Number of committed subdags, sliced by leader",
701                &["authority"],
702                registry,
703            ).unwrap(),
704            consensus_committed_messages: register_int_gauge_vec_with_registry!(
705                "consensus_committed_messages",
706                "Total number of committed consensus messages, sliced by author",
707                &["authority"],
708                registry,
709            ).unwrap(),
710            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
711                "consensus_committed_user_transactions",
712                "Number of committed user transactions, sliced by submitter",
713                &["authority"],
714                registry,
715            ).unwrap(),
716            consensus_handler_leader_round: register_int_gauge_with_registry!(
717                "consensus_handler_leader_round",
718                "The leader round of the current consensus output being processed in the consensus handler",
719                registry,
720            ).unwrap(),
721            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
722            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
723            zklogin_sig_count: register_int_counter_with_registry!(
724                "zklogin_sig_count",
725                "Count of zkLogin signatures",
726                registry,
727            )
728            .unwrap(),
729            multisig_sig_count: register_int_counter_with_registry!(
730                "multisig_sig_count",
731                "Count of zkLogin signatures",
732                registry,
733            )
734            .unwrap(),
735            consensus_calculated_throughput: register_int_gauge_with_registry!(
736                "consensus_calculated_throughput",
737                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
738                registry,
739            ).unwrap(),
740            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
741                "consensus_calculated_throughput_profile",
742                "The current active calculated throughput profile",
743                registry
744            ).unwrap(),
745            execution_queueing_latency: LatencyObserver::new(),
746            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
747            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
748        }
749    }
750
751    /// Reset metrics that contain `hostname` as one of the labels. This is
752    /// needed to avoid retaining metrics for long-gone committee members and
753    /// only exposing metrics for the committee in the current epoch.
754    pub fn reset_on_reconfigure(&self) {
755        self.consensus_committed_messages.reset();
756        self.consensus_handler_scores.reset();
757        self.consensus_committed_user_transactions.reset();
758    }
759}
760
761/// a Trait object for `Signer` that is:
762/// - Pin, i.e. confined to one place in memory (we don't want to copy private
763///   keys).
764/// - Sync, i.e. can be safely shared between threads.
765///
766/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
767pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
768
769pub struct AuthorityState {
770    // Fixed size, static, identity of the authority
771    /// The name of this authority.
772    pub name: AuthorityName,
773    /// The signature key of the authority.
774    pub secret: StableSyncAuthoritySigner,
775
776    /// The database
777    input_loader: TransactionInputLoader,
778    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
779
780    epoch_store: ArcSwap<AuthorityPerEpochStore>,
781
782    /// This lock denotes current 'execution epoch'.
783    /// Execution acquires read lock, checks certificate epoch and holds it
784    /// until all writes are complete. Reconfiguration acquires write lock,
785    /// changes the epoch and revert all transactions from previous epoch
786    /// that are executed but did not make into checkpoint.
787    execution_lock: RwLock<EpochId>,
788
789    pub indexes: Option<Arc<IndexStore>>,
790    pub rest_index: Option<Arc<RestIndexStore>>,
791
792    pub subscription_handler: Arc<SubscriptionHandler>,
793    pub checkpoint_store: Arc<CheckpointStore>,
794
795    committee_store: Arc<CommitteeStore>,
796
797    /// Manages pending certificates and their missing input objects.
798    transaction_manager: Arc<TransactionManager>,
799
800    /// Shuts down the execution task. Used only in testing.
801    #[cfg_attr(not(test), expect(unused))]
802    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
803
804    pub metrics: Arc<AuthorityMetrics>,
805    _pruner: AuthorityStorePruner,
806    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
807
808    /// Take db checkpoints of different dbs
809    db_checkpoint_config: DBCheckpointConfig,
810
811    pub config: NodeConfig,
812
813    /// Current overload status in this authority. Updated periodically.
814    pub overload_info: AuthorityOverloadInfo,
815
816    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
817
818    /// The chain identifier is derived from the digest of the genesis
819    /// checkpoint.
820    chain_identifier: ChainIdentifier,
821
822    pub(crate) congestion_tracker: Arc<CongestionTracker>,
823}
824
825/// The authority state encapsulates all state, drives execution, and ensures
826/// safety.
827///
828/// Note the authority operations can be accessed through a read ref (&) and do
829/// not require &mut. Internally a database is synchronized through a mutex
830/// lock.
831///
832/// Repeating valid commands should produce no changes and return no error.
833impl AuthorityState {
834    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
835        epoch_store.committee().authority_exists(&self.name)
836    }
837
838    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
839        epoch_store
840            .active_validators()
841            .iter()
842            .any(|a| AuthorityName::from(a) == self.name)
843    }
844
845    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
846        !self.is_committee_validator(epoch_store)
847    }
848
849    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
850        &self.committee_store
851    }
852
853    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
854        self.committee_store.clone()
855    }
856
857    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
858        &self.config.authority_overload_config
859    }
860
861    pub fn get_epoch_state_commitments(
862        &self,
863        epoch: EpochId,
864    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
865        self.checkpoint_store.get_epoch_state_commitments(epoch)
866    }
867
868    /// This is a private method and should be kept that way. It doesn't check
869    /// whether the provided transaction is a system transaction, and hence
870    /// can only be called internally.
871    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
872    async fn handle_transaction_impl(
873        &self,
874        transaction: VerifiedTransaction,
875        epoch_store: &Arc<AuthorityPerEpochStore>,
876    ) -> IotaResult<VerifiedSignedTransaction> {
877        // Ensure that validator cannot reconfigure while we are signing the tx
878        let _execution_lock = self.execution_lock_for_signing();
879
880        let tx_digest = transaction.digest();
881        let tx_data = transaction.data().transaction_data();
882
883        let input_object_kinds = tx_data.input_objects()?;
884        let receiving_objects_refs = tx_data.receiving_objects();
885
886        // Note: the deny checks may do redundant package loads but:
887        // - they only load packages when there is an active package deny map
888        // - the loads are cached anyway
889        iota_transaction_checks::deny::check_transaction_for_signing(
890            tx_data,
891            transaction.tx_signatures(),
892            &input_object_kinds,
893            &receiving_objects_refs,
894            &self.config.transaction_deny_config,
895            self.get_backing_package_store().as_ref(),
896        )?;
897
898        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
899            Some(tx_digest),
900            &input_object_kinds,
901            &receiving_objects_refs,
902            epoch_store.epoch(),
903        )?;
904
905        let (_gas_status, checked_input_objects) =
906            iota_transaction_checks::check_transaction_input(
907                epoch_store.protocol_config(),
908                epoch_store.reference_gas_price(),
909                tx_data,
910                input_objects,
911                &receiving_objects,
912                &self.metrics.bytecode_verifier_metrics,
913                &self.config.verifier_signing_config,
914            )?;
915
916        check_coin_deny_list_v1_during_signing(
917            tx_data.sender(),
918            &checked_input_objects,
919            &receiving_objects,
920            &self.get_object_store(),
921        )?;
922
923        let owned_objects = checked_input_objects.inner().filter_owned_objects();
924
925        let signed_transaction = VerifiedSignedTransaction::new(
926            epoch_store.epoch(),
927            transaction,
928            self.name,
929            &*self.secret,
930        );
931
932        // Check and write locks, to signed transaction, into the database
933        // The call to self.set_transaction_lock checks the lock is not conflicting,
934        // and returns ConflictingTransaction error in case there is a lock on a
935        // different existing transaction.
936        self.get_cache_writer().try_acquire_transaction_locks(
937            epoch_store,
938            &owned_objects,
939            signed_transaction.clone(),
940        )?;
941
942        Ok(signed_transaction)
943    }
944
945    /// Initiate a new transaction.
946    #[instrument(name = "handle_transaction", level = "trace", skip_all)]
947    pub async fn handle_transaction(
948        &self,
949        epoch_store: &Arc<AuthorityPerEpochStore>,
950        transaction: VerifiedTransaction,
951    ) -> IotaResult<HandleTransactionResponse> {
952        let tx_digest = *transaction.digest();
953        debug!("handle_transaction");
954
955        // Ensure an idempotent answer.
956        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
957            return Ok(HandleTransactionResponse { status });
958        }
959
960        let _metrics_guard = self
961            .metrics
962            .authority_state_handle_transaction_latency
963            .start_timer();
964        self.metrics.tx_orders.inc();
965
966        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
967        match signed {
968            Ok(s) => {
969                if self.is_committee_validator(epoch_store) {
970                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
971                        let tx = s.clone();
972                        let validator_tx_finalizer = validator_tx_finalizer.clone();
973                        let cache_reader = self.get_transaction_cache_reader().clone();
974                        let epoch_store = epoch_store.clone();
975                        spawn_monitored_task!(epoch_store.within_alive_epoch(
976                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
977                        ));
978                    }
979                }
980                Ok(HandleTransactionResponse {
981                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
982                })
983            }
984            // It happens frequently that while we are checking the validity of the transaction, it
985            // has just been executed.
986            // In that case, we could still return Ok to avoid showing confusing errors.
987            Err(err) => Ok(HandleTransactionResponse {
988                status: self
989                    .get_transaction_status(&tx_digest, epoch_store)?
990                    .ok_or(err)?
991                    .1,
992            }),
993        }
994    }
995
996    pub fn check_system_overload_at_signing(&self) -> bool {
997        self.config
998            .authority_overload_config
999            .check_system_overload_at_signing
1000    }
1001
1002    pub fn check_system_overload_at_execution(&self) -> bool {
1003        self.config
1004            .authority_overload_config
1005            .check_system_overload_at_execution
1006    }
1007
1008    pub(crate) fn check_system_overload(
1009        &self,
1010        consensus_adapter: &Arc<ConsensusAdapter>,
1011        tx_data: &SenderSignedData,
1012        do_authority_overload_check: bool,
1013    ) -> IotaResult {
1014        if do_authority_overload_check {
1015            self.check_authority_overload(tx_data).tap_err(|_| {
1016                self.update_overload_metrics("execution_queue");
1017            })?;
1018        }
1019        self.transaction_manager
1020            .check_execution_overload(self.overload_config(), tx_data)
1021            .tap_err(|_| {
1022                self.update_overload_metrics("execution_pending");
1023            })?;
1024        consensus_adapter.check_consensus_overload().tap_err(|_| {
1025            self.update_overload_metrics("consensus");
1026        })?;
1027
1028        let pending_tx_count = self
1029            .get_cache_commit()
1030            .approximate_pending_transaction_count();
1031        if pending_tx_count
1032            > self
1033                .config
1034                .execution_cache_config
1035                .writeback_cache
1036                .backpressure_threshold_for_rpc()
1037        {
1038            return Err(IotaError::ValidatorOverloadedRetryAfter {
1039                retry_after_secs: 10,
1040            });
1041        }
1042
1043        Ok(())
1044    }
1045
1046    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1047        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1048            return Ok(());
1049        }
1050
1051        let load_shedding_percentage = self
1052            .overload_info
1053            .load_shedding_percentage
1054            .load(Ordering::Relaxed);
1055        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1056    }
1057
1058    fn update_overload_metrics(&self, source: &str) {
1059        self.metrics
1060            .transaction_overload_sources
1061            .with_label_values(&[source])
1062            .inc();
1063    }
1064
1065    /// Executes a certificate for its effects.
1066    #[instrument(level = "trace", skip_all)]
1067    pub async fn execute_certificate(
1068        &self,
1069        certificate: &VerifiedCertificate,
1070        epoch_store: &Arc<AuthorityPerEpochStore>,
1071    ) -> IotaResult<TransactionEffects> {
1072        let _metrics_guard = if certificate.contains_shared_object() {
1073            self.metrics
1074                .execute_certificate_latency_shared_object
1075                .start_timer()
1076        } else {
1077            self.metrics
1078                .execute_certificate_latency_single_writer
1079                .start_timer()
1080        };
1081        trace!("execute_certificate");
1082
1083        self.metrics.total_cert_attempts.inc();
1084
1085        if !certificate.contains_shared_object() {
1086            // Shared object transactions need to be sequenced by the consensus before
1087            // enqueueing for execution, done in
1088            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1089            // object transactions, they can be enqueued for execution immediately.
1090            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1091        }
1092
1093        // tx could be reverted when epoch ends, so we must be careful not to return a
1094        // result here after the epoch ends.
1095        epoch_store
1096            .within_alive_epoch(self.notify_read_effects(certificate))
1097            .await
1098            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1099            .and_then(|r| r)
1100    }
1101
1102    /// Internal logic to execute a certificate.
1103    ///
1104    /// Guarantees that
1105    /// - If input objects are available, return no permanent failure.
1106    /// - Execution and output commit are atomic. i.e. outputs are only written
1107    ///   to storage,
1108    /// on successful execution; crashed execution has no observable effect and
1109    /// can be retried.
1110    ///
1111    /// It is caller's responsibility to ensure input objects are available and
1112    /// locks are set. If this cannot be satisfied by the caller,
1113    /// execute_certificate() should be called instead.
1114    ///
1115    /// Should only be called within iota-core.
1116    #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1117    pub fn try_execute_immediately(
1118        &self,
1119        certificate: &VerifiedExecutableTransaction,
1120        expected_effects_digest: Option<TransactionEffectsDigest>,
1121        epoch_store: &Arc<AuthorityPerEpochStore>,
1122    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1123        let _scope = monitored_scope("Execution::try_execute_immediately");
1124        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1125
1126        let tx_digest = certificate.digest();
1127
1128        // Acquire a lock to prevent concurrent executions of the same transaction.
1129        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1130
1131        // The cert could have been processed by a concurrent attempt of the same cert,
1132        // so check if the effects have already been written.
1133        if let Some(effects) = self
1134            .get_transaction_cache_reader()
1135            .try_get_executed_effects(tx_digest)?
1136        {
1137            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1138                assert_eq!(
1139                    effects.digest(),
1140                    expected_effects_digest_inner,
1141                    "Unexpected effects digest for transaction {tx_digest:?}"
1142                );
1143            }
1144            tx_guard.release();
1145            return Ok((effects, None));
1146        }
1147        let input_objects =
1148            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1149
1150        // If no expected_effects_digest was provided, try to get it from storage.
1151        // We could be re-executing a previously executed but uncommitted transaction,
1152        // perhaps after restarting with a new binary. In this situation, if
1153        // we have published an effects signature, we must be sure not to
1154        // equivocate. TODO: read from cache instead of DB
1155        let expected_effects_digest =
1156            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1157
1158        self.process_certificate(
1159            tx_guard,
1160            certificate,
1161            input_objects,
1162            expected_effects_digest,
1163            epoch_store,
1164        )
1165        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1166        .tap_ok(
1167            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1168        )
1169    }
1170
1171    pub fn read_objects_for_execution(
1172        &self,
1173        tx_lock: &CertLockGuard,
1174        certificate: &VerifiedExecutableTransaction,
1175        epoch_store: &Arc<AuthorityPerEpochStore>,
1176    ) -> IotaResult<InputObjects> {
1177        let _scope = monitored_scope("Execution::load_input_objects");
1178        let _metrics_guard = self
1179            .metrics
1180            .execution_load_input_objects_latency
1181            .start_timer();
1182        let input_objects = &certificate.data().transaction_data().input_objects()?;
1183        self.input_loader.read_objects_for_execution(
1184            epoch_store,
1185            &certificate.key(),
1186            tx_lock,
1187            input_objects,
1188            epoch_store.epoch(),
1189        )
1190    }
1191
1192    /// Test only wrapper for `try_execute_immediately()` above, useful for
1193    /// checking errors if the pre-conditions are not satisfied, and
1194    /// executing change epoch transactions.
1195    pub fn try_execute_for_test(
1196        &self,
1197        certificate: &VerifiedCertificate,
1198    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1199        let epoch_store = self.epoch_store_for_testing();
1200        let (effects, execution_error_opt) = self.try_execute_immediately(
1201            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1202            None,
1203            &epoch_store,
1204        )?;
1205        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1206        Ok((signed_effects, execution_error_opt))
1207    }
1208
1209    /// Non-fallible version of `try_execute_for_test()`.
1210    pub fn execute_for_test(
1211        &self,
1212        certificate: &VerifiedCertificate,
1213    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1214        self.try_execute_for_test(certificate)
1215            .expect("try_execute_for_test should not fail")
1216    }
1217
1218    pub async fn notify_read_effects(
1219        &self,
1220        certificate: &VerifiedCertificate,
1221    ) -> IotaResult<TransactionEffects> {
1222        self.get_transaction_cache_reader()
1223            .try_notify_read_executed_effects(&[*certificate.digest()])
1224            .await
1225            .map(|mut r| r.pop().expect("must return correct number of effects"))
1226    }
1227
1228    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1229        self.get_object_cache_reader()
1230            .try_check_owned_objects_are_live(owned_object_refs)
1231    }
1232
1233    /// This function captures the required state to debug a forked transaction.
1234    /// The dump is written to a file in dir `path`, with name prefixed by the
1235    /// transaction digest. NOTE: Since this info escapes the validator
1236    /// context, make sure not to leak any private info here
1237    pub(crate) fn debug_dump_transaction_state(
1238        &self,
1239        tx_digest: &TransactionDigest,
1240        effects: &TransactionEffects,
1241        expected_effects_digest: TransactionEffectsDigest,
1242        inner_temporary_store: &InnerTemporaryStore,
1243        certificate: &VerifiedExecutableTransaction,
1244        debug_dump_config: &StateDebugDumpConfig,
1245    ) -> IotaResult<PathBuf> {
1246        let dump_dir = debug_dump_config
1247            .dump_file_directory
1248            .as_ref()
1249            .cloned()
1250            .unwrap_or(std::env::temp_dir());
1251        let epoch_store = self.load_epoch_store_one_call_per_task();
1252
1253        NodeStateDump::new(
1254            tx_digest,
1255            effects,
1256            expected_effects_digest,
1257            self.get_object_store().as_ref(),
1258            &epoch_store,
1259            inner_temporary_store,
1260            certificate,
1261        )?
1262        .write_to_file(&dump_dir)
1263        .map_err(|e| IotaError::FileIO(e.to_string()))
1264    }
1265
1266    #[instrument(level = "trace", skip_all)]
1267    pub(crate) fn process_certificate(
1268        &self,
1269        tx_guard: CertTxGuard,
1270        certificate: &VerifiedExecutableTransaction,
1271        input_objects: InputObjects,
1272        expected_effects_digest: Option<TransactionEffectsDigest>,
1273        epoch_store: &Arc<AuthorityPerEpochStore>,
1274    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1275        let process_certificate_start_time = tokio::time::Instant::now();
1276        let digest = *certificate.digest();
1277
1278        fail_point_if!("correlated-crash-process-certificate", || {
1279            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1280                iota_simulator::task::kill_current_node(None);
1281            }
1282        });
1283
1284        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1285        // Any caller that verifies the signatures on the certificate will have already
1286        // checked the epoch. But paths that don't verify sigs (e.g. execution
1287        // from checkpoint, reading from db) present the possibility of an epoch
1288        // mismatch. If this cert is not finalzied in previous epoch, then it's
1289        // invalid.
1290        let execution_guard = match execution_guard {
1291            Ok(execution_guard) => execution_guard,
1292            Err(err) => {
1293                tx_guard.release();
1294                return Err(err);
1295            }
1296        };
1297        // Since we obtain a reference to the epoch store before taking the execution
1298        // lock, it's possible that reconfiguration has happened and they no
1299        // longer match.
1300        if *execution_guard != epoch_store.epoch() {
1301            tx_guard.release();
1302            info!("The epoch of the execution_guard doesn't match the epoch store");
1303            return Err(IotaError::WrongEpoch {
1304                expected_epoch: epoch_store.epoch(),
1305                actual_epoch: *execution_guard,
1306            });
1307        }
1308
1309        // Errors originating from prepare_certificate may be transient (failure to read
1310        // locks) or non-transient (transaction input is invalid, move vm
1311        // errors). However, all errors from this function occur before we have
1312        // written anything to the db, so we commit the tx guard and rely on the
1313        // client to retry the tx (if it was transient).
1314        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1315            &execution_guard,
1316            certificate,
1317            input_objects,
1318            epoch_store,
1319        ) {
1320            Err(e) => {
1321                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1322                tx_guard.release();
1323                return Err(e);
1324            }
1325            Ok(res) => res,
1326        };
1327
1328        if let Some(expected_effects_digest) = expected_effects_digest {
1329            if effects.digest() != expected_effects_digest {
1330                // We dont want to mask the original error, so we log it and continue.
1331                match self.debug_dump_transaction_state(
1332                    &digest,
1333                    &effects,
1334                    expected_effects_digest,
1335                    &inner_temporary_store,
1336                    certificate,
1337                    &self.config.state_debug_dump_config,
1338                ) {
1339                    Ok(out_path) => {
1340                        info!(
1341                            "Dumped node state for transaction {} to {}",
1342                            digest,
1343                            out_path.as_path().display().to_string()
1344                        );
1345                    }
1346                    Err(e) => {
1347                        error!("Error dumping state for transaction {}: {e}", digest);
1348                    }
1349                }
1350                error!(
1351                    tx_digest = ?digest,
1352                    ?expected_effects_digest,
1353                    actual_effects = ?effects,
1354                    "fork detected!"
1355                );
1356                panic!(
1357                    "Transaction {} is expected to have effects digest {}, but got {}!",
1358                    digest,
1359                    expected_effects_digest,
1360                    effects.digest(),
1361                );
1362            }
1363        }
1364
1365        fail_point!("crash");
1366
1367        self.commit_certificate(
1368            certificate,
1369            inner_temporary_store,
1370            &effects,
1371            tx_guard,
1372            execution_guard,
1373            epoch_store,
1374        )?;
1375
1376        if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1377            certificate.data().transaction_data().kind()
1378        {
1379            if let Some(err) = &execution_error_opt {
1380                debug_fatal!("Authenticator state update failed: {:?}", err);
1381            }
1382            epoch_store.update_authenticator_state(auth_state);
1383
1384            // double check that the signature verifier always matches the authenticator
1385            // state
1386            if cfg!(debug_assertions) {
1387                let authenticator_state = get_authenticator_state(self.get_object_store())
1388                    .expect("Read cannot fail")
1389                    .expect("Authenticator state must exist");
1390
1391                let mut sys_jwks: Vec<_> = authenticator_state
1392                    .active_jwks
1393                    .into_iter()
1394                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1395                    .collect();
1396                let mut active_jwks: Vec<_> = epoch_store
1397                    .signature_verifier
1398                    .get_jwks()
1399                    .into_iter()
1400                    .collect();
1401                sys_jwks.sort();
1402                active_jwks.sort();
1403
1404                assert_eq!(sys_jwks, active_jwks);
1405            }
1406        }
1407
1408        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1409        if elapsed > 0.0 {
1410            self.metrics
1411                .execution_gas_latency_ratio
1412                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1413        };
1414        Ok((effects, execution_error_opt))
1415    }
1416
1417    #[instrument(level = "trace", skip_all)]
1418    fn commit_certificate(
1419        &self,
1420        certificate: &VerifiedExecutableTransaction,
1421        inner_temporary_store: InnerTemporaryStore,
1422        effects: &TransactionEffects,
1423        tx_guard: CertTxGuard,
1424        _execution_guard: ExecutionLockReadGuard<'_>,
1425        epoch_store: &Arc<AuthorityPerEpochStore>,
1426    ) -> IotaResult {
1427        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1428            monitored_scope("Execution::commit_certificate");
1429        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1430
1431        let tx_key = certificate.key();
1432        let tx_digest = certificate.digest();
1433        let input_object_count = inner_temporary_store.input_objects.len();
1434        let shared_object_count = effects.input_shared_objects().len();
1435
1436        let output_keys = inner_temporary_store.get_output_keys(effects);
1437
1438        // index certificate
1439        let _ = self
1440            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1441            .tap_err(|e| {
1442                self.metrics.post_processing_total_failures.inc();
1443                error!(?tx_digest, "tx post processing failed: {e}");
1444            });
1445
1446        // The insertion to epoch_store is not atomic with the insertion to the
1447        // perpetual store. This is OK because we insert to the epoch store
1448        // first. And during lookups we always look up in the perpetual store first.
1449        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1450
1451        // Allow testing what happens if we crash here.
1452        fail_point!("crash");
1453
1454        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1455            certificate.clone().into_unsigned(),
1456            effects.clone(),
1457            inner_temporary_store,
1458        );
1459        self.get_cache_writer()
1460            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1461
1462        if certificate.transaction_data().is_end_of_epoch_tx() {
1463            // At the end of epoch, since system packages may have been upgraded, force
1464            // reload them in the cache.
1465            self.get_object_cache_reader()
1466                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1467        }
1468
1469        // commit_certificate finished, the tx is fully committed to the store.
1470        tx_guard.commit_tx();
1471
1472        // Notifies transaction manager about transaction and output objects committed.
1473        // This provides necessary information to transaction manager to start executing
1474        // additional ready transactions.
1475        self.transaction_manager
1476            .notify_commit(tx_digest, output_keys, epoch_store);
1477
1478        self.update_metrics(certificate, input_object_count, shared_object_count);
1479
1480        Ok(())
1481    }
1482
1483    fn update_metrics(
1484        &self,
1485        certificate: &VerifiedExecutableTransaction,
1486        input_object_count: usize,
1487        shared_object_count: usize,
1488    ) {
1489        // count signature by scheme, for zklogin and multisig
1490        if certificate.has_zklogin_sig() {
1491            self.metrics.zklogin_sig_count.inc();
1492        } else if certificate.has_upgraded_multisig() {
1493            self.metrics.multisig_sig_count.inc();
1494        }
1495
1496        self.metrics.total_effects.inc();
1497        self.metrics.total_certs.inc();
1498
1499        if shared_object_count > 0 {
1500            self.metrics.shared_obj_tx.inc();
1501        }
1502
1503        if certificate.is_sponsored_tx() {
1504            self.metrics.sponsored_tx.inc();
1505        }
1506
1507        self.metrics
1508            .num_input_objs
1509            .observe(input_object_count as f64);
1510        self.metrics
1511            .num_shared_objects
1512            .observe(shared_object_count as f64);
1513        self.metrics.batch_size.observe(
1514            certificate
1515                .data()
1516                .intent_message()
1517                .value
1518                .kind()
1519                .num_commands() as f64,
1520        );
1521    }
1522
1523    /// prepare_certificate validates the transaction input, and executes the
1524    /// certificate, returning effects, output objects, events, etc.
1525    ///
1526    /// It reads state from the db (both owned and shared locks), but it has no
1527    /// side effects.
1528    ///
1529    /// It can be generally understood that a failure of prepare_certificate
1530    /// indicates a non-transient error, e.g. the transaction input is
1531    /// somehow invalid, the correct locks are not held, etc. However, this
1532    /// is not entirely true, as a transient db read error may also cause
1533    /// this function to fail.
1534    #[instrument(level = "trace", skip_all)]
1535    fn prepare_certificate(
1536        &self,
1537        _execution_guard: &ExecutionLockReadGuard<'_>,
1538        certificate: &VerifiedExecutableTransaction,
1539        input_objects: InputObjects,
1540        epoch_store: &Arc<AuthorityPerEpochStore>,
1541    ) -> IotaResult<(
1542        InnerTemporaryStore,
1543        TransactionEffects,
1544        Option<ExecutionError>,
1545    )> {
1546        let _scope = monitored_scope("Execution::prepare_certificate");
1547        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1548        let prepare_certificate_start_time = tokio::time::Instant::now();
1549
1550        // TODO: We need to move this to a more appropriate place to avoid redundant
1551        // checks.
1552        let tx_data = certificate.data().transaction_data();
1553        tx_data.validity_check(epoch_store.protocol_config())?;
1554
1555        // The cost of partially re-auditing a transaction before execution is
1556        // tolerated.
1557        let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
1558            certificate,
1559            input_objects,
1560            epoch_store.protocol_config(),
1561            epoch_store.reference_gas_price(),
1562        )?;
1563
1564        let owned_object_refs = input_objects.inner().filter_owned_objects();
1565        self.check_owned_locks(&owned_object_refs)?;
1566        let tx_digest = *certificate.digest();
1567        let protocol_config = epoch_store.protocol_config();
1568        let transaction_data = &certificate.data().intent_message().value;
1569        let (kind, signer, gas) = transaction_data.execution_parts();
1570
1571        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1572        let (inner_temp_store, _, mut effects, execution_error_opt) =
1573            epoch_store.executor().execute_transaction_to_effects(
1574                self.get_backing_store().as_ref(),
1575                protocol_config,
1576                self.metrics.limits_metrics.clone(),
1577                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1578                // cyclic dependency w/ iota-adapter
1579                self.config
1580                    .expensive_safety_check_config
1581                    .enable_deep_per_tx_iota_conservation_check(),
1582                self.config.certificate_deny_config.certificate_deny_set(),
1583                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1584                epoch_store
1585                    .epoch_start_config()
1586                    .epoch_data()
1587                    .epoch_start_timestamp(),
1588                input_objects,
1589                gas,
1590                gas_status,
1591                kind,
1592                signer,
1593                tx_digest,
1594                &mut None,
1595            );
1596
1597        fail_point_if!("cp_execution_nondeterminism", || {
1598            #[cfg(msim)]
1599            self.create_fail_state(certificate, epoch_store, &mut effects);
1600        });
1601
1602        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1603        if elapsed > 0.0 {
1604            self.metrics
1605                .prepare_cert_gas_latency_ratio
1606                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1607        }
1608
1609        Ok((inner_temp_store, effects, execution_error_opt.err()))
1610    }
1611
1612    pub fn prepare_certificate_for_benchmark(
1613        &self,
1614        certificate: &VerifiedExecutableTransaction,
1615        input_objects: InputObjects,
1616        epoch_store: &Arc<AuthorityPerEpochStore>,
1617    ) -> IotaResult<(
1618        InnerTemporaryStore,
1619        TransactionEffects,
1620        Option<ExecutionError>,
1621    )> {
1622        let lock = RwLock::new(epoch_store.epoch());
1623        let execution_guard = lock.try_read().unwrap();
1624
1625        self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
1626    }
1627
1628    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1629    /// `VmChecks::Enabled` instead.
1630    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1631    #[allow(clippy::type_complexity)]
1632    pub fn dry_exec_transaction(
1633        &self,
1634        transaction: TransactionData,
1635        transaction_digest: TransactionDigest,
1636    ) -> IotaResult<(
1637        DryRunTransactionBlockResponse,
1638        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1639        TransactionEffects,
1640        Option<ObjectID>,
1641    )> {
1642        let epoch_store = self.load_epoch_store_one_call_per_task();
1643        if !self.is_fullnode(&epoch_store) {
1644            return Err(IotaError::UnsupportedFeature {
1645                error: "dry-exec is only supported on fullnodes".to_string(),
1646            });
1647        }
1648
1649        if transaction.kind().is_system_tx() {
1650            return Err(IotaError::UnsupportedFeature {
1651                error: "dry-exec does not support system transactions".to_string(),
1652            });
1653        }
1654
1655        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1656    }
1657
1658    #[allow(clippy::type_complexity)]
1659    pub fn dry_exec_transaction_for_benchmark(
1660        &self,
1661        transaction: TransactionData,
1662        transaction_digest: TransactionDigest,
1663    ) -> IotaResult<(
1664        DryRunTransactionBlockResponse,
1665        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1666        TransactionEffects,
1667        Option<ObjectID>,
1668    )> {
1669        let epoch_store = self.load_epoch_store_one_call_per_task();
1670        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1671    }
1672
1673    #[instrument(level = "trace", skip_all)]
1674    #[allow(clippy::type_complexity)]
1675    fn dry_exec_transaction_impl(
1676        &self,
1677        epoch_store: &AuthorityPerEpochStore,
1678        transaction: TransactionData,
1679        transaction_digest: TransactionDigest,
1680    ) -> IotaResult<(
1681        DryRunTransactionBlockResponse,
1682        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1683        TransactionEffects,
1684        Option<ObjectID>,
1685    )> {
1686        // Cheap validity checks for a transaction, including input size limits.
1687        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1688
1689        let input_object_kinds = transaction.input_objects()?;
1690        let receiving_object_refs = transaction.receiving_objects();
1691
1692        iota_transaction_checks::deny::check_transaction_for_signing(
1693            &transaction,
1694            &[],
1695            &input_object_kinds,
1696            &receiving_object_refs,
1697            &self.config.transaction_deny_config,
1698            self.get_backing_package_store().as_ref(),
1699        )?;
1700
1701        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1702            // We don't want to cache this transaction since it's a dry run.
1703            None,
1704            &input_object_kinds,
1705            &receiving_object_refs,
1706            epoch_store.epoch(),
1707        )?;
1708
1709        // make a gas object if one was not provided
1710        let mut transaction = transaction;
1711        let mut gas_object_refs = transaction.gas().to_vec();
1712        let reference_gas_price = epoch_store.reference_gas_price();
1713        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1714            let sender = transaction.gas_owner();
1715            let gas_object_id = ObjectID::random();
1716            let gas_object = Object::new_move(
1717                MoveObject::new_gas_coin(
1718                    OBJECT_START_VERSION,
1719                    gas_object_id,
1720                    SIMULATION_GAS_COIN_VALUE,
1721                ),
1722                Owner::AddressOwner(sender),
1723                TransactionDigest::genesis_marker(),
1724            );
1725            let gas_object_ref = gas_object.compute_object_reference();
1726            gas_object_refs = vec![gas_object_ref];
1727            // Add gas object to transaction gas payment
1728            transaction.gas_data_mut().payment = gas_object_refs.clone();
1729            (
1730                iota_transaction_checks::check_transaction_input_with_given_gas(
1731                    epoch_store.protocol_config(),
1732                    reference_gas_price,
1733                    &transaction,
1734                    input_objects,
1735                    receiving_objects,
1736                    gas_object,
1737                    &self.metrics.bytecode_verifier_metrics,
1738                    &self.config.verifier_signing_config,
1739                )?,
1740                Some(gas_object_id),
1741            )
1742        } else {
1743            (
1744                iota_transaction_checks::check_transaction_input(
1745                    epoch_store.protocol_config(),
1746                    reference_gas_price,
1747                    &transaction,
1748                    input_objects,
1749                    &receiving_objects,
1750                    &self.metrics.bytecode_verifier_metrics,
1751                    &self.config.verifier_signing_config,
1752                )?,
1753                None,
1754            )
1755        };
1756
1757        let protocol_config = epoch_store.protocol_config();
1758        let (kind, signer, _) = transaction.execution_parts();
1759
1760        let silent = true;
1761        let executor = iota_execution::executor(protocol_config, silent, None)
1762            .expect("Creating an executor should not fail here");
1763
1764        let expensive_checks = false;
1765        let (inner_temp_store, _, effects, execution_error) = executor
1766            .execute_transaction_to_effects(
1767                self.get_backing_store().as_ref(),
1768                protocol_config,
1769                self.metrics.limits_metrics.clone(),
1770                expensive_checks,
1771                self.config.certificate_deny_config.certificate_deny_set(),
1772                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1773                epoch_store
1774                    .epoch_start_config()
1775                    .epoch_data()
1776                    .epoch_start_timestamp(),
1777                checked_input_objects,
1778                gas_object_refs,
1779                gas_status,
1780                kind,
1781                signer,
1782                transaction_digest,
1783                &mut None,
1784            );
1785        let tx_digest = *effects.transaction_digest();
1786
1787        let module_cache =
1788            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1789
1790        let mut layout_resolver =
1791            epoch_store
1792                .executor()
1793                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1794                    &inner_temp_store,
1795                    self.get_backing_package_store(),
1796                )));
1797        // Returning empty vector here because we recalculate changes in the rpc layer.
1798        let object_changes = Vec::new();
1799
1800        // Returning empty vector here because we recalculate changes in the rpc layer.
1801        let balance_changes = Vec::new();
1802
1803        let written_with_kind = effects
1804            .created()
1805            .into_iter()
1806            .map(|(oref, _)| (oref, WriteKind::Create))
1807            .chain(
1808                effects
1809                    .unwrapped()
1810                    .into_iter()
1811                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1812            )
1813            .chain(
1814                effects
1815                    .mutated()
1816                    .into_iter()
1817                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
1818            )
1819            .map(|(oref, kind)| {
1820                let obj = inner_temp_store.written.get(&oref.0).unwrap();
1821                // TODO: Avoid clones.
1822                (oref.0, (oref, obj.clone(), kind))
1823            })
1824            .collect();
1825
1826        let execution_error_source = execution_error
1827            .as_ref()
1828            .err()
1829            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
1830
1831        Ok((
1832            DryRunTransactionBlockResponse {
1833                // to avoid cloning `transaction`, fields are populated in this order
1834                suggested_gas_price: self
1835                    .congestion_tracker
1836                    .get_prediction_suggested_gas_price(&transaction),
1837                input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
1838                    .map_err(|e| IotaError::TransactionSerialization {
1839                        error: format!(
1840                            "Failed to convert transaction to IotaTransactionBlockData: {e}",
1841                        ),
1842                    })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
1843                effects: effects.clone().try_into()?,
1844                events: IotaTransactionBlockEvents::try_from(
1845                    inner_temp_store.events.clone(),
1846                    tx_digest,
1847                    None,
1848                    layout_resolver.as_mut(),
1849                )?,
1850                object_changes,
1851                balance_changes,
1852                execution_error_source,
1853            },
1854            written_with_kind,
1855            effects,
1856            mock_gas,
1857        ))
1858    }
1859
1860    pub fn simulate_transaction(
1861        &self,
1862        mut transaction: TransactionData,
1863        checks: VmChecks,
1864    ) -> IotaResult<SimulateTransactionResult> {
1865        if transaction.kind().is_system_tx() {
1866            return Err(IotaError::UnsupportedFeature {
1867                error: "simulate does not support system transactions".to_string(),
1868            });
1869        }
1870
1871        let epoch_store = self.load_epoch_store_one_call_per_task();
1872        if !self.is_fullnode(&epoch_store) {
1873            return Err(IotaError::UnsupportedFeature {
1874                error: "simulate is only supported on fullnodes".to_string(),
1875            });
1876        }
1877
1878        // Cheap validity checks for a transaction, including input size limits.
1879        // This does not check if gas objects are missing since we may create a
1880        // mock gas object. It checks for other transaction input validity.
1881        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1882
1883        let input_object_kinds = transaction.input_objects()?;
1884        let receiving_object_refs = transaction.receiving_objects();
1885
1886        // Since we need to simulate a validator signing the transaction, the first step
1887        // is to check if some transaction elements are denied.
1888        iota_transaction_checks::deny::check_transaction_for_signing(
1889            &transaction,
1890            &[],
1891            &input_object_kinds,
1892            &receiving_object_refs,
1893            &self.config.transaction_deny_config,
1894            self.get_backing_package_store().as_ref(),
1895        )?;
1896
1897        // Load input and receiving objects
1898        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1899            // We don't want to cache this transaction since it's a simulation.
1900            None,
1901            &input_object_kinds,
1902            &receiving_object_refs,
1903            epoch_store.epoch(),
1904        )?;
1905
1906        // Create a mock gas object if one was not provided
1907        let mock_gas_id = if transaction.gas().is_empty() {
1908            let mock_gas_object = Object::new_move(
1909                MoveObject::new_gas_coin(
1910                    OBJECT_START_VERSION,
1911                    ObjectID::MAX,
1912                    SIMULATION_GAS_COIN_VALUE,
1913                ),
1914                Owner::AddressOwner(transaction.gas_data().owner),
1915                TransactionDigest::genesis_marker(),
1916            );
1917            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
1918            transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
1919            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
1920            Some(mock_gas_object.id())
1921        } else {
1922            None
1923        };
1924
1925        let protocol_config = epoch_store.protocol_config();
1926
1927        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
1928        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
1929        let (gas_status, checked_input_objects) = if checks.enabled() {
1930            iota_transaction_checks::check_transaction_input(
1931                protocol_config,
1932                epoch_store.reference_gas_price(),
1933                &transaction,
1934                input_objects,
1935                &receiving_objects,
1936                &self.metrics.bytecode_verifier_metrics,
1937                &self.config.verifier_signing_config,
1938            )?
1939        } else {
1940            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
1941                protocol_config,
1942                transaction.kind(),
1943                input_objects,
1944                receiving_objects,
1945            )?;
1946            let gas_status = IotaGasStatus::new(
1947                transaction.gas_budget(),
1948                transaction.gas_price(),
1949                epoch_store.reference_gas_price(),
1950                protocol_config,
1951            )?;
1952
1953            (gas_status, checked_input_objects)
1954        };
1955
1956        // Create a new executor for the simulation
1957        let executor = iota_execution::executor(
1958            protocol_config,
1959            true, // silent
1960            None,
1961        )
1962        .expect("Creating an executor should not fail here");
1963
1964        // Execute the simulation
1965        let (kind, signer, gas_coins) = transaction.execution_parts();
1966        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
1967            self.get_backing_store().as_ref(),
1968            protocol_config,
1969            self.metrics.limits_metrics.clone(),
1970            false, // expensive_checks
1971            self.config.certificate_deny_config.certificate_deny_set(),
1972            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1973            epoch_store
1974                .epoch_start_config()
1975                .epoch_data()
1976                .epoch_start_timestamp(),
1977            checked_input_objects,
1978            gas_coins,
1979            gas_status,
1980            kind,
1981            signer,
1982            transaction.digest(),
1983            checks.disabled(),
1984        );
1985
1986        // In the case of a dev inspect, the execution_result could be filled with some
1987        // values. Else, execution_result is empty in the case of a dry run.
1988        Ok(SimulateTransactionResult {
1989            input_objects: inner_temp_store.input_objects,
1990            output_objects: inner_temp_store.written,
1991            events: effects.events_digest().map(|_| inner_temp_store.events),
1992            effects,
1993            execution_result,
1994            mock_gas_id,
1995        })
1996    }
1997
1998    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1999    /// `VmChecks::DISABLED` instead.
2000    /// The object ID for gas can be any
2001    /// object ID, even for an uncreated object
2002    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2003    pub async fn dev_inspect_transaction_block(
2004        &self,
2005        sender: IotaAddress,
2006        transaction_kind: TransactionKind,
2007        gas_price: Option<u64>,
2008        gas_budget: Option<u64>,
2009        gas_sponsor: Option<IotaAddress>,
2010        gas_objects: Option<Vec<ObjectRef>>,
2011        show_raw_txn_data_and_effects: Option<bool>,
2012        skip_checks: Option<bool>,
2013    ) -> IotaResult<DevInspectResults> {
2014        let epoch_store = self.load_epoch_store_one_call_per_task();
2015
2016        if !self.is_fullnode(&epoch_store) {
2017            return Err(IotaError::UnsupportedFeature {
2018                error: "dev-inspect is only supported on fullnodes".to_string(),
2019            });
2020        }
2021
2022        if transaction_kind.is_system_tx() {
2023            return Err(IotaError::UnsupportedFeature {
2024                error: "system transactions are not supported".to_string(),
2025            });
2026        }
2027
2028        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2029        let skip_checks = skip_checks.unwrap_or(true);
2030        let reference_gas_price = epoch_store.reference_gas_price();
2031        let protocol_config = epoch_store.protocol_config();
2032        let max_tx_gas = protocol_config.max_tx_gas();
2033
2034        let price = gas_price.unwrap_or(reference_gas_price);
2035        let budget = gas_budget.unwrap_or(max_tx_gas);
2036        let owner = gas_sponsor.unwrap_or(sender);
2037        // Payment might be empty here, but it's fine we'll have to deal with it later
2038        // after reading all the input objects.
2039        let payment = gas_objects.unwrap_or_default();
2040        let transaction = TransactionData::V1(TransactionDataV1 {
2041            kind: transaction_kind.clone(),
2042            sender,
2043            gas_data: GasData {
2044                payment,
2045                owner,
2046                price,
2047                budget,
2048            },
2049            expiration: TransactionExpiration::None,
2050        });
2051
2052        let raw_txn_data = if show_raw_txn_data_and_effects {
2053            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2054                error: "Failed to serialize transaction during dev inspect".to_string(),
2055            })?
2056        } else {
2057            vec![]
2058        };
2059
2060        transaction.validity_check_no_gas_check(protocol_config)?;
2061
2062        let input_object_kinds = transaction.input_objects()?;
2063        let receiving_object_refs = transaction.receiving_objects();
2064
2065        iota_transaction_checks::deny::check_transaction_for_signing(
2066            &transaction,
2067            &[],
2068            &input_object_kinds,
2069            &receiving_object_refs,
2070            &self.config.transaction_deny_config,
2071            self.get_backing_package_store().as_ref(),
2072        )?;
2073
2074        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2075            // We don't want to cache this transaction since it's a dev inspect.
2076            None,
2077            &input_object_kinds,
2078            &receiving_object_refs,
2079            epoch_store.epoch(),
2080        )?;
2081
2082        // Create and use a dummy gas object if there is no gas object provided.
2083        let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2084            SIMULATION_GAS_COIN_VALUE,
2085            transaction.gas_owner(),
2086        );
2087
2088        let gas_objects = if transaction.gas().is_empty() {
2089            let gas_object_ref = dummy_gas_object.compute_object_reference();
2090            vec![gas_object_ref]
2091        } else {
2092            transaction.gas().to_vec()
2093        };
2094
2095        let (gas_status, checked_input_objects) = if skip_checks {
2096            // If we are skipping checks, then we call the check_dev_inspect_input function
2097            // which will perform only lightweight checks on the transaction
2098            // input. And if the gas field is empty, that means we will
2099            // use the dummy gas object so we need to add it to the input objects vector.
2100            if transaction.gas().is_empty() {
2101                input_objects.push(ObjectReadResult::new(
2102                    InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2103                    dummy_gas_object.into(),
2104                ));
2105            }
2106            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2107                protocol_config,
2108                &transaction_kind,
2109                input_objects,
2110                receiving_objects,
2111            )?;
2112            let gas_status = IotaGasStatus::new(
2113                max_tx_gas,
2114                transaction.gas_price(),
2115                reference_gas_price,
2116                protocol_config,
2117            )?;
2118
2119            (gas_status, checked_input_objects)
2120        } else {
2121            // If we are not skipping checks, then we call the check_transaction_input
2122            // function and its dummy gas variant which will perform full
2123            // fledged checks just like a real transaction execution.
2124            if transaction.gas().is_empty() {
2125                iota_transaction_checks::check_transaction_input_with_given_gas(
2126                    epoch_store.protocol_config(),
2127                    reference_gas_price,
2128                    &transaction,
2129                    input_objects,
2130                    receiving_objects,
2131                    dummy_gas_object,
2132                    &self.metrics.bytecode_verifier_metrics,
2133                    &self.config.verifier_signing_config,
2134                )?
2135            } else {
2136                iota_transaction_checks::check_transaction_input(
2137                    epoch_store.protocol_config(),
2138                    reference_gas_price,
2139                    &transaction,
2140                    input_objects,
2141                    &receiving_objects,
2142                    &self.metrics.bytecode_verifier_metrics,
2143                    &self.config.verifier_signing_config,
2144                )?
2145            }
2146        };
2147
2148        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2149            .expect("Creating an executor should not fail here");
2150        let intent_msg = IntentMessage::new(
2151            Intent {
2152                version: IntentVersion::V0,
2153                scope: IntentScope::TransactionData,
2154                app_id: IntentAppId::Iota,
2155            },
2156            transaction,
2157        );
2158        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2159        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2160            self.get_backing_store().as_ref(),
2161            protocol_config,
2162            self.metrics.limits_metrics.clone(),
2163            // expensive checks
2164            false,
2165            self.config.certificate_deny_config.certificate_deny_set(),
2166            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2167            epoch_store
2168                .epoch_start_config()
2169                .epoch_data()
2170                .epoch_start_timestamp(),
2171            checked_input_objects,
2172            gas_objects,
2173            gas_status,
2174            transaction_kind,
2175            sender,
2176            transaction_digest,
2177            skip_checks,
2178        );
2179
2180        let raw_effects = if show_raw_txn_data_and_effects {
2181            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2182                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2183            })?
2184        } else {
2185            vec![]
2186        };
2187
2188        let mut layout_resolver =
2189            epoch_store
2190                .executor()
2191                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2192                    &inner_temp_store,
2193                    self.get_backing_package_store(),
2194                )));
2195
2196        DevInspectResults::new(
2197            effects,
2198            inner_temp_store.events.clone(),
2199            execution_result,
2200            raw_txn_data,
2201            raw_effects,
2202            layout_resolver.as_mut(),
2203        )
2204    }
2205
2206    // Only used for testing because of how epoch store is loaded.
2207    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2208        let epoch_store = self.epoch_store_for_testing();
2209        Ok(epoch_store.reference_gas_price())
2210    }
2211
2212    #[instrument(level = "trace", skip_all)]
2213    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2214        self.get_transaction_cache_reader()
2215            .try_is_tx_already_executed(digest)
2216    }
2217
2218    /// Non-fallible version of `try_is_tx_already_executed`.
2219    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2220        self.try_is_tx_already_executed(digest)
2221            .expect("storage access failed")
2222    }
2223
2224    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2225    #[instrument(level = "debug", skip_all, err)]
2226    fn index_tx(
2227        &self,
2228        indexes: &IndexStore,
2229        digest: &TransactionDigest,
2230        // TODO: index_tx really just need the transaction data here.
2231        cert: &VerifiedExecutableTransaction,
2232        effects: &TransactionEffects,
2233        events: &TransactionEvents,
2234        timestamp_ms: u64,
2235        tx_coins: Option<TxCoins>,
2236        written: &WrittenObjects,
2237        inner_temporary_store: &InnerTemporaryStore,
2238    ) -> IotaResult<u64> {
2239        let changes = self
2240            .process_object_index(effects, written, inner_temporary_store)
2241            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2242
2243        indexes.index_tx(
2244            cert.data().intent_message().value.sender(),
2245            cert.data()
2246                .intent_message()
2247                .value
2248                .input_objects()?
2249                .iter()
2250                .map(|o| o.object_id()),
2251            effects
2252                .all_changed_objects()
2253                .into_iter()
2254                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2255            cert.data()
2256                .intent_message()
2257                .value
2258                .move_calls()
2259                .into_iter()
2260                .map(|(package, module, function)| {
2261                    (*package, module.to_owned(), function.to_owned())
2262                }),
2263            events,
2264            changes,
2265            digest,
2266            timestamp_ms,
2267            tx_coins,
2268        )
2269    }
2270
2271    #[cfg(msim)]
2272    fn create_fail_state(
2273        &self,
2274        certificate: &VerifiedExecutableTransaction,
2275        epoch_store: &Arc<AuthorityPerEpochStore>,
2276        effects: &mut TransactionEffects,
2277    ) {
2278        use std::cell::RefCell;
2279        thread_local! {
2280            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2281        }
2282        if !certificate.data().intent_message().value.is_system_tx() {
2283            let committee = epoch_store.committee();
2284            let cur_stake = (**committee).weight(&self.name);
2285            if cur_stake > 0 {
2286                FAIL_STATE.with_borrow_mut(|fail_state| {
2287                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2288                    if fail_state.0 < committee.validity_threshold() {
2289                        fail_state.0 += cur_stake;
2290                        fail_state.1.insert(self.name);
2291                    }
2292
2293                    if fail_state.1.contains(&self.name) {
2294                        info!("cp_exec failing tx");
2295                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2296                    }
2297                });
2298            }
2299        }
2300    }
2301
2302    fn process_object_index(
2303        &self,
2304        effects: &TransactionEffects,
2305        written: &WrittenObjects,
2306        inner_temporary_store: &InnerTemporaryStore,
2307    ) -> IotaResult<ObjectIndexChanges> {
2308        let epoch_store = self.load_epoch_store_one_call_per_task();
2309        let mut layout_resolver =
2310            epoch_store
2311                .executor()
2312                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2313                    inner_temporary_store,
2314                    self.get_backing_package_store(),
2315                )));
2316
2317        let modified_at_version = effects
2318            .modified_at_versions()
2319            .into_iter()
2320            .collect::<HashMap<_, _>>();
2321
2322        let tx_digest = effects.transaction_digest();
2323        let mut deleted_owners = vec![];
2324        let mut deleted_dynamic_fields = vec![];
2325        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2326            let old_version = modified_at_version.get(&id).unwrap();
2327            // When we process the index, the latest object hasn't been written yet so
2328            // the old object must be present.
2329            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2330                |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2331            ) {
2332                Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2333                Owner::ObjectOwner(object_id) => {
2334                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2335                }
2336                _ => {}
2337            }
2338        }
2339
2340        let mut new_owners = vec![];
2341        let mut new_dynamic_fields = vec![];
2342
2343        for (oref, owner, kind) in effects.all_changed_objects() {
2344            let id = &oref.0;
2345            // For mutated objects, retrieve old owner and delete old index if there is a
2346            // owner change.
2347            if let WriteKind::Mutate = kind {
2348                let Some(old_version) = modified_at_version.get(id) else {
2349                    panic!(
2350                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2351                    );
2352                };
2353                // When we process the index, the latest object hasn't been written yet so
2354                // the old object must be present.
2355                let Some(old_object) = self
2356                    .get_object_store()
2357                    .try_get_object_by_key(id, *old_version)?
2358                else {
2359                    panic!(
2360                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2361                    );
2362                };
2363                if old_object.owner != owner {
2364                    match old_object.owner {
2365                        Owner::AddressOwner(addr) => {
2366                            deleted_owners.push((addr, *id));
2367                        }
2368                        Owner::ObjectOwner(object_id) => {
2369                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2370                        }
2371                        _ => {}
2372                    }
2373                }
2374            }
2375
2376            match owner {
2377                Owner::AddressOwner(addr) => {
2378                    // TODO: We can remove the object fetching after we added ObjectType to
2379                    // TransactionEffects
2380                    let new_object = written.get(id).unwrap_or_else(
2381                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2382                    );
2383                    assert_eq!(
2384                        new_object.version(),
2385                        oref.1,
2386                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2387                        tx_digest,
2388                        id,
2389                        new_object.version(),
2390                        oref.1
2391                    );
2392
2393                    let type_ = new_object
2394                        .type_()
2395                        .map(|type_| ObjectType::Struct(type_.clone()))
2396                        .unwrap_or(ObjectType::Package);
2397
2398                    new_owners.push((
2399                        (addr, *id),
2400                        ObjectInfo {
2401                            object_id: *id,
2402                            version: oref.1,
2403                            digest: oref.2,
2404                            type_,
2405                            owner,
2406                            previous_transaction: *effects.transaction_digest(),
2407                        },
2408                    ));
2409                }
2410                Owner::ObjectOwner(owner) => {
2411                    let new_object = written.get(id).unwrap_or_else(
2412                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2413                    );
2414                    assert_eq!(
2415                        new_object.version(),
2416                        oref.1,
2417                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2418                        tx_digest,
2419                        id,
2420                        new_object.version(),
2421                        oref.1
2422                    );
2423
2424                    let Some(df_info) = self
2425                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut(), false)
2426                        .unwrap_or_else(|e| {
2427                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2428                            None
2429                        }
2430                    )
2431                        else {
2432                            // Skip indexing for non dynamic field objects.
2433                            continue;
2434                        };
2435                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2436                }
2437                _ => {}
2438            }
2439        }
2440
2441        Ok(ObjectIndexChanges {
2442            deleted_owners,
2443            deleted_dynamic_fields,
2444            new_owners,
2445            new_dynamic_fields,
2446        })
2447    }
2448
2449    fn try_create_dynamic_field_info(
2450        &self,
2451        o: &Object,
2452        written: &WrittenObjects,
2453        resolver: &mut dyn LayoutResolver,
2454        get_latest_object_version: bool,
2455    ) -> IotaResult<Option<DynamicFieldInfo>> {
2456        // Skip if not a move object
2457        let Some(move_object) = o.data.try_as_move().cloned() else {
2458            return Ok(None);
2459        };
2460
2461        // We only index dynamic field objects
2462        if !move_object.type_().is_dynamic_field() {
2463            return Ok(None);
2464        }
2465
2466        let layout = resolver
2467            .get_annotated_layout(&move_object.type_().clone().into())?
2468            .into_layout();
2469
2470        let field =
2471            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2472                IotaError::ObjectDeserialization {
2473                    error: e.to_string(),
2474                }
2475            })?;
2476
2477        let type_ = field.kind;
2478        let name_type: TypeTag = field.name_layout.into();
2479        let bcs_name = field.name_bytes.to_owned();
2480
2481        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2482            .map_err(|e| {
2483                warn!("{e}");
2484                IotaError::ObjectDeserialization {
2485                    error: e.to_string(),
2486                }
2487            })?;
2488
2489        let name = DynamicFieldName {
2490            type_: name_type,
2491            value: IotaMoveValue::from(name_value).to_json_value(),
2492        };
2493
2494        let value_metadata = field.value_metadata().map_err(|e| {
2495            warn!("{e}");
2496            IotaError::ObjectDeserialization {
2497                error: e.to_string(),
2498            }
2499        })?;
2500
2501        Ok(Some(match value_metadata {
2502            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2503                name,
2504                bcs_name,
2505                type_,
2506                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2507                object_id: o.id(),
2508                version: o.version(),
2509                digest: o.digest(),
2510            },
2511
2512            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2513                // Find the actual object from storage using the object id obtained from the
2514                // wrapper.
2515
2516                // Try to find the object in the written objects first.
2517                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2518                    (
2519                        object.version(),
2520                        object.digest(),
2521                        object.data.type_().unwrap().clone(),
2522                    )
2523                } else {
2524                    // If not found, try to find it in the database.
2525                    let object = if get_latest_object_version {
2526                        // Loading genesis object could meet a condition that the version of the
2527                        // genesis object is behind the one in the snapshot.
2528                        // In this case, the object can not be found with get_object_by_key, we need
2529                        // to use get_object instead. Since get_object is a heavier operation, we
2530                        // only allow to use it for genesis.
2531                        // reference: https://github.com/iotaledger/iota/issues/7267
2532                        self.get_object_store()
2533                            .try_get_object(&object_id)?
2534                            .ok_or_else(|| UserInputError::ObjectNotFound {
2535                                object_id,
2536                                version: Some(o.version()),
2537                            })?
2538                    } else {
2539                        // Non-genesis object should be in the database with the given version.
2540                        self.get_object_store()
2541                            .try_get_object_by_key(&object_id, o.version())?
2542                            .ok_or_else(|| UserInputError::ObjectNotFound {
2543                                object_id,
2544                                version: Some(o.version()),
2545                            })?
2546                    };
2547
2548                    (
2549                        object.version(),
2550                        object.digest(),
2551                        object.data.type_().unwrap().clone(),
2552                    )
2553                };
2554
2555                DynamicFieldInfo {
2556                    name,
2557                    bcs_name,
2558                    type_,
2559                    object_type: object_type.to_string(),
2560                    object_id,
2561                    version,
2562                    digest,
2563                }
2564            }
2565        }))
2566    }
2567
2568    #[instrument(level = "trace", skip_all, err)]
2569    fn post_process_one_tx(
2570        &self,
2571        certificate: &VerifiedExecutableTransaction,
2572        effects: &TransactionEffects,
2573        inner_temporary_store: &InnerTemporaryStore,
2574        epoch_store: &Arc<AuthorityPerEpochStore>,
2575    ) -> IotaResult {
2576        if self.indexes.is_none() {
2577            return Ok(());
2578        }
2579
2580        let tx_digest = certificate.digest();
2581        let timestamp_ms = Self::unixtime_now_ms();
2582        let events = &inner_temporary_store.events;
2583        let written = &inner_temporary_store.written;
2584        let tx_coins =
2585            self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2586
2587        // Index tx
2588        if let Some(indexes) = &self.indexes {
2589            let _ = self
2590                .index_tx(
2591                    indexes.as_ref(),
2592                    tx_digest,
2593                    certificate,
2594                    effects,
2595                    events,
2596                    timestamp_ms,
2597                    tx_coins,
2598                    written,
2599                    inner_temporary_store,
2600                )
2601                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2602                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2603                .expect("Indexing tx should not fail");
2604
2605            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2606            let events = self.make_transaction_block_events(
2607                events.clone(),
2608                *tx_digest,
2609                timestamp_ms,
2610                epoch_store,
2611                inner_temporary_store,
2612            )?;
2613            // Emit events
2614            self.subscription_handler
2615                .process_tx(certificate.data().transaction_data(), &effects, &events)
2616                .tap_ok(|_| {
2617                    self.metrics
2618                        .post_processing_total_tx_had_event_processed
2619                        .inc()
2620                })
2621                .tap_err(|e| {
2622                    warn!(
2623                        ?tx_digest,
2624                        "Post processing - Couldn't process events for tx: {}", e
2625                    )
2626                })?;
2627
2628            self.metrics
2629                .post_processing_total_events_emitted
2630                .inc_by(events.data.len() as u64);
2631        };
2632        Ok(())
2633    }
2634
2635    fn make_transaction_block_events(
2636        &self,
2637        transaction_events: TransactionEvents,
2638        digest: TransactionDigest,
2639        timestamp_ms: u64,
2640        epoch_store: &Arc<AuthorityPerEpochStore>,
2641        inner_temporary_store: &InnerTemporaryStore,
2642    ) -> IotaResult<IotaTransactionBlockEvents> {
2643        let mut layout_resolver =
2644            epoch_store
2645                .executor()
2646                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2647                    inner_temporary_store,
2648                    self.get_backing_package_store(),
2649                )));
2650        IotaTransactionBlockEvents::try_from(
2651            transaction_events,
2652            digest,
2653            Some(timestamp_ms),
2654            layout_resolver.as_mut(),
2655        )
2656    }
2657
2658    pub fn unixtime_now_ms() -> u64 {
2659        let now = SystemTime::now()
2660            .duration_since(UNIX_EPOCH)
2661            .expect("Time went backwards")
2662            .as_millis();
2663        u64::try_from(now).expect("Travelling in time machine")
2664    }
2665
2666    #[instrument(level = "trace", skip_all)]
2667    pub async fn handle_transaction_info_request(
2668        &self,
2669        request: TransactionInfoRequest,
2670    ) -> IotaResult<TransactionInfoResponse> {
2671        let epoch_store = self.load_epoch_store_one_call_per_task();
2672        let (transaction, status) = self
2673            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2674            .ok_or(IotaError::TransactionNotFound {
2675                digest: request.transaction_digest,
2676            })?;
2677        Ok(TransactionInfoResponse {
2678            transaction,
2679            status,
2680        })
2681    }
2682
2683    #[instrument(level = "trace", skip_all)]
2684    pub async fn handle_object_info_request(
2685        &self,
2686        request: ObjectInfoRequest,
2687    ) -> IotaResult<ObjectInfoResponse> {
2688        let epoch_store = self.load_epoch_store_one_call_per_task();
2689
2690        let requested_object_seq = match request.request_kind {
2691            ObjectInfoRequestKind::LatestObjectInfo => {
2692                let (_, seq, _) = self
2693                    .try_get_object_or_tombstone(request.object_id)
2694                    .await?
2695                    .ok_or_else(|| {
2696                        IotaError::from(UserInputError::ObjectNotFound {
2697                            object_id: request.object_id,
2698                            version: None,
2699                        })
2700                    })?;
2701                seq
2702            }
2703            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2704        };
2705
2706        let object = self
2707            .get_object_store()
2708            .try_get_object_by_key(&request.object_id, requested_object_seq)?
2709            .ok_or_else(|| {
2710                IotaError::from(UserInputError::ObjectNotFound {
2711                    object_id: request.object_id,
2712                    version: Some(requested_object_seq),
2713                })
2714            })?;
2715
2716        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2717            (request.generate_layout, object.data.try_as_move())
2718        {
2719            Some(into_struct_layout(
2720                epoch_store
2721                    .executor()
2722                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2723                    .get_annotated_layout(&move_obj.type_().clone().into())?,
2724            )?)
2725        } else {
2726            None
2727        };
2728
2729        let lock = if !object.is_address_owned() {
2730            // Only address owned objects have locks.
2731            None
2732        } else {
2733            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2734                .await?
2735                .map(|s| s.into_inner())
2736        };
2737
2738        Ok(ObjectInfoResponse {
2739            object,
2740            layout,
2741            lock_for_debugging: lock,
2742        })
2743    }
2744
2745    #[instrument(level = "trace", skip_all)]
2746    pub fn handle_checkpoint_request(
2747        &self,
2748        request: &CheckpointRequest,
2749    ) -> IotaResult<CheckpointResponse> {
2750        let summary = if request.certified {
2751            let summary = match request.sequence_number {
2752                Some(seq) => self
2753                    .checkpoint_store
2754                    .get_checkpoint_by_sequence_number(seq)?,
2755                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2756            }
2757            .map(|v| v.into_inner());
2758            summary.map(CheckpointSummaryResponse::Certified)
2759        } else {
2760            let summary = match request.sequence_number {
2761                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2762                None => self
2763                    .checkpoint_store
2764                    .get_latest_locally_computed_checkpoint()?,
2765            };
2766            summary.map(CheckpointSummaryResponse::Pending)
2767        };
2768        let contents = match &summary {
2769            Some(s) => self
2770                .checkpoint_store
2771                .get_checkpoint_contents(&s.content_digest())?,
2772            None => None,
2773        };
2774        Ok(CheckpointResponse {
2775            checkpoint: summary,
2776            contents,
2777        })
2778    }
2779
2780    fn check_protocol_version(
2781        supported_protocol_versions: SupportedProtocolVersions,
2782        current_version: ProtocolVersion,
2783    ) {
2784        info!("current protocol version is now {:?}", current_version);
2785        info!("supported versions are: {:?}", supported_protocol_versions);
2786        if !supported_protocol_versions.is_version_supported(current_version) {
2787            let msg = format!(
2788                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2789            );
2790
2791            error!("{}", msg);
2792            eprintln!("{msg}");
2793
2794            #[cfg(not(msim))]
2795            std::process::exit(1);
2796
2797            #[cfg(msim)]
2798            iota_simulator::task::shutdown_current_node();
2799        }
2800    }
2801
2802    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
2803    pub async fn new(
2804        name: AuthorityName,
2805        secret: StableSyncAuthoritySigner,
2806        supported_protocol_versions: SupportedProtocolVersions,
2807        store: Arc<AuthorityStore>,
2808        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2809        epoch_store: Arc<AuthorityPerEpochStore>,
2810        committee_store: Arc<CommitteeStore>,
2811        indexes: Option<Arc<IndexStore>>,
2812        rest_index: Option<Arc<RestIndexStore>>,
2813        checkpoint_store: Arc<CheckpointStore>,
2814        prometheus_registry: &Registry,
2815        genesis_objects: &[Object],
2816        db_checkpoint_config: &DBCheckpointConfig,
2817        config: NodeConfig,
2818        archive_readers: ArchiveReaderBalancer,
2819        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2820        chain_identifier: ChainIdentifier,
2821        pruner_db: Option<Arc<AuthorityPrunerTables>>,
2822    ) -> Arc<Self> {
2823        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2824
2825        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2826        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2827        let transaction_manager = Arc::new(TransactionManager::new(
2828            execution_cache_trait_pointers.object_cache_reader.clone(),
2829            execution_cache_trait_pointers
2830                .transaction_cache_reader
2831                .clone(),
2832            &epoch_store,
2833            tx_ready_certificates,
2834            metrics.clone(),
2835        ));
2836        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
2837
2838        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
2839            epoch_store.get_parent_path(),
2840            &config.authority_store_pruning_config,
2841        );
2842        let _pruner = AuthorityStorePruner::new(
2843            store.perpetual_tables.clone(),
2844            checkpoint_store.clone(),
2845            rest_index.clone(),
2846            indexes.clone(),
2847            config.authority_store_pruning_config.clone(),
2848            epoch_store.committee().authority_exists(&name),
2849            epoch_store.epoch_start_state().epoch_duration_ms(),
2850            prometheus_registry,
2851            archive_readers,
2852            pruner_db,
2853        );
2854        let input_loader =
2855            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
2856        let epoch = epoch_store.epoch();
2857        let rgp = epoch_store.reference_gas_price();
2858        let state = Arc::new(AuthorityState {
2859            name,
2860            secret,
2861            execution_lock: RwLock::new(epoch),
2862            epoch_store: ArcSwap::new(epoch_store.clone()),
2863            input_loader,
2864            execution_cache_trait_pointers,
2865            indexes,
2866            rest_index,
2867            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
2868            checkpoint_store,
2869            committee_store,
2870            transaction_manager,
2871            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
2872            metrics,
2873            _pruner,
2874            _authority_per_epoch_pruner,
2875            db_checkpoint_config: db_checkpoint_config.clone(),
2876            config,
2877            overload_info: AuthorityOverloadInfo::default(),
2878            validator_tx_finalizer,
2879            chain_identifier,
2880            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
2881        });
2882
2883        // Start a task to execute ready certificates.
2884        let authority_state = Arc::downgrade(&state);
2885        spawn_monitored_task!(execution_process(
2886            authority_state,
2887            rx_ready_certificates,
2888            rx_execution_shutdown,
2889        ));
2890
2891        // TODO: This doesn't belong to the constructor of AuthorityState.
2892        state
2893            .create_owner_index_if_empty(genesis_objects, &epoch_store)
2894            .expect("Error indexing genesis objects.");
2895
2896        state
2897    }
2898
2899    // TODO: Consolidate our traits to reduce the number of methods here.
2900    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
2901        &self.execution_cache_trait_pointers.object_cache_reader
2902    }
2903
2904    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
2905        &self.execution_cache_trait_pointers.transaction_cache_reader
2906    }
2907
2908    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
2909        &self.execution_cache_trait_pointers.cache_writer
2910    }
2911
2912    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
2913        &self.execution_cache_trait_pointers.backing_store
2914    }
2915
2916    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
2917        &self.execution_cache_trait_pointers.backing_package_store
2918    }
2919
2920    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
2921        &self.execution_cache_trait_pointers.object_store
2922    }
2923
2924    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
2925        &self.execution_cache_trait_pointers.reconfig_api
2926    }
2927
2928    pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
2929        &self.execution_cache_trait_pointers.accumulator_store
2930    }
2931
2932    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
2933        &self.execution_cache_trait_pointers.checkpoint_cache
2934    }
2935
2936    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
2937        &self.execution_cache_trait_pointers.state_sync_store
2938    }
2939
2940    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
2941        &self.execution_cache_trait_pointers.cache_commit
2942    }
2943
2944    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
2945        self.execution_cache_trait_pointers
2946            .testing_api
2947            .database_for_testing()
2948    }
2949
2950    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
2951        &self,
2952        config: NodeConfig,
2953        metrics: Arc<AuthorityStorePruningMetrics>,
2954    ) -> anyhow::Result<()> {
2955        let archive_readers =
2956            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
2957        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
2958            &self.database_for_testing().perpetual_tables,
2959            &self.checkpoint_store,
2960            self.rest_index.as_deref(),
2961            None,
2962            config.authority_store_pruning_config,
2963            metrics,
2964            archive_readers,
2965            EPOCH_DURATION_MS_FOR_TESTING,
2966        )
2967        .await
2968    }
2969
2970    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
2971        &self.transaction_manager
2972    }
2973
2974    /// Adds transactions / certificates to transaction manager for ordered
2975    /// execution.
2976    pub fn enqueue_transactions_for_execution(
2977        &self,
2978        txns: Vec<VerifiedExecutableTransaction>,
2979        epoch_store: &Arc<AuthorityPerEpochStore>,
2980    ) {
2981        self.transaction_manager.enqueue(txns, epoch_store)
2982    }
2983
2984    /// Adds certificates to transaction manager for ordered execution.
2985    pub fn enqueue_certificates_for_execution(
2986        &self,
2987        certs: Vec<VerifiedCertificate>,
2988        epoch_store: &Arc<AuthorityPerEpochStore>,
2989    ) {
2990        self.transaction_manager
2991            .enqueue_certificates(certs, epoch_store)
2992    }
2993
2994    pub fn enqueue_with_expected_effects_digest(
2995        &self,
2996        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
2997        epoch_store: &AuthorityPerEpochStore,
2998    ) {
2999        self.transaction_manager
3000            .enqueue_with_expected_effects_digest(certs, epoch_store)
3001    }
3002
3003    fn create_owner_index_if_empty(
3004        &self,
3005        genesis_objects: &[Object],
3006        epoch_store: &Arc<AuthorityPerEpochStore>,
3007    ) -> IotaResult {
3008        let Some(index_store) = &self.indexes else {
3009            return Ok(());
3010        };
3011        if !index_store.is_empty() {
3012            return Ok(());
3013        }
3014
3015        let mut new_owners = vec![];
3016        let mut new_dynamic_fields = vec![];
3017        let mut layout_resolver = epoch_store
3018            .executor()
3019            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3020        for o in genesis_objects.iter() {
3021            match o.owner {
3022                Owner::AddressOwner(addr) => new_owners.push((
3023                    (addr, o.id()),
3024                    ObjectInfo::new(&o.compute_object_reference(), o),
3025                )),
3026                Owner::ObjectOwner(object_id) => {
3027                    let id = o.id();
3028                    let Some(info) = self.try_create_dynamic_field_info(
3029                        o,
3030                        &BTreeMap::new(),
3031                        layout_resolver.as_mut(),
3032                        true,
3033                    )?
3034                    else {
3035                        continue;
3036                    };
3037                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3038                }
3039                _ => {}
3040            }
3041        }
3042
3043        index_store.insert_genesis_objects(ObjectIndexChanges {
3044            deleted_owners: vec![],
3045            deleted_dynamic_fields: vec![],
3046            new_owners,
3047            new_dynamic_fields,
3048        })
3049    }
3050
3051    /// Attempts to acquire execution lock for an executable transaction.
3052    /// Returns the lock if the transaction is matching current executed epoch
3053    /// Returns None otherwise
3054    pub fn execution_lock_for_executable_transaction(
3055        &self,
3056        transaction: &VerifiedExecutableTransaction,
3057    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3058        let lock = self
3059            .execution_lock
3060            .try_read()
3061            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3062        if *lock == transaction.auth_sig().epoch() {
3063            Ok(lock)
3064        } else {
3065            Err(IotaError::WrongEpoch {
3066                expected_epoch: *lock,
3067                actual_epoch: transaction.auth_sig().epoch(),
3068            })
3069        }
3070    }
3071
3072    /// Acquires the execution lock for the duration of a transaction signing
3073    /// request. This prevents reconfiguration from starting until we are
3074    /// finished handling the signing request. Otherwise, in-memory lock
3075    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3076    /// while we are attempting to acquire locks for the transaction.
3077    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3078        self.execution_lock
3079            .try_read()
3080            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3081    }
3082
3083    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3084        self.execution_lock.write().await
3085    }
3086
3087    #[instrument(level = "error", skip_all)]
3088    pub async fn reconfigure(
3089        &self,
3090        cur_epoch_store: &AuthorityPerEpochStore,
3091        supported_protocol_versions: SupportedProtocolVersions,
3092        new_committee: Committee,
3093        epoch_start_configuration: EpochStartConfiguration,
3094        accumulator: Arc<StateAccumulator>,
3095        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3096        epoch_supply_change: i64,
3097        epoch_last_checkpoint: CheckpointSequenceNumber,
3098    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3099        Self::check_protocol_version(
3100            supported_protocol_versions,
3101            epoch_start_configuration
3102                .epoch_start_state()
3103                .protocol_version(),
3104        );
3105        self.metrics.reset_on_reconfigure();
3106        self.committee_store.insert_new_committee(&new_committee)?;
3107
3108        // Wait until no transactions are being executed.
3109        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3110
3111        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3112        cur_epoch_store.epoch_terminated().await;
3113
3114        let highest_locally_built_checkpoint_seq = self
3115            .checkpoint_store
3116            .get_latest_locally_computed_checkpoint()?
3117            .map(|c| *c.sequence_number())
3118            .unwrap_or(0);
3119
3120        assert!(
3121            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3122            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3123        );
3124        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3125            // if we built the last checkpoint locally (as opposed to receiving it from a
3126            // peer), then all shared_version_assignments except the one for the
3127            // ChangeEpoch transaction should have been removed
3128            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3129            // Note that while 1 is the typical value, 0 is possible if the node restarts
3130            // after committing the last checkpoint but before reconfiguring.
3131            if num_shared_version_assignments > 1 {
3132                // If this happens in prod, we have a memory leak, but not a correctness issue.
3133                debug_fatal!(
3134                    "all shared_version_assignments should have been removed \
3135                    (num_shared_version_assignments: {num_shared_version_assignments})"
3136                );
3137            }
3138        }
3139
3140        // Safe to reconfigure now. No transactions are being executed,
3141        // and no epoch-specific tasks are running.
3142
3143        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3144        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3145        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3146            .await?;
3147        self.get_reconfig_api()
3148            .clear_state_end_of_epoch(&execution_lock);
3149        self.check_system_consistency(
3150            cur_epoch_store,
3151            accumulator,
3152            expensive_safety_check_config,
3153            epoch_supply_change,
3154        )?;
3155        self.get_reconfig_api()
3156            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3157        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3158            if self
3159                .db_checkpoint_config
3160                .perform_db_checkpoints_at_epoch_end
3161            {
3162                let checkpoint_indexes = self
3163                    .db_checkpoint_config
3164                    .perform_index_db_checkpoints_at_epoch_end
3165                    .unwrap_or(false);
3166                let current_epoch = cur_epoch_store.epoch();
3167                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3168                self.checkpoint_all_dbs(
3169                    &epoch_checkpoint_path,
3170                    cur_epoch_store,
3171                    checkpoint_indexes,
3172                )?;
3173            }
3174        }
3175
3176        self.get_reconfig_api()
3177            .reconfigure_cache(&epoch_start_configuration)
3178            .await;
3179
3180        let new_epoch = new_committee.epoch;
3181        let new_epoch_store = self
3182            .reopen_epoch_db(
3183                cur_epoch_store,
3184                new_committee,
3185                epoch_start_configuration,
3186                expensive_safety_check_config,
3187                epoch_last_checkpoint,
3188            )
3189            .await?;
3190        assert_eq!(new_epoch_store.epoch(), new_epoch);
3191        self.transaction_manager.reconfigure(new_epoch);
3192        *execution_lock = new_epoch;
3193        // drop execution_lock after epoch store was updated
3194        // see also assert in AuthorityState::process_certificate
3195        // on the epoch store and execution lock epoch match
3196        Ok(new_epoch_store)
3197    }
3198
3199    /// Advance the epoch store to the next epoch for testing only.
3200    /// This only manually sets all the places where we have the epoch number.
3201    /// It doesn't properly reconfigure the node, hence should be only used for
3202    /// testing.
3203    pub async fn reconfigure_for_testing(&self) {
3204        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3205        let epoch_store = self.epoch_store_for_testing().clone();
3206        let protocol_config = epoch_store.protocol_config().clone();
3207        // The current protocol config used in the epoch store may have been overridden
3208        // and diverged from the protocol config definitions. That override may
3209        // have now been dropped when the initial guard was dropped. We reapply
3210        // the override before creating the new epoch store, to make sure that
3211        // the new epoch store has the same protocol config as the current one.
3212        // Since this is for testing only, we mostly like to keep the protocol config
3213        // the same across epochs.
3214        let _guard =
3215            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3216        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3217            self.get_backing_package_store().clone(),
3218            self.get_object_store().clone(),
3219            &self.config.expensive_safety_check_config,
3220            self.checkpoint_store
3221                .get_epoch_last_checkpoint(epoch_store.epoch())
3222                .unwrap()
3223                .map(|c| *c.sequence_number())
3224                .unwrap_or_default(),
3225        );
3226        let new_epoch = new_epoch_store.epoch();
3227        self.transaction_manager.reconfigure(new_epoch);
3228        self.epoch_store.store(new_epoch_store);
3229        epoch_store.epoch_terminated().await;
3230        *execution_lock = new_epoch;
3231    }
3232
3233    #[instrument(level = "error", skip_all)]
3234    fn check_system_consistency(
3235        &self,
3236        cur_epoch_store: &AuthorityPerEpochStore,
3237        accumulator: Arc<StateAccumulator>,
3238        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3239        epoch_supply_change: i64,
3240    ) -> IotaResult<()> {
3241        info!(
3242            "Performing iota conservation consistency check for epoch {}",
3243            cur_epoch_store.epoch()
3244        );
3245
3246        if cfg!(debug_assertions) {
3247            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3248        }
3249
3250        self.get_reconfig_api()
3251            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3252
3253        // check for root state hash consistency with live object set
3254        if expensive_safety_check_config.enable_state_consistency_check() {
3255            info!(
3256                "Performing state consistency check for epoch {}",
3257                cur_epoch_store.epoch()
3258            );
3259            self.expensive_check_is_consistent_state(
3260                accumulator,
3261                cur_epoch_store,
3262                cfg!(debug_assertions), // panic in debug mode only
3263            );
3264        }
3265
3266        if expensive_safety_check_config.enable_secondary_index_checks() {
3267            if let Some(indexes) = self.indexes.clone() {
3268                verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3269                    .expect("secondary indexes are inconsistent");
3270            }
3271        }
3272
3273        Ok(())
3274    }
3275
3276    fn expensive_check_is_consistent_state(
3277        &self,
3278        accumulator: Arc<StateAccumulator>,
3279        cur_epoch_store: &AuthorityPerEpochStore,
3280        panic: bool,
3281    ) {
3282        let live_object_set_hash = accumulator.digest_live_object_set();
3283
3284        let root_state_hash: ECMHLiveObjectSetDigest = self
3285            .get_accumulator_store()
3286            .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3287            .expect("Retrieving root state hash cannot fail")
3288            .expect("Root state hash for epoch must exist")
3289            .1
3290            .digest()
3291            .into();
3292
3293        let is_inconsistent = root_state_hash != live_object_set_hash;
3294        if is_inconsistent {
3295            if panic {
3296                panic!(
3297                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3298                );
3299            } else {
3300                error!(
3301                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3302                    root_state_hash, live_object_set_hash
3303                );
3304            }
3305        } else {
3306            info!("State consistency check passed");
3307        }
3308
3309        if !panic {
3310            accumulator.set_inconsistent_state(is_inconsistent);
3311        }
3312    }
3313
3314    pub fn current_epoch_for_testing(&self) -> EpochId {
3315        self.epoch_store_for_testing().epoch()
3316    }
3317
3318    #[instrument(level = "error", skip_all)]
3319    pub fn checkpoint_all_dbs(
3320        &self,
3321        checkpoint_path: &Path,
3322        cur_epoch_store: &AuthorityPerEpochStore,
3323        checkpoint_indexes: bool,
3324    ) -> IotaResult {
3325        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3326        let current_epoch = cur_epoch_store.epoch();
3327
3328        if checkpoint_path.exists() {
3329            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3330            return Ok(());
3331        }
3332
3333        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3334        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3335
3336        if checkpoint_path_tmp.exists() {
3337            fs::remove_dir_all(&checkpoint_path_tmp)
3338                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3339        }
3340
3341        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3342        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3343
3344        // NOTE: Do not change the order of invoking these checkpoint calls
3345        // We want to snapshot checkpoint db first to not race with state sync
3346        self.checkpoint_store
3347            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3348
3349        self.get_reconfig_api()
3350            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3351
3352        self.committee_store
3353            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3354
3355        if checkpoint_indexes {
3356            if let Some(indexes) = self.indexes.as_ref() {
3357                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3358            }
3359        }
3360
3361        fs::rename(checkpoint_path_tmp, checkpoint_path)
3362            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3363        Ok(())
3364    }
3365
3366    /// Load the current epoch store. This can change during reconfiguration. To
3367    /// ensure that we never end up accessing different epoch stores in a
3368    /// single task, we need to make sure that this is called once per task.
3369    /// Each call needs to be carefully audited to ensure it is
3370    /// the case. This also means we should minimize the number of call-sites.
3371    /// Only call it when there is no way to obtain it from somewhere else.
3372    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3373        self.epoch_store.load()
3374    }
3375
3376    // Load the epoch store, should be used in tests only.
3377    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3378        self.load_epoch_store_one_call_per_task()
3379    }
3380
3381    pub fn clone_committee_for_testing(&self) -> Committee {
3382        Committee::clone(self.epoch_store_for_testing().committee())
3383    }
3384
3385    #[instrument(level = "trace", skip_all)]
3386    pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3387        self.get_object_store()
3388            .try_get_object(object_id)
3389            .map_err(Into::into)
3390    }
3391
3392    /// Non-fallible version of `try_get_object`.
3393    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3394        self.try_get_object(object_id)
3395            .await
3396            .expect("storage access failed")
3397    }
3398
3399    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3400        Ok(self
3401            .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3402            .await?
3403            .expect("framework object should always exist")
3404            .compute_object_reference())
3405    }
3406
3407    // This function is only used for testing.
3408    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3409        self.get_object_cache_reader()
3410            .try_get_iota_system_state_object_unsafe()
3411    }
3412
3413    #[instrument(level = "trace", skip_all)]
3414    pub fn get_checkpoint_by_sequence_number(
3415        &self,
3416        sequence_number: CheckpointSequenceNumber,
3417    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3418        Ok(self
3419            .checkpoint_store
3420            .get_checkpoint_by_sequence_number(sequence_number)?)
3421    }
3422
3423    #[instrument(level = "trace", skip_all)]
3424    pub fn get_transaction_checkpoint_for_tests(
3425        &self,
3426        digest: &TransactionDigest,
3427        epoch_store: &AuthorityPerEpochStore,
3428    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3429        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3430        let Some(checkpoint) = checkpoint else {
3431            return Ok(None);
3432        };
3433        let checkpoint = self
3434            .checkpoint_store
3435            .get_checkpoint_by_sequence_number(checkpoint)?;
3436        Ok(checkpoint)
3437    }
3438
3439    #[instrument(level = "trace", skip_all)]
3440    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3441        Ok(
3442            match self
3443                .get_object_cache_reader()
3444                .try_get_latest_object_or_tombstone(*object_id)?
3445            {
3446                Some((_, ObjectOrTombstone::Object(object))) => {
3447                    let layout = self.get_object_layout(&object)?;
3448                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3449                }
3450                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3451                None => ObjectRead::NotExists(*object_id),
3452            },
3453        )
3454    }
3455
3456    /// Chain Identifier is the digest of the genesis checkpoint.
3457    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3458        self.chain_identifier
3459    }
3460
3461    #[instrument(level = "trace", skip_all)]
3462    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3463    where
3464        T: DeserializeOwned,
3465    {
3466        let o = self.get_object_read(object_id)?.into_object()?;
3467        if let Some(move_object) = o.data.try_as_move() {
3468            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3469                IotaError::ObjectDeserialization {
3470                    error: format!("{e}"),
3471                }
3472            })?)
3473        } else {
3474            Err(IotaError::ObjectDeserialization {
3475                error: format!("Provided object : [{object_id}] is not a Move object."),
3476            })
3477        }
3478    }
3479
3480    /// This function aims to serve rpc reads on past objects and
3481    /// we don't expect it to be called for other purposes.
3482    /// Depending on the object pruning policies that will be enforced in the
3483    /// future there is no software-level guarantee/SLA to retrieve an object
3484    /// with an old version even if it exists/existed.
3485    #[instrument(level = "trace", skip_all)]
3486    pub fn get_past_object_read(
3487        &self,
3488        object_id: &ObjectID,
3489        version: SequenceNumber,
3490    ) -> IotaResult<PastObjectRead> {
3491        // Firstly we see if the object ever existed by getting its latest data
3492        let Some(obj_ref) = self
3493            .get_object_cache_reader()
3494            .try_get_latest_object_ref_or_tombstone(*object_id)?
3495        else {
3496            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3497        };
3498
3499        if version > obj_ref.1 {
3500            return Ok(PastObjectRead::VersionTooHigh {
3501                object_id: *object_id,
3502                asked_version: version,
3503                latest_version: obj_ref.1,
3504            });
3505        }
3506
3507        if version < obj_ref.1 {
3508            // Read past objects
3509            return Ok(match self.read_object_at_version(object_id, version)? {
3510                Some((object, layout)) => {
3511                    let obj_ref = object.compute_object_reference();
3512                    PastObjectRead::VersionFound(obj_ref, object, layout)
3513                }
3514
3515                None => PastObjectRead::VersionNotFound(*object_id, version),
3516            });
3517        }
3518
3519        if !obj_ref.2.is_alive() {
3520            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3521        }
3522
3523        match self.read_object_at_version(object_id, obj_ref.1)? {
3524            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3525            None => {
3526                error!(
3527                    "Object with in parent_entry is missing from object store, datastore is \
3528                     inconsistent",
3529                );
3530                Err(UserInputError::ObjectNotFound {
3531                    object_id: *object_id,
3532                    version: Some(obj_ref.1),
3533                }
3534                .into())
3535            }
3536        }
3537    }
3538
3539    #[instrument(level = "trace", skip_all)]
3540    fn read_object_at_version(
3541        &self,
3542        object_id: &ObjectID,
3543        version: SequenceNumber,
3544    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3545        let Some(object) = self
3546            .get_object_cache_reader()
3547            .try_get_object_by_key(object_id, version)?
3548        else {
3549            return Ok(None);
3550        };
3551
3552        let layout = self.get_object_layout(&object)?;
3553        Ok(Some((object, layout)))
3554    }
3555
3556    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3557        let layout = object
3558            .data
3559            .try_as_move()
3560            .map(|object| {
3561                into_struct_layout(
3562                    self.load_epoch_store_one_call_per_task()
3563                        .executor()
3564                        // TODO(cache) - must read through cache
3565                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3566                        .get_annotated_layout(&object.type_().clone().into())?,
3567                )
3568            })
3569            .transpose()?;
3570        Ok(layout)
3571    }
3572
3573    fn get_owner_at_version(
3574        &self,
3575        object_id: &ObjectID,
3576        version: SequenceNumber,
3577    ) -> IotaResult<Owner> {
3578        self.get_object_store()
3579            .try_get_object_by_key(object_id, version)?
3580            .ok_or_else(|| {
3581                IotaError::from(UserInputError::ObjectNotFound {
3582                    object_id: *object_id,
3583                    version: Some(version),
3584                })
3585            })
3586            .map(|o| o.owner)
3587    }
3588
3589    #[instrument(level = "trace", skip_all)]
3590    pub fn get_owner_objects(
3591        &self,
3592        owner: IotaAddress,
3593        // If `Some`, the query will start from the next item after the specified cursor
3594        cursor: Option<ObjectID>,
3595        limit: usize,
3596        filter: Option<IotaObjectDataFilter>,
3597    ) -> IotaResult<Vec<ObjectInfo>> {
3598        if let Some(indexes) = &self.indexes {
3599            indexes.get_owner_objects(owner, cursor, limit, filter)
3600        } else {
3601            Err(IotaError::IndexStoreNotAvailable)
3602        }
3603    }
3604
3605    #[instrument(level = "trace", skip_all)]
3606    pub fn get_owned_coins_iterator_with_cursor(
3607        &self,
3608        owner: IotaAddress,
3609        // If `Some`, the query will start from the next item after the specified cursor
3610        cursor: (String, ObjectID),
3611        limit: usize,
3612        one_coin_type_only: bool,
3613    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3614        if let Some(indexes) = &self.indexes {
3615            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3616        } else {
3617            Err(IotaError::IndexStoreNotAvailable)
3618        }
3619    }
3620
3621    #[instrument(level = "trace", skip_all)]
3622    pub fn get_owner_objects_iterator(
3623        &self,
3624        owner: IotaAddress,
3625        // If `Some`, the query will start from the next item after the specified cursor
3626        cursor: Option<ObjectID>,
3627        filter: Option<IotaObjectDataFilter>,
3628    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3629        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3630        if let Some(indexes) = &self.indexes {
3631            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3632        } else {
3633            Err(IotaError::IndexStoreNotAvailable)
3634        }
3635    }
3636
3637    #[instrument(level = "trace", skip_all)]
3638    pub async fn get_move_objects<T>(
3639        &self,
3640        owner: IotaAddress,
3641        type_: MoveObjectType,
3642    ) -> IotaResult<Vec<T>>
3643    where
3644        T: DeserializeOwned,
3645    {
3646        let object_ids = self
3647            .get_owner_objects_iterator(owner, None, None)?
3648            .filter(|o| match &o.type_ {
3649                ObjectType::Struct(s) => &type_ == s,
3650                ObjectType::Package => false,
3651            })
3652            .map(|info| ObjectKey(info.object_id, info.version))
3653            .collect::<Vec<_>>();
3654        let mut move_objects = vec![];
3655
3656        let objects = self
3657            .get_object_store()
3658            .try_multi_get_objects_by_key(&object_ids)?;
3659
3660        for (o, id) in objects.into_iter().zip(object_ids) {
3661            let object = o.ok_or_else(|| {
3662                IotaError::from(UserInputError::ObjectNotFound {
3663                    object_id: id.0,
3664                    version: Some(id.1),
3665                })
3666            })?;
3667            let move_object = object.data.try_as_move().ok_or_else(|| {
3668                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3669            })?;
3670            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3671                IotaError::ObjectDeserialization {
3672                    error: format!("{e}"),
3673                }
3674            })?);
3675        }
3676        Ok(move_objects)
3677    }
3678
3679    #[instrument(level = "trace", skip_all)]
3680    pub fn get_dynamic_fields(
3681        &self,
3682        owner: ObjectID,
3683        // If `Some`, the query will start from the next item after the specified cursor
3684        cursor: Option<ObjectID>,
3685        limit: usize,
3686    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3687        Ok(self
3688            .get_dynamic_fields_iterator(owner, cursor)?
3689            .take(limit)
3690            .collect::<Result<Vec<_>, _>>()?)
3691    }
3692
3693    fn get_dynamic_fields_iterator(
3694        &self,
3695        owner: ObjectID,
3696        // If `Some`, the query will start from the next item after the specified cursor
3697        cursor: Option<ObjectID>,
3698    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3699    {
3700        if let Some(indexes) = &self.indexes {
3701            indexes.get_dynamic_fields_iterator(owner, cursor)
3702        } else {
3703            Err(IotaError::IndexStoreNotAvailable)
3704        }
3705    }
3706
3707    #[instrument(level = "trace", skip_all)]
3708    pub fn get_dynamic_field_object_id(
3709        &self,
3710        owner: ObjectID,
3711        name_type: TypeTag,
3712        name_bcs_bytes: &[u8],
3713    ) -> IotaResult<Option<ObjectID>> {
3714        if let Some(indexes) = &self.indexes {
3715            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3716        } else {
3717            Err(IotaError::IndexStoreNotAvailable)
3718        }
3719    }
3720
3721    #[instrument(level = "trace", skip_all)]
3722    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3723        Ok(self.get_indexes()?.next_sequence_number())
3724    }
3725
3726    #[instrument(level = "trace", skip_all)]
3727    pub async fn get_executed_transaction_and_effects(
3728        &self,
3729        digest: TransactionDigest,
3730        kv_store: Arc<TransactionKeyValueStore>,
3731    ) -> IotaResult<(Transaction, TransactionEffects)> {
3732        let transaction = kv_store.get_tx(digest).await?;
3733        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3734        Ok((transaction, effects))
3735    }
3736
3737    #[instrument(level = "trace", skip_all)]
3738    pub fn multi_get_checkpoint_by_sequence_number(
3739        &self,
3740        sequence_numbers: &[CheckpointSequenceNumber],
3741    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3742        Ok(self
3743            .checkpoint_store
3744            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3745    }
3746
3747    #[instrument(level = "trace", skip_all)]
3748    pub fn get_transaction_events(
3749        &self,
3750        digest: &TransactionEventsDigest,
3751    ) -> IotaResult<TransactionEvents> {
3752        self.get_transaction_cache_reader()
3753            .try_get_events(digest)?
3754            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3755    }
3756
3757    pub fn get_transaction_input_objects(
3758        &self,
3759        effects: &TransactionEffects,
3760    ) -> anyhow::Result<Vec<Object>> {
3761        let input_object_keys = effects
3762            .modified_at_versions()
3763            .into_iter()
3764            .map(|(object_id, version)| ObjectKey(object_id, version))
3765            .collect::<Vec<_>>();
3766
3767        let input_objects = self
3768            .get_object_store()
3769            .try_multi_get_objects_by_key(&input_object_keys)?
3770            .into_iter()
3771            .enumerate()
3772            .map(|(idx, maybe_object)| {
3773                maybe_object.ok_or_else(|| {
3774                    anyhow::anyhow!(
3775                        "missing input object key {:?} from tx {}",
3776                        input_object_keys[idx],
3777                        effects.transaction_digest()
3778                    )
3779                })
3780            })
3781            .collect::<anyhow::Result<Vec<_>>>()?;
3782        Ok(input_objects)
3783    }
3784
3785    pub fn get_transaction_output_objects(
3786        &self,
3787        effects: &TransactionEffects,
3788    ) -> anyhow::Result<Vec<Object>> {
3789        let output_object_keys = effects
3790            .all_changed_objects()
3791            .into_iter()
3792            .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
3793            .collect::<Vec<_>>();
3794
3795        let output_objects = self
3796            .get_object_store()
3797            .try_multi_get_objects_by_key(&output_object_keys)?
3798            .into_iter()
3799            .enumerate()
3800            .map(|(idx, maybe_object)| {
3801                maybe_object.ok_or_else(|| {
3802                    anyhow::anyhow!(
3803                        "missing output object key {:?} from tx {}",
3804                        output_object_keys[idx],
3805                        effects.transaction_digest()
3806                    )
3807                })
3808            })
3809            .collect::<anyhow::Result<Vec<_>>>()?;
3810        Ok(output_objects)
3811    }
3812
3813    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3814        match &self.indexes {
3815            Some(i) => Ok(i.clone()),
3816            None => Err(IotaError::UnsupportedFeature {
3817                error: "extended object indexing is not enabled on this server".into(),
3818            }),
3819        }
3820    }
3821
3822    pub async fn get_transactions_for_tests(
3823        self: &Arc<Self>,
3824        filter: Option<TransactionFilter>,
3825        cursor: Option<TransactionDigest>,
3826        limit: Option<usize>,
3827        reverse: bool,
3828    ) -> IotaResult<Vec<TransactionDigest>> {
3829        let metrics = KeyValueStoreMetrics::new_for_tests();
3830        let kv_store = Arc::new(TransactionKeyValueStore::new(
3831            "rocksdb",
3832            metrics,
3833            self.clone(),
3834        ));
3835        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3836            .await
3837    }
3838
3839    #[instrument(level = "trace", skip_all)]
3840    pub async fn get_transactions(
3841        &self,
3842        kv_store: &Arc<TransactionKeyValueStore>,
3843        filter: Option<TransactionFilter>,
3844        // If `Some`, the query will start from the next item after the specified cursor
3845        cursor: Option<TransactionDigest>,
3846        limit: Option<usize>,
3847        reverse: bool,
3848    ) -> IotaResult<Vec<TransactionDigest>> {
3849        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3850            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3851            let iter = checkpoint_contents.iter().map(|c| c.transaction);
3852            if reverse {
3853                let iter = iter
3854                    .rev()
3855                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3856                    .skip(usize::from(cursor.is_some()));
3857                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3858            } else {
3859                let iter = iter
3860                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3861                    .skip(usize::from(cursor.is_some()));
3862                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3863            }
3864        }
3865        self.get_indexes()?
3866            .get_transactions(filter, cursor, limit, reverse)
3867    }
3868
3869    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
3870        &self.checkpoint_store
3871    }
3872
3873    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
3874        self.get_checkpoint_store()
3875            .get_highest_executed_checkpoint_seq_number()?
3876            .ok_or(IotaError::UserInput {
3877                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
3878            })
3879    }
3880
3881    #[cfg(msim)]
3882    pub fn get_highest_pruned_checkpoint_for_testing(
3883        &self,
3884    ) -> IotaResult<CheckpointSequenceNumber> {
3885        self.database_for_testing()
3886            .perpetual_tables
3887            .get_highest_pruned_checkpoint()
3888    }
3889
3890    #[instrument(level = "trace", skip_all)]
3891    pub fn get_checkpoint_summary_by_sequence_number(
3892        &self,
3893        sequence_number: CheckpointSequenceNumber,
3894    ) -> IotaResult<CheckpointSummary> {
3895        let verified_checkpoint = self
3896            .get_checkpoint_store()
3897            .get_checkpoint_by_sequence_number(sequence_number)?;
3898        match verified_checkpoint {
3899            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3900            None => Err(IotaError::UserInput {
3901                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3902            }),
3903        }
3904    }
3905
3906    #[instrument(level = "trace", skip_all)]
3907    pub fn get_checkpoint_summary_by_digest(
3908        &self,
3909        digest: CheckpointDigest,
3910    ) -> IotaResult<CheckpointSummary> {
3911        let verified_checkpoint = self
3912            .get_checkpoint_store()
3913            .get_checkpoint_by_digest(&digest)?;
3914        match verified_checkpoint {
3915            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3916            None => Err(IotaError::UserInput {
3917                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3918            }),
3919        }
3920    }
3921
3922    #[instrument(level = "trace", skip_all)]
3923    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
3924        if is_system_package(package_id) {
3925            return self.find_genesis_txn_digest();
3926        }
3927        Ok(self
3928            .get_object_read(&package_id)?
3929            .into_object()?
3930            .previous_transaction)
3931    }
3932
3933    #[instrument(level = "trace", skip_all)]
3934    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
3935        let summary = self
3936            .get_verified_checkpoint_by_sequence_number(0)?
3937            .into_message();
3938        let content = self.get_checkpoint_contents(summary.content_digest)?;
3939        let genesis_transaction = content.enumerate_transactions(&summary).next();
3940        Ok(genesis_transaction
3941            .ok_or(IotaError::UserInput {
3942                error: UserInputError::GenesisTransactionNotFound,
3943            })?
3944            .1
3945            .transaction)
3946    }
3947
3948    #[instrument(level = "trace", skip_all)]
3949    pub fn get_verified_checkpoint_by_sequence_number(
3950        &self,
3951        sequence_number: CheckpointSequenceNumber,
3952    ) -> IotaResult<VerifiedCheckpoint> {
3953        let verified_checkpoint = self
3954            .get_checkpoint_store()
3955            .get_checkpoint_by_sequence_number(sequence_number)?;
3956        match verified_checkpoint {
3957            Some(verified_checkpoint) => Ok(verified_checkpoint),
3958            None => Err(IotaError::UserInput {
3959                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3960            }),
3961        }
3962    }
3963
3964    #[instrument(level = "trace", skip_all)]
3965    pub fn get_verified_checkpoint_summary_by_digest(
3966        &self,
3967        digest: CheckpointDigest,
3968    ) -> IotaResult<VerifiedCheckpoint> {
3969        let verified_checkpoint = self
3970            .get_checkpoint_store()
3971            .get_checkpoint_by_digest(&digest)?;
3972        match verified_checkpoint {
3973            Some(verified_checkpoint) => Ok(verified_checkpoint),
3974            None => Err(IotaError::UserInput {
3975                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3976            }),
3977        }
3978    }
3979
3980    #[instrument(level = "trace", skip_all)]
3981    pub fn get_checkpoint_contents(
3982        &self,
3983        digest: CheckpointContentsDigest,
3984    ) -> IotaResult<CheckpointContents> {
3985        self.get_checkpoint_store()
3986            .get_checkpoint_contents(&digest)?
3987            .ok_or(IotaError::UserInput {
3988                error: UserInputError::CheckpointContentsNotFound(digest),
3989            })
3990    }
3991
3992    #[instrument(level = "trace", skip_all)]
3993    pub fn get_checkpoint_contents_by_sequence_number(
3994        &self,
3995        sequence_number: CheckpointSequenceNumber,
3996    ) -> IotaResult<CheckpointContents> {
3997        let verified_checkpoint = self
3998            .get_checkpoint_store()
3999            .get_checkpoint_by_sequence_number(sequence_number)?;
4000        match verified_checkpoint {
4001            Some(verified_checkpoint) => {
4002                let content_digest = verified_checkpoint.into_inner().content_digest;
4003                self.get_checkpoint_contents(content_digest)
4004            }
4005            None => Err(IotaError::UserInput {
4006                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4007            }),
4008        }
4009    }
4010
4011    #[instrument(level = "trace", skip_all)]
4012    pub async fn query_events(
4013        &self,
4014        kv_store: &Arc<TransactionKeyValueStore>,
4015        query: EventFilter,
4016        // If `Some`, the query will start from the next item after the specified cursor
4017        cursor: Option<EventID>,
4018        limit: usize,
4019        descending: bool,
4020    ) -> IotaResult<Vec<IotaEvent>> {
4021        let index_store = self.get_indexes()?;
4022
4023        // Get the tx_num from tx_digest
4024        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4025            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4026                IotaError::TransactionNotFound {
4027                    digest: cursor.tx_digest,
4028                },
4029            )?;
4030            (tx_seq, cursor.event_seq as usize)
4031        } else if descending {
4032            (u64::MAX, usize::MAX)
4033        } else {
4034            (0, 0)
4035        };
4036
4037        let limit = limit + 1;
4038        let mut event_keys = match query {
4039            EventFilter::All(filters) => {
4040                if filters.is_empty() {
4041                    index_store.all_events(tx_num, event_num, limit, descending)?
4042                } else {
4043                    return Err(IotaError::UserInput {
4044                        error: UserInputError::Unsupported(
4045                            "This query type does not currently support filter combinations"
4046                                .to_string(),
4047                        ),
4048                    });
4049                }
4050            }
4051            EventFilter::Transaction(digest) => {
4052                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4053            }
4054            EventFilter::MoveModule { package, module } => {
4055                let module_id = ModuleId::new(package.into(), module);
4056                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4057            }
4058            EventFilter::MoveEventType(struct_name) => index_store
4059                .events_by_move_event_struct_name(
4060                    &struct_name,
4061                    tx_num,
4062                    event_num,
4063                    limit,
4064                    descending,
4065                )?,
4066            EventFilter::Sender(sender) => {
4067                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4068            }
4069            EventFilter::TimeRange {
4070                start_time,
4071                end_time,
4072            } => index_store
4073                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4074            EventFilter::MoveEventModule { package, module } => index_store
4075                .events_by_move_event_module(
4076                    &ModuleId::new(package.into(), module),
4077                    tx_num,
4078                    event_num,
4079                    limit,
4080                    descending,
4081                )?,
4082            // not using "_ =>" because we want to make sure we remember to add new variants here
4083            EventFilter::Package(_)
4084            | EventFilter::MoveEventField { .. }
4085            | EventFilter::Any(_)
4086            | EventFilter::And(_, _)
4087            | EventFilter::Or(_, _) => {
4088                return Err(IotaError::UserInput {
4089                    error: UserInputError::Unsupported(
4090                        "This query type is not supported by the full node.".to_string(),
4091                    ),
4092                });
4093            }
4094        };
4095
4096        // skip one event if exclusive cursor is provided,
4097        // otherwise truncate to the original limit.
4098        if cursor.is_some() {
4099            if !event_keys.is_empty() {
4100                event_keys.remove(0);
4101            }
4102        } else {
4103            event_keys.truncate(limit - 1);
4104        }
4105
4106        // get the unique set of digests from the event_keys
4107        let transaction_digests = event_keys
4108            .iter()
4109            .map(|(_, digest, _, _)| *digest)
4110            .collect::<HashSet<_>>()
4111            .into_iter()
4112            .collect::<Vec<_>>();
4113
4114        let events = kv_store
4115            .multi_get_events_by_tx_digests(&transaction_digests)
4116            .await?;
4117
4118        let events_map: HashMap<_, _> =
4119            transaction_digests.iter().zip(events.into_iter()).collect();
4120
4121        let stored_events = event_keys
4122            .into_iter()
4123            .map(|k| {
4124                (
4125                    k,
4126                    events_map
4127                        .get(&k.1)
4128                        .expect("fetched digest is missing")
4129                        .clone()
4130                        .and_then(|e| e.data.get(k.2).cloned()),
4131                )
4132            })
4133            .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4134                event
4135                    .map(|e| (e, tx_digest, event_seq, timestamp))
4136                    .ok_or(IotaError::TransactionEventsNotFound { digest })
4137            })
4138            .collect::<Result<Vec<_>, _>>()?;
4139
4140        let epoch_store = self.load_epoch_store_one_call_per_task();
4141        let backing_store = self.get_backing_package_store().as_ref();
4142        let mut layout_resolver = epoch_store
4143            .executor()
4144            .type_layout_resolver(Box::new(backing_store));
4145        let mut events = vec![];
4146        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4147            events.push(IotaEvent::try_from(
4148                e.clone(),
4149                tx_digest,
4150                event_seq as u64,
4151                Some(timestamp),
4152                layout_resolver.get_annotated_layout(&e.type_)?,
4153            )?)
4154        }
4155        Ok(events)
4156    }
4157
4158    pub async fn insert_genesis_object(&self, object: Object) {
4159        self.get_reconfig_api()
4160            .try_insert_genesis_object(object)
4161            .expect("Cannot insert genesis object")
4162    }
4163
4164    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4165        futures::future::join_all(
4166            objects
4167                .iter()
4168                .map(|o| self.insert_genesis_object(o.clone())),
4169        )
4170        .await;
4171    }
4172
4173    /// Make a status response for a transaction
4174    #[instrument(level = "trace", skip_all)]
4175    pub fn get_transaction_status(
4176        &self,
4177        transaction_digest: &TransactionDigest,
4178        epoch_store: &Arc<AuthorityPerEpochStore>,
4179    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4180        // TODO: In the case of read path, we should not have to re-sign the effects.
4181        if let Some(effects) =
4182            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4183        {
4184            if let Some(transaction) = self
4185                .get_transaction_cache_reader()
4186                .try_get_transaction_block(transaction_digest)?
4187            {
4188                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4189                let events = if let Some(digest) = effects.events_digest() {
4190                    self.get_transaction_events(digest)?
4191                } else {
4192                    TransactionEvents::default()
4193                };
4194                return Ok(Some((
4195                    (*transaction).clone().into_message(),
4196                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4197                )));
4198            } else {
4199                // The read of effects and read of transaction are not atomic. It's possible
4200                // that we reverted the transaction (during epoch change) in
4201                // between the above two reads, and we end up having effects but
4202                // not transaction. In this case, we just fall through.
4203                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4204            }
4205        }
4206        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4207            self.metrics.tx_already_processed.inc();
4208            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4209            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4210        } else {
4211            Ok(None)
4212        }
4213    }
4214
4215    /// Get the signed effects of the given transaction. If the effects was
4216    /// signed in a previous epoch, re-sign it so that the caller is able to
4217    /// form a cert of the effects in the current epoch.
4218    #[instrument(level = "trace", skip_all)]
4219    pub fn get_signed_effects_and_maybe_resign(
4220        &self,
4221        transaction_digest: &TransactionDigest,
4222        epoch_store: &Arc<AuthorityPerEpochStore>,
4223    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4224        let effects = self
4225            .get_transaction_cache_reader()
4226            .try_get_executed_effects(transaction_digest)?;
4227        match effects {
4228            Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4229            None => Ok(None),
4230        }
4231    }
4232
4233    #[instrument(level = "trace", skip_all)]
4234    pub(crate) fn sign_effects(
4235        &self,
4236        effects: TransactionEffects,
4237        epoch_store: &Arc<AuthorityPerEpochStore>,
4238    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4239        let tx_digest = *effects.transaction_digest();
4240        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4241            Some(sig) if sig.epoch == epoch_store.epoch() => {
4242                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4243            }
4244            _ => {
4245                // If the transaction was executed in previous epochs, the validator will
4246                // re-sign the effects with new current epoch so that a client is always able to
4247                // obtain an effects certificate at the current epoch.
4248                //
4249                // Why is this necessary? Consider the following case:
4250                // - assume there are 4 validators
4251                // - Quorum driver gets 2 signed effects before reconfig halt
4252                // - The tx makes it into final checkpoint.
4253                // - 2 validators go away and are replaced in the new epoch.
4254                // - The new epoch begins.
4255                // - The quorum driver cannot complete the partial effects cert from the
4256                //   previous epoch, because it may not be able to reach either of the 2 former
4257                //   validators.
4258                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4259                //   the new epoch, the QD can make a new effects cert and return it to the
4260                //   client.
4261                //
4262                // This is a considered a short-term workaround. Eventually, Quorum Driver
4263                // should be able to return either an effects certificate, -or-
4264                // a proof of inclusion in a checkpoint. In the case above, the
4265                // Quorum Driver would return a proof of inclusion in the final
4266                // checkpoint, and this code would no longer be necessary.
4267                debug!(
4268                    ?tx_digest,
4269                    epoch=?epoch_store.epoch(),
4270                    "Re-signing the effects with the current epoch"
4271                );
4272
4273                let sig = AuthoritySignInfo::new(
4274                    epoch_store.epoch(),
4275                    &effects,
4276                    Intent::iota_app(IntentScope::TransactionEffects),
4277                    self.name,
4278                    &*self.secret,
4279                );
4280
4281                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4282
4283                epoch_store.insert_effects_digest_and_signature(
4284                    &tx_digest,
4285                    effects.digest(),
4286                    &sig,
4287                )?;
4288
4289                effects
4290            }
4291        };
4292
4293        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4294            signed_effects,
4295        ))
4296    }
4297
4298    // Returns coin objects for indexing for fullnode if indexing is enabled.
4299    #[instrument(level = "trace", skip_all)]
4300    fn fullnode_only_get_tx_coins_for_indexing(
4301        &self,
4302        inner_temporary_store: &InnerTemporaryStore,
4303        epoch_store: &Arc<AuthorityPerEpochStore>,
4304    ) -> Option<TxCoins> {
4305        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4306            return None;
4307        }
4308        let written_coin_objects = inner_temporary_store
4309            .written
4310            .iter()
4311            .filter_map(|(k, v)| {
4312                if v.is_coin() {
4313                    Some((*k, v.clone()))
4314                } else {
4315                    None
4316                }
4317            })
4318            .collect();
4319        let input_coin_objects = inner_temporary_store
4320            .input_objects
4321            .iter()
4322            .filter_map(|(k, v)| {
4323                if v.is_coin() {
4324                    Some((*k, v.clone()))
4325                } else {
4326                    None
4327                }
4328            })
4329            .collect::<ObjectMap>();
4330        Some((input_coin_objects, written_coin_objects))
4331    }
4332
4333    /// Get the TransactionEnvelope that currently locks the given object, if
4334    /// any. Since object locks are only valid for one epoch, we also need
4335    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4336    /// no lock records for the given object can be found.
4337    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4338    /// object record is at a different version.
4339    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4340    /// certain transaction. Returns None if the a lock record is
4341    /// initialized for the given ObjectRef but not yet locked by any
4342    /// transaction,     or cannot find the transaction in transaction
4343    /// table, because of data race etc.
4344    #[instrument(level = "trace", skip_all)]
4345    pub async fn get_transaction_lock(
4346        &self,
4347        object_ref: &ObjectRef,
4348        epoch_store: &AuthorityPerEpochStore,
4349    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4350        let lock_info = self
4351            .get_object_cache_reader()
4352            .try_get_lock(*object_ref, epoch_store)?;
4353        let lock_info = match lock_info {
4354            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4355                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4356                    provided_obj_ref: *object_ref,
4357                    current_version: locked_ref.1,
4358                }
4359                .into());
4360            }
4361            ObjectLockStatus::Initialized => {
4362                return Ok(None);
4363            }
4364            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4365        };
4366
4367        epoch_store.get_signed_transaction(&lock_info)
4368    }
4369
4370    pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4371        self.get_object_cache_reader().try_get_objects(objects)
4372    }
4373
4374    /// Non-fallible version of `try_get_objects`.
4375    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4376        self.try_get_objects(objects)
4377            .await
4378            .expect("storage access failed")
4379    }
4380
4381    pub async fn try_get_object_or_tombstone(
4382        &self,
4383        object_id: ObjectID,
4384    ) -> IotaResult<Option<ObjectRef>> {
4385        self.get_object_cache_reader()
4386            .try_get_latest_object_ref_or_tombstone(object_id)
4387    }
4388
4389    /// Non-fallible version of `try_get_object_or_tombstone`.
4390    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4391        self.try_get_object_or_tombstone(object_id)
4392            .await
4393            .expect("storage access failed")
4394    }
4395
4396    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4397    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4398    /// upgrade.
4399    ///
4400    /// This method can be used to dynamic adjust the amount of buffer. If set
4401    /// to 0, the upgrade will go through with only 2f+1 votes.
4402    ///
4403    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4404    /// should have the same value), or you risk halting the chain.
4405    pub fn set_override_protocol_upgrade_buffer_stake(
4406        &self,
4407        expected_epoch: EpochId,
4408        buffer_stake_bps: u64,
4409    ) -> IotaResult {
4410        let epoch_store = self.load_epoch_store_one_call_per_task();
4411        let actual_epoch = epoch_store.epoch();
4412        if actual_epoch != expected_epoch {
4413            return Err(IotaError::WrongEpoch {
4414                expected_epoch,
4415                actual_epoch,
4416            });
4417        }
4418
4419        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4420    }
4421
4422    pub fn clear_override_protocol_upgrade_buffer_stake(
4423        &self,
4424        expected_epoch: EpochId,
4425    ) -> IotaResult {
4426        let epoch_store = self.load_epoch_store_one_call_per_task();
4427        let actual_epoch = epoch_store.epoch();
4428        if actual_epoch != expected_epoch {
4429            return Err(IotaError::WrongEpoch {
4430                expected_epoch,
4431                actual_epoch,
4432            });
4433        }
4434
4435        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4436    }
4437
4438    /// Get the set of system packages that are compiled in to this build, if
4439    /// those packages are compatible with the current versions of those
4440    /// packages on-chain.
4441    pub async fn get_available_system_packages(
4442        &self,
4443        binary_config: &BinaryConfig,
4444    ) -> Vec<ObjectRef> {
4445        let mut results = vec![];
4446
4447        let system_packages = BuiltInFramework::iter_system_packages();
4448
4449        // Add extra framework packages during simtest
4450        #[cfg(msim)]
4451        let extra_packages = framework_injection::get_extra_packages(self.name);
4452        #[cfg(msim)]
4453        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4454
4455        for system_package in system_packages {
4456            let modules = system_package.modules().to_vec();
4457            // In simtests, we could override the current built-in framework packages.
4458            #[cfg(msim)]
4459            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4460                .unwrap_or(modules);
4461
4462            let Some(obj_ref) = iota_framework::compare_system_package(
4463                &self.get_object_store(),
4464                &system_package.id,
4465                &modules,
4466                system_package.dependencies.to_vec(),
4467                binary_config,
4468            )
4469            .await
4470            else {
4471                return vec![];
4472            };
4473            results.push(obj_ref);
4474        }
4475
4476        results
4477    }
4478
4479    /// Return the new versions, module bytes, and dependencies for the packages
4480    /// that have been committed to for a framework upgrade, in
4481    /// `system_packages`.  Loads the module contents from the binary, and
4482    /// performs the following checks:
4483    ///
4484    /// - Whether its contents matches what is on-chain already, in which case
4485    ///   no upgrade is required, and its contents are omitted from the output.
4486    /// - Whether the contents in the binary can form a package whose digest
4487    ///   matches the input, meaning the framework will be upgraded, and this
4488    ///   authority can satisfy that upgrade, in which case the contents are
4489    ///   included in the output.
4490    ///
4491    /// If the current version of the framework can't be loaded, the binary does
4492    /// not contain the bytes for that framework ID, or the resulting
4493    /// package fails the digest check, `None` is returned indicating that
4494    /// this authority cannot run the upgrade that the network voted on.
4495    async fn get_system_package_bytes(
4496        &self,
4497        system_packages: Vec<ObjectRef>,
4498        binary_config: &BinaryConfig,
4499    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4500        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4501        let objects = self.get_objects(&ids).await;
4502
4503        let mut res = Vec::with_capacity(system_packages.len());
4504        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4505            let prev_transaction = match object {
4506                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4507                    // Skip this one because it doesn't need to be upgraded.
4508                    info!("Framework {} does not need updating", system_package_ref.0);
4509                    continue;
4510                }
4511
4512                Some(cur_object) => cur_object.previous_transaction,
4513                None => TransactionDigest::genesis_marker(),
4514            };
4515
4516            #[cfg(msim)]
4517            let SystemPackage {
4518                id: _,
4519                bytes,
4520                dependencies,
4521            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4522                .unwrap_or_else(|| {
4523                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4524                });
4525
4526            #[cfg(not(msim))]
4527            let SystemPackage {
4528                id: _,
4529                bytes,
4530                dependencies,
4531            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4532
4533            let modules: Vec<_> = bytes
4534                .iter()
4535                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4536                .collect();
4537
4538            let new_object = Object::new_system_package(
4539                &modules,
4540                system_package_ref.1,
4541                dependencies.clone(),
4542                prev_transaction,
4543            );
4544
4545            let new_ref = new_object.compute_object_reference();
4546            if new_ref != system_package_ref {
4547                error!(
4548                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4549                );
4550                return None;
4551            }
4552
4553            res.push((system_package_ref.1, bytes, dependencies));
4554        }
4555
4556        Some(res)
4557    }
4558
4559    /// Returns the new protocol version and system packages that the network
4560    /// has voted to upgrade to. If the proposed protocol version is not
4561    /// supported, None is returned.
4562    fn is_protocol_version_supported_v1(
4563        proposed_protocol_version: ProtocolVersion,
4564        committee: &Committee,
4565        capabilities: Vec<AuthorityCapabilitiesV1>,
4566        mut buffer_stake_bps: u64,
4567    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4568        if buffer_stake_bps > 10000 {
4569            warn!("clamping buffer_stake_bps to 10000");
4570            buffer_stake_bps = 10000;
4571        }
4572
4573        // For each validator, gather the protocol version and system packages that it
4574        // would like to upgrade to in the next epoch.
4575        let mut desired_upgrades: Vec<_> = capabilities
4576            .into_iter()
4577            .filter_map(|mut cap| {
4578                // A validator that lists no packages is voting against any change at all.
4579                if cap.available_system_packages.is_empty() {
4580                    return None;
4581                }
4582
4583                cap.available_system_packages.sort();
4584
4585                info!(
4586                    "validator {:?} supports {:?} with system packages: {:?}",
4587                    cap.authority.concise(),
4588                    cap.supported_protocol_versions,
4589                    cap.available_system_packages,
4590                );
4591
4592                // A validator that only supports the current protocol version is also voting
4593                // against any change, because framework upgrades always require a protocol
4594                // version bump.
4595                cap.supported_protocol_versions
4596                    .get_version_digest(proposed_protocol_version)
4597                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4598            })
4599            .collect();
4600
4601        // There can only be one set of votes that have a majority, find one if it
4602        // exists.
4603        desired_upgrades.sort();
4604        desired_upgrades
4605            .into_iter()
4606            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4607            .into_iter()
4608            .find_map(|((digest, packages), group)| {
4609                // should have been filtered out earlier.
4610                assert!(!packages.is_empty());
4611
4612                let mut stake_aggregator: StakeAggregator<(), true> =
4613                    StakeAggregator::new(Arc::new(committee.clone()));
4614
4615                for (_, _, authority) in group {
4616                    stake_aggregator.insert_generic(authority, ());
4617                }
4618
4619                let total_votes = stake_aggregator.total_votes();
4620                let quorum_threshold = committee.quorum_threshold();
4621                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4622
4623                info!(
4624                    protocol_config_digest = ?digest,
4625                    ?total_votes,
4626                    ?quorum_threshold,
4627                    ?buffer_stake_bps,
4628                    ?effective_threshold,
4629                    ?proposed_protocol_version,
4630                    ?packages,
4631                    "support for upgrade"
4632                );
4633
4634                let has_support = total_votes >= effective_threshold;
4635                has_support.then_some((proposed_protocol_version, digest, packages))
4636            })
4637    }
4638
4639    /// Selects the highest supported protocol version and system packages that
4640    /// the network has voted to upgrade to. If no upgrade is supported,
4641    /// returns the current protocol version and system packages.
4642    fn choose_protocol_version_and_system_packages_v1(
4643        current_protocol_version: ProtocolVersion,
4644        current_protocol_digest: Digest,
4645        committee: &Committee,
4646        capabilities: Vec<AuthorityCapabilitiesV1>,
4647        buffer_stake_bps: u64,
4648    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4649        let mut next_protocol_version = current_protocol_version;
4650        let mut system_packages = vec![];
4651        let mut protocol_version_digest = current_protocol_digest;
4652
4653        // Finds the highest supported protocol version and system packages by
4654        // incrementing the proposed protocol version by one until no further
4655        // upgrades are supported.
4656        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4657            next_protocol_version + 1,
4658            committee,
4659            capabilities.clone(),
4660            buffer_stake_bps,
4661        ) {
4662            next_protocol_version = version;
4663            protocol_version_digest = digest;
4664            system_packages = packages;
4665        }
4666
4667        (
4668            next_protocol_version,
4669            protocol_version_digest,
4670            system_packages,
4671        )
4672    }
4673
4674    /// Returns the indices of validators that support the given protocol
4675    /// version and digest. This includes both committee and non-committee
4676    /// validators based on their capabilities. Uses active validators
4677    /// instead of committee indices.
4678    fn get_validators_supporting_protocol_version(
4679        target_protocol_version: ProtocolVersion,
4680        target_digest: Digest,
4681        active_validators: &[AuthorityPublicKey],
4682        capabilities: &[AuthorityCapabilitiesV1],
4683    ) -> Vec<u64> {
4684        let mut eligible_validators = Vec::new();
4685
4686        for capability in capabilities {
4687            // Check if this validator supports the target protocol version and digest
4688            if let Some(digest) = capability
4689                .supported_protocol_versions
4690                .get_version_digest(target_protocol_version)
4691            {
4692                if digest == target_digest {
4693                    // Find the validator's index in the active validators list
4694                    if let Some(index) = active_validators
4695                        .iter()
4696                        .position(|name| AuthorityName::from(name) == capability.authority)
4697                    {
4698                        eligible_validators.push(index as u64);
4699                    }
4700                }
4701            }
4702        }
4703
4704        // Sort indices for deterministic behavior
4705        eligible_validators.sort();
4706        eligible_validators
4707    }
4708
4709    /// Calculates the sum of weights for eligible validators that are part of
4710    /// the committee. Takes the indices from
4711    /// get_validators_supporting_protocol_version and maps them back
4712    /// to committee members to get their weights.
4713    fn calculate_eligible_validators_weight(
4714        eligible_validator_indices: &[u64],
4715        active_validators: &[AuthorityPublicKey],
4716        committee: &Committee,
4717    ) -> u64 {
4718        let mut total_weight = 0u64;
4719
4720        for &index in eligible_validator_indices {
4721            let authority_pubkey = &active_validators[index as usize];
4722            // Check if this validator is in the committee and get their weight
4723            if let Some((_, weight)) = committee
4724                .members()
4725                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4726            {
4727                total_weight += weight;
4728            }
4729        }
4730
4731        total_weight
4732    }
4733
4734    #[instrument(level = "debug", skip_all)]
4735    fn create_authenticator_state_tx(
4736        &self,
4737        epoch_store: &Arc<AuthorityPerEpochStore>,
4738    ) -> Option<EndOfEpochTransactionKind> {
4739        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4740            info!("authenticator state transactions not enabled");
4741            return None;
4742        }
4743
4744        let authenticator_state_exists = epoch_store.authenticator_state_exists();
4745        let tx = if authenticator_state_exists {
4746            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4747            let min_epoch =
4748                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4749            let authenticator_obj_initial_shared_version = epoch_store
4750                .epoch_start_config()
4751                .authenticator_obj_initial_shared_version()
4752                .expect("initial version must exist");
4753
4754            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4755                min_epoch,
4756                authenticator_obj_initial_shared_version,
4757            );
4758
4759            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4760
4761            tx
4762        } else {
4763            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4764            info!("Creating AuthenticatorStateCreate tx");
4765            tx
4766        };
4767        Some(tx)
4768    }
4769
4770    /// Creates and execute the advance epoch transaction to effects without
4771    /// committing it to the database. The effects of the change epoch tx
4772    /// are only written to the database after a certified checkpoint has been
4773    /// formed and executed by CheckpointExecutor.
4774    ///
4775    /// When a framework upgraded has been decided on, but the validator does
4776    /// not have the new versions of the packages locally, the validator
4777    /// cannot form the ChangeEpochTx. In this case it returns Err,
4778    /// indicating that the checkpoint builder should give up trying to make the
4779    /// final checkpoint. As long as the network is able to create a certified
4780    /// checkpoint (which should be ensured by the capabilities vote), it
4781    /// will arrive via state sync and be executed by CheckpointExecutor.
4782    #[instrument(level = "error", skip_all)]
4783    pub async fn create_and_execute_advance_epoch_tx(
4784        &self,
4785        epoch_store: &Arc<AuthorityPerEpochStore>,
4786        gas_cost_summary: &GasCostSummary,
4787        checkpoint: CheckpointSequenceNumber,
4788        epoch_start_timestamp_ms: CheckpointTimestamp,
4789    ) -> anyhow::Result<(
4790        IotaSystemState,
4791        Option<SystemEpochInfoEvent>,
4792        TransactionEffects,
4793    )> {
4794        let mut txns = Vec::new();
4795
4796        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4797            txns.push(tx);
4798        }
4799
4800        let next_epoch = epoch_store.epoch() + 1;
4801
4802        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4803        let authority_capabilities = epoch_store
4804            .get_capabilities_v1()
4805            .expect("read capabilities from db cannot fail");
4806        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4807            Self::choose_protocol_version_and_system_packages_v1(
4808                epoch_store.protocol_version(),
4809                SupportedProtocolVersionsWithHashes::protocol_config_digest(
4810                    epoch_store.protocol_config(),
4811                ),
4812                epoch_store.committee(),
4813                authority_capabilities.clone(),
4814                buffer_stake_bps,
4815            );
4816
4817        // since system packages are created during the current epoch, they should abide
4818        // by the rules of the current epoch, including the current epoch's max
4819        // Move binary format version
4820        let config = epoch_store.protocol_config();
4821        let binary_config = to_binary_config(config);
4822        let Some(next_epoch_system_package_bytes) = self
4823            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4824            .await
4825        else {
4826            error!(
4827                "upgraded system packages {:?} are not locally available, cannot create \
4828                ChangeEpochTx. validator binary must be upgraded to the correct version!",
4829                next_epoch_system_packages
4830            );
4831            // the checkpoint builder will keep retrying forever when it hits this error.
4832            // Eventually, one of two things will happen:
4833            // - The operator will upgrade this binary to one that has the new packages
4834            //   locally, and this function will succeed.
4835            // - The final checkpoint will be certified by other validators, we will receive
4836            //   it via state sync, and execute it. This will upgrade the framework
4837            //   packages, reconfigure, and most likely shut down in the new epoch (this
4838            //   validator likely doesn't support the new protocol version, or else it
4839            //   should have had the packages.)
4840            bail!("missing system packages: cannot form ChangeEpochTx");
4841        };
4842
4843        // Use ChangeEpochV3 when the feature flag is enabled and ChangeEpochV2
4844        // requirements are met
4845
4846        if config.select_committee_from_eligible_validators() {
4847            // Get the list of eligible validators that support the target protocol version
4848            let active_validators = epoch_store.epoch_start_state().get_active_validators();
4849
4850            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
4851
4852            // Use validators supporting the target protocol version as eligible validators
4853            // in the next version if select_committee_supporting_next_epoch_version feature
4854            // flag is set to true.
4855            if config.select_committee_supporting_next_epoch_version() {
4856                eligible_active_validators = Self::get_validators_supporting_protocol_version(
4857                    next_epoch_protocol_version,
4858                    next_epoch_protocol_digest,
4859                    &active_validators,
4860                    &authority_capabilities,
4861                );
4862
4863                // Calculate the total weight of eligible validators in the committee
4864                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
4865                    &eligible_active_validators,
4866                    &active_validators,
4867                    epoch_store.committee(),
4868                );
4869
4870                // Safety check: ensure eligible validators have enough stake
4871                // Use the same effective threshold calculation that was used to decide the
4872                // protocol version
4873                let committee = epoch_store.committee();
4874                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4875
4876                if eligible_validators_weight < effective_threshold {
4877                    error!(
4878                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
4879                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
4880                    );
4881                    // Pass all active validator indices as eligible validators
4882                    // to perform selection among all of them.
4883                    eligible_active_validators = (0..active_validators.len() as u64).collect();
4884                }
4885            }
4886
4887            txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
4888                next_epoch,
4889                next_epoch_protocol_version,
4890                gas_cost_summary.storage_cost,
4891                gas_cost_summary.computation_cost,
4892                gas_cost_summary.computation_cost_burned,
4893                gas_cost_summary.storage_rebate,
4894                gas_cost_summary.non_refundable_storage_fee,
4895                epoch_start_timestamp_ms,
4896                next_epoch_system_package_bytes,
4897                eligible_active_validators,
4898            ));
4899        } else if config.protocol_defined_base_fee()
4900            && config.max_committee_members_count_as_option().is_some()
4901        {
4902            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
4903                next_epoch,
4904                next_epoch_protocol_version,
4905                gas_cost_summary.storage_cost,
4906                gas_cost_summary.computation_cost,
4907                gas_cost_summary.computation_cost_burned,
4908                gas_cost_summary.storage_rebate,
4909                gas_cost_summary.non_refundable_storage_fee,
4910                epoch_start_timestamp_ms,
4911                next_epoch_system_package_bytes,
4912            ));
4913        } else {
4914            txns.push(EndOfEpochTransactionKind::new_change_epoch(
4915                next_epoch,
4916                next_epoch_protocol_version,
4917                gas_cost_summary.storage_cost,
4918                gas_cost_summary.computation_cost,
4919                gas_cost_summary.storage_rebate,
4920                gas_cost_summary.non_refundable_storage_fee,
4921                epoch_start_timestamp_ms,
4922                next_epoch_system_package_bytes,
4923            ));
4924        }
4925
4926        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
4927
4928        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
4929            tx.clone(),
4930            epoch_store.epoch(),
4931            checkpoint,
4932        );
4933
4934        let tx_digest = executable_tx.digest();
4935
4936        info!(
4937            ?next_epoch,
4938            ?next_epoch_protocol_version,
4939            ?next_epoch_system_packages,
4940            computation_cost=?gas_cost_summary.computation_cost,
4941            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
4942            storage_cost=?gas_cost_summary.storage_cost,
4943            storage_rebate=?gas_cost_summary.storage_rebate,
4944            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
4945            ?tx_digest,
4946            "Creating advance epoch transaction"
4947        );
4948
4949        fail_point_async!("change_epoch_tx_delay");
4950        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
4951
4952        // The tx could have been executed by state sync already - if so simply return
4953        // an error. The checkpoint builder will shortly be terminated by
4954        // reconfiguration anyway.
4955        if self
4956            .get_transaction_cache_reader()
4957            .try_is_tx_already_executed(tx_digest)?
4958        {
4959            warn!("change epoch tx has already been executed via state sync");
4960            bail!("change epoch tx has already been executed via state sync",);
4961        }
4962
4963        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
4964
4965        // We must manually assign the shared object versions to the transaction before
4966        // executing it. This is because we do not sequence end-of-epoch
4967        // transactions through consensus.
4968        epoch_store.assign_shared_object_versions_idempotent(
4969            self.get_object_cache_reader().as_ref(),
4970            std::slice::from_ref(&executable_tx),
4971        )?;
4972
4973        let input_objects =
4974            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
4975
4976        let (temporary_store, effects, _execution_error_opt) =
4977            self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
4978        let system_obj = get_iota_system_state(&temporary_store.written)
4979            .expect("change epoch tx must write to system object");
4980        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
4981        let system_epoch_info_event = temporary_store
4982            .events
4983            .data
4984            .into_iter()
4985            .find(|event| event.is_system_epoch_info_event())
4986            .map(SystemEpochInfoEvent::from);
4987        // The system epoch info event can be `None` in case if the `advance_epoch`
4988        // Move function call failed and was executed in the safe mode.
4989        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
4990
4991        // We must write tx and effects to the state sync tables so that state sync is
4992        // able to deliver to the transaction to CheckpointExecutor after it is
4993        // included in a certified checkpoint.
4994        self.get_state_sync_store()
4995            .try_insert_transaction_and_effects(&tx, &effects)
4996            .map_err(|err| {
4997                let err: anyhow::Error = err.into();
4998                err
4999            })?;
5000
5001        info!(
5002            "Effects summary of the change epoch transaction: {:?}",
5003            effects.summary_for_debug()
5004        );
5005        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5006        // The change epoch transaction cannot fail to execute.
5007        assert!(effects.status().is_ok());
5008        Ok((system_obj, system_epoch_info_event, effects))
5009    }
5010
5011    /// This function is called at the very end of the epoch.
5012    /// This step is required before updating new epoch in the db and calling
5013    /// reopen_epoch_db.
5014    #[instrument(level = "error", skip_all)]
5015    async fn revert_uncommitted_epoch_transactions(
5016        &self,
5017        epoch_store: &AuthorityPerEpochStore,
5018    ) -> IotaResult {
5019        {
5020            let state = epoch_store.get_reconfig_state_write_lock_guard();
5021            if state.should_accept_user_certs() {
5022                // Need to change this so that consensus adapter do not accept certificates from
5023                // user. This can happen if our local validator did not initiate
5024                // epoch change locally, but 2f+1 nodes already concluded the
5025                // epoch.
5026                //
5027                // This lock is essentially a barrier for
5028                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5029                // after this block
5030                epoch_store.close_user_certs(state);
5031            }
5032            // lock is dropped here
5033        }
5034        let pending_certificates = epoch_store.pending_consensus_certificates();
5035        info!(
5036            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5037            pending_certificates.len(),
5038            pending_certificates,
5039        );
5040        for digest in pending_certificates {
5041            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5042                info!(
5043                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5044                    digest
5045                );
5046                continue;
5047            }
5048            info!("Reverting {:?} at the end of epoch", digest);
5049            epoch_store.revert_executed_transaction(&digest)?;
5050            self.get_reconfig_api().try_revert_state_update(&digest)?;
5051        }
5052        info!("All uncommitted local transactions reverted");
5053        Ok(())
5054    }
5055
5056    #[instrument(level = "error", skip_all)]
5057    async fn reopen_epoch_db(
5058        &self,
5059        cur_epoch_store: &AuthorityPerEpochStore,
5060        new_committee: Committee,
5061        epoch_start_configuration: EpochStartConfiguration,
5062        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5063        epoch_last_checkpoint: CheckpointSequenceNumber,
5064    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5065        let new_epoch = new_committee.epoch;
5066        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5067        assert_eq!(
5068            epoch_start_configuration.epoch_start_state().epoch(),
5069            new_committee.epoch
5070        );
5071        fail_point!("before-open-new-epoch-store");
5072        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5073            self.name,
5074            new_committee,
5075            epoch_start_configuration,
5076            self.get_backing_package_store().clone(),
5077            self.get_object_store().clone(),
5078            expensive_safety_check_config,
5079            epoch_last_checkpoint,
5080        )?;
5081        self.epoch_store.store(new_epoch_store.clone());
5082        Ok(new_epoch_store)
5083    }
5084
5085    #[cfg(test)]
5086    pub(crate) fn iter_live_object_set_for_testing(
5087        &self,
5088    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5089        self.get_accumulator_store()
5090            .iter_cached_live_object_set_for_testing()
5091    }
5092
5093    #[cfg(test)]
5094    pub(crate) fn shutdown_execution_for_test(&self) {
5095        self.tx_execution_shutdown
5096            .lock()
5097            .take()
5098            .unwrap()
5099            .send(())
5100            .unwrap();
5101    }
5102
5103    /// NOTE: this function is only to be used for fuzzing and testing. Never
5104    /// use in prod
5105    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5106        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5107        self.get_object_cache_reader()
5108            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5109        self.get_reconfig_api()
5110            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5111    }
5112}
5113
5114pub struct RandomnessRoundReceiver {
5115    authority_state: Arc<AuthorityState>,
5116    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5117}
5118
5119impl RandomnessRoundReceiver {
5120    pub fn spawn(
5121        authority_state: Arc<AuthorityState>,
5122        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5123    ) -> JoinHandle<()> {
5124        let rrr = RandomnessRoundReceiver {
5125            authority_state,
5126            randomness_rx,
5127        };
5128        spawn_monitored_task!(rrr.run())
5129    }
5130
5131    async fn run(mut self) {
5132        info!("RandomnessRoundReceiver event loop started");
5133
5134        loop {
5135            tokio::select! {
5136                maybe_recv = self.randomness_rx.recv() => {
5137                    if let Some((epoch, round, bytes)) = maybe_recv {
5138                        self.handle_new_randomness(epoch, round, bytes);
5139                    } else {
5140                        break;
5141                    }
5142                },
5143            }
5144        }
5145
5146        info!("RandomnessRoundReceiver event loop ended");
5147    }
5148
5149    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5150    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5151        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5152        if epoch_store.epoch() != epoch {
5153            warn!(
5154                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5155                epoch_store.epoch()
5156            );
5157            return;
5158        }
5159        let transaction = VerifiedTransaction::new_randomness_state_update(
5160            epoch,
5161            round,
5162            bytes,
5163            epoch_store
5164                .epoch_start_config()
5165                .randomness_obj_initial_shared_version(),
5166        );
5167        debug!(
5168            "created randomness state update transaction with digest: {:?}",
5169            transaction.digest()
5170        );
5171        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5172        let digest = *transaction.digest();
5173
5174        // Randomness state updates contain the full bls signature for the random round,
5175        // which cannot necessarily be reconstructed again later. Therefore we must
5176        // immediately persist this transaction. If we crash before its outputs
5177        // are committed, this ensures we will be able to re-execute it.
5178        self.authority_state
5179            .get_cache_commit()
5180            .persist_transaction(&transaction);
5181
5182        // Send transaction to TransactionManager for execution.
5183        self.authority_state
5184            .transaction_manager()
5185            .enqueue(vec![transaction], &epoch_store);
5186
5187        let authority_state = self.authority_state.clone();
5188        spawn_monitored_task!(async move {
5189            // Wait for transaction execution in a separate task, to avoid deadlock in case
5190            // of out-of-order randomness generation. (Each
5191            // RandomnessStateUpdate depends on the output of the
5192            // RandomnessStateUpdate from the previous round.)
5193            //
5194            // We set a very long timeout so that in case this gets stuck for some reason,
5195            // the validator will eventually crash rather than continuing in a
5196            // zombie mode.
5197            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5198            let result = tokio::time::timeout(
5199                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5200                authority_state
5201                    .get_transaction_cache_reader()
5202                    .try_notify_read_executed_effects(&[digest]),
5203            )
5204            .await;
5205            let result = match result {
5206                Ok(result) => result,
5207                Err(_) => {
5208                    if cfg!(debug_assertions) {
5209                        // Crash on randomness update execution timeout in debug builds.
5210                        panic!(
5211                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5212                        );
5213                    }
5214                    warn!(
5215                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5216                    );
5217                    // Continue waiting as long as necessary in non-debug builds.
5218                    authority_state
5219                        .get_transaction_cache_reader()
5220                        .try_notify_read_executed_effects(&[digest])
5221                        .await
5222                }
5223            };
5224
5225            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5226            let effects = effects.pop().expect("should return effects");
5227            if *effects.status() != ExecutionStatus::Success {
5228                fatal!(
5229                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5230                );
5231            }
5232            debug!(
5233                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5234            );
5235        });
5236    }
5237}
5238
5239#[async_trait]
5240impl TransactionKeyValueStoreTrait for AuthorityState {
5241    async fn multi_get(
5242        &self,
5243        transaction_keys: &[TransactionDigest],
5244        effects_keys: &[TransactionDigest],
5245    ) -> IotaResult<KVStoreTransactionData> {
5246        let txns = if !transaction_keys.is_empty() {
5247            self.get_transaction_cache_reader()
5248                .try_multi_get_transaction_blocks(transaction_keys)?
5249                .into_iter()
5250                .map(|t| t.map(|t| (*t).clone().into_inner()))
5251                .collect()
5252        } else {
5253            vec![]
5254        };
5255
5256        let fx = if !effects_keys.is_empty() {
5257            self.get_transaction_cache_reader()
5258                .try_multi_get_executed_effects(effects_keys)?
5259        } else {
5260            vec![]
5261        };
5262
5263        Ok((txns, fx))
5264    }
5265
5266    async fn multi_get_checkpoints(
5267        &self,
5268        checkpoint_summaries: &[CheckpointSequenceNumber],
5269        checkpoint_contents: &[CheckpointSequenceNumber],
5270        checkpoint_summaries_by_digest: &[CheckpointDigest],
5271    ) -> IotaResult<(
5272        Vec<Option<CertifiedCheckpointSummary>>,
5273        Vec<Option<CheckpointContents>>,
5274        Vec<Option<CertifiedCheckpointSummary>>,
5275    )> {
5276        // TODO: use multi-get methods if it ever becomes important (unlikely)
5277        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5278        let store = self.get_checkpoint_store();
5279        for seq in checkpoint_summaries {
5280            let checkpoint = store
5281                .get_checkpoint_by_sequence_number(*seq)?
5282                .map(|c| c.into_inner());
5283
5284            summaries.push(checkpoint);
5285        }
5286
5287        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5288        for seq in checkpoint_contents {
5289            let checkpoint = store
5290                .get_checkpoint_by_sequence_number(*seq)?
5291                .and_then(|summary| {
5292                    store
5293                        .get_checkpoint_contents(&summary.content_digest)
5294                        .expect("db read cannot fail")
5295                });
5296            contents.push(checkpoint);
5297        }
5298
5299        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5300        for digest in checkpoint_summaries_by_digest {
5301            let checkpoint = store
5302                .get_checkpoint_by_digest(digest)?
5303                .map(|c| c.into_inner());
5304            summaries_by_digest.push(checkpoint);
5305        }
5306
5307        Ok((summaries, contents, summaries_by_digest))
5308    }
5309
5310    async fn get_transaction_perpetual_checkpoint(
5311        &self,
5312        digest: TransactionDigest,
5313    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5314        self.get_checkpoint_cache()
5315            .try_get_transaction_perpetual_checkpoint(&digest)
5316            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5317    }
5318
5319    async fn get_object(
5320        &self,
5321        object_id: ObjectID,
5322        version: VersionNumber,
5323    ) -> IotaResult<Option<Object>> {
5324        self.get_object_cache_reader()
5325            .try_get_object_by_key(&object_id, version)
5326    }
5327
5328    async fn multi_get_transactions_perpetual_checkpoints(
5329        &self,
5330        digests: &[TransactionDigest],
5331    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5332        let res = self
5333            .get_checkpoint_cache()
5334            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5335
5336        Ok(res
5337            .into_iter()
5338            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5339            .collect())
5340    }
5341
5342    #[instrument(skip(self))]
5343    async fn multi_get_events_by_tx_digests(
5344        &self,
5345        digests: &[TransactionDigest],
5346    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5347        if digests.is_empty() {
5348            return Ok(vec![]);
5349        }
5350        let events_digests: Vec<_> = self
5351            .get_transaction_cache_reader()
5352            .try_multi_get_executed_effects(digests)?
5353            .into_iter()
5354            .map(|t| t.and_then(|t| t.events_digest().cloned()))
5355            .collect();
5356        let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5357        let mut events = self
5358            .get_transaction_cache_reader()
5359            .try_multi_get_events(&non_empty_events)?
5360            .into_iter();
5361        Ok(events_digests
5362            .into_iter()
5363            .map(|ev| ev.and_then(|_| events.next()?))
5364            .collect())
5365    }
5366}
5367
5368#[cfg(msim)]
5369pub mod framework_injection {
5370    use std::{
5371        cell::RefCell,
5372        collections::{BTreeMap, BTreeSet},
5373    };
5374
5375    use iota_framework::{BuiltInFramework, SystemPackage};
5376    use iota_types::{
5377        base_types::{AuthorityName, ObjectID},
5378        is_system_package,
5379    };
5380    use move_binary_format::CompiledModule;
5381
5382    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5383
5384    // Thread local cache because all simtests run in a single unique thread.
5385    thread_local! {
5386        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5387    }
5388
5389    type Framework = Vec<CompiledModule>;
5390
5391    pub type PackageUpgradeCallback =
5392        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5393
5394    enum PackageOverrideConfig {
5395        Global(Framework),
5396        PerValidator(PackageUpgradeCallback),
5397    }
5398
5399    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5400        modules
5401            .iter()
5402            .map(|m| {
5403                let mut buf = Vec::new();
5404                m.serialize_with_version(m.version, &mut buf).unwrap();
5405                buf
5406            })
5407            .collect()
5408    }
5409
5410    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5411        OVERRIDE.with(|bs| {
5412            bs.borrow_mut()
5413                .insert(package_id, PackageOverrideConfig::Global(modules))
5414        });
5415    }
5416
5417    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5418        OVERRIDE.with(|bs| {
5419            bs.borrow_mut()
5420                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5421        });
5422    }
5423
5424    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5425        OVERRIDE.with(|cfg| {
5426            cfg.borrow().get(package_id).and_then(|entry| match entry {
5427                PackageOverrideConfig::Global(framework) => {
5428                    Some(compiled_modules_to_bytes(framework))
5429                }
5430                PackageOverrideConfig::PerValidator(func) => {
5431                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
5432                }
5433            })
5434        })
5435    }
5436
5437    pub fn get_override_modules(
5438        package_id: &ObjectID,
5439        name: AuthorityName,
5440    ) -> Option<Vec<CompiledModule>> {
5441        OVERRIDE.with(|cfg| {
5442            cfg.borrow().get(package_id).and_then(|entry| match entry {
5443                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5444                PackageOverrideConfig::PerValidator(func) => func(name),
5445            })
5446        })
5447    }
5448
5449    pub fn get_override_system_package(
5450        package_id: &ObjectID,
5451        name: AuthorityName,
5452    ) -> Option<SystemPackage> {
5453        let bytes = get_override_bytes(package_id, name)?;
5454        let dependencies = if is_system_package(*package_id) {
5455            BuiltInFramework::get_package_by_id(package_id)
5456                .dependencies
5457                .to_vec()
5458        } else {
5459            // Assume that entirely new injected packages depend on all existing system
5460            // packages.
5461            BuiltInFramework::all_package_ids()
5462        };
5463        Some(SystemPackage {
5464            id: *package_id,
5465            bytes,
5466            dependencies,
5467        })
5468    }
5469
5470    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5471        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5472        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5473            cfg.borrow()
5474                .keys()
5475                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5476                .collect()
5477        });
5478
5479        extra
5480            .into_iter()
5481            .map(|package| SystemPackage {
5482                id: package,
5483                bytes: get_override_bytes(&package, name).unwrap(),
5484                dependencies: BuiltInFramework::all_package_ids(),
5485            })
5486            .collect()
5487    }
5488}
5489
5490#[derive(Debug, Serialize, Deserialize, Clone)]
5491pub struct ObjDumpFormat {
5492    pub id: ObjectID,
5493    pub version: VersionNumber,
5494    pub digest: ObjectDigest,
5495    pub object: Object,
5496}
5497
5498impl ObjDumpFormat {
5499    fn new(object: Object) -> Self {
5500        let oref = object.compute_object_reference();
5501        Self {
5502            id: oref.0,
5503            version: oref.1,
5504            digest: oref.2,
5505            object,
5506        }
5507    }
5508}
5509
5510#[derive(Debug, Serialize, Deserialize, Clone)]
5511pub struct NodeStateDump {
5512    pub tx_digest: TransactionDigest,
5513    pub sender_signed_data: SenderSignedData,
5514    pub executed_epoch: u64,
5515    pub reference_gas_price: u64,
5516    pub protocol_version: u64,
5517    pub epoch_start_timestamp_ms: u64,
5518    pub computed_effects: TransactionEffects,
5519    pub expected_effects_digest: TransactionEffectsDigest,
5520    pub relevant_system_packages: Vec<ObjDumpFormat>,
5521    pub shared_objects: Vec<ObjDumpFormat>,
5522    pub loaded_child_objects: Vec<ObjDumpFormat>,
5523    pub modified_at_versions: Vec<ObjDumpFormat>,
5524    pub runtime_reads: Vec<ObjDumpFormat>,
5525    pub input_objects: Vec<ObjDumpFormat>,
5526}
5527
5528impl NodeStateDump {
5529    pub fn new(
5530        tx_digest: &TransactionDigest,
5531        effects: &TransactionEffects,
5532        expected_effects_digest: TransactionEffectsDigest,
5533        object_store: &dyn ObjectStore,
5534        epoch_store: &Arc<AuthorityPerEpochStore>,
5535        inner_temporary_store: &InnerTemporaryStore,
5536        certificate: &VerifiedExecutableTransaction,
5537    ) -> IotaResult<Self> {
5538        // Epoch info
5539        let executed_epoch = epoch_store.epoch();
5540        let reference_gas_price = epoch_store.reference_gas_price();
5541        let epoch_start_config = epoch_store.epoch_start_config();
5542        let protocol_version = epoch_store.protocol_version().as_u64();
5543        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5544
5545        // Record all system packages at this version
5546        let mut relevant_system_packages = Vec::new();
5547        for sys_package_id in BuiltInFramework::all_package_ids() {
5548            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5549                relevant_system_packages.push(ObjDumpFormat::new(w))
5550            }
5551        }
5552
5553        // Record all the shared objects
5554        let mut shared_objects = Vec::new();
5555        for kind in effects.input_shared_objects() {
5556            match kind {
5557                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5558                    if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5559                        shared_objects.push(ObjDumpFormat::new(w))
5560                    }
5561                }
5562                InputSharedObject::ReadDeleted(..)
5563                | InputSharedObject::MutateDeleted(..)
5564                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
5565                                                           * objects. */
5566            }
5567        }
5568
5569        // Record all loaded child objects
5570        // Child objects which are read but not mutated are not tracked anywhere else
5571        let mut loaded_child_objects = Vec::new();
5572        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5573            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5574                loaded_child_objects.push(ObjDumpFormat::new(w))
5575            }
5576        }
5577
5578        // Record all modified objects
5579        let mut modified_at_versions = Vec::new();
5580        for (id, ver) in effects.modified_at_versions() {
5581            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5582                modified_at_versions.push(ObjDumpFormat::new(w))
5583            }
5584        }
5585
5586        // Packages read at runtime, which were not previously loaded into the temoorary
5587        // store Some packages may be fetched at runtime and wont show up in
5588        // input objects
5589        let mut runtime_reads = Vec::new();
5590        for obj in inner_temporary_store
5591            .runtime_packages_loaded_from_db
5592            .values()
5593        {
5594            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
5595        }
5596
5597        // All other input objects should already be in `inner_temporary_store.objects`
5598
5599        Ok(Self {
5600            tx_digest: *tx_digest,
5601            executed_epoch,
5602            reference_gas_price,
5603            epoch_start_timestamp_ms,
5604            protocol_version,
5605            relevant_system_packages,
5606            shared_objects,
5607            loaded_child_objects,
5608            modified_at_versions,
5609            runtime_reads,
5610            sender_signed_data: certificate.clone().into_message(),
5611            input_objects: inner_temporary_store
5612                .input_objects
5613                .values()
5614                .map(|o| ObjDumpFormat::new(o.clone()))
5615                .collect(),
5616            computed_effects: effects.clone(),
5617            expected_effects_digest,
5618        })
5619    }
5620
5621    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
5622        let mut objects = Vec::new();
5623        objects.extend(self.relevant_system_packages.clone());
5624        objects.extend(self.shared_objects.clone());
5625        objects.extend(self.loaded_child_objects.clone());
5626        objects.extend(self.modified_at_versions.clone());
5627        objects.extend(self.runtime_reads.clone());
5628        objects.extend(self.input_objects.clone());
5629        objects
5630    }
5631
5632    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
5633        let file_name = format!(
5634            "{}_{}_NODE_DUMP.json",
5635            self.tx_digest,
5636            AuthorityState::unixtime_now_ms()
5637        );
5638        let mut path = path.to_path_buf();
5639        path.push(&file_name);
5640        let mut file = File::create(path.clone())?;
5641        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
5642        Ok(path)
5643    }
5644
5645    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
5646        let file = File::open(path)?;
5647        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
5648    }
5649}