iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_storage::{
48    key_value_store::{
49        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
50    },
51    key_value_store_metrics::KeyValueStoreMetrics,
52};
53#[cfg(msim)]
54use iota_types::committee::CommitteeTrait;
55use iota_types::{
56    IOTA_SYSTEM_ADDRESS, TypeTag,
57    authenticator_state::get_authenticator_state,
58    base_types::*,
59    committee::{Committee, EpochId, ProtocolVersion},
60    crypto::{
61        AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
62        default_hash,
63    },
64    deny_list_v1::check_coin_deny_list_v1_during_signing,
65    digests::{ChainIdentifier, Digest, TransactionEventsDigest},
66    dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
67    effects::{
68        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
69        TransactionEvents, VerifiedSignedTransactionEffects,
70    },
71    error::{ExecutionError, IotaError, IotaResult, UserInputError},
72    event::{Event, EventID, SystemEpochInfoEvent},
73    executable_transaction::VerifiedExecutableTransaction,
74    execution_config_utils::to_binary_config,
75    execution_status::ExecutionStatus,
76    gas::{GasCostSummary, IotaGasStatus},
77    gas_coin::NANOS_PER_IOTA,
78    inner_temporary_store::{
79        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
80        WrittenObjects,
81    },
82    iota_system_state::{
83        IotaSystemState, IotaSystemStateTrait,
84        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
85    },
86    is_system_package,
87    layout_resolver::{LayoutResolver, into_struct_layout},
88    message_envelope::Message,
89    messages_checkpoint::{
90        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
91        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
92        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
93        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
94    },
95    messages_consensus::AuthorityCapabilitiesV1,
96    messages_grpc::{
97        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
98        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
99        TransactionStatus,
100    },
101    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
102    object::{
103        MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
104        bounded_visitor::BoundedVisitor,
105    },
106    storage::{
107        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
108    },
109    supported_protocol_versions::{
110        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
111    },
112    transaction::*,
113    transaction_executor::{SimulateTransactionResult, VmChecks},
114};
115use itertools::Itertools;
116use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
117use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
118use parking_lot::Mutex;
119use prometheus::{
120    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
121    register_histogram_vec_with_registry, register_histogram_with_registry,
122    register_int_counter_vec_with_registry, register_int_counter_with_registry,
123    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
124};
125use serde::{Deserialize, Serialize, de::DeserializeOwned};
126use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
127use tap::TapFallible;
128use tokio::{
129    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
130    task::JoinHandle,
131};
132use tracing::{debug, error, info, instrument, trace, warn};
133use typed_store::TypedStoreError;
134
135use self::{
136    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
137};
138#[cfg(msim)]
139pub use crate::checkpoints::checkpoint_executor::utils::{
140    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
141};
142use crate::{
143    authority::{
144        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
145        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
146        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
147        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
148        authority_store_tables::AuthorityPrunerTables,
149        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
150    },
151    authority_client::NetworkAuthorityClient,
152    checkpoints::CheckpointStore,
153    congestion_tracker::CongestionTracker,
154    consensus_adapter::ConsensusAdapter,
155    epoch::committee_store::CommitteeStore,
156    execution_cache::{
157        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
158        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
159        TransactionCacheRead,
160    },
161    execution_driver::execution_process,
162    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
163    metrics::{LatencyObserver, RateTracker},
164    module_cache_metrics::ResolverMetrics,
165    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
166    rest_index::RestIndexStore,
167    stake_aggregator::StakeAggregator,
168    state_accumulator::{AccumulatorStore, StateAccumulator},
169    subscription_handler::SubscriptionHandler,
170    transaction_input_loader::TransactionInputLoader,
171    transaction_manager::TransactionManager,
172    transaction_outputs::TransactionOutputs,
173    validator_tx_finalizer::ValidatorTxFinalizer,
174    verify_indexes::verify_indexes,
175};
176
177#[cfg(test)]
178#[path = "unit_tests/authority_tests.rs"]
179pub mod authority_tests;
180
181#[cfg(test)]
182#[path = "unit_tests/transaction_tests.rs"]
183pub mod transaction_tests;
184
185#[cfg(test)]
186#[path = "unit_tests/batch_transaction_tests.rs"]
187mod batch_transaction_tests;
188
189#[cfg(test)]
190#[path = "unit_tests/move_integration_tests.rs"]
191pub mod move_integration_tests;
192
193#[cfg(test)]
194#[path = "unit_tests/gas_tests.rs"]
195mod gas_tests;
196
197#[cfg(test)]
198#[path = "unit_tests/batch_verification_tests.rs"]
199mod batch_verification_tests;
200
201#[cfg(test)]
202#[path = "unit_tests/coin_deny_list_tests.rs"]
203mod coin_deny_list_tests;
204
205#[cfg(test)]
206#[path = "unit_tests/auth_unit_test_utils.rs"]
207pub mod auth_unit_test_utils;
208
209pub mod authority_test_utils;
210
211pub mod authority_per_epoch_store;
212pub mod authority_per_epoch_store_pruner;
213
214pub mod authority_store_pruner;
215pub mod authority_store_tables;
216pub mod authority_store_types;
217pub mod epoch_start_configuration;
218pub mod shared_object_congestion_tracker;
219pub mod shared_object_version_manager;
220pub mod suggested_gas_price_calculator;
221pub mod test_authority_builder;
222pub mod transaction_deferral;
223
224pub(crate) mod authority_store;
225pub mod backpressure;
226
227/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
228pub struct AuthorityMetrics {
229    tx_orders: IntCounter,
230    total_certs: IntCounter,
231    total_cert_attempts: IntCounter,
232    total_effects: IntCounter,
233    pub shared_obj_tx: IntCounter,
234    sponsored_tx: IntCounter,
235    tx_already_processed: IntCounter,
236    num_input_objs: Histogram,
237    num_shared_objects: Histogram,
238    batch_size: Histogram,
239
240    authority_state_handle_transaction_latency: Histogram,
241
242    execute_certificate_latency_single_writer: Histogram,
243    execute_certificate_latency_shared_object: Histogram,
244
245    internal_execution_latency: Histogram,
246    execution_load_input_objects_latency: Histogram,
247    prepare_certificate_latency: Histogram,
248    commit_certificate_latency: Histogram,
249    db_checkpoint_latency: Histogram,
250
251    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
252    pub(crate) transaction_manager_num_missing_objects: IntGauge,
253    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
254    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
255    pub(crate) transaction_manager_num_ready: IntGauge,
256    pub(crate) transaction_manager_object_cache_size: IntGauge,
257    pub(crate) transaction_manager_object_cache_hits: IntCounter,
258    pub(crate) transaction_manager_object_cache_misses: IntCounter,
259    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
260    pub(crate) transaction_manager_package_cache_size: IntGauge,
261    pub(crate) transaction_manager_package_cache_hits: IntCounter,
262    pub(crate) transaction_manager_package_cache_misses: IntCounter,
263    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
264    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
265
266    pub(crate) execution_driver_executed_transactions: IntCounter,
267    pub(crate) execution_driver_dispatch_queue: IntGauge,
268    pub(crate) execution_queueing_delay_s: Histogram,
269    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
270    pub(crate) execution_gas_latency_ratio: Histogram,
271
272    pub(crate) skipped_consensus_txns: IntCounter,
273    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
274
275    pub(crate) authority_overload_status: IntGauge,
276    pub(crate) authority_load_shedding_percentage: IntGauge,
277
278    pub(crate) transaction_overload_sources: IntCounterVec,
279
280    /// Post processing metrics
281    post_processing_total_events_emitted: IntCounter,
282    post_processing_total_tx_indexed: IntCounter,
283    post_processing_total_tx_had_event_processed: IntCounter,
284    post_processing_total_failures: IntCounter,
285
286    /// Consensus handler metrics
287    pub consensus_handler_processed: IntCounterVec,
288    pub consensus_handler_transaction_sizes: HistogramVec,
289    pub consensus_handler_num_low_scoring_authorities: IntGauge,
290    pub consensus_handler_scores: IntGaugeVec,
291    pub consensus_handler_deferred_transactions: IntCounter,
292    pub consensus_handler_congested_transactions: IntCounter,
293    pub consensus_handler_cancelled_transactions: IntCounter,
294    pub consensus_handler_max_object_costs: IntGaugeVec,
295    pub consensus_committed_subdags: IntCounterVec,
296    pub consensus_committed_messages: IntGaugeVec,
297    pub consensus_committed_user_transactions: IntGaugeVec,
298    pub consensus_calculated_throughput: IntGauge,
299    pub consensus_calculated_throughput_profile: IntGauge,
300
301    pub limits_metrics: Arc<LimitsMetrics>,
302
303    /// bytecode verifier metrics for tracking timeouts
304    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
305
306    /// Count of zklogin signatures
307    pub zklogin_sig_count: IntCounter,
308    /// Count of multisig signatures
309    pub multisig_sig_count: IntCounter,
310
311    // Tracks recent average txn queueing delay between when it is ready for execution
312    // until it starts executing.
313    pub execution_queueing_latency: LatencyObserver,
314
315    // Tracks the rate of transactions become ready for execution in transaction manager.
316    // The need for the Mutex is that the tracker is updated in transaction manager and read
317    // in the overload_monitor. There should be low mutex contention because
318    // transaction manager is single threaded and the read rate in overload_monitor is
319    // low. In the case where transaction manager becomes multi-threaded, we can
320    // create one rate tracker per thread.
321    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
322
323    // Tracks the rate of transactions starts execution in execution driver.
324    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
325    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
326}
327
328// Override default Prom buckets for positive numbers in 0-10M range
329const POSITIVE_INT_BUCKETS: &[f64] = &[
330    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
331    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
332    7000000., 10000000.,
333];
334
335const LATENCY_SEC_BUCKETS: &[f64] = &[
336    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
337    10., 20., 30., 60., 90.,
338];
339
340// Buckets for low latency samples. Starts from 10us.
341const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
342    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
343    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
344];
345
346const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
347    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
348    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
349];
350
351/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
352pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
353
354impl AuthorityMetrics {
355    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
356        let execute_certificate_latency = register_histogram_vec_with_registry!(
357            "authority_state_execute_certificate_latency",
358            "Latency of executing certificates, including waiting for inputs",
359            &["tx_type"],
360            LATENCY_SEC_BUCKETS.to_vec(),
361            registry,
362        )
363        .unwrap();
364
365        let execute_certificate_latency_single_writer =
366            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
367        let execute_certificate_latency_shared_object =
368            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
369
370        Self {
371            tx_orders: register_int_counter_with_registry!(
372                "total_transaction_orders",
373                "Total number of transaction orders",
374                registry,
375            )
376            .unwrap(),
377            total_certs: register_int_counter_with_registry!(
378                "total_transaction_certificates",
379                "Total number of transaction certificates handled",
380                registry,
381            )
382            .unwrap(),
383            total_cert_attempts: register_int_counter_with_registry!(
384                "total_handle_certificate_attempts",
385                "Number of calls to handle_certificate",
386                registry,
387            )
388            .unwrap(),
389            // total_effects == total transactions finished
390            total_effects: register_int_counter_with_registry!(
391                "total_transaction_effects",
392                "Total number of transaction effects produced",
393                registry,
394            )
395            .unwrap(),
396
397            shared_obj_tx: register_int_counter_with_registry!(
398                "num_shared_obj_tx",
399                "Number of transactions involving shared objects",
400                registry,
401            )
402            .unwrap(),
403
404            sponsored_tx: register_int_counter_with_registry!(
405                "num_sponsored_tx",
406                "Number of sponsored transactions",
407                registry,
408            )
409            .unwrap(),
410
411            tx_already_processed: register_int_counter_with_registry!(
412                "num_tx_already_processed",
413                "Number of transaction orders already processed previously",
414                registry,
415            )
416            .unwrap(),
417            num_input_objs: register_histogram_with_registry!(
418                "num_input_objects",
419                "Distribution of number of input TX objects per TX",
420                POSITIVE_INT_BUCKETS.to_vec(),
421                registry,
422            )
423            .unwrap(),
424            num_shared_objects: register_histogram_with_registry!(
425                "num_shared_objects",
426                "Number of shared input objects per TX",
427                POSITIVE_INT_BUCKETS.to_vec(),
428                registry,
429            )
430            .unwrap(),
431            batch_size: register_histogram_with_registry!(
432                "batch_size",
433                "Distribution of size of transaction batch",
434                POSITIVE_INT_BUCKETS.to_vec(),
435                registry,
436            )
437            .unwrap(),
438            authority_state_handle_transaction_latency: register_histogram_with_registry!(
439                "authority_state_handle_transaction_latency",
440                "Latency of handling transactions",
441                LATENCY_SEC_BUCKETS.to_vec(),
442                registry,
443            )
444            .unwrap(),
445            execute_certificate_latency_single_writer,
446            execute_certificate_latency_shared_object,
447            internal_execution_latency: register_histogram_with_registry!(
448                "authority_state_internal_execution_latency",
449                "Latency of actual certificate executions",
450                LATENCY_SEC_BUCKETS.to_vec(),
451                registry,
452            )
453            .unwrap(),
454            execution_load_input_objects_latency: register_histogram_with_registry!(
455                "authority_state_execution_load_input_objects_latency",
456                "Latency of loading input objects for execution",
457                LOW_LATENCY_SEC_BUCKETS.to_vec(),
458                registry,
459            )
460            .unwrap(),
461            prepare_certificate_latency: register_histogram_with_registry!(
462                "authority_state_prepare_certificate_latency",
463                "Latency of executing certificates, before committing the results",
464                LATENCY_SEC_BUCKETS.to_vec(),
465                registry,
466            )
467            .unwrap(),
468            commit_certificate_latency: register_histogram_with_registry!(
469                "authority_state_commit_certificate_latency",
470                "Latency of committing certificate execution results",
471                LATENCY_SEC_BUCKETS.to_vec(),
472                registry,
473            )
474            .unwrap(),
475            db_checkpoint_latency: register_histogram_with_registry!(
476                "db_checkpoint_latency",
477                "Latency of checkpointing dbs",
478                LATENCY_SEC_BUCKETS.to_vec(),
479                registry,
480            ).unwrap(),
481            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
482                "transaction_manager_num_enqueued_certificates",
483                "Current number of certificates enqueued to TransactionManager",
484                &["result"],
485                registry,
486            )
487            .unwrap(),
488            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
489                "transaction_manager_num_missing_objects",
490                "Current number of missing objects in TransactionManager",
491                registry,
492            )
493            .unwrap(),
494            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
495                "transaction_manager_num_pending_certificates",
496                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
497                registry,
498            )
499            .unwrap(),
500            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
501                "transaction_manager_num_executing_certificates",
502                "Number of executing certificates, including queued and actually running certificates",
503                registry,
504            )
505            .unwrap(),
506            transaction_manager_num_ready: register_int_gauge_with_registry!(
507                "transaction_manager_num_ready",
508                "Number of ready transactions in TransactionManager",
509                registry,
510            )
511            .unwrap(),
512            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
513                "transaction_manager_object_cache_size",
514                "Current size of object-availability cache in TransactionManager",
515                registry,
516            )
517            .unwrap(),
518            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
519                "transaction_manager_object_cache_hits",
520                "Number of object-availability cache hits in TransactionManager",
521                registry,
522            )
523            .unwrap(),
524            authority_overload_status: register_int_gauge_with_registry!(
525                "authority_overload_status",
526                "Whether authority is current experiencing overload and enters load shedding mode.",
527                registry)
528            .unwrap(),
529            authority_load_shedding_percentage: register_int_gauge_with_registry!(
530                "authority_load_shedding_percentage",
531                "The percentage of transactions is shed when the authority is in load shedding mode.",
532                registry)
533            .unwrap(),
534            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
535                "transaction_manager_object_cache_misses",
536                "Number of object-availability cache misses in TransactionManager",
537                registry,
538            )
539            .unwrap(),
540            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
541                "transaction_manager_object_cache_evictions",
542                "Number of object-availability cache evictions in TransactionManager",
543                registry,
544            )
545            .unwrap(),
546            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
547                "transaction_manager_package_cache_size",
548                "Current size of package-availability cache in TransactionManager",
549                registry,
550            )
551            .unwrap(),
552            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
553                "transaction_manager_package_cache_hits",
554                "Number of package-availability cache hits in TransactionManager",
555                registry,
556            )
557            .unwrap(),
558            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
559                "transaction_manager_package_cache_misses",
560                "Number of package-availability cache misses in TransactionManager",
561                registry,
562            )
563            .unwrap(),
564            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
565                "transaction_manager_package_cache_evictions",
566                "Number of package-availability cache evictions in TransactionManager",
567                registry,
568            )
569            .unwrap(),
570            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
571                "transaction_manager_transaction_queue_age_s",
572                "Time spent in waiting for transaction in the queue",
573                LATENCY_SEC_BUCKETS.to_vec(),
574                registry,
575            )
576            .unwrap(),
577            transaction_overload_sources: register_int_counter_vec_with_registry!(
578                "transaction_overload_sources",
579                "Number of times each source indicates transaction overload.",
580                &["source"],
581                registry)
582            .unwrap(),
583            execution_driver_executed_transactions: register_int_counter_with_registry!(
584                "execution_driver_executed_transactions",
585                "Cumulative number of transaction executed by execution driver",
586                registry,
587            )
588            .unwrap(),
589            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
590                "execution_driver_dispatch_queue",
591                "Number of transaction pending in execution driver dispatch queue",
592                registry,
593            )
594            .unwrap(),
595            execution_queueing_delay_s: register_histogram_with_registry!(
596                "execution_queueing_delay_s",
597                "Queueing delay between a transaction is ready for execution until it starts executing.",
598                LATENCY_SEC_BUCKETS.to_vec(),
599                registry
600            )
601            .unwrap(),
602            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
603                "prepare_cert_gas_latency_ratio",
604                "The ratio of computation gas divided by VM execution latency.",
605                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
606                registry
607            )
608            .unwrap(),
609            execution_gas_latency_ratio: register_histogram_with_registry!(
610                "execution_gas_latency_ratio",
611                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
612                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
613                registry
614            )
615            .unwrap(),
616            skipped_consensus_txns: register_int_counter_with_registry!(
617                "skipped_consensus_txns",
618                "Total number of consensus transactions skipped",
619                registry,
620            )
621            .unwrap(),
622            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
623                "skipped_consensus_txns_cache_hit",
624                "Total number of consensus transactions skipped because of local cache hit",
625                registry,
626            )
627            .unwrap(),
628            post_processing_total_events_emitted: register_int_counter_with_registry!(
629                "post_processing_total_events_emitted",
630                "Total number of events emitted in post processing",
631                registry,
632            )
633            .unwrap(),
634            post_processing_total_tx_indexed: register_int_counter_with_registry!(
635                "post_processing_total_tx_indexed",
636                "Total number of txes indexed in post processing",
637                registry,
638            )
639            .unwrap(),
640            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
641                "post_processing_total_tx_had_event_processed",
642                "Total number of txes finished event processing in post processing",
643                registry,
644            )
645            .unwrap(),
646            post_processing_total_failures: register_int_counter_with_registry!(
647                "post_processing_total_failures",
648                "Total number of failure in post processing",
649                registry,
650            )
651            .unwrap(),
652            consensus_handler_processed: register_int_counter_vec_with_registry!(
653                "consensus_handler_processed",
654                "Number of transactions processed by consensus handler",
655                &["class"],
656                registry
657            ).unwrap(),
658            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
659                "consensus_handler_transaction_sizes",
660                "Sizes of each type of transactions processed by consensus handler",
661                &["class"],
662                POSITIVE_INT_BUCKETS.to_vec(),
663                registry
664            ).unwrap(),
665            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
666                "consensus_handler_num_low_scoring_authorities",
667                "Number of low scoring authorities based on reputation scores from consensus",
668                registry
669            ).unwrap(),
670            consensus_handler_scores: register_int_gauge_vec_with_registry!(
671                "consensus_handler_scores",
672                "scores from consensus for each authority",
673                &["authority"],
674                registry,
675            ).unwrap(),
676            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
677                "consensus_handler_deferred_transactions",
678                "Number of transactions deferred by consensus handler",
679                registry,
680            ).unwrap(),
681            consensus_handler_congested_transactions: register_int_counter_with_registry!(
682                "consensus_handler_congested_transactions",
683                "Number of transactions deferred by consensus handler due to congestion",
684                registry,
685            ).unwrap(),
686            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
687                "consensus_handler_cancelled_transactions",
688                "Number of transactions cancelled by consensus handler",
689                registry,
690            ).unwrap(),
691            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
692                "consensus_handler_max_congestion_control_object_costs",
693                "Max object costs for congestion control in the current consensus commit",
694                &["commit_type"],
695                registry,
696            ).unwrap(),
697            consensus_committed_subdags: register_int_counter_vec_with_registry!(
698                "consensus_committed_subdags",
699                "Number of committed subdags, sliced by author",
700                &["authority"],
701                registry,
702            ).unwrap(),
703            consensus_committed_messages: register_int_gauge_vec_with_registry!(
704                "consensus_committed_messages",
705                "Total number of committed consensus messages, sliced by author",
706                &["authority"],
707                registry,
708            ).unwrap(),
709            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
710                "consensus_committed_user_transactions",
711                "Number of committed user transactions, sliced by submitter",
712                &["authority"],
713                registry,
714            ).unwrap(),
715            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
716            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
717            zklogin_sig_count: register_int_counter_with_registry!(
718                "zklogin_sig_count",
719                "Count of zkLogin signatures",
720                registry,
721            )
722            .unwrap(),
723            multisig_sig_count: register_int_counter_with_registry!(
724                "multisig_sig_count",
725                "Count of zkLogin signatures",
726                registry,
727            )
728            .unwrap(),
729            consensus_calculated_throughput: register_int_gauge_with_registry!(
730                "consensus_calculated_throughput",
731                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
732                registry,
733            ).unwrap(),
734            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
735                "consensus_calculated_throughput_profile",
736                "The current active calculated throughput profile",
737                registry
738            ).unwrap(),
739            execution_queueing_latency: LatencyObserver::new(),
740            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
741            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
742        }
743    }
744
745    /// Reset metrics that contain `hostname` as one of the labels. This is
746    /// needed to avoid retaining metrics for long-gone committee members and
747    /// only exposing metrics for the committee in the current epoch.
748    pub fn reset_on_reconfigure(&self) {
749        self.consensus_committed_messages.reset();
750        self.consensus_handler_scores.reset();
751        self.consensus_committed_user_transactions.reset();
752    }
753}
754
755/// a Trait object for `Signer` that is:
756/// - Pin, i.e. confined to one place in memory (we don't want to copy private
757///   keys).
758/// - Sync, i.e. can be safely shared between threads.
759///
760/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
761pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
762
763pub struct AuthorityState {
764    // Fixed size, static, identity of the authority
765    /// The name of this authority.
766    pub name: AuthorityName,
767    /// The signature key of the authority.
768    pub secret: StableSyncAuthoritySigner,
769
770    /// The database
771    input_loader: TransactionInputLoader,
772    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
773
774    epoch_store: ArcSwap<AuthorityPerEpochStore>,
775
776    /// This lock denotes current 'execution epoch'.
777    /// Execution acquires read lock, checks certificate epoch and holds it
778    /// until all writes are complete. Reconfiguration acquires write lock,
779    /// changes the epoch and revert all transactions from previous epoch
780    /// that are executed but did not make into checkpoint.
781    execution_lock: RwLock<EpochId>,
782
783    pub indexes: Option<Arc<IndexStore>>,
784    pub rest_index: Option<Arc<RestIndexStore>>,
785
786    pub subscription_handler: Arc<SubscriptionHandler>,
787    pub checkpoint_store: Arc<CheckpointStore>,
788
789    committee_store: Arc<CommitteeStore>,
790
791    /// Manages pending certificates and their missing input objects.
792    transaction_manager: Arc<TransactionManager>,
793
794    /// Shuts down the execution task. Used only in testing.
795    #[cfg_attr(not(test), expect(unused))]
796    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
797
798    pub metrics: Arc<AuthorityMetrics>,
799    _pruner: AuthorityStorePruner,
800    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
801
802    /// Take db checkpoints of different dbs
803    db_checkpoint_config: DBCheckpointConfig,
804
805    pub config: NodeConfig,
806
807    /// Current overload status in this authority. Updated periodically.
808    pub overload_info: AuthorityOverloadInfo,
809
810    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
811
812    /// The chain identifier is derived from the digest of the genesis
813    /// checkpoint.
814    chain_identifier: ChainIdentifier,
815
816    pub(crate) congestion_tracker: Arc<CongestionTracker>,
817}
818
819/// The authority state encapsulates all state, drives execution, and ensures
820/// safety.
821///
822/// Note the authority operations can be accessed through a read ref (&) and do
823/// not require &mut. Internally a database is synchronized through a mutex
824/// lock.
825///
826/// Repeating valid commands should produce no changes and return no error.
827impl AuthorityState {
828    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
829        epoch_store.committee().authority_exists(&self.name)
830    }
831
832    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
833        epoch_store
834            .active_validators()
835            .iter()
836            .any(|a| AuthorityName::from(a) == self.name)
837    }
838
839    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
840        !self.is_committee_validator(epoch_store)
841    }
842
843    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
844        &self.committee_store
845    }
846
847    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
848        self.committee_store.clone()
849    }
850
851    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
852        &self.config.authority_overload_config
853    }
854
855    pub fn get_epoch_state_commitments(
856        &self,
857        epoch: EpochId,
858    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
859        self.checkpoint_store.get_epoch_state_commitments(epoch)
860    }
861
862    /// This is a private method and should be kept that way. It doesn't check
863    /// whether the provided transaction is a system transaction, and hence
864    /// can only be called internally.
865    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
866    async fn handle_transaction_impl(
867        &self,
868        transaction: VerifiedTransaction,
869        epoch_store: &Arc<AuthorityPerEpochStore>,
870    ) -> IotaResult<VerifiedSignedTransaction> {
871        // Ensure that validator cannot reconfigure while we are signing the tx
872        let _execution_lock = self.execution_lock_for_signing();
873
874        let tx_digest = transaction.digest();
875        let tx_data = transaction.data().transaction_data();
876
877        let input_object_kinds = tx_data.input_objects()?;
878        let receiving_objects_refs = tx_data.receiving_objects();
879
880        // Note: the deny checks may do redundant package loads but:
881        // - they only load packages when there is an active package deny map
882        // - the loads are cached anyway
883        iota_transaction_checks::deny::check_transaction_for_signing(
884            tx_data,
885            transaction.tx_signatures(),
886            &input_object_kinds,
887            &receiving_objects_refs,
888            &self.config.transaction_deny_config,
889            self.get_backing_package_store().as_ref(),
890        )?;
891
892        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
893            Some(tx_digest),
894            &input_object_kinds,
895            &receiving_objects_refs,
896            epoch_store.epoch(),
897        )?;
898
899        let (_gas_status, checked_input_objects) =
900            iota_transaction_checks::check_transaction_input(
901                epoch_store.protocol_config(),
902                epoch_store.reference_gas_price(),
903                tx_data,
904                input_objects,
905                &receiving_objects,
906                &self.metrics.bytecode_verifier_metrics,
907                &self.config.verifier_signing_config,
908            )?;
909
910        check_coin_deny_list_v1_during_signing(
911            tx_data.sender(),
912            &checked_input_objects,
913            &receiving_objects,
914            &self.get_object_store(),
915        )?;
916
917        let owned_objects = checked_input_objects.inner().filter_owned_objects();
918
919        let signed_transaction = VerifiedSignedTransaction::new(
920            epoch_store.epoch(),
921            transaction,
922            self.name,
923            &*self.secret,
924        );
925
926        // Check and write locks, to signed transaction, into the database
927        // The call to self.set_transaction_lock checks the lock is not conflicting,
928        // and returns ConflictingTransaction error in case there is a lock on a
929        // different existing transaction.
930        self.get_cache_writer().try_acquire_transaction_locks(
931            epoch_store,
932            &owned_objects,
933            signed_transaction.clone(),
934        )?;
935
936        Ok(signed_transaction)
937    }
938
939    /// Initiate a new transaction.
940    #[instrument(name = "handle_transaction", level = "trace", skip_all)]
941    pub async fn handle_transaction(
942        &self,
943        epoch_store: &Arc<AuthorityPerEpochStore>,
944        transaction: VerifiedTransaction,
945    ) -> IotaResult<HandleTransactionResponse> {
946        let tx_digest = *transaction.digest();
947        debug!("handle_transaction");
948
949        // Ensure an idempotent answer.
950        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
951            return Ok(HandleTransactionResponse { status });
952        }
953
954        let _metrics_guard = self
955            .metrics
956            .authority_state_handle_transaction_latency
957            .start_timer();
958        self.metrics.tx_orders.inc();
959
960        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
961        match signed {
962            Ok(s) => {
963                if self.is_committee_validator(epoch_store) {
964                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
965                        let tx = s.clone();
966                        let validator_tx_finalizer = validator_tx_finalizer.clone();
967                        let cache_reader = self.get_transaction_cache_reader().clone();
968                        let epoch_store = epoch_store.clone();
969                        spawn_monitored_task!(epoch_store.within_alive_epoch(
970                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
971                        ));
972                    }
973                }
974                Ok(HandleTransactionResponse {
975                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
976                })
977            }
978            // It happens frequently that while we are checking the validity of the transaction, it
979            // has just been executed.
980            // In that case, we could still return Ok to avoid showing confusing errors.
981            Err(err) => Ok(HandleTransactionResponse {
982                status: self
983                    .get_transaction_status(&tx_digest, epoch_store)?
984                    .ok_or(err)?
985                    .1,
986            }),
987        }
988    }
989
990    pub fn check_system_overload_at_signing(&self) -> bool {
991        self.config
992            .authority_overload_config
993            .check_system_overload_at_signing
994    }
995
996    pub fn check_system_overload_at_execution(&self) -> bool {
997        self.config
998            .authority_overload_config
999            .check_system_overload_at_execution
1000    }
1001
1002    pub(crate) fn check_system_overload(
1003        &self,
1004        consensus_adapter: &Arc<ConsensusAdapter>,
1005        tx_data: &SenderSignedData,
1006        do_authority_overload_check: bool,
1007    ) -> IotaResult {
1008        if do_authority_overload_check {
1009            self.check_authority_overload(tx_data).tap_err(|_| {
1010                self.update_overload_metrics("execution_queue");
1011            })?;
1012        }
1013        self.transaction_manager
1014            .check_execution_overload(self.overload_config(), tx_data)
1015            .tap_err(|_| {
1016                self.update_overload_metrics("execution_pending");
1017            })?;
1018        consensus_adapter.check_consensus_overload().tap_err(|_| {
1019            self.update_overload_metrics("consensus");
1020        })?;
1021
1022        let pending_tx_count = self
1023            .get_cache_commit()
1024            .approximate_pending_transaction_count();
1025        if pending_tx_count
1026            > self
1027                .config
1028                .execution_cache_config
1029                .writeback_cache
1030                .backpressure_threshold_for_rpc()
1031        {
1032            return Err(IotaError::ValidatorOverloadedRetryAfter {
1033                retry_after_secs: 10,
1034            });
1035        }
1036
1037        Ok(())
1038    }
1039
1040    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1041        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1042            return Ok(());
1043        }
1044
1045        let load_shedding_percentage = self
1046            .overload_info
1047            .load_shedding_percentage
1048            .load(Ordering::Relaxed);
1049        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1050    }
1051
1052    fn update_overload_metrics(&self, source: &str) {
1053        self.metrics
1054            .transaction_overload_sources
1055            .with_label_values(&[source])
1056            .inc();
1057    }
1058
1059    /// Executes a certificate for its effects.
1060    #[instrument(level = "trace", skip_all)]
1061    pub async fn execute_certificate(
1062        &self,
1063        certificate: &VerifiedCertificate,
1064        epoch_store: &Arc<AuthorityPerEpochStore>,
1065    ) -> IotaResult<TransactionEffects> {
1066        let _metrics_guard = if certificate.contains_shared_object() {
1067            self.metrics
1068                .execute_certificate_latency_shared_object
1069                .start_timer()
1070        } else {
1071            self.metrics
1072                .execute_certificate_latency_single_writer
1073                .start_timer()
1074        };
1075        trace!("execute_certificate");
1076
1077        self.metrics.total_cert_attempts.inc();
1078
1079        if !certificate.contains_shared_object() {
1080            // Shared object transactions need to be sequenced by the consensus before
1081            // enqueueing for execution, done in
1082            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1083            // object transactions, they can be enqueued for execution immediately.
1084            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1085        }
1086
1087        // tx could be reverted when epoch ends, so we must be careful not to return a
1088        // result here after the epoch ends.
1089        epoch_store
1090            .within_alive_epoch(self.notify_read_effects(certificate))
1091            .await
1092            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1093            .and_then(|r| r)
1094    }
1095
1096    /// Internal logic to execute a certificate.
1097    ///
1098    /// Guarantees that
1099    /// - If input objects are available, return no permanent failure.
1100    /// - Execution and output commit are atomic. i.e. outputs are only written
1101    ///   to storage,
1102    /// on successful execution; crashed execution has no observable effect and
1103    /// can be retried.
1104    ///
1105    /// It is caller's responsibility to ensure input objects are available and
1106    /// locks are set. If this cannot be satisfied by the caller,
1107    /// execute_certificate() should be called instead.
1108    ///
1109    /// Should only be called within iota-core.
1110    #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1111    pub fn try_execute_immediately(
1112        &self,
1113        certificate: &VerifiedExecutableTransaction,
1114        expected_effects_digest: Option<TransactionEffectsDigest>,
1115        epoch_store: &Arc<AuthorityPerEpochStore>,
1116    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1117        let _scope = monitored_scope("Execution::try_execute_immediately");
1118        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1119
1120        let tx_digest = certificate.digest();
1121
1122        // Acquire a lock to prevent concurrent executions of the same transaction.
1123        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1124
1125        // The cert could have been processed by a concurrent attempt of the same cert,
1126        // so check if the effects have already been written.
1127        if let Some(effects) = self
1128            .get_transaction_cache_reader()
1129            .try_get_executed_effects(tx_digest)?
1130        {
1131            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1132                assert_eq!(
1133                    effects.digest(),
1134                    expected_effects_digest_inner,
1135                    "Unexpected effects digest for transaction {tx_digest:?}"
1136                );
1137            }
1138            tx_guard.release();
1139            return Ok((effects, None));
1140        }
1141        let input_objects =
1142            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1143
1144        // If no expected_effects_digest was provided, try to get it from storage.
1145        // We could be re-executing a previously executed but uncommitted transaction,
1146        // perhaps after restarting with a new binary. In this situation, if
1147        // we have published an effects signature, we must be sure not to
1148        // equivocate. TODO: read from cache instead of DB
1149        let expected_effects_digest =
1150            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1151
1152        self.process_certificate(
1153            tx_guard,
1154            certificate,
1155            input_objects,
1156            expected_effects_digest,
1157            epoch_store,
1158        )
1159        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1160        .tap_ok(
1161            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1162        )
1163    }
1164
1165    pub fn read_objects_for_execution(
1166        &self,
1167        tx_lock: &CertLockGuard,
1168        certificate: &VerifiedExecutableTransaction,
1169        epoch_store: &Arc<AuthorityPerEpochStore>,
1170    ) -> IotaResult<InputObjects> {
1171        let _scope = monitored_scope("Execution::load_input_objects");
1172        let _metrics_guard = self
1173            .metrics
1174            .execution_load_input_objects_latency
1175            .start_timer();
1176        let input_objects = &certificate.data().transaction_data().input_objects()?;
1177        self.input_loader.read_objects_for_execution(
1178            epoch_store,
1179            &certificate.key(),
1180            tx_lock,
1181            input_objects,
1182            epoch_store.epoch(),
1183        )
1184    }
1185
1186    /// Test only wrapper for `try_execute_immediately()` above, useful for
1187    /// checking errors if the pre-conditions are not satisfied, and
1188    /// executing change epoch transactions.
1189    pub fn try_execute_for_test(
1190        &self,
1191        certificate: &VerifiedCertificate,
1192    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1193        let epoch_store = self.epoch_store_for_testing();
1194        let (effects, execution_error_opt) = self.try_execute_immediately(
1195            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1196            None,
1197            &epoch_store,
1198        )?;
1199        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1200        Ok((signed_effects, execution_error_opt))
1201    }
1202
1203    /// Non-fallible version of `try_execute_for_test()`.
1204    pub fn execute_for_test(
1205        &self,
1206        certificate: &VerifiedCertificate,
1207    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1208        self.try_execute_for_test(certificate)
1209            .expect("try_execute_for_test should not fail")
1210    }
1211
1212    pub async fn notify_read_effects(
1213        &self,
1214        certificate: &VerifiedCertificate,
1215    ) -> IotaResult<TransactionEffects> {
1216        self.get_transaction_cache_reader()
1217            .try_notify_read_executed_effects(&[*certificate.digest()])
1218            .await
1219            .map(|mut r| r.pop().expect("must return correct number of effects"))
1220    }
1221
1222    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1223        self.get_object_cache_reader()
1224            .try_check_owned_objects_are_live(owned_object_refs)
1225    }
1226
1227    /// This function captures the required state to debug a forked transaction.
1228    /// The dump is written to a file in dir `path`, with name prefixed by the
1229    /// transaction digest. NOTE: Since this info escapes the validator
1230    /// context, make sure not to leak any private info here
1231    pub(crate) fn debug_dump_transaction_state(
1232        &self,
1233        tx_digest: &TransactionDigest,
1234        effects: &TransactionEffects,
1235        expected_effects_digest: TransactionEffectsDigest,
1236        inner_temporary_store: &InnerTemporaryStore,
1237        certificate: &VerifiedExecutableTransaction,
1238        debug_dump_config: &StateDebugDumpConfig,
1239    ) -> IotaResult<PathBuf> {
1240        let dump_dir = debug_dump_config
1241            .dump_file_directory
1242            .as_ref()
1243            .cloned()
1244            .unwrap_or(std::env::temp_dir());
1245        let epoch_store = self.load_epoch_store_one_call_per_task();
1246
1247        NodeStateDump::new(
1248            tx_digest,
1249            effects,
1250            expected_effects_digest,
1251            self.get_object_store().as_ref(),
1252            &epoch_store,
1253            inner_temporary_store,
1254            certificate,
1255        )?
1256        .write_to_file(&dump_dir)
1257        .map_err(|e| IotaError::FileIO(e.to_string()))
1258    }
1259
1260    #[instrument(level = "trace", skip_all)]
1261    pub(crate) fn process_certificate(
1262        &self,
1263        tx_guard: CertTxGuard,
1264        certificate: &VerifiedExecutableTransaction,
1265        input_objects: InputObjects,
1266        expected_effects_digest: Option<TransactionEffectsDigest>,
1267        epoch_store: &Arc<AuthorityPerEpochStore>,
1268    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1269        let process_certificate_start_time = tokio::time::Instant::now();
1270        let digest = *certificate.digest();
1271
1272        fail_point_if!("correlated-crash-process-certificate", || {
1273            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1274                iota_simulator::task::kill_current_node(None);
1275            }
1276        });
1277
1278        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1279        // Any caller that verifies the signatures on the certificate will have already
1280        // checked the epoch. But paths that don't verify sigs (e.g. execution
1281        // from checkpoint, reading from db) present the possibility of an epoch
1282        // mismatch. If this cert is not finalzied in previous epoch, then it's
1283        // invalid.
1284        let execution_guard = match execution_guard {
1285            Ok(execution_guard) => execution_guard,
1286            Err(err) => {
1287                tx_guard.release();
1288                return Err(err);
1289            }
1290        };
1291        // Since we obtain a reference to the epoch store before taking the execution
1292        // lock, it's possible that reconfiguration has happened and they no
1293        // longer match.
1294        if *execution_guard != epoch_store.epoch() {
1295            tx_guard.release();
1296            info!("The epoch of the execution_guard doesn't match the epoch store");
1297            return Err(IotaError::WrongEpoch {
1298                expected_epoch: epoch_store.epoch(),
1299                actual_epoch: *execution_guard,
1300            });
1301        }
1302
1303        // Errors originating from prepare_certificate may be transient (failure to read
1304        // locks) or non-transient (transaction input is invalid, move vm
1305        // errors). However, all errors from this function occur before we have
1306        // written anything to the db, so we commit the tx guard and rely on the
1307        // client to retry the tx (if it was transient).
1308        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1309            &execution_guard,
1310            certificate,
1311            input_objects,
1312            epoch_store,
1313        ) {
1314            Err(e) => {
1315                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1316                tx_guard.release();
1317                return Err(e);
1318            }
1319            Ok(res) => res,
1320        };
1321
1322        if let Some(expected_effects_digest) = expected_effects_digest {
1323            if effects.digest() != expected_effects_digest {
1324                // We dont want to mask the original error, so we log it and continue.
1325                match self.debug_dump_transaction_state(
1326                    &digest,
1327                    &effects,
1328                    expected_effects_digest,
1329                    &inner_temporary_store,
1330                    certificate,
1331                    &self.config.state_debug_dump_config,
1332                ) {
1333                    Ok(out_path) => {
1334                        info!(
1335                            "Dumped node state for transaction {} to {}",
1336                            digest,
1337                            out_path.as_path().display().to_string()
1338                        );
1339                    }
1340                    Err(e) => {
1341                        error!("Error dumping state for transaction {}: {e}", digest);
1342                    }
1343                }
1344                error!(
1345                    tx_digest = ?digest,
1346                    ?expected_effects_digest,
1347                    actual_effects = ?effects,
1348                    "fork detected!"
1349                );
1350                panic!(
1351                    "Transaction {} is expected to have effects digest {}, but got {}!",
1352                    digest,
1353                    expected_effects_digest,
1354                    effects.digest(),
1355                );
1356            }
1357        }
1358
1359        fail_point!("crash");
1360
1361        self.commit_certificate(
1362            certificate,
1363            inner_temporary_store,
1364            &effects,
1365            tx_guard,
1366            execution_guard,
1367            epoch_store,
1368        )?;
1369
1370        if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1371            certificate.data().transaction_data().kind()
1372        {
1373            if let Some(err) = &execution_error_opt {
1374                debug_fatal!("Authenticator state update failed: {:?}", err);
1375            }
1376            epoch_store.update_authenticator_state(auth_state);
1377
1378            // double check that the signature verifier always matches the authenticator
1379            // state
1380            if cfg!(debug_assertions) {
1381                let authenticator_state = get_authenticator_state(self.get_object_store())
1382                    .expect("Read cannot fail")
1383                    .expect("Authenticator state must exist");
1384
1385                let mut sys_jwks: Vec<_> = authenticator_state
1386                    .active_jwks
1387                    .into_iter()
1388                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1389                    .collect();
1390                let mut active_jwks: Vec<_> = epoch_store
1391                    .signature_verifier
1392                    .get_jwks()
1393                    .into_iter()
1394                    .collect();
1395                sys_jwks.sort();
1396                active_jwks.sort();
1397
1398                assert_eq!(sys_jwks, active_jwks);
1399            }
1400        }
1401
1402        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1403        if elapsed > 0.0 {
1404            self.metrics
1405                .execution_gas_latency_ratio
1406                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1407        };
1408        Ok((effects, execution_error_opt))
1409    }
1410
1411    #[instrument(level = "trace", skip_all)]
1412    fn commit_certificate(
1413        &self,
1414        certificate: &VerifiedExecutableTransaction,
1415        inner_temporary_store: InnerTemporaryStore,
1416        effects: &TransactionEffects,
1417        tx_guard: CertTxGuard,
1418        _execution_guard: ExecutionLockReadGuard<'_>,
1419        epoch_store: &Arc<AuthorityPerEpochStore>,
1420    ) -> IotaResult {
1421        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1422            monitored_scope("Execution::commit_certificate");
1423        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1424
1425        let tx_key = certificate.key();
1426        let tx_digest = certificate.digest();
1427        let input_object_count = inner_temporary_store.input_objects.len();
1428        let shared_object_count = effects.input_shared_objects().len();
1429
1430        let output_keys = inner_temporary_store.get_output_keys(effects);
1431
1432        // index certificate
1433        let _ = self
1434            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1435            .tap_err(|e| {
1436                self.metrics.post_processing_total_failures.inc();
1437                error!(?tx_digest, "tx post processing failed: {e}");
1438            });
1439
1440        // The insertion to epoch_store is not atomic with the insertion to the
1441        // perpetual store. This is OK because we insert to the epoch store
1442        // first. And during lookups we always look up in the perpetual store first.
1443        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1444
1445        // Allow testing what happens if we crash here.
1446        fail_point!("crash");
1447
1448        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1449            certificate.clone().into_unsigned(),
1450            effects.clone(),
1451            inner_temporary_store,
1452        );
1453        self.get_cache_writer()
1454            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1455
1456        if certificate.transaction_data().is_end_of_epoch_tx() {
1457            // At the end of epoch, since system packages may have been upgraded, force
1458            // reload them in the cache.
1459            self.get_object_cache_reader()
1460                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1461        }
1462
1463        // commit_certificate finished, the tx is fully committed to the store.
1464        tx_guard.commit_tx();
1465
1466        // Notifies transaction manager about transaction and output objects committed.
1467        // This provides necessary information to transaction manager to start executing
1468        // additional ready transactions.
1469        self.transaction_manager
1470            .notify_commit(tx_digest, output_keys, epoch_store);
1471
1472        self.update_metrics(certificate, input_object_count, shared_object_count);
1473
1474        Ok(())
1475    }
1476
1477    fn update_metrics(
1478        &self,
1479        certificate: &VerifiedExecutableTransaction,
1480        input_object_count: usize,
1481        shared_object_count: usize,
1482    ) {
1483        // count signature by scheme, for zklogin and multisig
1484        if certificate.has_zklogin_sig() {
1485            self.metrics.zklogin_sig_count.inc();
1486        } else if certificate.has_upgraded_multisig() {
1487            self.metrics.multisig_sig_count.inc();
1488        }
1489
1490        self.metrics.total_effects.inc();
1491        self.metrics.total_certs.inc();
1492
1493        if shared_object_count > 0 {
1494            self.metrics.shared_obj_tx.inc();
1495        }
1496
1497        if certificate.is_sponsored_tx() {
1498            self.metrics.sponsored_tx.inc();
1499        }
1500
1501        self.metrics
1502            .num_input_objs
1503            .observe(input_object_count as f64);
1504        self.metrics
1505            .num_shared_objects
1506            .observe(shared_object_count as f64);
1507        self.metrics.batch_size.observe(
1508            certificate
1509                .data()
1510                .intent_message()
1511                .value
1512                .kind()
1513                .num_commands() as f64,
1514        );
1515    }
1516
1517    /// prepare_certificate validates the transaction input, and executes the
1518    /// certificate, returning effects, output objects, events, etc.
1519    ///
1520    /// It reads state from the db (both owned and shared locks), but it has no
1521    /// side effects.
1522    ///
1523    /// It can be generally understood that a failure of prepare_certificate
1524    /// indicates a non-transient error, e.g. the transaction input is
1525    /// somehow invalid, the correct locks are not held, etc. However, this
1526    /// is not entirely true, as a transient db read error may also cause
1527    /// this function to fail.
1528    #[instrument(level = "trace", skip_all)]
1529    fn prepare_certificate(
1530        &self,
1531        _execution_guard: &ExecutionLockReadGuard<'_>,
1532        certificate: &VerifiedExecutableTransaction,
1533        input_objects: InputObjects,
1534        epoch_store: &Arc<AuthorityPerEpochStore>,
1535    ) -> IotaResult<(
1536        InnerTemporaryStore,
1537        TransactionEffects,
1538        Option<ExecutionError>,
1539    )> {
1540        let _scope = monitored_scope("Execution::prepare_certificate");
1541        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1542        let prepare_certificate_start_time = tokio::time::Instant::now();
1543
1544        // TODO: We need to move this to a more appropriate place to avoid redundant
1545        // checks.
1546        let tx_data = certificate.data().transaction_data();
1547        tx_data.validity_check(epoch_store.protocol_config())?;
1548
1549        // The cost of partially re-auditing a transaction before execution is
1550        // tolerated.
1551        let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
1552            certificate,
1553            input_objects,
1554            epoch_store.protocol_config(),
1555            epoch_store.reference_gas_price(),
1556        )?;
1557
1558        let owned_object_refs = input_objects.inner().filter_owned_objects();
1559        self.check_owned_locks(&owned_object_refs)?;
1560        let tx_digest = *certificate.digest();
1561        let protocol_config = epoch_store.protocol_config();
1562        let transaction_data = &certificate.data().intent_message().value;
1563        let (kind, signer, gas) = transaction_data.execution_parts();
1564
1565        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1566        let (inner_temp_store, _, mut effects, execution_error_opt) =
1567            epoch_store.executor().execute_transaction_to_effects(
1568                self.get_backing_store().as_ref(),
1569                protocol_config,
1570                self.metrics.limits_metrics.clone(),
1571                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1572                // cyclic dependency w/ iota-adapter
1573                self.config
1574                    .expensive_safety_check_config
1575                    .enable_deep_per_tx_iota_conservation_check(),
1576                self.config.certificate_deny_config.certificate_deny_set(),
1577                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1578                epoch_store
1579                    .epoch_start_config()
1580                    .epoch_data()
1581                    .epoch_start_timestamp(),
1582                input_objects,
1583                gas,
1584                gas_status,
1585                kind,
1586                signer,
1587                tx_digest,
1588                &mut None,
1589            );
1590
1591        fail_point_if!("cp_execution_nondeterminism", || {
1592            #[cfg(msim)]
1593            self.create_fail_state(certificate, epoch_store, &mut effects);
1594        });
1595
1596        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1597        if elapsed > 0.0 {
1598            self.metrics
1599                .prepare_cert_gas_latency_ratio
1600                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1601        }
1602
1603        Ok((inner_temp_store, effects, execution_error_opt.err()))
1604    }
1605
1606    pub fn prepare_certificate_for_benchmark(
1607        &self,
1608        certificate: &VerifiedExecutableTransaction,
1609        input_objects: InputObjects,
1610        epoch_store: &Arc<AuthorityPerEpochStore>,
1611    ) -> IotaResult<(
1612        InnerTemporaryStore,
1613        TransactionEffects,
1614        Option<ExecutionError>,
1615    )> {
1616        let lock = RwLock::new(epoch_store.epoch());
1617        let execution_guard = lock.try_read().unwrap();
1618
1619        self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
1620    }
1621
1622    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1623    /// `VmChecks::Enabled` instead.
1624    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1625    #[allow(clippy::type_complexity)]
1626    pub fn dry_exec_transaction(
1627        &self,
1628        transaction: TransactionData,
1629        transaction_digest: TransactionDigest,
1630    ) -> IotaResult<(
1631        DryRunTransactionBlockResponse,
1632        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1633        TransactionEffects,
1634        Option<ObjectID>,
1635    )> {
1636        let epoch_store = self.load_epoch_store_one_call_per_task();
1637        if !self.is_fullnode(&epoch_store) {
1638            return Err(IotaError::UnsupportedFeature {
1639                error: "dry-exec is only supported on fullnodes".to_string(),
1640            });
1641        }
1642
1643        if transaction.kind().is_system_tx() {
1644            return Err(IotaError::UnsupportedFeature {
1645                error: "dry-exec does not support system transactions".to_string(),
1646            });
1647        }
1648
1649        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1650    }
1651
1652    #[allow(clippy::type_complexity)]
1653    pub fn dry_exec_transaction_for_benchmark(
1654        &self,
1655        transaction: TransactionData,
1656        transaction_digest: TransactionDigest,
1657    ) -> IotaResult<(
1658        DryRunTransactionBlockResponse,
1659        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1660        TransactionEffects,
1661        Option<ObjectID>,
1662    )> {
1663        let epoch_store = self.load_epoch_store_one_call_per_task();
1664        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1665    }
1666
1667    #[instrument(level = "trace", skip_all)]
1668    #[allow(clippy::type_complexity)]
1669    fn dry_exec_transaction_impl(
1670        &self,
1671        epoch_store: &AuthorityPerEpochStore,
1672        transaction: TransactionData,
1673        transaction_digest: TransactionDigest,
1674    ) -> IotaResult<(
1675        DryRunTransactionBlockResponse,
1676        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1677        TransactionEffects,
1678        Option<ObjectID>,
1679    )> {
1680        // Cheap validity checks for a transaction, including input size limits.
1681        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1682
1683        let input_object_kinds = transaction.input_objects()?;
1684        let receiving_object_refs = transaction.receiving_objects();
1685
1686        iota_transaction_checks::deny::check_transaction_for_signing(
1687            &transaction,
1688            &[],
1689            &input_object_kinds,
1690            &receiving_object_refs,
1691            &self.config.transaction_deny_config,
1692            self.get_backing_package_store().as_ref(),
1693        )?;
1694
1695        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1696            // We don't want to cache this transaction since it's a dry run.
1697            None,
1698            &input_object_kinds,
1699            &receiving_object_refs,
1700            epoch_store.epoch(),
1701        )?;
1702
1703        // make a gas object if one was not provided
1704        let mut transaction = transaction;
1705        let mut gas_object_refs = transaction.gas().to_vec();
1706        let reference_gas_price = epoch_store.reference_gas_price();
1707        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1708            let sender = transaction.gas_owner();
1709            let gas_object_id = ObjectID::random();
1710            let gas_object = Object::new_move(
1711                MoveObject::new_gas_coin(
1712                    OBJECT_START_VERSION,
1713                    gas_object_id,
1714                    SIMULATION_GAS_COIN_VALUE,
1715                ),
1716                Owner::AddressOwner(sender),
1717                TransactionDigest::genesis_marker(),
1718            );
1719            let gas_object_ref = gas_object.compute_object_reference();
1720            gas_object_refs = vec![gas_object_ref];
1721            // Add gas object to transaction gas payment
1722            transaction.gas_data_mut().payment = gas_object_refs.clone();
1723            (
1724                iota_transaction_checks::check_transaction_input_with_given_gas(
1725                    epoch_store.protocol_config(),
1726                    reference_gas_price,
1727                    &transaction,
1728                    input_objects,
1729                    receiving_objects,
1730                    gas_object,
1731                    &self.metrics.bytecode_verifier_metrics,
1732                    &self.config.verifier_signing_config,
1733                )?,
1734                Some(gas_object_id),
1735            )
1736        } else {
1737            (
1738                iota_transaction_checks::check_transaction_input(
1739                    epoch_store.protocol_config(),
1740                    reference_gas_price,
1741                    &transaction,
1742                    input_objects,
1743                    &receiving_objects,
1744                    &self.metrics.bytecode_verifier_metrics,
1745                    &self.config.verifier_signing_config,
1746                )?,
1747                None,
1748            )
1749        };
1750
1751        let protocol_config = epoch_store.protocol_config();
1752        let (kind, signer, _) = transaction.execution_parts();
1753
1754        let silent = true;
1755        let executor = iota_execution::executor(protocol_config, silent, None)
1756            .expect("Creating an executor should not fail here");
1757
1758        let expensive_checks = false;
1759        let (inner_temp_store, _, effects, _execution_error) = executor
1760            .execute_transaction_to_effects(
1761                self.get_backing_store().as_ref(),
1762                protocol_config,
1763                self.metrics.limits_metrics.clone(),
1764                expensive_checks,
1765                self.config.certificate_deny_config.certificate_deny_set(),
1766                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1767                epoch_store
1768                    .epoch_start_config()
1769                    .epoch_data()
1770                    .epoch_start_timestamp(),
1771                checked_input_objects,
1772                gas_object_refs,
1773                gas_status,
1774                kind,
1775                signer,
1776                transaction_digest,
1777                &mut None,
1778            );
1779        let tx_digest = *effects.transaction_digest();
1780
1781        let module_cache =
1782            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1783
1784        let mut layout_resolver =
1785            epoch_store
1786                .executor()
1787                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1788                    &inner_temp_store,
1789                    self.get_backing_package_store(),
1790                )));
1791        // Returning empty vector here because we recalculate changes in the rpc layer.
1792        let object_changes = Vec::new();
1793
1794        // Returning empty vector here because we recalculate changes in the rpc layer.
1795        let balance_changes = Vec::new();
1796
1797        let written_with_kind = effects
1798            .created()
1799            .into_iter()
1800            .map(|(oref, _)| (oref, WriteKind::Create))
1801            .chain(
1802                effects
1803                    .unwrapped()
1804                    .into_iter()
1805                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1806            )
1807            .chain(
1808                effects
1809                    .mutated()
1810                    .into_iter()
1811                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
1812            )
1813            .map(|(oref, kind)| {
1814                let obj = inner_temp_store.written.get(&oref.0).unwrap();
1815                // TODO: Avoid clones.
1816                (oref.0, (oref, obj.clone(), kind))
1817            })
1818            .collect();
1819
1820        Ok((
1821            DryRunTransactionBlockResponse {
1822                // to avoid cloning `transaction`, fields are populated in this order
1823                suggested_gas_price: self
1824                    .congestion_tracker
1825                    .get_prediction_suggested_gas_price(&transaction),
1826                input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
1827                    .map_err(|e| IotaError::TransactionSerialization {
1828                        error: format!(
1829                            "Failed to convert transaction to IotaTransactionBlockData: {e}",
1830                        ),
1831                    })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
1832                effects: effects.clone().try_into()?,
1833                events: IotaTransactionBlockEvents::try_from(
1834                    inner_temp_store.events.clone(),
1835                    tx_digest,
1836                    None,
1837                    layout_resolver.as_mut(),
1838                )?,
1839                object_changes,
1840                balance_changes,
1841            },
1842            written_with_kind,
1843            effects,
1844            mock_gas,
1845        ))
1846    }
1847
1848    pub fn simulate_transaction(
1849        &self,
1850        mut transaction: TransactionData,
1851        checks: VmChecks,
1852    ) -> IotaResult<SimulateTransactionResult> {
1853        if transaction.kind().is_system_tx() {
1854            return Err(IotaError::UnsupportedFeature {
1855                error: "simulate does not support system transactions".to_string(),
1856            });
1857        }
1858
1859        let epoch_store = self.load_epoch_store_one_call_per_task();
1860        if !self.is_fullnode(&epoch_store) {
1861            return Err(IotaError::UnsupportedFeature {
1862                error: "simulate is only supported on fullnodes".to_string(),
1863            });
1864        }
1865
1866        // Cheap validity checks for a transaction, including input size limits.
1867        // This does not check if gas objects are missing since we may create a
1868        // mock gas object. It checks for other transaction input validity.
1869        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1870
1871        let input_object_kinds = transaction.input_objects()?;
1872        let receiving_object_refs = transaction.receiving_objects();
1873
1874        // Since we need to simulate a validator signing the transaction, the first step
1875        // is to check if some transaction elements are denied.
1876        iota_transaction_checks::deny::check_transaction_for_signing(
1877            &transaction,
1878            &[],
1879            &input_object_kinds,
1880            &receiving_object_refs,
1881            &self.config.transaction_deny_config,
1882            self.get_backing_package_store().as_ref(),
1883        )?;
1884
1885        // Load input and receiving objects
1886        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1887            // We don't want to cache this transaction since it's a simulation.
1888            None,
1889            &input_object_kinds,
1890            &receiving_object_refs,
1891            epoch_store.epoch(),
1892        )?;
1893
1894        // Create a mock gas object if one was not provided
1895        let mock_gas_id = if transaction.gas().is_empty() {
1896            let mock_gas_object = Object::new_move(
1897                MoveObject::new_gas_coin(
1898                    OBJECT_START_VERSION,
1899                    ObjectID::MAX,
1900                    SIMULATION_GAS_COIN_VALUE,
1901                ),
1902                Owner::AddressOwner(transaction.gas_data().owner),
1903                TransactionDigest::genesis_marker(),
1904            );
1905            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
1906            transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
1907            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
1908            Some(mock_gas_object.id())
1909        } else {
1910            None
1911        };
1912
1913        let protocol_config = epoch_store.protocol_config();
1914
1915        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
1916        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
1917        let (gas_status, checked_input_objects) = if checks.enabled() {
1918            iota_transaction_checks::check_transaction_input(
1919                protocol_config,
1920                epoch_store.reference_gas_price(),
1921                &transaction,
1922                input_objects,
1923                &receiving_objects,
1924                &self.metrics.bytecode_verifier_metrics,
1925                &self.config.verifier_signing_config,
1926            )?
1927        } else {
1928            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
1929                protocol_config,
1930                transaction.kind(),
1931                input_objects,
1932                receiving_objects,
1933            )?;
1934            let gas_status = IotaGasStatus::new(
1935                transaction.gas_budget(),
1936                transaction.gas_price(),
1937                epoch_store.reference_gas_price(),
1938                protocol_config,
1939            )?;
1940
1941            (gas_status, checked_input_objects)
1942        };
1943
1944        // Create a new executor for the simulation
1945        let executor = iota_execution::executor(
1946            protocol_config,
1947            true, // silent
1948            None,
1949        )
1950        .expect("Creating an executor should not fail here");
1951
1952        // Execute the simulation
1953        let (kind, signer, gas_coins) = transaction.execution_parts();
1954        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
1955            self.get_backing_store().as_ref(),
1956            protocol_config,
1957            self.metrics.limits_metrics.clone(),
1958            false, // expensive_checks
1959            self.config.certificate_deny_config.certificate_deny_set(),
1960            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1961            epoch_store
1962                .epoch_start_config()
1963                .epoch_data()
1964                .epoch_start_timestamp(),
1965            checked_input_objects,
1966            gas_coins,
1967            gas_status,
1968            kind,
1969            signer,
1970            transaction.digest(),
1971            checks.disabled(),
1972        );
1973
1974        // In the case of a dev inspect, the execution_result could be filled with some
1975        // values. Else, execution_result is empty in the case of a dry run.
1976        Ok(SimulateTransactionResult {
1977            input_objects: inner_temp_store.input_objects,
1978            output_objects: inner_temp_store.written,
1979            events: effects.events_digest().map(|_| inner_temp_store.events),
1980            effects,
1981            execution_result,
1982            mock_gas_id,
1983        })
1984    }
1985
1986    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1987    /// `VmChecks::DISABLED` instead.
1988    /// The object ID for gas can be any
1989    /// object ID, even for an uncreated object
1990    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
1991    pub async fn dev_inspect_transaction_block(
1992        &self,
1993        sender: IotaAddress,
1994        transaction_kind: TransactionKind,
1995        gas_price: Option<u64>,
1996        gas_budget: Option<u64>,
1997        gas_sponsor: Option<IotaAddress>,
1998        gas_objects: Option<Vec<ObjectRef>>,
1999        show_raw_txn_data_and_effects: Option<bool>,
2000        skip_checks: Option<bool>,
2001    ) -> IotaResult<DevInspectResults> {
2002        let epoch_store = self.load_epoch_store_one_call_per_task();
2003
2004        if !self.is_fullnode(&epoch_store) {
2005            return Err(IotaError::UnsupportedFeature {
2006                error: "dev-inspect is only supported on fullnodes".to_string(),
2007            });
2008        }
2009
2010        if transaction_kind.is_system_tx() {
2011            return Err(IotaError::UnsupportedFeature {
2012                error: "system transactions are not supported".to_string(),
2013            });
2014        }
2015
2016        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2017        let skip_checks = skip_checks.unwrap_or(true);
2018        let reference_gas_price = epoch_store.reference_gas_price();
2019        let protocol_config = epoch_store.protocol_config();
2020        let max_tx_gas = protocol_config.max_tx_gas();
2021
2022        let price = gas_price.unwrap_or(reference_gas_price);
2023        let budget = gas_budget.unwrap_or(max_tx_gas);
2024        let owner = gas_sponsor.unwrap_or(sender);
2025        // Payment might be empty here, but it's fine we'll have to deal with it later
2026        // after reading all the input objects.
2027        let payment = gas_objects.unwrap_or_default();
2028        let transaction = TransactionData::V1(TransactionDataV1 {
2029            kind: transaction_kind.clone(),
2030            sender,
2031            gas_data: GasData {
2032                payment,
2033                owner,
2034                price,
2035                budget,
2036            },
2037            expiration: TransactionExpiration::None,
2038        });
2039
2040        let raw_txn_data = if show_raw_txn_data_and_effects {
2041            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2042                error: "Failed to serialize transaction during dev inspect".to_string(),
2043            })?
2044        } else {
2045            vec![]
2046        };
2047
2048        transaction.validity_check_no_gas_check(protocol_config)?;
2049
2050        let input_object_kinds = transaction.input_objects()?;
2051        let receiving_object_refs = transaction.receiving_objects();
2052
2053        iota_transaction_checks::deny::check_transaction_for_signing(
2054            &transaction,
2055            &[],
2056            &input_object_kinds,
2057            &receiving_object_refs,
2058            &self.config.transaction_deny_config,
2059            self.get_backing_package_store().as_ref(),
2060        )?;
2061
2062        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2063            // We don't want to cache this transaction since it's a dev inspect.
2064            None,
2065            &input_object_kinds,
2066            &receiving_object_refs,
2067            epoch_store.epoch(),
2068        )?;
2069
2070        // Create and use a dummy gas object if there is no gas object provided.
2071        let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2072            SIMULATION_GAS_COIN_VALUE,
2073            transaction.gas_owner(),
2074        );
2075
2076        let gas_objects = if transaction.gas().is_empty() {
2077            let gas_object_ref = dummy_gas_object.compute_object_reference();
2078            vec![gas_object_ref]
2079        } else {
2080            transaction.gas().to_vec()
2081        };
2082
2083        let (gas_status, checked_input_objects) = if skip_checks {
2084            // If we are skipping checks, then we call the check_dev_inspect_input function
2085            // which will perform only lightweight checks on the transaction
2086            // input. And if the gas field is empty, that means we will
2087            // use the dummy gas object so we need to add it to the input objects vector.
2088            if transaction.gas().is_empty() {
2089                input_objects.push(ObjectReadResult::new(
2090                    InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2091                    dummy_gas_object.into(),
2092                ));
2093            }
2094            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2095                protocol_config,
2096                &transaction_kind,
2097                input_objects,
2098                receiving_objects,
2099            )?;
2100            let gas_status = IotaGasStatus::new(
2101                max_tx_gas,
2102                transaction.gas_price(),
2103                reference_gas_price,
2104                protocol_config,
2105            )?;
2106
2107            (gas_status, checked_input_objects)
2108        } else {
2109            // If we are not skipping checks, then we call the check_transaction_input
2110            // function and its dummy gas variant which will perform full
2111            // fledged checks just like a real transaction execution.
2112            if transaction.gas().is_empty() {
2113                iota_transaction_checks::check_transaction_input_with_given_gas(
2114                    epoch_store.protocol_config(),
2115                    reference_gas_price,
2116                    &transaction,
2117                    input_objects,
2118                    receiving_objects,
2119                    dummy_gas_object,
2120                    &self.metrics.bytecode_verifier_metrics,
2121                    &self.config.verifier_signing_config,
2122                )?
2123            } else {
2124                iota_transaction_checks::check_transaction_input(
2125                    epoch_store.protocol_config(),
2126                    reference_gas_price,
2127                    &transaction,
2128                    input_objects,
2129                    &receiving_objects,
2130                    &self.metrics.bytecode_verifier_metrics,
2131                    &self.config.verifier_signing_config,
2132                )?
2133            }
2134        };
2135
2136        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2137            .expect("Creating an executor should not fail here");
2138        let intent_msg = IntentMessage::new(
2139            Intent {
2140                version: IntentVersion::V0,
2141                scope: IntentScope::TransactionData,
2142                app_id: AppId::Iota,
2143            },
2144            transaction,
2145        );
2146        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2147        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2148            self.get_backing_store().as_ref(),
2149            protocol_config,
2150            self.metrics.limits_metrics.clone(),
2151            // expensive checks
2152            false,
2153            self.config.certificate_deny_config.certificate_deny_set(),
2154            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2155            epoch_store
2156                .epoch_start_config()
2157                .epoch_data()
2158                .epoch_start_timestamp(),
2159            checked_input_objects,
2160            gas_objects,
2161            gas_status,
2162            transaction_kind,
2163            sender,
2164            transaction_digest,
2165            skip_checks,
2166        );
2167
2168        let raw_effects = if show_raw_txn_data_and_effects {
2169            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2170                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2171            })?
2172        } else {
2173            vec![]
2174        };
2175
2176        let mut layout_resolver =
2177            epoch_store
2178                .executor()
2179                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2180                    &inner_temp_store,
2181                    self.get_backing_package_store(),
2182                )));
2183
2184        DevInspectResults::new(
2185            effects,
2186            inner_temp_store.events.clone(),
2187            execution_result,
2188            raw_txn_data,
2189            raw_effects,
2190            layout_resolver.as_mut(),
2191        )
2192    }
2193
2194    // Only used for testing because of how epoch store is loaded.
2195    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2196        let epoch_store = self.epoch_store_for_testing();
2197        Ok(epoch_store.reference_gas_price())
2198    }
2199
2200    #[instrument(level = "trace", skip_all)]
2201    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2202        self.get_transaction_cache_reader()
2203            .try_is_tx_already_executed(digest)
2204    }
2205
2206    /// Non-fallible version of `try_is_tx_already_executed`.
2207    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2208        self.try_is_tx_already_executed(digest)
2209            .expect("storage access failed")
2210    }
2211
2212    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2213    #[instrument(level = "debug", skip_all, err)]
2214    fn index_tx(
2215        &self,
2216        indexes: &IndexStore,
2217        digest: &TransactionDigest,
2218        // TODO: index_tx really just need the transaction data here.
2219        cert: &VerifiedExecutableTransaction,
2220        effects: &TransactionEffects,
2221        events: &TransactionEvents,
2222        timestamp_ms: u64,
2223        tx_coins: Option<TxCoins>,
2224        written: &WrittenObjects,
2225        inner_temporary_store: &InnerTemporaryStore,
2226    ) -> IotaResult<u64> {
2227        let changes = self
2228            .process_object_index(effects, written, inner_temporary_store)
2229            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2230
2231        indexes.index_tx(
2232            cert.data().intent_message().value.sender(),
2233            cert.data()
2234                .intent_message()
2235                .value
2236                .input_objects()?
2237                .iter()
2238                .map(|o| o.object_id()),
2239            effects
2240                .all_changed_objects()
2241                .into_iter()
2242                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2243            cert.data()
2244                .intent_message()
2245                .value
2246                .move_calls()
2247                .into_iter()
2248                .map(|(package, module, function)| {
2249                    (*package, module.to_owned(), function.to_owned())
2250                }),
2251            events,
2252            changes,
2253            digest,
2254            timestamp_ms,
2255            tx_coins,
2256        )
2257    }
2258
2259    #[cfg(msim)]
2260    fn create_fail_state(
2261        &self,
2262        certificate: &VerifiedExecutableTransaction,
2263        epoch_store: &Arc<AuthorityPerEpochStore>,
2264        effects: &mut TransactionEffects,
2265    ) {
2266        use std::cell::RefCell;
2267        thread_local! {
2268            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2269        }
2270        if !certificate.data().intent_message().value.is_system_tx() {
2271            let committee = epoch_store.committee();
2272            let cur_stake = (**committee).weight(&self.name);
2273            if cur_stake > 0 {
2274                FAIL_STATE.with_borrow_mut(|fail_state| {
2275                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2276                    if fail_state.0 < committee.validity_threshold() {
2277                        fail_state.0 += cur_stake;
2278                        fail_state.1.insert(self.name);
2279                    }
2280
2281                    if fail_state.1.contains(&self.name) {
2282                        info!("cp_exec failing tx");
2283                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2284                    }
2285                });
2286            }
2287        }
2288    }
2289
2290    fn process_object_index(
2291        &self,
2292        effects: &TransactionEffects,
2293        written: &WrittenObjects,
2294        inner_temporary_store: &InnerTemporaryStore,
2295    ) -> IotaResult<ObjectIndexChanges> {
2296        let epoch_store = self.load_epoch_store_one_call_per_task();
2297        let mut layout_resolver =
2298            epoch_store
2299                .executor()
2300                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2301                    inner_temporary_store,
2302                    self.get_backing_package_store(),
2303                )));
2304
2305        let modified_at_version = effects
2306            .modified_at_versions()
2307            .into_iter()
2308            .collect::<HashMap<_, _>>();
2309
2310        let tx_digest = effects.transaction_digest();
2311        let mut deleted_owners = vec![];
2312        let mut deleted_dynamic_fields = vec![];
2313        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2314            let old_version = modified_at_version.get(&id).unwrap();
2315            // When we process the index, the latest object hasn't been written yet so
2316            // the old object must be present.
2317            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2318                |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2319            ) {
2320                Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2321                Owner::ObjectOwner(object_id) => {
2322                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2323                }
2324                _ => {}
2325            }
2326        }
2327
2328        let mut new_owners = vec![];
2329        let mut new_dynamic_fields = vec![];
2330
2331        for (oref, owner, kind) in effects.all_changed_objects() {
2332            let id = &oref.0;
2333            // For mutated objects, retrieve old owner and delete old index if there is a
2334            // owner change.
2335            if let WriteKind::Mutate = kind {
2336                let Some(old_version) = modified_at_version.get(id) else {
2337                    panic!(
2338                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2339                    );
2340                };
2341                // When we process the index, the latest object hasn't been written yet so
2342                // the old object must be present.
2343                let Some(old_object) = self
2344                    .get_object_store()
2345                    .try_get_object_by_key(id, *old_version)?
2346                else {
2347                    panic!(
2348                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2349                    );
2350                };
2351                if old_object.owner != owner {
2352                    match old_object.owner {
2353                        Owner::AddressOwner(addr) => {
2354                            deleted_owners.push((addr, *id));
2355                        }
2356                        Owner::ObjectOwner(object_id) => {
2357                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2358                        }
2359                        _ => {}
2360                    }
2361                }
2362            }
2363
2364            match owner {
2365                Owner::AddressOwner(addr) => {
2366                    // TODO: We can remove the object fetching after we added ObjectType to
2367                    // TransactionEffects
2368                    let new_object = written.get(id).unwrap_or_else(
2369                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2370                    );
2371                    assert_eq!(
2372                        new_object.version(),
2373                        oref.1,
2374                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2375                        tx_digest,
2376                        id,
2377                        new_object.version(),
2378                        oref.1
2379                    );
2380
2381                    let type_ = new_object
2382                        .type_()
2383                        .map(|type_| ObjectType::Struct(type_.clone()))
2384                        .unwrap_or(ObjectType::Package);
2385
2386                    new_owners.push((
2387                        (addr, *id),
2388                        ObjectInfo {
2389                            object_id: *id,
2390                            version: oref.1,
2391                            digest: oref.2,
2392                            type_,
2393                            owner,
2394                            previous_transaction: *effects.transaction_digest(),
2395                        },
2396                    ));
2397                }
2398                Owner::ObjectOwner(owner) => {
2399                    let new_object = written.get(id).unwrap_or_else(
2400                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2401                    );
2402                    assert_eq!(
2403                        new_object.version(),
2404                        oref.1,
2405                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2406                        tx_digest,
2407                        id,
2408                        new_object.version(),
2409                        oref.1
2410                    );
2411
2412                    let Some(df_info) = self
2413                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut(), false)
2414                        .unwrap_or_else(|e| {
2415                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2416                            None
2417                        }
2418                    )
2419                        else {
2420                            // Skip indexing for non dynamic field objects.
2421                            continue;
2422                        };
2423                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2424                }
2425                _ => {}
2426            }
2427        }
2428
2429        Ok(ObjectIndexChanges {
2430            deleted_owners,
2431            deleted_dynamic_fields,
2432            new_owners,
2433            new_dynamic_fields,
2434        })
2435    }
2436
2437    fn try_create_dynamic_field_info(
2438        &self,
2439        o: &Object,
2440        written: &WrittenObjects,
2441        resolver: &mut dyn LayoutResolver,
2442        get_latest_object_version: bool,
2443    ) -> IotaResult<Option<DynamicFieldInfo>> {
2444        // Skip if not a move object
2445        let Some(move_object) = o.data.try_as_move().cloned() else {
2446            return Ok(None);
2447        };
2448
2449        // We only index dynamic field objects
2450        if !move_object.type_().is_dynamic_field() {
2451            return Ok(None);
2452        }
2453
2454        let layout = resolver
2455            .get_annotated_layout(&move_object.type_().clone().into())?
2456            .into_layout();
2457
2458        let field =
2459            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2460                IotaError::ObjectDeserialization {
2461                    error: e.to_string(),
2462                }
2463            })?;
2464
2465        let type_ = field.kind;
2466        let name_type: TypeTag = field.name_layout.into();
2467        let bcs_name = field.name_bytes.to_owned();
2468
2469        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2470            .map_err(|e| {
2471                warn!("{e}");
2472                IotaError::ObjectDeserialization {
2473                    error: e.to_string(),
2474                }
2475            })?;
2476
2477        let name = DynamicFieldName {
2478            type_: name_type,
2479            value: IotaMoveValue::from(name_value).to_json_value(),
2480        };
2481
2482        let value_metadata = field.value_metadata().map_err(|e| {
2483            warn!("{e}");
2484            IotaError::ObjectDeserialization {
2485                error: e.to_string(),
2486            }
2487        })?;
2488
2489        Ok(Some(match value_metadata {
2490            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2491                name,
2492                bcs_name,
2493                type_,
2494                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2495                object_id: o.id(),
2496                version: o.version(),
2497                digest: o.digest(),
2498            },
2499
2500            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2501                // Find the actual object from storage using the object id obtained from the
2502                // wrapper.
2503
2504                // Try to find the object in the written objects first.
2505                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2506                    (
2507                        object.version(),
2508                        object.digest(),
2509                        object.data.type_().unwrap().clone(),
2510                    )
2511                } else {
2512                    // If not found, try to find it in the database.
2513                    let object = if get_latest_object_version {
2514                        // Loading genesis object could meet a condition that the version of the
2515                        // genesis object is behind the one in the snapshot.
2516                        // In this case, the object can not be found with get_object_by_key, we need
2517                        // to use get_object instead. Since get_object is a heavier operation, we
2518                        // only allow to use it for genesis.
2519                        // reference: https://github.com/iotaledger/iota/issues/7267
2520                        self.get_object_store()
2521                            .try_get_object(&object_id)?
2522                            .ok_or_else(|| UserInputError::ObjectNotFound {
2523                                object_id,
2524                                version: Some(o.version()),
2525                            })?
2526                    } else {
2527                        // Non-genesis object should be in the database with the given version.
2528                        self.get_object_store()
2529                            .try_get_object_by_key(&object_id, o.version())?
2530                            .ok_or_else(|| UserInputError::ObjectNotFound {
2531                                object_id,
2532                                version: Some(o.version()),
2533                            })?
2534                    };
2535
2536                    (
2537                        object.version(),
2538                        object.digest(),
2539                        object.data.type_().unwrap().clone(),
2540                    )
2541                };
2542
2543                DynamicFieldInfo {
2544                    name,
2545                    bcs_name,
2546                    type_,
2547                    object_type: object_type.to_string(),
2548                    object_id,
2549                    version,
2550                    digest,
2551                }
2552            }
2553        }))
2554    }
2555
2556    #[instrument(level = "trace", skip_all, err)]
2557    fn post_process_one_tx(
2558        &self,
2559        certificate: &VerifiedExecutableTransaction,
2560        effects: &TransactionEffects,
2561        inner_temporary_store: &InnerTemporaryStore,
2562        epoch_store: &Arc<AuthorityPerEpochStore>,
2563    ) -> IotaResult {
2564        if self.indexes.is_none() {
2565            return Ok(());
2566        }
2567
2568        let tx_digest = certificate.digest();
2569        let timestamp_ms = Self::unixtime_now_ms();
2570        let events = &inner_temporary_store.events;
2571        let written = &inner_temporary_store.written;
2572        let tx_coins =
2573            self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2574
2575        // Index tx
2576        if let Some(indexes) = &self.indexes {
2577            let _ = self
2578                .index_tx(
2579                    indexes.as_ref(),
2580                    tx_digest,
2581                    certificate,
2582                    effects,
2583                    events,
2584                    timestamp_ms,
2585                    tx_coins,
2586                    written,
2587                    inner_temporary_store,
2588                )
2589                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2590                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2591                .expect("Indexing tx should not fail");
2592
2593            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2594            let events = self.make_transaction_block_events(
2595                events.clone(),
2596                *tx_digest,
2597                timestamp_ms,
2598                epoch_store,
2599                inner_temporary_store,
2600            )?;
2601            // Emit events
2602            self.subscription_handler
2603                .process_tx(certificate.data().transaction_data(), &effects, &events)
2604                .tap_ok(|_| {
2605                    self.metrics
2606                        .post_processing_total_tx_had_event_processed
2607                        .inc()
2608                })
2609                .tap_err(|e| {
2610                    warn!(
2611                        ?tx_digest,
2612                        "Post processing - Couldn't process events for tx: {}", e
2613                    )
2614                })?;
2615
2616            self.metrics
2617                .post_processing_total_events_emitted
2618                .inc_by(events.data.len() as u64);
2619        };
2620        Ok(())
2621    }
2622
2623    fn make_transaction_block_events(
2624        &self,
2625        transaction_events: TransactionEvents,
2626        digest: TransactionDigest,
2627        timestamp_ms: u64,
2628        epoch_store: &Arc<AuthorityPerEpochStore>,
2629        inner_temporary_store: &InnerTemporaryStore,
2630    ) -> IotaResult<IotaTransactionBlockEvents> {
2631        let mut layout_resolver =
2632            epoch_store
2633                .executor()
2634                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2635                    inner_temporary_store,
2636                    self.get_backing_package_store(),
2637                )));
2638        IotaTransactionBlockEvents::try_from(
2639            transaction_events,
2640            digest,
2641            Some(timestamp_ms),
2642            layout_resolver.as_mut(),
2643        )
2644    }
2645
2646    pub fn unixtime_now_ms() -> u64 {
2647        let now = SystemTime::now()
2648            .duration_since(UNIX_EPOCH)
2649            .expect("Time went backwards")
2650            .as_millis();
2651        u64::try_from(now).expect("Travelling in time machine")
2652    }
2653
2654    #[instrument(level = "trace", skip_all)]
2655    pub async fn handle_transaction_info_request(
2656        &self,
2657        request: TransactionInfoRequest,
2658    ) -> IotaResult<TransactionInfoResponse> {
2659        let epoch_store = self.load_epoch_store_one_call_per_task();
2660        let (transaction, status) = self
2661            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2662            .ok_or(IotaError::TransactionNotFound {
2663                digest: request.transaction_digest,
2664            })?;
2665        Ok(TransactionInfoResponse {
2666            transaction,
2667            status,
2668        })
2669    }
2670
2671    #[instrument(level = "trace", skip_all)]
2672    pub async fn handle_object_info_request(
2673        &self,
2674        request: ObjectInfoRequest,
2675    ) -> IotaResult<ObjectInfoResponse> {
2676        let epoch_store = self.load_epoch_store_one_call_per_task();
2677
2678        let requested_object_seq = match request.request_kind {
2679            ObjectInfoRequestKind::LatestObjectInfo => {
2680                let (_, seq, _) = self
2681                    .try_get_object_or_tombstone(request.object_id)
2682                    .await?
2683                    .ok_or_else(|| {
2684                        IotaError::from(UserInputError::ObjectNotFound {
2685                            object_id: request.object_id,
2686                            version: None,
2687                        })
2688                    })?;
2689                seq
2690            }
2691            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2692        };
2693
2694        let object = self
2695            .get_object_store()
2696            .try_get_object_by_key(&request.object_id, requested_object_seq)?
2697            .ok_or_else(|| {
2698                IotaError::from(UserInputError::ObjectNotFound {
2699                    object_id: request.object_id,
2700                    version: Some(requested_object_seq),
2701                })
2702            })?;
2703
2704        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2705            (request.generate_layout, object.data.try_as_move())
2706        {
2707            Some(into_struct_layout(
2708                epoch_store
2709                    .executor()
2710                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2711                    .get_annotated_layout(&move_obj.type_().clone().into())?,
2712            )?)
2713        } else {
2714            None
2715        };
2716
2717        let lock = if !object.is_address_owned() {
2718            // Only address owned objects have locks.
2719            None
2720        } else {
2721            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2722                .await?
2723                .map(|s| s.into_inner())
2724        };
2725
2726        Ok(ObjectInfoResponse {
2727            object,
2728            layout,
2729            lock_for_debugging: lock,
2730        })
2731    }
2732
2733    #[instrument(level = "trace", skip_all)]
2734    pub fn handle_checkpoint_request(
2735        &self,
2736        request: &CheckpointRequest,
2737    ) -> IotaResult<CheckpointResponse> {
2738        let summary = if request.certified {
2739            let summary = match request.sequence_number {
2740                Some(seq) => self
2741                    .checkpoint_store
2742                    .get_checkpoint_by_sequence_number(seq)?,
2743                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2744            }
2745            .map(|v| v.into_inner());
2746            summary.map(CheckpointSummaryResponse::Certified)
2747        } else {
2748            let summary = match request.sequence_number {
2749                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2750                None => self
2751                    .checkpoint_store
2752                    .get_latest_locally_computed_checkpoint()?,
2753            };
2754            summary.map(CheckpointSummaryResponse::Pending)
2755        };
2756        let contents = match &summary {
2757            Some(s) => self
2758                .checkpoint_store
2759                .get_checkpoint_contents(&s.content_digest())?,
2760            None => None,
2761        };
2762        Ok(CheckpointResponse {
2763            checkpoint: summary,
2764            contents,
2765        })
2766    }
2767
2768    fn check_protocol_version(
2769        supported_protocol_versions: SupportedProtocolVersions,
2770        current_version: ProtocolVersion,
2771    ) {
2772        info!("current protocol version is now {:?}", current_version);
2773        info!("supported versions are: {:?}", supported_protocol_versions);
2774        if !supported_protocol_versions.is_version_supported(current_version) {
2775            let msg = format!(
2776                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2777            );
2778
2779            error!("{}", msg);
2780            eprintln!("{msg}");
2781
2782            #[cfg(not(msim))]
2783            std::process::exit(1);
2784
2785            #[cfg(msim)]
2786            iota_simulator::task::shutdown_current_node();
2787        }
2788    }
2789
2790    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
2791    pub async fn new(
2792        name: AuthorityName,
2793        secret: StableSyncAuthoritySigner,
2794        supported_protocol_versions: SupportedProtocolVersions,
2795        store: Arc<AuthorityStore>,
2796        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2797        epoch_store: Arc<AuthorityPerEpochStore>,
2798        committee_store: Arc<CommitteeStore>,
2799        indexes: Option<Arc<IndexStore>>,
2800        rest_index: Option<Arc<RestIndexStore>>,
2801        checkpoint_store: Arc<CheckpointStore>,
2802        prometheus_registry: &Registry,
2803        genesis_objects: &[Object],
2804        db_checkpoint_config: &DBCheckpointConfig,
2805        config: NodeConfig,
2806        archive_readers: ArchiveReaderBalancer,
2807        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2808        chain_identifier: ChainIdentifier,
2809        pruner_db: Option<Arc<AuthorityPrunerTables>>,
2810    ) -> Arc<Self> {
2811        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2812
2813        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2814        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2815        let transaction_manager = Arc::new(TransactionManager::new(
2816            execution_cache_trait_pointers.object_cache_reader.clone(),
2817            execution_cache_trait_pointers
2818                .transaction_cache_reader
2819                .clone(),
2820            &epoch_store,
2821            tx_ready_certificates,
2822            metrics.clone(),
2823        ));
2824        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
2825
2826        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
2827            epoch_store.get_parent_path(),
2828            &config.authority_store_pruning_config,
2829        );
2830        let _pruner = AuthorityStorePruner::new(
2831            store.perpetual_tables.clone(),
2832            checkpoint_store.clone(),
2833            rest_index.clone(),
2834            config.authority_store_pruning_config.clone(),
2835            epoch_store.committee().authority_exists(&name),
2836            epoch_store.epoch_start_state().epoch_duration_ms(),
2837            prometheus_registry,
2838            archive_readers,
2839            pruner_db,
2840        );
2841        let input_loader =
2842            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
2843        let epoch = epoch_store.epoch();
2844        let rgp = epoch_store.reference_gas_price();
2845        let state = Arc::new(AuthorityState {
2846            name,
2847            secret,
2848            execution_lock: RwLock::new(epoch),
2849            epoch_store: ArcSwap::new(epoch_store.clone()),
2850            input_loader,
2851            execution_cache_trait_pointers,
2852            indexes,
2853            rest_index,
2854            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
2855            checkpoint_store,
2856            committee_store,
2857            transaction_manager,
2858            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
2859            metrics,
2860            _pruner,
2861            _authority_per_epoch_pruner,
2862            db_checkpoint_config: db_checkpoint_config.clone(),
2863            config,
2864            overload_info: AuthorityOverloadInfo::default(),
2865            validator_tx_finalizer,
2866            chain_identifier,
2867            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
2868        });
2869
2870        // Start a task to execute ready certificates.
2871        let authority_state = Arc::downgrade(&state);
2872        spawn_monitored_task!(execution_process(
2873            authority_state,
2874            rx_ready_certificates,
2875            rx_execution_shutdown,
2876        ));
2877
2878        // TODO: This doesn't belong to the constructor of AuthorityState.
2879        state
2880            .create_owner_index_if_empty(genesis_objects, &epoch_store)
2881            .expect("Error indexing genesis objects.");
2882
2883        state
2884    }
2885
2886    // TODO: Consolidate our traits to reduce the number of methods here.
2887    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
2888        &self.execution_cache_trait_pointers.object_cache_reader
2889    }
2890
2891    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
2892        &self.execution_cache_trait_pointers.transaction_cache_reader
2893    }
2894
2895    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
2896        &self.execution_cache_trait_pointers.cache_writer
2897    }
2898
2899    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
2900        &self.execution_cache_trait_pointers.backing_store
2901    }
2902
2903    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
2904        &self.execution_cache_trait_pointers.backing_package_store
2905    }
2906
2907    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
2908        &self.execution_cache_trait_pointers.object_store
2909    }
2910
2911    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
2912        &self.execution_cache_trait_pointers.reconfig_api
2913    }
2914
2915    pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
2916        &self.execution_cache_trait_pointers.accumulator_store
2917    }
2918
2919    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
2920        &self.execution_cache_trait_pointers.checkpoint_cache
2921    }
2922
2923    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
2924        &self.execution_cache_trait_pointers.state_sync_store
2925    }
2926
2927    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
2928        &self.execution_cache_trait_pointers.cache_commit
2929    }
2930
2931    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
2932        self.execution_cache_trait_pointers
2933            .testing_api
2934            .database_for_testing()
2935    }
2936
2937    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
2938        &self,
2939        config: NodeConfig,
2940        metrics: Arc<AuthorityStorePruningMetrics>,
2941    ) -> anyhow::Result<()> {
2942        let archive_readers =
2943            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
2944        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
2945            &self.database_for_testing().perpetual_tables,
2946            &self.checkpoint_store,
2947            self.rest_index.as_deref(),
2948            None,
2949            config.authority_store_pruning_config,
2950            metrics,
2951            archive_readers,
2952            EPOCH_DURATION_MS_FOR_TESTING,
2953        )
2954        .await
2955    }
2956
2957    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
2958        &self.transaction_manager
2959    }
2960
2961    /// Adds transactions / certificates to transaction manager for ordered
2962    /// execution.
2963    pub fn enqueue_transactions_for_execution(
2964        &self,
2965        txns: Vec<VerifiedExecutableTransaction>,
2966        epoch_store: &Arc<AuthorityPerEpochStore>,
2967    ) {
2968        self.transaction_manager.enqueue(txns, epoch_store)
2969    }
2970
2971    /// Adds certificates to transaction manager for ordered execution.
2972    pub fn enqueue_certificates_for_execution(
2973        &self,
2974        certs: Vec<VerifiedCertificate>,
2975        epoch_store: &Arc<AuthorityPerEpochStore>,
2976    ) {
2977        self.transaction_manager
2978            .enqueue_certificates(certs, epoch_store)
2979    }
2980
2981    pub fn enqueue_with_expected_effects_digest(
2982        &self,
2983        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
2984        epoch_store: &AuthorityPerEpochStore,
2985    ) {
2986        self.transaction_manager
2987            .enqueue_with_expected_effects_digest(certs, epoch_store)
2988    }
2989
2990    fn create_owner_index_if_empty(
2991        &self,
2992        genesis_objects: &[Object],
2993        epoch_store: &Arc<AuthorityPerEpochStore>,
2994    ) -> IotaResult {
2995        let Some(index_store) = &self.indexes else {
2996            return Ok(());
2997        };
2998        if !index_store.is_empty() {
2999            return Ok(());
3000        }
3001
3002        let mut new_owners = vec![];
3003        let mut new_dynamic_fields = vec![];
3004        let mut layout_resolver = epoch_store
3005            .executor()
3006            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3007        for o in genesis_objects.iter() {
3008            match o.owner {
3009                Owner::AddressOwner(addr) => new_owners.push((
3010                    (addr, o.id()),
3011                    ObjectInfo::new(&o.compute_object_reference(), o),
3012                )),
3013                Owner::ObjectOwner(object_id) => {
3014                    let id = o.id();
3015                    let Some(info) = self.try_create_dynamic_field_info(
3016                        o,
3017                        &BTreeMap::new(),
3018                        layout_resolver.as_mut(),
3019                        true,
3020                    )?
3021                    else {
3022                        continue;
3023                    };
3024                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3025                }
3026                _ => {}
3027            }
3028        }
3029
3030        index_store.insert_genesis_objects(ObjectIndexChanges {
3031            deleted_owners: vec![],
3032            deleted_dynamic_fields: vec![],
3033            new_owners,
3034            new_dynamic_fields,
3035        })
3036    }
3037
3038    /// Attempts to acquire execution lock for an executable transaction.
3039    /// Returns the lock if the transaction is matching current executed epoch
3040    /// Returns None otherwise
3041    pub fn execution_lock_for_executable_transaction(
3042        &self,
3043        transaction: &VerifiedExecutableTransaction,
3044    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3045        let lock = self
3046            .execution_lock
3047            .try_read()
3048            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3049        if *lock == transaction.auth_sig().epoch() {
3050            Ok(lock)
3051        } else {
3052            Err(IotaError::WrongEpoch {
3053                expected_epoch: *lock,
3054                actual_epoch: transaction.auth_sig().epoch(),
3055            })
3056        }
3057    }
3058
3059    /// Acquires the execution lock for the duration of a transaction signing
3060    /// request. This prevents reconfiguration from starting until we are
3061    /// finished handling the signing request. Otherwise, in-memory lock
3062    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3063    /// while we are attempting to acquire locks for the transaction.
3064    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3065        self.execution_lock
3066            .try_read()
3067            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3068    }
3069
3070    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3071        self.execution_lock.write().await
3072    }
3073
3074    #[instrument(level = "error", skip_all)]
3075    pub async fn reconfigure(
3076        &self,
3077        cur_epoch_store: &AuthorityPerEpochStore,
3078        supported_protocol_versions: SupportedProtocolVersions,
3079        new_committee: Committee,
3080        epoch_start_configuration: EpochStartConfiguration,
3081        accumulator: Arc<StateAccumulator>,
3082        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3083        epoch_supply_change: i64,
3084        epoch_last_checkpoint: CheckpointSequenceNumber,
3085    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3086        Self::check_protocol_version(
3087            supported_protocol_versions,
3088            epoch_start_configuration
3089                .epoch_start_state()
3090                .protocol_version(),
3091        );
3092        self.metrics.reset_on_reconfigure();
3093        self.committee_store.insert_new_committee(&new_committee)?;
3094
3095        // Wait until no transactions are being executed.
3096        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3097
3098        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3099        cur_epoch_store.epoch_terminated().await;
3100
3101        let highest_locally_built_checkpoint_seq = self
3102            .checkpoint_store
3103            .get_latest_locally_computed_checkpoint()?
3104            .map(|c| *c.sequence_number())
3105            .unwrap_or(0);
3106
3107        assert!(
3108            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3109            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3110        );
3111        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3112            // if we built the last checkpoint locally (as opposed to receiving it from a
3113            // peer), then all shared_version_assignments except the one for the
3114            // ChangeEpoch transaction should have been removed
3115            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3116            // Note that while 1 is the typical value, 0 is possible if the node restarts
3117            // after committing the last checkpoint but before reconfiguring.
3118            if num_shared_version_assignments > 1 {
3119                // If this happens in prod, we have a memory leak, but not a correctness issue.
3120                debug_fatal!(
3121                    "all shared_version_assignments should have been removed \
3122                    (num_shared_version_assignments: {num_shared_version_assignments})"
3123                );
3124            }
3125        }
3126
3127        // Safe to reconfigure now. No transactions are being executed,
3128        // and no epoch-specific tasks are running.
3129
3130        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3131        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3132        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3133            .await?;
3134        self.get_reconfig_api()
3135            .clear_state_end_of_epoch(&execution_lock);
3136        self.check_system_consistency(
3137            cur_epoch_store,
3138            accumulator,
3139            expensive_safety_check_config,
3140            epoch_supply_change,
3141        )?;
3142        self.get_reconfig_api()
3143            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3144        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3145            if self
3146                .db_checkpoint_config
3147                .perform_db_checkpoints_at_epoch_end
3148            {
3149                let checkpoint_indexes = self
3150                    .db_checkpoint_config
3151                    .perform_index_db_checkpoints_at_epoch_end
3152                    .unwrap_or(false);
3153                let current_epoch = cur_epoch_store.epoch();
3154                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3155                self.checkpoint_all_dbs(
3156                    &epoch_checkpoint_path,
3157                    cur_epoch_store,
3158                    checkpoint_indexes,
3159                )?;
3160            }
3161        }
3162
3163        self.get_reconfig_api()
3164            .reconfigure_cache(&epoch_start_configuration)
3165            .await;
3166
3167        let new_epoch = new_committee.epoch;
3168        let new_epoch_store = self
3169            .reopen_epoch_db(
3170                cur_epoch_store,
3171                new_committee,
3172                epoch_start_configuration,
3173                expensive_safety_check_config,
3174                epoch_last_checkpoint,
3175            )
3176            .await?;
3177        assert_eq!(new_epoch_store.epoch(), new_epoch);
3178        self.transaction_manager.reconfigure(new_epoch);
3179        *execution_lock = new_epoch;
3180        // drop execution_lock after epoch store was updated
3181        // see also assert in AuthorityState::process_certificate
3182        // on the epoch store and execution lock epoch match
3183        Ok(new_epoch_store)
3184    }
3185
3186    /// Advance the epoch store to the next epoch for testing only.
3187    /// This only manually sets all the places where we have the epoch number.
3188    /// It doesn't properly reconfigure the node, hence should be only used for
3189    /// testing.
3190    pub async fn reconfigure_for_testing(&self) {
3191        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3192        let epoch_store = self.epoch_store_for_testing().clone();
3193        let protocol_config = epoch_store.protocol_config().clone();
3194        // The current protocol config used in the epoch store may have been overridden
3195        // and diverged from the protocol config definitions. That override may
3196        // have now been dropped when the initial guard was dropped. We reapply
3197        // the override before creating the new epoch store, to make sure that
3198        // the new epoch store has the same protocol config as the current one.
3199        // Since this is for testing only, we mostly like to keep the protocol config
3200        // the same across epochs.
3201        let _guard =
3202            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3203        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3204            self.get_backing_package_store().clone(),
3205            self.get_object_store().clone(),
3206            &self.config.expensive_safety_check_config,
3207            self.checkpoint_store
3208                .get_epoch_last_checkpoint(epoch_store.epoch())
3209                .unwrap()
3210                .map(|c| *c.sequence_number())
3211                .unwrap_or_default(),
3212        );
3213        let new_epoch = new_epoch_store.epoch();
3214        self.transaction_manager.reconfigure(new_epoch);
3215        self.epoch_store.store(new_epoch_store);
3216        epoch_store.epoch_terminated().await;
3217        *execution_lock = new_epoch;
3218    }
3219
3220    #[instrument(level = "error", skip_all)]
3221    fn check_system_consistency(
3222        &self,
3223        cur_epoch_store: &AuthorityPerEpochStore,
3224        accumulator: Arc<StateAccumulator>,
3225        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3226        epoch_supply_change: i64,
3227    ) -> IotaResult<()> {
3228        info!(
3229            "Performing iota conservation consistency check for epoch {}",
3230            cur_epoch_store.epoch()
3231        );
3232
3233        if cfg!(debug_assertions) {
3234            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3235        }
3236
3237        self.get_reconfig_api()
3238            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3239
3240        // check for root state hash consistency with live object set
3241        if expensive_safety_check_config.enable_state_consistency_check() {
3242            info!(
3243                "Performing state consistency check for epoch {}",
3244                cur_epoch_store.epoch()
3245            );
3246            self.expensive_check_is_consistent_state(
3247                accumulator,
3248                cur_epoch_store,
3249                cfg!(debug_assertions), // panic in debug mode only
3250            );
3251        }
3252
3253        if expensive_safety_check_config.enable_secondary_index_checks() {
3254            if let Some(indexes) = self.indexes.clone() {
3255                verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3256                    .expect("secondary indexes are inconsistent");
3257            }
3258        }
3259
3260        Ok(())
3261    }
3262
3263    fn expensive_check_is_consistent_state(
3264        &self,
3265        accumulator: Arc<StateAccumulator>,
3266        cur_epoch_store: &AuthorityPerEpochStore,
3267        panic: bool,
3268    ) {
3269        let live_object_set_hash = accumulator.digest_live_object_set();
3270
3271        let root_state_hash: ECMHLiveObjectSetDigest = self
3272            .get_accumulator_store()
3273            .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3274            .expect("Retrieving root state hash cannot fail")
3275            .expect("Root state hash for epoch must exist")
3276            .1
3277            .digest()
3278            .into();
3279
3280        let is_inconsistent = root_state_hash != live_object_set_hash;
3281        if is_inconsistent {
3282            if panic {
3283                panic!(
3284                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3285                );
3286            } else {
3287                error!(
3288                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3289                    root_state_hash, live_object_set_hash
3290                );
3291            }
3292        } else {
3293            info!("State consistency check passed");
3294        }
3295
3296        if !panic {
3297            accumulator.set_inconsistent_state(is_inconsistent);
3298        }
3299    }
3300
3301    pub fn current_epoch_for_testing(&self) -> EpochId {
3302        self.epoch_store_for_testing().epoch()
3303    }
3304
3305    #[instrument(level = "error", skip_all)]
3306    pub fn checkpoint_all_dbs(
3307        &self,
3308        checkpoint_path: &Path,
3309        cur_epoch_store: &AuthorityPerEpochStore,
3310        checkpoint_indexes: bool,
3311    ) -> IotaResult {
3312        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3313        let current_epoch = cur_epoch_store.epoch();
3314
3315        if checkpoint_path.exists() {
3316            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3317            return Ok(());
3318        }
3319
3320        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3321        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3322
3323        if checkpoint_path_tmp.exists() {
3324            fs::remove_dir_all(&checkpoint_path_tmp)
3325                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3326        }
3327
3328        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3329        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3330
3331        // NOTE: Do not change the order of invoking these checkpoint calls
3332        // We want to snapshot checkpoint db first to not race with state sync
3333        self.checkpoint_store
3334            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3335
3336        self.get_reconfig_api()
3337            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3338
3339        self.committee_store
3340            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3341
3342        if checkpoint_indexes {
3343            if let Some(indexes) = self.indexes.as_ref() {
3344                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3345            }
3346        }
3347
3348        fs::rename(checkpoint_path_tmp, checkpoint_path)
3349            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3350        Ok(())
3351    }
3352
3353    /// Load the current epoch store. This can change during reconfiguration. To
3354    /// ensure that we never end up accessing different epoch stores in a
3355    /// single task, we need to make sure that this is called once per task.
3356    /// Each call needs to be carefully audited to ensure it is
3357    /// the case. This also means we should minimize the number of call-sites.
3358    /// Only call it when there is no way to obtain it from somewhere else.
3359    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3360        self.epoch_store.load()
3361    }
3362
3363    // Load the epoch store, should be used in tests only.
3364    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3365        self.load_epoch_store_one_call_per_task()
3366    }
3367
3368    pub fn clone_committee_for_testing(&self) -> Committee {
3369        Committee::clone(self.epoch_store_for_testing().committee())
3370    }
3371
3372    #[instrument(level = "trace", skip_all)]
3373    pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3374        self.get_object_store()
3375            .try_get_object(object_id)
3376            .map_err(Into::into)
3377    }
3378
3379    /// Non-fallible version of `try_get_object`.
3380    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3381        self.try_get_object(object_id)
3382            .await
3383            .expect("storage access failed")
3384    }
3385
3386    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3387        Ok(self
3388            .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3389            .await?
3390            .expect("framework object should always exist")
3391            .compute_object_reference())
3392    }
3393
3394    // This function is only used for testing.
3395    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3396        self.get_object_cache_reader()
3397            .try_get_iota_system_state_object_unsafe()
3398    }
3399
3400    #[instrument(level = "trace", skip_all)]
3401    pub fn get_checkpoint_by_sequence_number(
3402        &self,
3403        sequence_number: CheckpointSequenceNumber,
3404    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3405        Ok(self
3406            .checkpoint_store
3407            .get_checkpoint_by_sequence_number(sequence_number)?)
3408    }
3409
3410    #[instrument(level = "trace", skip_all)]
3411    pub fn get_transaction_checkpoint_for_tests(
3412        &self,
3413        digest: &TransactionDigest,
3414        epoch_store: &AuthorityPerEpochStore,
3415    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3416        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3417        let Some(checkpoint) = checkpoint else {
3418            return Ok(None);
3419        };
3420        let checkpoint = self
3421            .checkpoint_store
3422            .get_checkpoint_by_sequence_number(checkpoint)?;
3423        Ok(checkpoint)
3424    }
3425
3426    #[instrument(level = "trace", skip_all)]
3427    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3428        Ok(
3429            match self
3430                .get_object_cache_reader()
3431                .try_get_latest_object_or_tombstone(*object_id)?
3432            {
3433                Some((_, ObjectOrTombstone::Object(object))) => {
3434                    let layout = self.get_object_layout(&object)?;
3435                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3436                }
3437                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3438                None => ObjectRead::NotExists(*object_id),
3439            },
3440        )
3441    }
3442
3443    /// Chain Identifier is the digest of the genesis checkpoint.
3444    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3445        self.chain_identifier
3446    }
3447
3448    #[instrument(level = "trace", skip_all)]
3449    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3450    where
3451        T: DeserializeOwned,
3452    {
3453        let o = self.get_object_read(object_id)?.into_object()?;
3454        if let Some(move_object) = o.data.try_as_move() {
3455            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3456                IotaError::ObjectDeserialization {
3457                    error: format!("{e}"),
3458                }
3459            })?)
3460        } else {
3461            Err(IotaError::ObjectDeserialization {
3462                error: format!("Provided object : [{object_id}] is not a Move object."),
3463            })
3464        }
3465    }
3466
3467    /// This function aims to serve rpc reads on past objects and
3468    /// we don't expect it to be called for other purposes.
3469    /// Depending on the object pruning policies that will be enforced in the
3470    /// future there is no software-level guarantee/SLA to retrieve an object
3471    /// with an old version even if it exists/existed.
3472    #[instrument(level = "trace", skip_all)]
3473    pub fn get_past_object_read(
3474        &self,
3475        object_id: &ObjectID,
3476        version: SequenceNumber,
3477    ) -> IotaResult<PastObjectRead> {
3478        // Firstly we see if the object ever existed by getting its latest data
3479        let Some(obj_ref) = self
3480            .get_object_cache_reader()
3481            .try_get_latest_object_ref_or_tombstone(*object_id)?
3482        else {
3483            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3484        };
3485
3486        if version > obj_ref.1 {
3487            return Ok(PastObjectRead::VersionTooHigh {
3488                object_id: *object_id,
3489                asked_version: version,
3490                latest_version: obj_ref.1,
3491            });
3492        }
3493
3494        if version < obj_ref.1 {
3495            // Read past objects
3496            return Ok(match self.read_object_at_version(object_id, version)? {
3497                Some((object, layout)) => {
3498                    let obj_ref = object.compute_object_reference();
3499                    PastObjectRead::VersionFound(obj_ref, object, layout)
3500                }
3501
3502                None => PastObjectRead::VersionNotFound(*object_id, version),
3503            });
3504        }
3505
3506        if !obj_ref.2.is_alive() {
3507            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3508        }
3509
3510        match self.read_object_at_version(object_id, obj_ref.1)? {
3511            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3512            None => {
3513                error!(
3514                    "Object with in parent_entry is missing from object store, datastore is \
3515                     inconsistent",
3516                );
3517                Err(UserInputError::ObjectNotFound {
3518                    object_id: *object_id,
3519                    version: Some(obj_ref.1),
3520                }
3521                .into())
3522            }
3523        }
3524    }
3525
3526    #[instrument(level = "trace", skip_all)]
3527    fn read_object_at_version(
3528        &self,
3529        object_id: &ObjectID,
3530        version: SequenceNumber,
3531    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3532        let Some(object) = self
3533            .get_object_cache_reader()
3534            .try_get_object_by_key(object_id, version)?
3535        else {
3536            return Ok(None);
3537        };
3538
3539        let layout = self.get_object_layout(&object)?;
3540        Ok(Some((object, layout)))
3541    }
3542
3543    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3544        let layout = object
3545            .data
3546            .try_as_move()
3547            .map(|object| {
3548                into_struct_layout(
3549                    self.load_epoch_store_one_call_per_task()
3550                        .executor()
3551                        // TODO(cache) - must read through cache
3552                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3553                        .get_annotated_layout(&object.type_().clone().into())?,
3554                )
3555            })
3556            .transpose()?;
3557        Ok(layout)
3558    }
3559
3560    fn get_owner_at_version(
3561        &self,
3562        object_id: &ObjectID,
3563        version: SequenceNumber,
3564    ) -> IotaResult<Owner> {
3565        self.get_object_store()
3566            .try_get_object_by_key(object_id, version)?
3567            .ok_or_else(|| {
3568                IotaError::from(UserInputError::ObjectNotFound {
3569                    object_id: *object_id,
3570                    version: Some(version),
3571                })
3572            })
3573            .map(|o| o.owner)
3574    }
3575
3576    #[instrument(level = "trace", skip_all)]
3577    pub fn get_owner_objects(
3578        &self,
3579        owner: IotaAddress,
3580        // If `Some`, the query will start from the next item after the specified cursor
3581        cursor: Option<ObjectID>,
3582        limit: usize,
3583        filter: Option<IotaObjectDataFilter>,
3584    ) -> IotaResult<Vec<ObjectInfo>> {
3585        if let Some(indexes) = &self.indexes {
3586            indexes.get_owner_objects(owner, cursor, limit, filter)
3587        } else {
3588            Err(IotaError::IndexStoreNotAvailable)
3589        }
3590    }
3591
3592    #[instrument(level = "trace", skip_all)]
3593    pub fn get_owned_coins_iterator_with_cursor(
3594        &self,
3595        owner: IotaAddress,
3596        // If `Some`, the query will start from the next item after the specified cursor
3597        cursor: (String, ObjectID),
3598        limit: usize,
3599        one_coin_type_only: bool,
3600    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3601        if let Some(indexes) = &self.indexes {
3602            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3603        } else {
3604            Err(IotaError::IndexStoreNotAvailable)
3605        }
3606    }
3607
3608    #[instrument(level = "trace", skip_all)]
3609    pub fn get_owner_objects_iterator(
3610        &self,
3611        owner: IotaAddress,
3612        // If `Some`, the query will start from the next item after the specified cursor
3613        cursor: Option<ObjectID>,
3614        filter: Option<IotaObjectDataFilter>,
3615    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3616        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3617        if let Some(indexes) = &self.indexes {
3618            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3619        } else {
3620            Err(IotaError::IndexStoreNotAvailable)
3621        }
3622    }
3623
3624    #[instrument(level = "trace", skip_all)]
3625    pub async fn get_move_objects<T>(
3626        &self,
3627        owner: IotaAddress,
3628        type_: MoveObjectType,
3629    ) -> IotaResult<Vec<T>>
3630    where
3631        T: DeserializeOwned,
3632    {
3633        let object_ids = self
3634            .get_owner_objects_iterator(owner, None, None)?
3635            .filter(|o| match &o.type_ {
3636                ObjectType::Struct(s) => &type_ == s,
3637                ObjectType::Package => false,
3638            })
3639            .map(|info| ObjectKey(info.object_id, info.version))
3640            .collect::<Vec<_>>();
3641        let mut move_objects = vec![];
3642
3643        let objects = self
3644            .get_object_store()
3645            .try_multi_get_objects_by_key(&object_ids)?;
3646
3647        for (o, id) in objects.into_iter().zip(object_ids) {
3648            let object = o.ok_or_else(|| {
3649                IotaError::from(UserInputError::ObjectNotFound {
3650                    object_id: id.0,
3651                    version: Some(id.1),
3652                })
3653            })?;
3654            let move_object = object.data.try_as_move().ok_or_else(|| {
3655                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3656            })?;
3657            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3658                IotaError::ObjectDeserialization {
3659                    error: format!("{e}"),
3660                }
3661            })?);
3662        }
3663        Ok(move_objects)
3664    }
3665
3666    #[instrument(level = "trace", skip_all)]
3667    pub fn get_dynamic_fields(
3668        &self,
3669        owner: ObjectID,
3670        // If `Some`, the query will start from the next item after the specified cursor
3671        cursor: Option<ObjectID>,
3672        limit: usize,
3673    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3674        Ok(self
3675            .get_dynamic_fields_iterator(owner, cursor)?
3676            .take(limit)
3677            .collect::<Result<Vec<_>, _>>()?)
3678    }
3679
3680    fn get_dynamic_fields_iterator(
3681        &self,
3682        owner: ObjectID,
3683        // If `Some`, the query will start from the next item after the specified cursor
3684        cursor: Option<ObjectID>,
3685    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3686    {
3687        if let Some(indexes) = &self.indexes {
3688            indexes.get_dynamic_fields_iterator(owner, cursor)
3689        } else {
3690            Err(IotaError::IndexStoreNotAvailable)
3691        }
3692    }
3693
3694    #[instrument(level = "trace", skip_all)]
3695    pub fn get_dynamic_field_object_id(
3696        &self,
3697        owner: ObjectID,
3698        name_type: TypeTag,
3699        name_bcs_bytes: &[u8],
3700    ) -> IotaResult<Option<ObjectID>> {
3701        if let Some(indexes) = &self.indexes {
3702            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3703        } else {
3704            Err(IotaError::IndexStoreNotAvailable)
3705        }
3706    }
3707
3708    #[instrument(level = "trace", skip_all)]
3709    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3710        Ok(self.get_indexes()?.next_sequence_number())
3711    }
3712
3713    #[instrument(level = "trace", skip_all)]
3714    pub async fn get_executed_transaction_and_effects(
3715        &self,
3716        digest: TransactionDigest,
3717        kv_store: Arc<TransactionKeyValueStore>,
3718    ) -> IotaResult<(Transaction, TransactionEffects)> {
3719        let transaction = kv_store.get_tx(digest).await?;
3720        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3721        Ok((transaction, effects))
3722    }
3723
3724    #[instrument(level = "trace", skip_all)]
3725    pub fn multi_get_checkpoint_by_sequence_number(
3726        &self,
3727        sequence_numbers: &[CheckpointSequenceNumber],
3728    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3729        Ok(self
3730            .checkpoint_store
3731            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3732    }
3733
3734    #[instrument(level = "trace", skip_all)]
3735    pub fn get_transaction_events(
3736        &self,
3737        digest: &TransactionEventsDigest,
3738    ) -> IotaResult<TransactionEvents> {
3739        self.get_transaction_cache_reader()
3740            .try_get_events(digest)?
3741            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3742    }
3743
3744    pub fn get_transaction_input_objects(
3745        &self,
3746        effects: &TransactionEffects,
3747    ) -> anyhow::Result<Vec<Object>> {
3748        let input_object_keys = effects
3749            .modified_at_versions()
3750            .into_iter()
3751            .map(|(object_id, version)| ObjectKey(object_id, version))
3752            .collect::<Vec<_>>();
3753
3754        let input_objects = self
3755            .get_object_store()
3756            .try_multi_get_objects_by_key(&input_object_keys)?
3757            .into_iter()
3758            .enumerate()
3759            .map(|(idx, maybe_object)| {
3760                maybe_object.ok_or_else(|| {
3761                    anyhow::anyhow!(
3762                        "missing input object key {:?} from tx {}",
3763                        input_object_keys[idx],
3764                        effects.transaction_digest()
3765                    )
3766                })
3767            })
3768            .collect::<anyhow::Result<Vec<_>>>()?;
3769        Ok(input_objects)
3770    }
3771
3772    pub fn get_transaction_output_objects(
3773        &self,
3774        effects: &TransactionEffects,
3775    ) -> anyhow::Result<Vec<Object>> {
3776        let output_object_keys = effects
3777            .all_changed_objects()
3778            .into_iter()
3779            .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
3780            .collect::<Vec<_>>();
3781
3782        let output_objects = self
3783            .get_object_store()
3784            .try_multi_get_objects_by_key(&output_object_keys)?
3785            .into_iter()
3786            .enumerate()
3787            .map(|(idx, maybe_object)| {
3788                maybe_object.ok_or_else(|| {
3789                    anyhow::anyhow!(
3790                        "missing output object key {:?} from tx {}",
3791                        output_object_keys[idx],
3792                        effects.transaction_digest()
3793                    )
3794                })
3795            })
3796            .collect::<anyhow::Result<Vec<_>>>()?;
3797        Ok(output_objects)
3798    }
3799
3800    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3801        match &self.indexes {
3802            Some(i) => Ok(i.clone()),
3803            None => Err(IotaError::UnsupportedFeature {
3804                error: "extended object indexing is not enabled on this server".into(),
3805            }),
3806        }
3807    }
3808
3809    pub async fn get_transactions_for_tests(
3810        self: &Arc<Self>,
3811        filter: Option<TransactionFilter>,
3812        cursor: Option<TransactionDigest>,
3813        limit: Option<usize>,
3814        reverse: bool,
3815    ) -> IotaResult<Vec<TransactionDigest>> {
3816        let metrics = KeyValueStoreMetrics::new_for_tests();
3817        let kv_store = Arc::new(TransactionKeyValueStore::new(
3818            "rocksdb",
3819            metrics,
3820            self.clone(),
3821        ));
3822        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3823            .await
3824    }
3825
3826    #[instrument(level = "trace", skip_all)]
3827    pub async fn get_transactions(
3828        &self,
3829        kv_store: &Arc<TransactionKeyValueStore>,
3830        filter: Option<TransactionFilter>,
3831        // If `Some`, the query will start from the next item after the specified cursor
3832        cursor: Option<TransactionDigest>,
3833        limit: Option<usize>,
3834        reverse: bool,
3835    ) -> IotaResult<Vec<TransactionDigest>> {
3836        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3837            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3838            let iter = checkpoint_contents.iter().map(|c| c.transaction);
3839            if reverse {
3840                let iter = iter
3841                    .rev()
3842                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3843                    .skip(usize::from(cursor.is_some()));
3844                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3845            } else {
3846                let iter = iter
3847                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3848                    .skip(usize::from(cursor.is_some()));
3849                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3850            }
3851        }
3852        self.get_indexes()?
3853            .get_transactions(filter, cursor, limit, reverse)
3854    }
3855
3856    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
3857        &self.checkpoint_store
3858    }
3859
3860    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
3861        self.get_checkpoint_store()
3862            .get_highest_executed_checkpoint_seq_number()?
3863            .ok_or(IotaError::UserInput {
3864                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
3865            })
3866    }
3867
3868    #[cfg(msim)]
3869    pub fn get_highest_pruned_checkpoint_for_testing(
3870        &self,
3871    ) -> IotaResult<CheckpointSequenceNumber> {
3872        self.database_for_testing()
3873            .perpetual_tables
3874            .get_highest_pruned_checkpoint()
3875    }
3876
3877    #[instrument(level = "trace", skip_all)]
3878    pub fn get_checkpoint_summary_by_sequence_number(
3879        &self,
3880        sequence_number: CheckpointSequenceNumber,
3881    ) -> IotaResult<CheckpointSummary> {
3882        let verified_checkpoint = self
3883            .get_checkpoint_store()
3884            .get_checkpoint_by_sequence_number(sequence_number)?;
3885        match verified_checkpoint {
3886            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3887            None => Err(IotaError::UserInput {
3888                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3889            }),
3890        }
3891    }
3892
3893    #[instrument(level = "trace", skip_all)]
3894    pub fn get_checkpoint_summary_by_digest(
3895        &self,
3896        digest: CheckpointDigest,
3897    ) -> IotaResult<CheckpointSummary> {
3898        let verified_checkpoint = self
3899            .get_checkpoint_store()
3900            .get_checkpoint_by_digest(&digest)?;
3901        match verified_checkpoint {
3902            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3903            None => Err(IotaError::UserInput {
3904                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3905            }),
3906        }
3907    }
3908
3909    #[instrument(level = "trace", skip_all)]
3910    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
3911        if is_system_package(package_id) {
3912            return self.find_genesis_txn_digest();
3913        }
3914        Ok(self
3915            .get_object_read(&package_id)?
3916            .into_object()?
3917            .previous_transaction)
3918    }
3919
3920    #[instrument(level = "trace", skip_all)]
3921    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
3922        let summary = self
3923            .get_verified_checkpoint_by_sequence_number(0)?
3924            .into_message();
3925        let content = self.get_checkpoint_contents(summary.content_digest)?;
3926        let genesis_transaction = content.enumerate_transactions(&summary).next();
3927        Ok(genesis_transaction
3928            .ok_or(IotaError::UserInput {
3929                error: UserInputError::GenesisTransactionNotFound,
3930            })?
3931            .1
3932            .transaction)
3933    }
3934
3935    #[instrument(level = "trace", skip_all)]
3936    pub fn get_verified_checkpoint_by_sequence_number(
3937        &self,
3938        sequence_number: CheckpointSequenceNumber,
3939    ) -> IotaResult<VerifiedCheckpoint> {
3940        let verified_checkpoint = self
3941            .get_checkpoint_store()
3942            .get_checkpoint_by_sequence_number(sequence_number)?;
3943        match verified_checkpoint {
3944            Some(verified_checkpoint) => Ok(verified_checkpoint),
3945            None => Err(IotaError::UserInput {
3946                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3947            }),
3948        }
3949    }
3950
3951    #[instrument(level = "trace", skip_all)]
3952    pub fn get_verified_checkpoint_summary_by_digest(
3953        &self,
3954        digest: CheckpointDigest,
3955    ) -> IotaResult<VerifiedCheckpoint> {
3956        let verified_checkpoint = self
3957            .get_checkpoint_store()
3958            .get_checkpoint_by_digest(&digest)?;
3959        match verified_checkpoint {
3960            Some(verified_checkpoint) => Ok(verified_checkpoint),
3961            None => Err(IotaError::UserInput {
3962                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3963            }),
3964        }
3965    }
3966
3967    #[instrument(level = "trace", skip_all)]
3968    pub fn get_checkpoint_contents(
3969        &self,
3970        digest: CheckpointContentsDigest,
3971    ) -> IotaResult<CheckpointContents> {
3972        self.get_checkpoint_store()
3973            .get_checkpoint_contents(&digest)?
3974            .ok_or(IotaError::UserInput {
3975                error: UserInputError::CheckpointContentsNotFound(digest),
3976            })
3977    }
3978
3979    #[instrument(level = "trace", skip_all)]
3980    pub fn get_checkpoint_contents_by_sequence_number(
3981        &self,
3982        sequence_number: CheckpointSequenceNumber,
3983    ) -> IotaResult<CheckpointContents> {
3984        let verified_checkpoint = self
3985            .get_checkpoint_store()
3986            .get_checkpoint_by_sequence_number(sequence_number)?;
3987        match verified_checkpoint {
3988            Some(verified_checkpoint) => {
3989                let content_digest = verified_checkpoint.into_inner().content_digest;
3990                self.get_checkpoint_contents(content_digest)
3991            }
3992            None => Err(IotaError::UserInput {
3993                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3994            }),
3995        }
3996    }
3997
3998    #[instrument(level = "trace", skip_all)]
3999    pub async fn query_events(
4000        &self,
4001        kv_store: &Arc<TransactionKeyValueStore>,
4002        query: EventFilter,
4003        // If `Some`, the query will start from the next item after the specified cursor
4004        cursor: Option<EventID>,
4005        limit: usize,
4006        descending: bool,
4007    ) -> IotaResult<Vec<IotaEvent>> {
4008        let index_store = self.get_indexes()?;
4009
4010        // Get the tx_num from tx_digest
4011        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4012            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4013                IotaError::TransactionNotFound {
4014                    digest: cursor.tx_digest,
4015                },
4016            )?;
4017            (tx_seq, cursor.event_seq as usize)
4018        } else if descending {
4019            (u64::MAX, usize::MAX)
4020        } else {
4021            (0, 0)
4022        };
4023
4024        let limit = limit + 1;
4025        let mut event_keys = match query {
4026            EventFilter::All(filters) => {
4027                if filters.is_empty() {
4028                    index_store.all_events(tx_num, event_num, limit, descending)?
4029                } else {
4030                    return Err(IotaError::UserInput {
4031                        error: UserInputError::Unsupported(
4032                            "This query type does not currently support filter combinations"
4033                                .to_string(),
4034                        ),
4035                    });
4036                }
4037            }
4038            EventFilter::Transaction(digest) => {
4039                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4040            }
4041            EventFilter::MoveModule { package, module } => {
4042                let module_id = ModuleId::new(package.into(), module);
4043                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4044            }
4045            EventFilter::MoveEventType(struct_name) => index_store
4046                .events_by_move_event_struct_name(
4047                    &struct_name,
4048                    tx_num,
4049                    event_num,
4050                    limit,
4051                    descending,
4052                )?,
4053            EventFilter::Sender(sender) => {
4054                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4055            }
4056            EventFilter::TimeRange {
4057                start_time,
4058                end_time,
4059            } => index_store
4060                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4061            EventFilter::MoveEventModule { package, module } => index_store
4062                .events_by_move_event_module(
4063                    &ModuleId::new(package.into(), module),
4064                    tx_num,
4065                    event_num,
4066                    limit,
4067                    descending,
4068                )?,
4069            // not using "_ =>" because we want to make sure we remember to add new variants here
4070            EventFilter::Package(_)
4071            | EventFilter::MoveEventField { .. }
4072            | EventFilter::Any(_)
4073            | EventFilter::And(_, _)
4074            | EventFilter::Or(_, _) => {
4075                return Err(IotaError::UserInput {
4076                    error: UserInputError::Unsupported(
4077                        "This query type is not supported by the full node.".to_string(),
4078                    ),
4079                });
4080            }
4081        };
4082
4083        // skip one event if exclusive cursor is provided,
4084        // otherwise truncate to the original limit.
4085        if cursor.is_some() {
4086            if !event_keys.is_empty() {
4087                event_keys.remove(0);
4088            }
4089        } else {
4090            event_keys.truncate(limit - 1);
4091        }
4092
4093        // get the unique set of digests from the event_keys
4094        let transaction_digests = event_keys
4095            .iter()
4096            .map(|(_, digest, _, _)| *digest)
4097            .collect::<HashSet<_>>()
4098            .into_iter()
4099            .collect::<Vec<_>>();
4100
4101        let events = kv_store
4102            .multi_get_events_by_tx_digests(&transaction_digests)
4103            .await?;
4104
4105        let events_map: HashMap<_, _> =
4106            transaction_digests.iter().zip(events.into_iter()).collect();
4107
4108        let stored_events = event_keys
4109            .into_iter()
4110            .map(|k| {
4111                (
4112                    k,
4113                    events_map
4114                        .get(&k.1)
4115                        .expect("fetched digest is missing")
4116                        .clone()
4117                        .and_then(|e| e.data.get(k.2).cloned()),
4118                )
4119            })
4120            .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4121                event
4122                    .map(|e| (e, tx_digest, event_seq, timestamp))
4123                    .ok_or(IotaError::TransactionEventsNotFound { digest })
4124            })
4125            .collect::<Result<Vec<_>, _>>()?;
4126
4127        let epoch_store = self.load_epoch_store_one_call_per_task();
4128        let backing_store = self.get_backing_package_store().as_ref();
4129        let mut layout_resolver = epoch_store
4130            .executor()
4131            .type_layout_resolver(Box::new(backing_store));
4132        let mut events = vec![];
4133        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4134            events.push(IotaEvent::try_from(
4135                e.clone(),
4136                tx_digest,
4137                event_seq as u64,
4138                Some(timestamp),
4139                layout_resolver.get_annotated_layout(&e.type_)?,
4140            )?)
4141        }
4142        Ok(events)
4143    }
4144
4145    pub async fn insert_genesis_object(&self, object: Object) {
4146        self.get_reconfig_api()
4147            .try_insert_genesis_object(object)
4148            .expect("Cannot insert genesis object")
4149    }
4150
4151    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4152        futures::future::join_all(
4153            objects
4154                .iter()
4155                .map(|o| self.insert_genesis_object(o.clone())),
4156        )
4157        .await;
4158    }
4159
4160    /// Make a status response for a transaction
4161    #[instrument(level = "trace", skip_all)]
4162    pub fn get_transaction_status(
4163        &self,
4164        transaction_digest: &TransactionDigest,
4165        epoch_store: &Arc<AuthorityPerEpochStore>,
4166    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4167        // TODO: In the case of read path, we should not have to re-sign the effects.
4168        if let Some(effects) =
4169            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4170        {
4171            if let Some(transaction) = self
4172                .get_transaction_cache_reader()
4173                .try_get_transaction_block(transaction_digest)?
4174            {
4175                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4176                let events = if let Some(digest) = effects.events_digest() {
4177                    self.get_transaction_events(digest)?
4178                } else {
4179                    TransactionEvents::default()
4180                };
4181                return Ok(Some((
4182                    (*transaction).clone().into_message(),
4183                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4184                )));
4185            } else {
4186                // The read of effects and read of transaction are not atomic. It's possible
4187                // that we reverted the transaction (during epoch change) in
4188                // between the above two reads, and we end up having effects but
4189                // not transaction. In this case, we just fall through.
4190                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4191            }
4192        }
4193        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4194            self.metrics.tx_already_processed.inc();
4195            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4196            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4197        } else {
4198            Ok(None)
4199        }
4200    }
4201
4202    /// Get the signed effects of the given transaction. If the effects was
4203    /// signed in a previous epoch, re-sign it so that the caller is able to
4204    /// form a cert of the effects in the current epoch.
4205    #[instrument(level = "trace", skip_all)]
4206    pub fn get_signed_effects_and_maybe_resign(
4207        &self,
4208        transaction_digest: &TransactionDigest,
4209        epoch_store: &Arc<AuthorityPerEpochStore>,
4210    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4211        let effects = self
4212            .get_transaction_cache_reader()
4213            .try_get_executed_effects(transaction_digest)?;
4214        match effects {
4215            Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4216            None => Ok(None),
4217        }
4218    }
4219
4220    #[instrument(level = "trace", skip_all)]
4221    pub(crate) fn sign_effects(
4222        &self,
4223        effects: TransactionEffects,
4224        epoch_store: &Arc<AuthorityPerEpochStore>,
4225    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4226        let tx_digest = *effects.transaction_digest();
4227        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4228            Some(sig) if sig.epoch == epoch_store.epoch() => {
4229                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4230            }
4231            _ => {
4232                // If the transaction was executed in previous epochs, the validator will
4233                // re-sign the effects with new current epoch so that a client is always able to
4234                // obtain an effects certificate at the current epoch.
4235                //
4236                // Why is this necessary? Consider the following case:
4237                // - assume there are 4 validators
4238                // - Quorum driver gets 2 signed effects before reconfig halt
4239                // - The tx makes it into final checkpoint.
4240                // - 2 validators go away and are replaced in the new epoch.
4241                // - The new epoch begins.
4242                // - The quorum driver cannot complete the partial effects cert from the
4243                //   previous epoch, because it may not be able to reach either of the 2 former
4244                //   validators.
4245                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4246                //   the new epoch, the QD can make a new effects cert and return it to the
4247                //   client.
4248                //
4249                // This is a considered a short-term workaround. Eventually, Quorum Driver
4250                // should be able to return either an effects certificate, -or-
4251                // a proof of inclusion in a checkpoint. In the case above, the
4252                // Quorum Driver would return a proof of inclusion in the final
4253                // checkpoint, and this code would no longer be necessary.
4254                debug!(
4255                    ?tx_digest,
4256                    epoch=?epoch_store.epoch(),
4257                    "Re-signing the effects with the current epoch"
4258                );
4259
4260                let sig = AuthoritySignInfo::new(
4261                    epoch_store.epoch(),
4262                    &effects,
4263                    Intent::iota_app(IntentScope::TransactionEffects),
4264                    self.name,
4265                    &*self.secret,
4266                );
4267
4268                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4269
4270                epoch_store.insert_effects_digest_and_signature(
4271                    &tx_digest,
4272                    effects.digest(),
4273                    &sig,
4274                )?;
4275
4276                effects
4277            }
4278        };
4279
4280        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4281            signed_effects,
4282        ))
4283    }
4284
4285    // Returns coin objects for indexing for fullnode if indexing is enabled.
4286    #[instrument(level = "trace", skip_all)]
4287    fn fullnode_only_get_tx_coins_for_indexing(
4288        &self,
4289        inner_temporary_store: &InnerTemporaryStore,
4290        epoch_store: &Arc<AuthorityPerEpochStore>,
4291    ) -> Option<TxCoins> {
4292        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4293            return None;
4294        }
4295        let written_coin_objects = inner_temporary_store
4296            .written
4297            .iter()
4298            .filter_map(|(k, v)| {
4299                if v.is_coin() {
4300                    Some((*k, v.clone()))
4301                } else {
4302                    None
4303                }
4304            })
4305            .collect();
4306        let input_coin_objects = inner_temporary_store
4307            .input_objects
4308            .iter()
4309            .filter_map(|(k, v)| {
4310                if v.is_coin() {
4311                    Some((*k, v.clone()))
4312                } else {
4313                    None
4314                }
4315            })
4316            .collect::<ObjectMap>();
4317        Some((input_coin_objects, written_coin_objects))
4318    }
4319
4320    /// Get the TransactionEnvelope that currently locks the given object, if
4321    /// any. Since object locks are only valid for one epoch, we also need
4322    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4323    /// no lock records for the given object can be found.
4324    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4325    /// object record is at a different version.
4326    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4327    /// certain transaction. Returns None if the a lock record is
4328    /// initialized for the given ObjectRef but not yet locked by any
4329    /// transaction,     or cannot find the transaction in transaction
4330    /// table, because of data race etc.
4331    #[instrument(level = "trace", skip_all)]
4332    pub async fn get_transaction_lock(
4333        &self,
4334        object_ref: &ObjectRef,
4335        epoch_store: &AuthorityPerEpochStore,
4336    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4337        let lock_info = self
4338            .get_object_cache_reader()
4339            .try_get_lock(*object_ref, epoch_store)?;
4340        let lock_info = match lock_info {
4341            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4342                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4343                    provided_obj_ref: *object_ref,
4344                    current_version: locked_ref.1,
4345                }
4346                .into());
4347            }
4348            ObjectLockStatus::Initialized => {
4349                return Ok(None);
4350            }
4351            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4352        };
4353
4354        epoch_store.get_signed_transaction(&lock_info)
4355    }
4356
4357    pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4358        self.get_object_cache_reader().try_get_objects(objects)
4359    }
4360
4361    /// Non-fallible version of `try_get_objects`.
4362    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4363        self.try_get_objects(objects)
4364            .await
4365            .expect("storage access failed")
4366    }
4367
4368    pub async fn try_get_object_or_tombstone(
4369        &self,
4370        object_id: ObjectID,
4371    ) -> IotaResult<Option<ObjectRef>> {
4372        self.get_object_cache_reader()
4373            .try_get_latest_object_ref_or_tombstone(object_id)
4374    }
4375
4376    /// Non-fallible version of `try_get_object_or_tombstone`.
4377    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4378        self.try_get_object_or_tombstone(object_id)
4379            .await
4380            .expect("storage access failed")
4381    }
4382
4383    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4384    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4385    /// upgrade.
4386    ///
4387    /// This method can be used to dynamic adjust the amount of buffer. If set
4388    /// to 0, the upgrade will go through with only 2f+1 votes.
4389    ///
4390    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4391    /// should have the same value), or you risk halting the chain.
4392    pub fn set_override_protocol_upgrade_buffer_stake(
4393        &self,
4394        expected_epoch: EpochId,
4395        buffer_stake_bps: u64,
4396    ) -> IotaResult {
4397        let epoch_store = self.load_epoch_store_one_call_per_task();
4398        let actual_epoch = epoch_store.epoch();
4399        if actual_epoch != expected_epoch {
4400            return Err(IotaError::WrongEpoch {
4401                expected_epoch,
4402                actual_epoch,
4403            });
4404        }
4405
4406        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4407    }
4408
4409    pub fn clear_override_protocol_upgrade_buffer_stake(
4410        &self,
4411        expected_epoch: EpochId,
4412    ) -> IotaResult {
4413        let epoch_store = self.load_epoch_store_one_call_per_task();
4414        let actual_epoch = epoch_store.epoch();
4415        if actual_epoch != expected_epoch {
4416            return Err(IotaError::WrongEpoch {
4417                expected_epoch,
4418                actual_epoch,
4419            });
4420        }
4421
4422        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4423    }
4424
4425    /// Get the set of system packages that are compiled in to this build, if
4426    /// those packages are compatible with the current versions of those
4427    /// packages on-chain.
4428    pub async fn get_available_system_packages(
4429        &self,
4430        binary_config: &BinaryConfig,
4431    ) -> Vec<ObjectRef> {
4432        let mut results = vec![];
4433
4434        let system_packages = BuiltInFramework::iter_system_packages();
4435
4436        // Add extra framework packages during simtest
4437        #[cfg(msim)]
4438        let extra_packages = framework_injection::get_extra_packages(self.name);
4439        #[cfg(msim)]
4440        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4441
4442        for system_package in system_packages {
4443            let modules = system_package.modules().to_vec();
4444            // In simtests, we could override the current built-in framework packages.
4445            #[cfg(msim)]
4446            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4447                .unwrap_or(modules);
4448
4449            let Some(obj_ref) = iota_framework::compare_system_package(
4450                &self.get_object_store(),
4451                &system_package.id,
4452                &modules,
4453                system_package.dependencies.to_vec(),
4454                binary_config,
4455            )
4456            .await
4457            else {
4458                return vec![];
4459            };
4460            results.push(obj_ref);
4461        }
4462
4463        results
4464    }
4465
4466    /// Return the new versions, module bytes, and dependencies for the packages
4467    /// that have been committed to for a framework upgrade, in
4468    /// `system_packages`.  Loads the module contents from the binary, and
4469    /// performs the following checks:
4470    ///
4471    /// - Whether its contents matches what is on-chain already, in which case
4472    ///   no upgrade is required, and its contents are omitted from the output.
4473    /// - Whether the contents in the binary can form a package whose digest
4474    ///   matches the input, meaning the framework will be upgraded, and this
4475    ///   authority can satisfy that upgrade, in which case the contents are
4476    ///   included in the output.
4477    ///
4478    /// If the current version of the framework can't be loaded, the binary does
4479    /// not contain the bytes for that framework ID, or the resulting
4480    /// package fails the digest check, `None` is returned indicating that
4481    /// this authority cannot run the upgrade that the network voted on.
4482    async fn get_system_package_bytes(
4483        &self,
4484        system_packages: Vec<ObjectRef>,
4485        binary_config: &BinaryConfig,
4486    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4487        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4488        let objects = self.get_objects(&ids).await;
4489
4490        let mut res = Vec::with_capacity(system_packages.len());
4491        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4492            let prev_transaction = match object {
4493                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4494                    // Skip this one because it doesn't need to be upgraded.
4495                    info!("Framework {} does not need updating", system_package_ref.0);
4496                    continue;
4497                }
4498
4499                Some(cur_object) => cur_object.previous_transaction,
4500                None => TransactionDigest::genesis_marker(),
4501            };
4502
4503            #[cfg(msim)]
4504            let SystemPackage {
4505                id: _,
4506                bytes,
4507                dependencies,
4508            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4509                .unwrap_or_else(|| {
4510                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4511                });
4512
4513            #[cfg(not(msim))]
4514            let SystemPackage {
4515                id: _,
4516                bytes,
4517                dependencies,
4518            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4519
4520            let modules: Vec<_> = bytes
4521                .iter()
4522                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4523                .collect();
4524
4525            let new_object = Object::new_system_package(
4526                &modules,
4527                system_package_ref.1,
4528                dependencies.clone(),
4529                prev_transaction,
4530            );
4531
4532            let new_ref = new_object.compute_object_reference();
4533            if new_ref != system_package_ref {
4534                error!(
4535                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4536                );
4537                return None;
4538            }
4539
4540            res.push((system_package_ref.1, bytes, dependencies));
4541        }
4542
4543        Some(res)
4544    }
4545
4546    /// Returns the new protocol version and system packages that the network
4547    /// has voted to upgrade to. If the proposed protocol version is not
4548    /// supported, None is returned.
4549    fn is_protocol_version_supported_v1(
4550        proposed_protocol_version: ProtocolVersion,
4551        committee: &Committee,
4552        capabilities: Vec<AuthorityCapabilitiesV1>,
4553        mut buffer_stake_bps: u64,
4554    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4555        if buffer_stake_bps > 10000 {
4556            warn!("clamping buffer_stake_bps to 10000");
4557            buffer_stake_bps = 10000;
4558        }
4559
4560        // For each validator, gather the protocol version and system packages that it
4561        // would like to upgrade to in the next epoch.
4562        let mut desired_upgrades: Vec<_> = capabilities
4563            .into_iter()
4564            .filter_map(|mut cap| {
4565                // A validator that lists no packages is voting against any change at all.
4566                if cap.available_system_packages.is_empty() {
4567                    return None;
4568                }
4569
4570                cap.available_system_packages.sort();
4571
4572                info!(
4573                    "validator {:?} supports {:?} with system packages: {:?}",
4574                    cap.authority.concise(),
4575                    cap.supported_protocol_versions,
4576                    cap.available_system_packages,
4577                );
4578
4579                // A validator that only supports the current protocol version is also voting
4580                // against any change, because framework upgrades always require a protocol
4581                // version bump.
4582                cap.supported_protocol_versions
4583                    .get_version_digest(proposed_protocol_version)
4584                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4585            })
4586            .collect();
4587
4588        // There can only be one set of votes that have a majority, find one if it
4589        // exists.
4590        desired_upgrades.sort();
4591        desired_upgrades
4592            .into_iter()
4593            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4594            .into_iter()
4595            .find_map(|((digest, packages), group)| {
4596                // should have been filtered out earlier.
4597                assert!(!packages.is_empty());
4598
4599                let mut stake_aggregator: StakeAggregator<(), true> =
4600                    StakeAggregator::new(Arc::new(committee.clone()));
4601
4602                for (_, _, authority) in group {
4603                    stake_aggregator.insert_generic(authority, ());
4604                }
4605
4606                let total_votes = stake_aggregator.total_votes();
4607                let quorum_threshold = committee.quorum_threshold();
4608                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4609
4610                info!(
4611                    protocol_config_digest = ?digest,
4612                    ?total_votes,
4613                    ?quorum_threshold,
4614                    ?buffer_stake_bps,
4615                    ?effective_threshold,
4616                    ?proposed_protocol_version,
4617                    ?packages,
4618                    "support for upgrade"
4619                );
4620
4621                let has_support = total_votes >= effective_threshold;
4622                has_support.then_some((proposed_protocol_version, digest, packages))
4623            })
4624    }
4625
4626    /// Selects the highest supported protocol version and system packages that
4627    /// the network has voted to upgrade to. If no upgrade is supported,
4628    /// returns the current protocol version and system packages.
4629    fn choose_protocol_version_and_system_packages_v1(
4630        current_protocol_version: ProtocolVersion,
4631        current_protocol_digest: Digest,
4632        committee: &Committee,
4633        capabilities: Vec<AuthorityCapabilitiesV1>,
4634        buffer_stake_bps: u64,
4635    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4636        let mut next_protocol_version = current_protocol_version;
4637        let mut system_packages = vec![];
4638        let mut protocol_version_digest = current_protocol_digest;
4639
4640        // Finds the highest supported protocol version and system packages by
4641        // incrementing the proposed protocol version by one until no further
4642        // upgrades are supported.
4643        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4644            next_protocol_version + 1,
4645            committee,
4646            capabilities.clone(),
4647            buffer_stake_bps,
4648        ) {
4649            next_protocol_version = version;
4650            protocol_version_digest = digest;
4651            system_packages = packages;
4652        }
4653
4654        (
4655            next_protocol_version,
4656            protocol_version_digest,
4657            system_packages,
4658        )
4659    }
4660
4661    /// Returns the indices of validators that support the given protocol
4662    /// version and digest. This includes both committee and non-committee
4663    /// validators based on their capabilities. Uses active validators
4664    /// instead of committee indices.
4665    fn get_validators_supporting_protocol_version(
4666        target_protocol_version: ProtocolVersion,
4667        target_digest: Digest,
4668        active_validators: &[AuthorityPublicKey],
4669        capabilities: &[AuthorityCapabilitiesV1],
4670    ) -> Vec<u64> {
4671        let mut eligible_validators = Vec::new();
4672
4673        for capability in capabilities {
4674            // Check if this validator supports the target protocol version and digest
4675            if let Some(digest) = capability
4676                .supported_protocol_versions
4677                .get_version_digest(target_protocol_version)
4678            {
4679                if digest == target_digest {
4680                    // Find the validator's index in the active validators list
4681                    if let Some(index) = active_validators
4682                        .iter()
4683                        .position(|name| AuthorityName::from(name) == capability.authority)
4684                    {
4685                        eligible_validators.push(index as u64);
4686                    }
4687                }
4688            }
4689        }
4690
4691        // Sort indices for deterministic behavior
4692        eligible_validators.sort();
4693        eligible_validators
4694    }
4695
4696    /// Calculates the sum of weights for eligible validators that are part of
4697    /// the committee. Takes the indices from
4698    /// get_validators_supporting_protocol_version and maps them back
4699    /// to committee members to get their weights.
4700    fn calculate_eligible_validators_weight(
4701        eligible_validator_indices: &[u64],
4702        active_validators: &[AuthorityPublicKey],
4703        committee: &Committee,
4704    ) -> u64 {
4705        let mut total_weight = 0u64;
4706
4707        for &index in eligible_validator_indices {
4708            let authority_pubkey = &active_validators[index as usize];
4709            // Check if this validator is in the committee and get their weight
4710            if let Some((_, weight)) = committee
4711                .members()
4712                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4713            {
4714                total_weight += weight;
4715            }
4716        }
4717
4718        total_weight
4719    }
4720
4721    #[instrument(level = "debug", skip_all)]
4722    fn create_authenticator_state_tx(
4723        &self,
4724        epoch_store: &Arc<AuthorityPerEpochStore>,
4725    ) -> Option<EndOfEpochTransactionKind> {
4726        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4727            info!("authenticator state transactions not enabled");
4728            return None;
4729        }
4730
4731        let authenticator_state_exists = epoch_store.authenticator_state_exists();
4732        let tx = if authenticator_state_exists {
4733            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4734            let min_epoch =
4735                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4736            let authenticator_obj_initial_shared_version = epoch_store
4737                .epoch_start_config()
4738                .authenticator_obj_initial_shared_version()
4739                .expect("initial version must exist");
4740
4741            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4742                min_epoch,
4743                authenticator_obj_initial_shared_version,
4744            );
4745
4746            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4747
4748            tx
4749        } else {
4750            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4751            info!("Creating AuthenticatorStateCreate tx");
4752            tx
4753        };
4754        Some(tx)
4755    }
4756
4757    /// Creates and execute the advance epoch transaction to effects without
4758    /// committing it to the database. The effects of the change epoch tx
4759    /// are only written to the database after a certified checkpoint has been
4760    /// formed and executed by CheckpointExecutor.
4761    ///
4762    /// When a framework upgraded has been decided on, but the validator does
4763    /// not have the new versions of the packages locally, the validator
4764    /// cannot form the ChangeEpochTx. In this case it returns Err,
4765    /// indicating that the checkpoint builder should give up trying to make the
4766    /// final checkpoint. As long as the network is able to create a certified
4767    /// checkpoint (which should be ensured by the capabilities vote), it
4768    /// will arrive via state sync and be executed by CheckpointExecutor.
4769    #[instrument(level = "error", skip_all)]
4770    pub async fn create_and_execute_advance_epoch_tx(
4771        &self,
4772        epoch_store: &Arc<AuthorityPerEpochStore>,
4773        gas_cost_summary: &GasCostSummary,
4774        checkpoint: CheckpointSequenceNumber,
4775        epoch_start_timestamp_ms: CheckpointTimestamp,
4776    ) -> anyhow::Result<(
4777        IotaSystemState,
4778        Option<SystemEpochInfoEvent>,
4779        TransactionEffects,
4780    )> {
4781        let mut txns = Vec::new();
4782
4783        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4784            txns.push(tx);
4785        }
4786
4787        let next_epoch = epoch_store.epoch() + 1;
4788
4789        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4790        let authority_capabilities = epoch_store
4791            .get_capabilities_v1()
4792            .expect("read capabilities from db cannot fail");
4793        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4794            Self::choose_protocol_version_and_system_packages_v1(
4795                epoch_store.protocol_version(),
4796                SupportedProtocolVersionsWithHashes::protocol_config_digest(
4797                    epoch_store.protocol_config(),
4798                ),
4799                epoch_store.committee(),
4800                authority_capabilities.clone(),
4801                buffer_stake_bps,
4802            );
4803
4804        // since system packages are created during the current epoch, they should abide
4805        // by the rules of the current epoch, including the current epoch's max
4806        // Move binary format version
4807        let config = epoch_store.protocol_config();
4808        let binary_config = to_binary_config(config);
4809        let Some(next_epoch_system_package_bytes) = self
4810            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4811            .await
4812        else {
4813            error!(
4814                "upgraded system packages {:?} are not locally available, cannot create \
4815                ChangeEpochTx. validator binary must be upgraded to the correct version!",
4816                next_epoch_system_packages
4817            );
4818            // the checkpoint builder will keep retrying forever when it hits this error.
4819            // Eventually, one of two things will happen:
4820            // - The operator will upgrade this binary to one that has the new packages
4821            //   locally, and this function will succeed.
4822            // - The final checkpoint will be certified by other validators, we will receive
4823            //   it via state sync, and execute it. This will upgrade the framework
4824            //   packages, reconfigure, and most likely shut down in the new epoch (this
4825            //   validator likely doesn't support the new protocol version, or else it
4826            //   should have had the packages.)
4827            bail!("missing system packages: cannot form ChangeEpochTx");
4828        };
4829
4830        // Use ChangeEpochV3 when the feature flag is enabled and ChangeEpochV2
4831        // requirements are met
4832
4833        if config.select_committee_from_eligible_validators() {
4834            // Get the list of eligible validators that support the target protocol version
4835            let active_validators = epoch_store.epoch_start_state().get_active_validators();
4836
4837            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
4838
4839            // Use validators supporting the target protocol version as eligible validators
4840            // in the next version if select_committee_supporting_next_epoch_version feature
4841            // flag is set to true.
4842            if config.select_committee_supporting_next_epoch_version() {
4843                eligible_active_validators = Self::get_validators_supporting_protocol_version(
4844                    next_epoch_protocol_version,
4845                    next_epoch_protocol_digest,
4846                    &active_validators,
4847                    &authority_capabilities,
4848                );
4849
4850                // Calculate the total weight of eligible validators in the committee
4851                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
4852                    &eligible_active_validators,
4853                    &active_validators,
4854                    epoch_store.committee(),
4855                );
4856
4857                // Safety check: ensure eligible validators have enough stake
4858                // Use the same effective threshold calculation that was used to decide the
4859                // protocol version
4860                let committee = epoch_store.committee();
4861                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4862
4863                if eligible_validators_weight < effective_threshold {
4864                    error!(
4865                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
4866                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
4867                    );
4868                    // Pass all active validator indices as eligible validators
4869                    // to perform selection among all of them.
4870                    eligible_active_validators = (0..active_validators.len() as u64).collect();
4871                }
4872            }
4873
4874            txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
4875                next_epoch,
4876                next_epoch_protocol_version,
4877                gas_cost_summary.storage_cost,
4878                gas_cost_summary.computation_cost,
4879                gas_cost_summary.computation_cost_burned,
4880                gas_cost_summary.storage_rebate,
4881                gas_cost_summary.non_refundable_storage_fee,
4882                epoch_start_timestamp_ms,
4883                next_epoch_system_package_bytes,
4884                eligible_active_validators,
4885            ));
4886        } else if config.protocol_defined_base_fee()
4887            && config.max_committee_members_count_as_option().is_some()
4888        {
4889            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
4890                next_epoch,
4891                next_epoch_protocol_version,
4892                gas_cost_summary.storage_cost,
4893                gas_cost_summary.computation_cost,
4894                gas_cost_summary.computation_cost_burned,
4895                gas_cost_summary.storage_rebate,
4896                gas_cost_summary.non_refundable_storage_fee,
4897                epoch_start_timestamp_ms,
4898                next_epoch_system_package_bytes,
4899            ));
4900        } else {
4901            txns.push(EndOfEpochTransactionKind::new_change_epoch(
4902                next_epoch,
4903                next_epoch_protocol_version,
4904                gas_cost_summary.storage_cost,
4905                gas_cost_summary.computation_cost,
4906                gas_cost_summary.storage_rebate,
4907                gas_cost_summary.non_refundable_storage_fee,
4908                epoch_start_timestamp_ms,
4909                next_epoch_system_package_bytes,
4910            ));
4911        }
4912
4913        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
4914
4915        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
4916            tx.clone(),
4917            epoch_store.epoch(),
4918            checkpoint,
4919        );
4920
4921        let tx_digest = executable_tx.digest();
4922
4923        info!(
4924            ?next_epoch,
4925            ?next_epoch_protocol_version,
4926            ?next_epoch_system_packages,
4927            computation_cost=?gas_cost_summary.computation_cost,
4928            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
4929            storage_cost=?gas_cost_summary.storage_cost,
4930            storage_rebate=?gas_cost_summary.storage_rebate,
4931            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
4932            ?tx_digest,
4933            "Creating advance epoch transaction"
4934        );
4935
4936        fail_point_async!("change_epoch_tx_delay");
4937        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
4938
4939        // The tx could have been executed by state sync already - if so simply return
4940        // an error. The checkpoint builder will shortly be terminated by
4941        // reconfiguration anyway.
4942        if self
4943            .get_transaction_cache_reader()
4944            .try_is_tx_already_executed(tx_digest)?
4945        {
4946            warn!("change epoch tx has already been executed via state sync");
4947            bail!("change epoch tx has already been executed via state sync",);
4948        }
4949
4950        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
4951
4952        // We must manually assign the shared object versions to the transaction before
4953        // executing it. This is because we do not sequence end-of-epoch
4954        // transactions through consensus.
4955        epoch_store.assign_shared_object_versions_idempotent(
4956            self.get_object_cache_reader().as_ref(),
4957            std::slice::from_ref(&executable_tx),
4958        )?;
4959
4960        let input_objects =
4961            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
4962
4963        let (temporary_store, effects, _execution_error_opt) =
4964            self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
4965        let system_obj = get_iota_system_state(&temporary_store.written)
4966            .expect("change epoch tx must write to system object");
4967        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
4968        let system_epoch_info_event = temporary_store
4969            .events
4970            .data
4971            .into_iter()
4972            .find(|event| event.is_system_epoch_info_event())
4973            .map(SystemEpochInfoEvent::from);
4974        // The system epoch info event can be `None` in case if the `advance_epoch`
4975        // Move function call failed and was executed in the safe mode.
4976        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
4977
4978        // We must write tx and effects to the state sync tables so that state sync is
4979        // able to deliver to the transaction to CheckpointExecutor after it is
4980        // included in a certified checkpoint.
4981        self.get_state_sync_store()
4982            .try_insert_transaction_and_effects(&tx, &effects)
4983            .map_err(|err| {
4984                let err: anyhow::Error = err.into();
4985                err
4986            })?;
4987
4988        info!(
4989            "Effects summary of the change epoch transaction: {:?}",
4990            effects.summary_for_debug()
4991        );
4992        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
4993        // The change epoch transaction cannot fail to execute.
4994        assert!(effects.status().is_ok());
4995        Ok((system_obj, system_epoch_info_event, effects))
4996    }
4997
4998    /// This function is called at the very end of the epoch.
4999    /// This step is required before updating new epoch in the db and calling
5000    /// reopen_epoch_db.
5001    #[instrument(level = "error", skip_all)]
5002    async fn revert_uncommitted_epoch_transactions(
5003        &self,
5004        epoch_store: &AuthorityPerEpochStore,
5005    ) -> IotaResult {
5006        {
5007            let state = epoch_store.get_reconfig_state_write_lock_guard();
5008            if state.should_accept_user_certs() {
5009                // Need to change this so that consensus adapter do not accept certificates from
5010                // user. This can happen if our local validator did not initiate
5011                // epoch change locally, but 2f+1 nodes already concluded the
5012                // epoch.
5013                //
5014                // This lock is essentially a barrier for
5015                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5016                // after this block
5017                epoch_store.close_user_certs(state);
5018            }
5019            // lock is dropped here
5020        }
5021        let pending_certificates = epoch_store.pending_consensus_certificates();
5022        info!(
5023            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5024            pending_certificates.len(),
5025            pending_certificates,
5026        );
5027        for digest in pending_certificates {
5028            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5029                info!(
5030                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5031                    digest
5032                );
5033                continue;
5034            }
5035            info!("Reverting {:?} at the end of epoch", digest);
5036            epoch_store.revert_executed_transaction(&digest)?;
5037            self.get_reconfig_api().try_revert_state_update(&digest)?;
5038        }
5039        info!("All uncommitted local transactions reverted");
5040        Ok(())
5041    }
5042
5043    #[instrument(level = "error", skip_all)]
5044    async fn reopen_epoch_db(
5045        &self,
5046        cur_epoch_store: &AuthorityPerEpochStore,
5047        new_committee: Committee,
5048        epoch_start_configuration: EpochStartConfiguration,
5049        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5050        epoch_last_checkpoint: CheckpointSequenceNumber,
5051    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5052        let new_epoch = new_committee.epoch;
5053        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5054        assert_eq!(
5055            epoch_start_configuration.epoch_start_state().epoch(),
5056            new_committee.epoch
5057        );
5058        fail_point!("before-open-new-epoch-store");
5059        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5060            self.name,
5061            new_committee,
5062            epoch_start_configuration,
5063            self.get_backing_package_store().clone(),
5064            self.get_object_store().clone(),
5065            expensive_safety_check_config,
5066            cur_epoch_store.get_chain_identifier(),
5067            epoch_last_checkpoint,
5068        );
5069        self.epoch_store.store(new_epoch_store.clone());
5070        Ok(new_epoch_store)
5071    }
5072
5073    #[cfg(test)]
5074    pub(crate) fn iter_live_object_set_for_testing(
5075        &self,
5076    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5077        self.get_accumulator_store()
5078            .iter_cached_live_object_set_for_testing()
5079    }
5080
5081    #[cfg(test)]
5082    pub(crate) fn shutdown_execution_for_test(&self) {
5083        self.tx_execution_shutdown
5084            .lock()
5085            .take()
5086            .unwrap()
5087            .send(())
5088            .unwrap();
5089    }
5090
5091    /// NOTE: this function is only to be used for fuzzing and testing. Never
5092    /// use in prod
5093    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5094        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5095        self.get_object_cache_reader()
5096            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5097        self.get_reconfig_api()
5098            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5099    }
5100}
5101
5102pub struct RandomnessRoundReceiver {
5103    authority_state: Arc<AuthorityState>,
5104    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5105}
5106
5107impl RandomnessRoundReceiver {
5108    pub fn spawn(
5109        authority_state: Arc<AuthorityState>,
5110        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5111    ) -> JoinHandle<()> {
5112        let rrr = RandomnessRoundReceiver {
5113            authority_state,
5114            randomness_rx,
5115        };
5116        spawn_monitored_task!(rrr.run())
5117    }
5118
5119    async fn run(mut self) {
5120        info!("RandomnessRoundReceiver event loop started");
5121
5122        loop {
5123            tokio::select! {
5124                maybe_recv = self.randomness_rx.recv() => {
5125                    if let Some((epoch, round, bytes)) = maybe_recv {
5126                        self.handle_new_randomness(epoch, round, bytes);
5127                    } else {
5128                        break;
5129                    }
5130                },
5131            }
5132        }
5133
5134        info!("RandomnessRoundReceiver event loop ended");
5135    }
5136
5137    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5138    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5139        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5140        if epoch_store.epoch() != epoch {
5141            warn!(
5142                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5143                epoch_store.epoch()
5144            );
5145            return;
5146        }
5147        let transaction = VerifiedTransaction::new_randomness_state_update(
5148            epoch,
5149            round,
5150            bytes,
5151            epoch_store
5152                .epoch_start_config()
5153                .randomness_obj_initial_shared_version(),
5154        );
5155        debug!(
5156            "created randomness state update transaction with digest: {:?}",
5157            transaction.digest()
5158        );
5159        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5160        let digest = *transaction.digest();
5161
5162        // Randomness state updates contain the full bls signature for the random round,
5163        // which cannot necessarily be reconstructed again later. Therefore we must
5164        // immediately persist this transaction. If we crash before its outputs
5165        // are committed, this ensures we will be able to re-execute it.
5166        self.authority_state
5167            .get_cache_commit()
5168            .persist_transaction(&transaction);
5169
5170        // Send transaction to TransactionManager for execution.
5171        self.authority_state
5172            .transaction_manager()
5173            .enqueue(vec![transaction], &epoch_store);
5174
5175        let authority_state = self.authority_state.clone();
5176        spawn_monitored_task!(async move {
5177            // Wait for transaction execution in a separate task, to avoid deadlock in case
5178            // of out-of-order randomness generation. (Each
5179            // RandomnessStateUpdate depends on the output of the
5180            // RandomnessStateUpdate from the previous round.)
5181            //
5182            // We set a very long timeout so that in case this gets stuck for some reason,
5183            // the validator will eventually crash rather than continuing in a
5184            // zombie mode.
5185            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5186            let result = tokio::time::timeout(
5187                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5188                authority_state
5189                    .get_transaction_cache_reader()
5190                    .try_notify_read_executed_effects(&[digest]),
5191            )
5192            .await;
5193            let result = match result {
5194                Ok(result) => result,
5195                Err(_) => {
5196                    if cfg!(debug_assertions) {
5197                        // Crash on randomness update execution timeout in debug builds.
5198                        panic!(
5199                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5200                        );
5201                    }
5202                    warn!(
5203                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5204                    );
5205                    // Continue waiting as long as necessary in non-debug builds.
5206                    authority_state
5207                        .get_transaction_cache_reader()
5208                        .try_notify_read_executed_effects(&[digest])
5209                        .await
5210                }
5211            };
5212
5213            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5214            let effects = effects.pop().expect("should return effects");
5215            if *effects.status() != ExecutionStatus::Success {
5216                fatal!(
5217                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5218                );
5219            }
5220            debug!(
5221                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5222            );
5223        });
5224    }
5225}
5226
5227#[async_trait]
5228impl TransactionKeyValueStoreTrait for AuthorityState {
5229    async fn multi_get(
5230        &self,
5231        transaction_keys: &[TransactionDigest],
5232        effects_keys: &[TransactionDigest],
5233    ) -> IotaResult<KVStoreTransactionData> {
5234        let txns = if !transaction_keys.is_empty() {
5235            self.get_transaction_cache_reader()
5236                .try_multi_get_transaction_blocks(transaction_keys)?
5237                .into_iter()
5238                .map(|t| t.map(|t| (*t).clone().into_inner()))
5239                .collect()
5240        } else {
5241            vec![]
5242        };
5243
5244        let fx = if !effects_keys.is_empty() {
5245            self.get_transaction_cache_reader()
5246                .try_multi_get_executed_effects(effects_keys)?
5247        } else {
5248            vec![]
5249        };
5250
5251        Ok((txns, fx))
5252    }
5253
5254    async fn multi_get_checkpoints(
5255        &self,
5256        checkpoint_summaries: &[CheckpointSequenceNumber],
5257        checkpoint_contents: &[CheckpointSequenceNumber],
5258        checkpoint_summaries_by_digest: &[CheckpointDigest],
5259    ) -> IotaResult<(
5260        Vec<Option<CertifiedCheckpointSummary>>,
5261        Vec<Option<CheckpointContents>>,
5262        Vec<Option<CertifiedCheckpointSummary>>,
5263    )> {
5264        // TODO: use multi-get methods if it ever becomes important (unlikely)
5265        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5266        let store = self.get_checkpoint_store();
5267        for seq in checkpoint_summaries {
5268            let checkpoint = store
5269                .get_checkpoint_by_sequence_number(*seq)?
5270                .map(|c| c.into_inner());
5271
5272            summaries.push(checkpoint);
5273        }
5274
5275        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5276        for seq in checkpoint_contents {
5277            let checkpoint = store
5278                .get_checkpoint_by_sequence_number(*seq)?
5279                .and_then(|summary| {
5280                    store
5281                        .get_checkpoint_contents(&summary.content_digest)
5282                        .expect("db read cannot fail")
5283                });
5284            contents.push(checkpoint);
5285        }
5286
5287        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5288        for digest in checkpoint_summaries_by_digest {
5289            let checkpoint = store
5290                .get_checkpoint_by_digest(digest)?
5291                .map(|c| c.into_inner());
5292            summaries_by_digest.push(checkpoint);
5293        }
5294
5295        Ok((summaries, contents, summaries_by_digest))
5296    }
5297
5298    async fn get_transaction_perpetual_checkpoint(
5299        &self,
5300        digest: TransactionDigest,
5301    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5302        self.get_checkpoint_cache()
5303            .try_get_transaction_perpetual_checkpoint(&digest)
5304            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5305    }
5306
5307    async fn get_object(
5308        &self,
5309        object_id: ObjectID,
5310        version: VersionNumber,
5311    ) -> IotaResult<Option<Object>> {
5312        self.get_object_cache_reader()
5313            .try_get_object_by_key(&object_id, version)
5314    }
5315
5316    async fn multi_get_transactions_perpetual_checkpoints(
5317        &self,
5318        digests: &[TransactionDigest],
5319    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5320        let res = self
5321            .get_checkpoint_cache()
5322            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5323
5324        Ok(res
5325            .into_iter()
5326            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5327            .collect())
5328    }
5329
5330    #[instrument(skip(self))]
5331    async fn multi_get_events_by_tx_digests(
5332        &self,
5333        digests: &[TransactionDigest],
5334    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5335        if digests.is_empty() {
5336            return Ok(vec![]);
5337        }
5338        let events_digests: Vec<_> = self
5339            .get_transaction_cache_reader()
5340            .try_multi_get_executed_effects(digests)?
5341            .into_iter()
5342            .map(|t| t.and_then(|t| t.events_digest().cloned()))
5343            .collect();
5344        let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5345        let mut events = self
5346            .get_transaction_cache_reader()
5347            .try_multi_get_events(&non_empty_events)?
5348            .into_iter();
5349        Ok(events_digests
5350            .into_iter()
5351            .map(|ev| ev.and_then(|_| events.next()?))
5352            .collect())
5353    }
5354}
5355
5356#[cfg(msim)]
5357pub mod framework_injection {
5358    use std::{
5359        cell::RefCell,
5360        collections::{BTreeMap, BTreeSet},
5361    };
5362
5363    use iota_framework::{BuiltInFramework, SystemPackage};
5364    use iota_types::{
5365        base_types::{AuthorityName, ObjectID},
5366        is_system_package,
5367    };
5368    use move_binary_format::CompiledModule;
5369
5370    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5371
5372    // Thread local cache because all simtests run in a single unique thread.
5373    thread_local! {
5374        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5375    }
5376
5377    type Framework = Vec<CompiledModule>;
5378
5379    pub type PackageUpgradeCallback =
5380        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5381
5382    enum PackageOverrideConfig {
5383        Global(Framework),
5384        PerValidator(PackageUpgradeCallback),
5385    }
5386
5387    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5388        modules
5389            .iter()
5390            .map(|m| {
5391                let mut buf = Vec::new();
5392                m.serialize_with_version(m.version, &mut buf).unwrap();
5393                buf
5394            })
5395            .collect()
5396    }
5397
5398    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5399        OVERRIDE.with(|bs| {
5400            bs.borrow_mut()
5401                .insert(package_id, PackageOverrideConfig::Global(modules))
5402        });
5403    }
5404
5405    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5406        OVERRIDE.with(|bs| {
5407            bs.borrow_mut()
5408                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5409        });
5410    }
5411
5412    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5413        OVERRIDE.with(|cfg| {
5414            cfg.borrow().get(package_id).and_then(|entry| match entry {
5415                PackageOverrideConfig::Global(framework) => {
5416                    Some(compiled_modules_to_bytes(framework))
5417                }
5418                PackageOverrideConfig::PerValidator(func) => {
5419                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
5420                }
5421            })
5422        })
5423    }
5424
5425    pub fn get_override_modules(
5426        package_id: &ObjectID,
5427        name: AuthorityName,
5428    ) -> Option<Vec<CompiledModule>> {
5429        OVERRIDE.with(|cfg| {
5430            cfg.borrow().get(package_id).and_then(|entry| match entry {
5431                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5432                PackageOverrideConfig::PerValidator(func) => func(name),
5433            })
5434        })
5435    }
5436
5437    pub fn get_override_system_package(
5438        package_id: &ObjectID,
5439        name: AuthorityName,
5440    ) -> Option<SystemPackage> {
5441        let bytes = get_override_bytes(package_id, name)?;
5442        let dependencies = if is_system_package(*package_id) {
5443            BuiltInFramework::get_package_by_id(package_id)
5444                .dependencies
5445                .to_vec()
5446        } else {
5447            // Assume that entirely new injected packages depend on all existing system
5448            // packages.
5449            BuiltInFramework::all_package_ids()
5450        };
5451        Some(SystemPackage {
5452            id: *package_id,
5453            bytes,
5454            dependencies,
5455        })
5456    }
5457
5458    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5459        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5460        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5461            cfg.borrow()
5462                .keys()
5463                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5464                .collect()
5465        });
5466
5467        extra
5468            .into_iter()
5469            .map(|package| SystemPackage {
5470                id: package,
5471                bytes: get_override_bytes(&package, name).unwrap(),
5472                dependencies: BuiltInFramework::all_package_ids(),
5473            })
5474            .collect()
5475    }
5476}
5477
5478#[derive(Debug, Serialize, Deserialize, Clone)]
5479pub struct ObjDumpFormat {
5480    pub id: ObjectID,
5481    pub version: VersionNumber,
5482    pub digest: ObjectDigest,
5483    pub object: Object,
5484}
5485
5486impl ObjDumpFormat {
5487    fn new(object: Object) -> Self {
5488        let oref = object.compute_object_reference();
5489        Self {
5490            id: oref.0,
5491            version: oref.1,
5492            digest: oref.2,
5493            object,
5494        }
5495    }
5496}
5497
5498#[derive(Debug, Serialize, Deserialize, Clone)]
5499pub struct NodeStateDump {
5500    pub tx_digest: TransactionDigest,
5501    pub sender_signed_data: SenderSignedData,
5502    pub executed_epoch: u64,
5503    pub reference_gas_price: u64,
5504    pub protocol_version: u64,
5505    pub epoch_start_timestamp_ms: u64,
5506    pub computed_effects: TransactionEffects,
5507    pub expected_effects_digest: TransactionEffectsDigest,
5508    pub relevant_system_packages: Vec<ObjDumpFormat>,
5509    pub shared_objects: Vec<ObjDumpFormat>,
5510    pub loaded_child_objects: Vec<ObjDumpFormat>,
5511    pub modified_at_versions: Vec<ObjDumpFormat>,
5512    pub runtime_reads: Vec<ObjDumpFormat>,
5513    pub input_objects: Vec<ObjDumpFormat>,
5514}
5515
5516impl NodeStateDump {
5517    pub fn new(
5518        tx_digest: &TransactionDigest,
5519        effects: &TransactionEffects,
5520        expected_effects_digest: TransactionEffectsDigest,
5521        object_store: &dyn ObjectStore,
5522        epoch_store: &Arc<AuthorityPerEpochStore>,
5523        inner_temporary_store: &InnerTemporaryStore,
5524        certificate: &VerifiedExecutableTransaction,
5525    ) -> IotaResult<Self> {
5526        // Epoch info
5527        let executed_epoch = epoch_store.epoch();
5528        let reference_gas_price = epoch_store.reference_gas_price();
5529        let epoch_start_config = epoch_store.epoch_start_config();
5530        let protocol_version = epoch_store.protocol_version().as_u64();
5531        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5532
5533        // Record all system packages at this version
5534        let mut relevant_system_packages = Vec::new();
5535        for sys_package_id in BuiltInFramework::all_package_ids() {
5536            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5537                relevant_system_packages.push(ObjDumpFormat::new(w))
5538            }
5539        }
5540
5541        // Record all the shared objects
5542        let mut shared_objects = Vec::new();
5543        for kind in effects.input_shared_objects() {
5544            match kind {
5545                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5546                    if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5547                        shared_objects.push(ObjDumpFormat::new(w))
5548                    }
5549                }
5550                InputSharedObject::ReadDeleted(..)
5551                | InputSharedObject::MutateDeleted(..)
5552                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
5553                                                           * objects. */
5554            }
5555        }
5556
5557        // Record all loaded child objects
5558        // Child objects which are read but not mutated are not tracked anywhere else
5559        let mut loaded_child_objects = Vec::new();
5560        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5561            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5562                loaded_child_objects.push(ObjDumpFormat::new(w))
5563            }
5564        }
5565
5566        // Record all modified objects
5567        let mut modified_at_versions = Vec::new();
5568        for (id, ver) in effects.modified_at_versions() {
5569            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5570                modified_at_versions.push(ObjDumpFormat::new(w))
5571            }
5572        }
5573
5574        // Packages read at runtime, which were not previously loaded into the temoorary
5575        // store Some packages may be fetched at runtime and wont show up in
5576        // input objects
5577        let mut runtime_reads = Vec::new();
5578        for obj in inner_temporary_store
5579            .runtime_packages_loaded_from_db
5580            .values()
5581        {
5582            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
5583        }
5584
5585        // All other input objects should already be in `inner_temporary_store.objects`
5586
5587        Ok(Self {
5588            tx_digest: *tx_digest,
5589            executed_epoch,
5590            reference_gas_price,
5591            epoch_start_timestamp_ms,
5592            protocol_version,
5593            relevant_system_packages,
5594            shared_objects,
5595            loaded_child_objects,
5596            modified_at_versions,
5597            runtime_reads,
5598            sender_signed_data: certificate.clone().into_message(),
5599            input_objects: inner_temporary_store
5600                .input_objects
5601                .values()
5602                .map(|o| ObjDumpFormat::new(o.clone()))
5603                .collect(),
5604            computed_effects: effects.clone(),
5605            expected_effects_digest,
5606        })
5607    }
5608
5609    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
5610        let mut objects = Vec::new();
5611        objects.extend(self.relevant_system_packages.clone());
5612        objects.extend(self.shared_objects.clone());
5613        objects.extend(self.loaded_child_objects.clone());
5614        objects.extend(self.modified_at_versions.clone());
5615        objects.extend(self.runtime_reads.clone());
5616        objects.extend(self.input_objects.clone());
5617        objects
5618    }
5619
5620    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
5621        let file_name = format!(
5622            "{}_{}_NODE_DUMP.json",
5623            self.tx_digest,
5624            AuthorityState::unixtime_now_ms()
5625        );
5626        let mut path = path.to_path_buf();
5627        path.push(&file_name);
5628        let mut file = File::create(path.clone())?;
5629        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
5630        Ok(path)
5631    }
5632
5633    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
5634        let file = File::open(path)?;
5635        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
5636    }
5637}