iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49    key_value_store::{
50        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51    },
52    key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57    IOTA_SYSTEM_ADDRESS, TypeTag,
58    authenticator_state::get_authenticator_state,
59    base_types::*,
60    committee::{Committee, EpochId, ProtocolVersion},
61    crypto::{
62        AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
63        default_hash,
64    },
65    deny_list_v1::check_coin_deny_list_v1_during_signing,
66    digests::{ChainIdentifier, Digest, TransactionEventsDigest},
67    dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
68    effects::{
69        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
70        TransactionEvents, VerifiedSignedTransactionEffects,
71    },
72    error::{ExecutionError, IotaError, IotaResult, UserInputError},
73    event::{Event, EventID, SystemEpochInfoEvent},
74    executable_transaction::VerifiedExecutableTransaction,
75    execution_config_utils::to_binary_config,
76    execution_status::ExecutionStatus,
77    gas::{GasCostSummary, IotaGasStatus},
78    gas_coin::NANOS_PER_IOTA,
79    inner_temporary_store::{
80        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
81        WrittenObjects,
82    },
83    iota_system_state::{
84        IotaSystemState, IotaSystemStateTrait,
85        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
86    },
87    is_system_package,
88    layout_resolver::{LayoutResolver, into_struct_layout},
89    message_envelope::Message,
90    messages_checkpoint::{
91        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
92        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
93        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
94        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
95    },
96    messages_consensus::AuthorityCapabilitiesV1,
97    messages_grpc::{
98        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
99        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
100        TransactionStatus,
101    },
102    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
103    object::{
104        MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
105        bounded_visitor::BoundedVisitor,
106    },
107    storage::{
108        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
109    },
110    supported_protocol_versions::{
111        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
112    },
113    transaction::*,
114    transaction_executor::{SimulateTransactionResult, VmChecks},
115};
116use itertools::Itertools;
117use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
118use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
119use parking_lot::Mutex;
120use prometheus::{
121    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
122    register_histogram_vec_with_registry, register_histogram_with_registry,
123    register_int_counter_vec_with_registry, register_int_counter_with_registry,
124    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
125};
126use serde::{Deserialize, Serialize, de::DeserializeOwned};
127use tap::TapFallible;
128use tokio::{
129    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
130    task::JoinHandle,
131};
132use tracing::{debug, error, info, instrument, trace, warn};
133use typed_store::TypedStoreError;
134
135use self::{
136    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
137};
138#[cfg(msim)]
139pub use crate::checkpoints::checkpoint_executor::utils::{
140    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
141};
142use crate::{
143    authority::{
144        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
145        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
146        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
147        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
148        authority_store_tables::AuthorityPrunerTables,
149        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
150    },
151    authority_client::NetworkAuthorityClient,
152    checkpoints::CheckpointStore,
153    congestion_tracker::CongestionTracker,
154    consensus_adapter::ConsensusAdapter,
155    epoch::committee_store::CommitteeStore,
156    execution_cache::{
157        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
158        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
159        TransactionCacheRead,
160    },
161    execution_driver::execution_process,
162    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
163    metrics::{LatencyObserver, RateTracker},
164    module_cache_metrics::ResolverMetrics,
165    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
166    rest_index::RestIndexStore,
167    stake_aggregator::StakeAggregator,
168    state_accumulator::{AccumulatorStore, StateAccumulator},
169    subscription_handler::SubscriptionHandler,
170    transaction_input_loader::TransactionInputLoader,
171    transaction_manager::TransactionManager,
172    transaction_outputs::TransactionOutputs,
173    validator_tx_finalizer::ValidatorTxFinalizer,
174    verify_indexes::verify_indexes,
175};
176
177#[cfg(test)]
178#[path = "unit_tests/authority_tests.rs"]
179pub mod authority_tests;
180
181#[cfg(test)]
182#[path = "unit_tests/transaction_tests.rs"]
183pub mod transaction_tests;
184
185#[cfg(test)]
186#[path = "unit_tests/batch_transaction_tests.rs"]
187mod batch_transaction_tests;
188
189#[cfg(test)]
190#[path = "unit_tests/move_integration_tests.rs"]
191pub mod move_integration_tests;
192
193#[cfg(test)]
194#[path = "unit_tests/gas_tests.rs"]
195mod gas_tests;
196
197#[cfg(test)]
198#[path = "unit_tests/batch_verification_tests.rs"]
199mod batch_verification_tests;
200
201#[cfg(test)]
202#[path = "unit_tests/coin_deny_list_tests.rs"]
203mod coin_deny_list_tests;
204
205#[cfg(test)]
206#[path = "unit_tests/auth_unit_test_utils.rs"]
207pub mod auth_unit_test_utils;
208
209pub mod authority_test_utils;
210
211pub mod authority_per_epoch_store;
212pub mod authority_per_epoch_store_pruner;
213
214pub mod authority_store_pruner;
215pub mod authority_store_tables;
216pub mod authority_store_types;
217pub mod epoch_start_configuration;
218pub mod shared_object_congestion_tracker;
219pub mod shared_object_version_manager;
220pub mod suggested_gas_price_calculator;
221pub mod test_authority_builder;
222pub mod transaction_deferral;
223
224pub(crate) mod authority_store;
225pub mod backpressure;
226
227/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
228pub struct AuthorityMetrics {
229    tx_orders: IntCounter,
230    total_certs: IntCounter,
231    total_cert_attempts: IntCounter,
232    total_effects: IntCounter,
233    pub shared_obj_tx: IntCounter,
234    sponsored_tx: IntCounter,
235    tx_already_processed: IntCounter,
236    num_input_objs: Histogram,
237    num_shared_objects: Histogram,
238    batch_size: Histogram,
239
240    authority_state_handle_transaction_latency: Histogram,
241
242    execute_certificate_latency_single_writer: Histogram,
243    execute_certificate_latency_shared_object: Histogram,
244
245    internal_execution_latency: Histogram,
246    execution_load_input_objects_latency: Histogram,
247    prepare_certificate_latency: Histogram,
248    commit_certificate_latency: Histogram,
249    db_checkpoint_latency: Histogram,
250
251    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
252    pub(crate) transaction_manager_num_missing_objects: IntGauge,
253    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
254    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
255    pub(crate) transaction_manager_num_ready: IntGauge,
256    pub(crate) transaction_manager_object_cache_size: IntGauge,
257    pub(crate) transaction_manager_object_cache_hits: IntCounter,
258    pub(crate) transaction_manager_object_cache_misses: IntCounter,
259    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
260    pub(crate) transaction_manager_package_cache_size: IntGauge,
261    pub(crate) transaction_manager_package_cache_hits: IntCounter,
262    pub(crate) transaction_manager_package_cache_misses: IntCounter,
263    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
264    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
265
266    pub(crate) execution_driver_executed_transactions: IntCounter,
267    pub(crate) execution_driver_dispatch_queue: IntGauge,
268    pub(crate) execution_queueing_delay_s: Histogram,
269    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
270    pub(crate) execution_gas_latency_ratio: Histogram,
271
272    pub(crate) skipped_consensus_txns: IntCounter,
273    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
274
275    pub(crate) authority_overload_status: IntGauge,
276    pub(crate) authority_load_shedding_percentage: IntGauge,
277
278    pub(crate) transaction_overload_sources: IntCounterVec,
279
280    /// Post processing metrics
281    post_processing_total_events_emitted: IntCounter,
282    post_processing_total_tx_indexed: IntCounter,
283    post_processing_total_tx_had_event_processed: IntCounter,
284    post_processing_total_failures: IntCounter,
285
286    /// Consensus handler metrics
287    pub consensus_handler_processed: IntCounterVec,
288    pub consensus_handler_transaction_sizes: HistogramVec,
289    pub consensus_handler_num_low_scoring_authorities: IntGauge,
290    pub consensus_handler_scores: IntGaugeVec,
291    pub consensus_handler_deferred_transactions: IntCounter,
292    pub consensus_handler_congested_transactions: IntCounter,
293    pub consensus_handler_cancelled_transactions: IntCounter,
294    pub consensus_handler_max_object_costs: IntGaugeVec,
295    pub consensus_committed_subdags: IntCounterVec,
296    pub consensus_committed_messages: IntGaugeVec,
297    pub consensus_committed_user_transactions: IntGaugeVec,
298    pub consensus_handler_leader_round: IntGauge,
299    pub consensus_calculated_throughput: IntGauge,
300    pub consensus_calculated_throughput_profile: IntGauge,
301
302    pub limits_metrics: Arc<LimitsMetrics>,
303
304    /// bytecode verifier metrics for tracking timeouts
305    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
306
307    /// Count of zklogin signatures
308    pub zklogin_sig_count: IntCounter,
309    /// Count of multisig signatures
310    pub multisig_sig_count: IntCounter,
311
312    // Tracks recent average txn queueing delay between when it is ready for execution
313    // until it starts executing.
314    pub execution_queueing_latency: LatencyObserver,
315
316    // Tracks the rate of transactions become ready for execution in transaction manager.
317    // The need for the Mutex is that the tracker is updated in transaction manager and read
318    // in the overload_monitor. There should be low mutex contention because
319    // transaction manager is single threaded and the read rate in overload_monitor is
320    // low. In the case where transaction manager becomes multi-threaded, we can
321    // create one rate tracker per thread.
322    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
323
324    // Tracks the rate of transactions starts execution in execution driver.
325    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
326    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
327}
328
329// Override default Prom buckets for positive numbers in 0-10M range
330const POSITIVE_INT_BUCKETS: &[f64] = &[
331    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
332    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
333    7000000., 10000000.,
334];
335
336const LATENCY_SEC_BUCKETS: &[f64] = &[
337    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
338    10., 20., 30., 60., 90.,
339];
340
341// Buckets for low latency samples. Starts from 10us.
342const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
343    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
344    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
345];
346
347const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
348    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
349    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
350];
351
352/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
353pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
354
355impl AuthorityMetrics {
356    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
357        let execute_certificate_latency = register_histogram_vec_with_registry!(
358            "authority_state_execute_certificate_latency",
359            "Latency of executing certificates, including waiting for inputs",
360            &["tx_type"],
361            LATENCY_SEC_BUCKETS.to_vec(),
362            registry,
363        )
364        .unwrap();
365
366        let execute_certificate_latency_single_writer =
367            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
368        let execute_certificate_latency_shared_object =
369            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
370
371        Self {
372            tx_orders: register_int_counter_with_registry!(
373                "total_transaction_orders",
374                "Total number of transaction orders",
375                registry,
376            )
377            .unwrap(),
378            total_certs: register_int_counter_with_registry!(
379                "total_transaction_certificates",
380                "Total number of transaction certificates handled",
381                registry,
382            )
383            .unwrap(),
384            total_cert_attempts: register_int_counter_with_registry!(
385                "total_handle_certificate_attempts",
386                "Number of calls to handle_certificate",
387                registry,
388            )
389            .unwrap(),
390            // total_effects == total transactions finished
391            total_effects: register_int_counter_with_registry!(
392                "total_transaction_effects",
393                "Total number of transaction effects produced",
394                registry,
395            )
396            .unwrap(),
397
398            shared_obj_tx: register_int_counter_with_registry!(
399                "num_shared_obj_tx",
400                "Number of transactions involving shared objects",
401                registry,
402            )
403            .unwrap(),
404
405            sponsored_tx: register_int_counter_with_registry!(
406                "num_sponsored_tx",
407                "Number of sponsored transactions",
408                registry,
409            )
410            .unwrap(),
411
412            tx_already_processed: register_int_counter_with_registry!(
413                "num_tx_already_processed",
414                "Number of transaction orders already processed previously",
415                registry,
416            )
417            .unwrap(),
418            num_input_objs: register_histogram_with_registry!(
419                "num_input_objects",
420                "Distribution of number of input TX objects per TX",
421                POSITIVE_INT_BUCKETS.to_vec(),
422                registry,
423            )
424            .unwrap(),
425            num_shared_objects: register_histogram_with_registry!(
426                "num_shared_objects",
427                "Number of shared input objects per TX",
428                POSITIVE_INT_BUCKETS.to_vec(),
429                registry,
430            )
431            .unwrap(),
432            batch_size: register_histogram_with_registry!(
433                "batch_size",
434                "Distribution of size of transaction batch",
435                POSITIVE_INT_BUCKETS.to_vec(),
436                registry,
437            )
438            .unwrap(),
439            authority_state_handle_transaction_latency: register_histogram_with_registry!(
440                "authority_state_handle_transaction_latency",
441                "Latency of handling transactions",
442                LATENCY_SEC_BUCKETS.to_vec(),
443                registry,
444            )
445            .unwrap(),
446            execute_certificate_latency_single_writer,
447            execute_certificate_latency_shared_object,
448            internal_execution_latency: register_histogram_with_registry!(
449                "authority_state_internal_execution_latency",
450                "Latency of actual certificate executions",
451                LATENCY_SEC_BUCKETS.to_vec(),
452                registry,
453            )
454            .unwrap(),
455            execution_load_input_objects_latency: register_histogram_with_registry!(
456                "authority_state_execution_load_input_objects_latency",
457                "Latency of loading input objects for execution",
458                LOW_LATENCY_SEC_BUCKETS.to_vec(),
459                registry,
460            )
461            .unwrap(),
462            prepare_certificate_latency: register_histogram_with_registry!(
463                "authority_state_prepare_certificate_latency",
464                "Latency of executing certificates, before committing the results",
465                LATENCY_SEC_BUCKETS.to_vec(),
466                registry,
467            )
468            .unwrap(),
469            commit_certificate_latency: register_histogram_with_registry!(
470                "authority_state_commit_certificate_latency",
471                "Latency of committing certificate execution results",
472                LATENCY_SEC_BUCKETS.to_vec(),
473                registry,
474            )
475            .unwrap(),
476            db_checkpoint_latency: register_histogram_with_registry!(
477                "db_checkpoint_latency",
478                "Latency of checkpointing dbs",
479                LATENCY_SEC_BUCKETS.to_vec(),
480                registry,
481            ).unwrap(),
482            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
483                "transaction_manager_num_enqueued_certificates",
484                "Current number of certificates enqueued to TransactionManager",
485                &["result"],
486                registry,
487            )
488            .unwrap(),
489            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
490                "transaction_manager_num_missing_objects",
491                "Current number of missing objects in TransactionManager",
492                registry,
493            )
494            .unwrap(),
495            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
496                "transaction_manager_num_pending_certificates",
497                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
498                registry,
499            )
500            .unwrap(),
501            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
502                "transaction_manager_num_executing_certificates",
503                "Number of executing certificates, including queued and actually running certificates",
504                registry,
505            )
506            .unwrap(),
507            transaction_manager_num_ready: register_int_gauge_with_registry!(
508                "transaction_manager_num_ready",
509                "Number of ready transactions in TransactionManager",
510                registry,
511            )
512            .unwrap(),
513            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
514                "transaction_manager_object_cache_size",
515                "Current size of object-availability cache in TransactionManager",
516                registry,
517            )
518            .unwrap(),
519            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
520                "transaction_manager_object_cache_hits",
521                "Number of object-availability cache hits in TransactionManager",
522                registry,
523            )
524            .unwrap(),
525            authority_overload_status: register_int_gauge_with_registry!(
526                "authority_overload_status",
527                "Whether authority is current experiencing overload and enters load shedding mode.",
528                registry)
529            .unwrap(),
530            authority_load_shedding_percentage: register_int_gauge_with_registry!(
531                "authority_load_shedding_percentage",
532                "The percentage of transactions is shed when the authority is in load shedding mode.",
533                registry)
534            .unwrap(),
535            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
536                "transaction_manager_object_cache_misses",
537                "Number of object-availability cache misses in TransactionManager",
538                registry,
539            )
540            .unwrap(),
541            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
542                "transaction_manager_object_cache_evictions",
543                "Number of object-availability cache evictions in TransactionManager",
544                registry,
545            )
546            .unwrap(),
547            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
548                "transaction_manager_package_cache_size",
549                "Current size of package-availability cache in TransactionManager",
550                registry,
551            )
552            .unwrap(),
553            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
554                "transaction_manager_package_cache_hits",
555                "Number of package-availability cache hits in TransactionManager",
556                registry,
557            )
558            .unwrap(),
559            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
560                "transaction_manager_package_cache_misses",
561                "Number of package-availability cache misses in TransactionManager",
562                registry,
563            )
564            .unwrap(),
565            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
566                "transaction_manager_package_cache_evictions",
567                "Number of package-availability cache evictions in TransactionManager",
568                registry,
569            )
570            .unwrap(),
571            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
572                "transaction_manager_transaction_queue_age_s",
573                "Time spent in waiting for transaction in the queue",
574                LATENCY_SEC_BUCKETS.to_vec(),
575                registry,
576            )
577            .unwrap(),
578            transaction_overload_sources: register_int_counter_vec_with_registry!(
579                "transaction_overload_sources",
580                "Number of times each source indicates transaction overload.",
581                &["source"],
582                registry)
583            .unwrap(),
584            execution_driver_executed_transactions: register_int_counter_with_registry!(
585                "execution_driver_executed_transactions",
586                "Cumulative number of transaction executed by execution driver",
587                registry,
588            )
589            .unwrap(),
590            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
591                "execution_driver_dispatch_queue",
592                "Number of transaction pending in execution driver dispatch queue",
593                registry,
594            )
595            .unwrap(),
596            execution_queueing_delay_s: register_histogram_with_registry!(
597                "execution_queueing_delay_s",
598                "Queueing delay between a transaction is ready for execution until it starts executing.",
599                LATENCY_SEC_BUCKETS.to_vec(),
600                registry
601            )
602            .unwrap(),
603            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
604                "prepare_cert_gas_latency_ratio",
605                "The ratio of computation gas divided by VM execution latency.",
606                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
607                registry
608            )
609            .unwrap(),
610            execution_gas_latency_ratio: register_histogram_with_registry!(
611                "execution_gas_latency_ratio",
612                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
613                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
614                registry
615            )
616            .unwrap(),
617            skipped_consensus_txns: register_int_counter_with_registry!(
618                "skipped_consensus_txns",
619                "Total number of consensus transactions skipped",
620                registry,
621            )
622            .unwrap(),
623            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
624                "skipped_consensus_txns_cache_hit",
625                "Total number of consensus transactions skipped because of local cache hit",
626                registry,
627            )
628            .unwrap(),
629            post_processing_total_events_emitted: register_int_counter_with_registry!(
630                "post_processing_total_events_emitted",
631                "Total number of events emitted in post processing",
632                registry,
633            )
634            .unwrap(),
635            post_processing_total_tx_indexed: register_int_counter_with_registry!(
636                "post_processing_total_tx_indexed",
637                "Total number of txes indexed in post processing",
638                registry,
639            )
640            .unwrap(),
641            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
642                "post_processing_total_tx_had_event_processed",
643                "Total number of txes finished event processing in post processing",
644                registry,
645            )
646            .unwrap(),
647            post_processing_total_failures: register_int_counter_with_registry!(
648                "post_processing_total_failures",
649                "Total number of failure in post processing",
650                registry,
651            )
652            .unwrap(),
653            consensus_handler_processed: register_int_counter_vec_with_registry!(
654                "consensus_handler_processed",
655                "Number of transactions processed by consensus handler",
656                &["class"],
657                registry
658            ).unwrap(),
659            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
660                "consensus_handler_transaction_sizes",
661                "Sizes of each type of transactions processed by consensus handler",
662                &["class"],
663                POSITIVE_INT_BUCKETS.to_vec(),
664                registry
665            ).unwrap(),
666            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
667                "consensus_handler_num_low_scoring_authorities",
668                "Number of low scoring authorities based on reputation scores from consensus",
669                registry
670            ).unwrap(),
671            consensus_handler_scores: register_int_gauge_vec_with_registry!(
672                "consensus_handler_scores",
673                "scores from consensus for each authority",
674                &["authority"],
675                registry,
676            ).unwrap(),
677            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
678                "consensus_handler_deferred_transactions",
679                "Number of transactions deferred by consensus handler",
680                registry,
681            ).unwrap(),
682            consensus_handler_congested_transactions: register_int_counter_with_registry!(
683                "consensus_handler_congested_transactions",
684                "Number of transactions deferred by consensus handler due to congestion",
685                registry,
686            ).unwrap(),
687            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
688                "consensus_handler_cancelled_transactions",
689                "Number of transactions cancelled by consensus handler",
690                registry,
691            ).unwrap(),
692            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
693                "consensus_handler_max_congestion_control_object_costs",
694                "Max object costs for congestion control in the current consensus commit",
695                &["commit_type"],
696                registry,
697            ).unwrap(),
698            consensus_committed_subdags: register_int_counter_vec_with_registry!(
699                "consensus_committed_subdags",
700                "Number of committed subdags, sliced by leader",
701                &["authority"],
702                registry,
703            ).unwrap(),
704            consensus_committed_messages: register_int_gauge_vec_with_registry!(
705                "consensus_committed_messages",
706                "Total number of committed consensus messages, sliced by author",
707                &["authority"],
708                registry,
709            ).unwrap(),
710            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
711                "consensus_committed_user_transactions",
712                "Number of committed user transactions, sliced by submitter",
713                &["authority"],
714                registry,
715            ).unwrap(),
716            consensus_handler_leader_round: register_int_gauge_with_registry!(
717                "consensus_handler_leader_round",
718                "The leader round of the current consensus output being processed in the consensus handler",
719                registry,
720            ).unwrap(),
721            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
722            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
723            zklogin_sig_count: register_int_counter_with_registry!(
724                "zklogin_sig_count",
725                "Count of zkLogin signatures",
726                registry,
727            )
728            .unwrap(),
729            multisig_sig_count: register_int_counter_with_registry!(
730                "multisig_sig_count",
731                "Count of zkLogin signatures",
732                registry,
733            )
734            .unwrap(),
735            consensus_calculated_throughput: register_int_gauge_with_registry!(
736                "consensus_calculated_throughput",
737                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
738                registry,
739            ).unwrap(),
740            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
741                "consensus_calculated_throughput_profile",
742                "The current active calculated throughput profile",
743                registry
744            ).unwrap(),
745            execution_queueing_latency: LatencyObserver::new(),
746            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
747            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
748        }
749    }
750
751    /// Reset metrics that contain `hostname` as one of the labels. This is
752    /// needed to avoid retaining metrics for long-gone committee members and
753    /// only exposing metrics for the committee in the current epoch.
754    pub fn reset_on_reconfigure(&self) {
755        self.consensus_committed_messages.reset();
756        self.consensus_handler_scores.reset();
757        self.consensus_committed_user_transactions.reset();
758    }
759}
760
761/// a Trait object for `Signer` that is:
762/// - Pin, i.e. confined to one place in memory (we don't want to copy private
763///   keys).
764/// - Sync, i.e. can be safely shared between threads.
765///
766/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
767pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
768
769pub struct AuthorityState {
770    // Fixed size, static, identity of the authority
771    /// The name of this authority.
772    pub name: AuthorityName,
773    /// The signature key of the authority.
774    pub secret: StableSyncAuthoritySigner,
775
776    /// The database
777    input_loader: TransactionInputLoader,
778    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
779
780    epoch_store: ArcSwap<AuthorityPerEpochStore>,
781
782    /// This lock denotes current 'execution epoch'.
783    /// Execution acquires read lock, checks certificate epoch and holds it
784    /// until all writes are complete. Reconfiguration acquires write lock,
785    /// changes the epoch and revert all transactions from previous epoch
786    /// that are executed but did not make into checkpoint.
787    execution_lock: RwLock<EpochId>,
788
789    pub indexes: Option<Arc<IndexStore>>,
790    pub rest_index: Option<Arc<RestIndexStore>>,
791
792    pub subscription_handler: Arc<SubscriptionHandler>,
793    pub checkpoint_store: Arc<CheckpointStore>,
794
795    committee_store: Arc<CommitteeStore>,
796
797    /// Manages pending certificates and their missing input objects.
798    transaction_manager: Arc<TransactionManager>,
799
800    /// Shuts down the execution task. Used only in testing.
801    #[cfg_attr(not(test), expect(unused))]
802    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
803
804    pub metrics: Arc<AuthorityMetrics>,
805    _pruner: AuthorityStorePruner,
806    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
807
808    /// Take db checkpoints of different dbs
809    db_checkpoint_config: DBCheckpointConfig,
810
811    pub config: NodeConfig,
812
813    /// Current overload status in this authority. Updated periodically.
814    pub overload_info: AuthorityOverloadInfo,
815
816    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
817
818    /// The chain identifier is derived from the digest of the genesis
819    /// checkpoint.
820    chain_identifier: ChainIdentifier,
821
822    pub(crate) congestion_tracker: Arc<CongestionTracker>,
823}
824
825/// The authority state encapsulates all state, drives execution, and ensures
826/// safety.
827///
828/// Note the authority operations can be accessed through a read ref (&) and do
829/// not require &mut. Internally a database is synchronized through a mutex
830/// lock.
831///
832/// Repeating valid commands should produce no changes and return no error.
833impl AuthorityState {
834    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
835        epoch_store.committee().authority_exists(&self.name)
836    }
837
838    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
839        epoch_store
840            .active_validators()
841            .iter()
842            .any(|a| AuthorityName::from(a) == self.name)
843    }
844
845    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
846        !self.is_committee_validator(epoch_store)
847    }
848
849    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
850        &self.committee_store
851    }
852
853    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
854        self.committee_store.clone()
855    }
856
857    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
858        &self.config.authority_overload_config
859    }
860
861    pub fn get_epoch_state_commitments(
862        &self,
863        epoch: EpochId,
864    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
865        self.checkpoint_store.get_epoch_state_commitments(epoch)
866    }
867
868    /// This is a private method and should be kept that way. It doesn't check
869    /// whether the provided transaction is a system transaction, and hence
870    /// can only be called internally.
871    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
872    async fn handle_transaction_impl(
873        &self,
874        transaction: VerifiedTransaction,
875        epoch_store: &Arc<AuthorityPerEpochStore>,
876    ) -> IotaResult<VerifiedSignedTransaction> {
877        // Ensure that validator cannot reconfigure while we are signing the tx
878        let _execution_lock = self.execution_lock_for_signing();
879
880        let tx_digest = transaction.digest();
881        let tx_data = transaction.data().transaction_data();
882
883        let input_object_kinds = tx_data.input_objects()?;
884        let receiving_objects_refs = tx_data.receiving_objects();
885
886        // Note: the deny checks may do redundant package loads but:
887        // - they only load packages when there is an active package deny map
888        // - the loads are cached anyway
889        iota_transaction_checks::deny::check_transaction_for_signing(
890            tx_data,
891            transaction.tx_signatures(),
892            &input_object_kinds,
893            &receiving_objects_refs,
894            &self.config.transaction_deny_config,
895            self.get_backing_package_store().as_ref(),
896        )?;
897
898        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
899            Some(tx_digest),
900            &input_object_kinds,
901            &receiving_objects_refs,
902            epoch_store.epoch(),
903        )?;
904
905        let (_gas_status, checked_input_objects) =
906            iota_transaction_checks::check_transaction_input(
907                epoch_store.protocol_config(),
908                epoch_store.reference_gas_price(),
909                tx_data,
910                input_objects,
911                &receiving_objects,
912                &self.metrics.bytecode_verifier_metrics,
913                &self.config.verifier_signing_config,
914            )?;
915
916        check_coin_deny_list_v1_during_signing(
917            tx_data.sender(),
918            &checked_input_objects,
919            &receiving_objects,
920            &self.get_object_store(),
921        )?;
922
923        let owned_objects = checked_input_objects.inner().filter_owned_objects();
924
925        let signed_transaction = VerifiedSignedTransaction::new(
926            epoch_store.epoch(),
927            transaction,
928            self.name,
929            &*self.secret,
930        );
931
932        // Check and write locks, to signed transaction, into the database
933        // The call to self.set_transaction_lock checks the lock is not conflicting,
934        // and returns ConflictingTransaction error in case there is a lock on a
935        // different existing transaction.
936        self.get_cache_writer().try_acquire_transaction_locks(
937            epoch_store,
938            &owned_objects,
939            signed_transaction.clone(),
940        )?;
941
942        Ok(signed_transaction)
943    }
944
945    /// Initiate a new transaction.
946    #[instrument(name = "handle_transaction", level = "trace", skip_all)]
947    pub async fn handle_transaction(
948        &self,
949        epoch_store: &Arc<AuthorityPerEpochStore>,
950        transaction: VerifiedTransaction,
951    ) -> IotaResult<HandleTransactionResponse> {
952        let tx_digest = *transaction.digest();
953        debug!("handle_transaction");
954
955        // Ensure an idempotent answer.
956        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
957            return Ok(HandleTransactionResponse { status });
958        }
959
960        let _metrics_guard = self
961            .metrics
962            .authority_state_handle_transaction_latency
963            .start_timer();
964        self.metrics.tx_orders.inc();
965
966        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
967        match signed {
968            Ok(s) => {
969                if self.is_committee_validator(epoch_store) {
970                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
971                        let tx = s.clone();
972                        let validator_tx_finalizer = validator_tx_finalizer.clone();
973                        let cache_reader = self.get_transaction_cache_reader().clone();
974                        let epoch_store = epoch_store.clone();
975                        spawn_monitored_task!(epoch_store.within_alive_epoch(
976                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
977                        ));
978                    }
979                }
980                Ok(HandleTransactionResponse {
981                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
982                })
983            }
984            // It happens frequently that while we are checking the validity of the transaction, it
985            // has just been executed.
986            // In that case, we could still return Ok to avoid showing confusing errors.
987            Err(err) => Ok(HandleTransactionResponse {
988                status: self
989                    .get_transaction_status(&tx_digest, epoch_store)?
990                    .ok_or(err)?
991                    .1,
992            }),
993        }
994    }
995
996    pub fn check_system_overload_at_signing(&self) -> bool {
997        self.config
998            .authority_overload_config
999            .check_system_overload_at_signing
1000    }
1001
1002    pub fn check_system_overload_at_execution(&self) -> bool {
1003        self.config
1004            .authority_overload_config
1005            .check_system_overload_at_execution
1006    }
1007
1008    pub(crate) fn check_system_overload(
1009        &self,
1010        consensus_adapter: &Arc<ConsensusAdapter>,
1011        tx_data: &SenderSignedData,
1012        do_authority_overload_check: bool,
1013    ) -> IotaResult {
1014        if do_authority_overload_check {
1015            self.check_authority_overload(tx_data).tap_err(|_| {
1016                self.update_overload_metrics("execution_queue");
1017            })?;
1018        }
1019        self.transaction_manager
1020            .check_execution_overload(self.overload_config(), tx_data)
1021            .tap_err(|_| {
1022                self.update_overload_metrics("execution_pending");
1023            })?;
1024        consensus_adapter.check_consensus_overload().tap_err(|_| {
1025            self.update_overload_metrics("consensus");
1026        })?;
1027
1028        let pending_tx_count = self
1029            .get_cache_commit()
1030            .approximate_pending_transaction_count();
1031        if pending_tx_count
1032            > self
1033                .config
1034                .execution_cache_config
1035                .writeback_cache
1036                .backpressure_threshold_for_rpc()
1037        {
1038            return Err(IotaError::ValidatorOverloadedRetryAfter {
1039                retry_after_secs: 10,
1040            });
1041        }
1042
1043        Ok(())
1044    }
1045
1046    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1047        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1048            return Ok(());
1049        }
1050
1051        let load_shedding_percentage = self
1052            .overload_info
1053            .load_shedding_percentage
1054            .load(Ordering::Relaxed);
1055        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1056    }
1057
1058    fn update_overload_metrics(&self, source: &str) {
1059        self.metrics
1060            .transaction_overload_sources
1061            .with_label_values(&[source])
1062            .inc();
1063    }
1064
1065    /// Executes a certificate for its effects.
1066    #[instrument(level = "trace", skip_all)]
1067    pub async fn execute_certificate(
1068        &self,
1069        certificate: &VerifiedCertificate,
1070        epoch_store: &Arc<AuthorityPerEpochStore>,
1071    ) -> IotaResult<TransactionEffects> {
1072        let _metrics_guard = if certificate.contains_shared_object() {
1073            self.metrics
1074                .execute_certificate_latency_shared_object
1075                .start_timer()
1076        } else {
1077            self.metrics
1078                .execute_certificate_latency_single_writer
1079                .start_timer()
1080        };
1081        trace!("execute_certificate");
1082
1083        self.metrics.total_cert_attempts.inc();
1084
1085        if !certificate.contains_shared_object() {
1086            // Shared object transactions need to be sequenced by the consensus before
1087            // enqueueing for execution, done in
1088            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1089            // object transactions, they can be enqueued for execution immediately.
1090            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1091        }
1092
1093        // tx could be reverted when epoch ends, so we must be careful not to return a
1094        // result here after the epoch ends.
1095        epoch_store
1096            .within_alive_epoch(self.notify_read_effects(certificate))
1097            .await
1098            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1099            .and_then(|r| r)
1100    }
1101
1102    /// Internal logic to execute a certificate.
1103    ///
1104    /// Guarantees that
1105    /// - If input objects are available, return no permanent failure.
1106    /// - Execution and output commit are atomic. i.e. outputs are only written
1107    ///   to storage,
1108    /// on successful execution; crashed execution has no observable effect and
1109    /// can be retried.
1110    ///
1111    /// It is caller's responsibility to ensure input objects are available and
1112    /// locks are set. If this cannot be satisfied by the caller,
1113    /// execute_certificate() should be called instead.
1114    ///
1115    /// Should only be called within iota-core.
1116    #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1117    pub fn try_execute_immediately(
1118        &self,
1119        certificate: &VerifiedExecutableTransaction,
1120        expected_effects_digest: Option<TransactionEffectsDigest>,
1121        epoch_store: &Arc<AuthorityPerEpochStore>,
1122    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1123        let _scope = monitored_scope("Execution::try_execute_immediately");
1124        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1125
1126        let tx_digest = certificate.digest();
1127
1128        // Acquire a lock to prevent concurrent executions of the same transaction.
1129        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1130
1131        // The cert could have been processed by a concurrent attempt of the same cert,
1132        // so check if the effects have already been written.
1133        if let Some(effects) = self
1134            .get_transaction_cache_reader()
1135            .try_get_executed_effects(tx_digest)?
1136        {
1137            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1138                assert_eq!(
1139                    effects.digest(),
1140                    expected_effects_digest_inner,
1141                    "Unexpected effects digest for transaction {tx_digest:?}"
1142                );
1143            }
1144            tx_guard.release();
1145            return Ok((effects, None));
1146        }
1147        let input_objects =
1148            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1149
1150        // If no expected_effects_digest was provided, try to get it from storage.
1151        // We could be re-executing a previously executed but uncommitted transaction,
1152        // perhaps after restarting with a new binary. In this situation, if
1153        // we have published an effects signature, we must be sure not to
1154        // equivocate. TODO: read from cache instead of DB
1155        let expected_effects_digest =
1156            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1157
1158        self.process_certificate(
1159            tx_guard,
1160            certificate,
1161            input_objects,
1162            expected_effects_digest,
1163            epoch_store,
1164        )
1165        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1166        .tap_ok(
1167            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1168        )
1169    }
1170
1171    pub fn read_objects_for_execution(
1172        &self,
1173        tx_lock: &CertLockGuard,
1174        certificate: &VerifiedExecutableTransaction,
1175        epoch_store: &Arc<AuthorityPerEpochStore>,
1176    ) -> IotaResult<InputObjects> {
1177        let _scope = monitored_scope("Execution::load_input_objects");
1178        let _metrics_guard = self
1179            .metrics
1180            .execution_load_input_objects_latency
1181            .start_timer();
1182        let input_objects = &certificate.data().transaction_data().input_objects()?;
1183        self.input_loader.read_objects_for_execution(
1184            epoch_store,
1185            &certificate.key(),
1186            tx_lock,
1187            input_objects,
1188            epoch_store.epoch(),
1189        )
1190    }
1191
1192    /// Test only wrapper for `try_execute_immediately()` above, useful for
1193    /// checking errors if the pre-conditions are not satisfied, and
1194    /// executing change epoch transactions.
1195    pub fn try_execute_for_test(
1196        &self,
1197        certificate: &VerifiedCertificate,
1198    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1199        let epoch_store = self.epoch_store_for_testing();
1200        let (effects, execution_error_opt) = self.try_execute_immediately(
1201            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1202            None,
1203            &epoch_store,
1204        )?;
1205        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1206        Ok((signed_effects, execution_error_opt))
1207    }
1208
1209    /// Non-fallible version of `try_execute_for_test()`.
1210    pub fn execute_for_test(
1211        &self,
1212        certificate: &VerifiedCertificate,
1213    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1214        self.try_execute_for_test(certificate)
1215            .expect("try_execute_for_test should not fail")
1216    }
1217
1218    pub async fn notify_read_effects(
1219        &self,
1220        certificate: &VerifiedCertificate,
1221    ) -> IotaResult<TransactionEffects> {
1222        self.get_transaction_cache_reader()
1223            .try_notify_read_executed_effects(&[*certificate.digest()])
1224            .await
1225            .map(|mut r| r.pop().expect("must return correct number of effects"))
1226    }
1227
1228    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1229        self.get_object_cache_reader()
1230            .try_check_owned_objects_are_live(owned_object_refs)
1231    }
1232
1233    /// This function captures the required state to debug a forked transaction.
1234    /// The dump is written to a file in dir `path`, with name prefixed by the
1235    /// transaction digest. NOTE: Since this info escapes the validator
1236    /// context, make sure not to leak any private info here
1237    pub(crate) fn debug_dump_transaction_state(
1238        &self,
1239        tx_digest: &TransactionDigest,
1240        effects: &TransactionEffects,
1241        expected_effects_digest: TransactionEffectsDigest,
1242        inner_temporary_store: &InnerTemporaryStore,
1243        certificate: &VerifiedExecutableTransaction,
1244        debug_dump_config: &StateDebugDumpConfig,
1245    ) -> IotaResult<PathBuf> {
1246        let dump_dir = debug_dump_config
1247            .dump_file_directory
1248            .as_ref()
1249            .cloned()
1250            .unwrap_or(std::env::temp_dir());
1251        let epoch_store = self.load_epoch_store_one_call_per_task();
1252
1253        NodeStateDump::new(
1254            tx_digest,
1255            effects,
1256            expected_effects_digest,
1257            self.get_object_store().as_ref(),
1258            &epoch_store,
1259            inner_temporary_store,
1260            certificate,
1261        )?
1262        .write_to_file(&dump_dir)
1263        .map_err(|e| IotaError::FileIO(e.to_string()))
1264    }
1265
1266    #[instrument(level = "trace", skip_all)]
1267    pub(crate) fn process_certificate(
1268        &self,
1269        tx_guard: CertTxGuard,
1270        certificate: &VerifiedExecutableTransaction,
1271        input_objects: InputObjects,
1272        expected_effects_digest: Option<TransactionEffectsDigest>,
1273        epoch_store: &Arc<AuthorityPerEpochStore>,
1274    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1275        let process_certificate_start_time = tokio::time::Instant::now();
1276        let digest = *certificate.digest();
1277
1278        fail_point_if!("correlated-crash-process-certificate", || {
1279            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1280                iota_simulator::task::kill_current_node(None);
1281            }
1282        });
1283
1284        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1285        // Any caller that verifies the signatures on the certificate will have already
1286        // checked the epoch. But paths that don't verify sigs (e.g. execution
1287        // from checkpoint, reading from db) present the possibility of an epoch
1288        // mismatch. If this cert is not finalzied in previous epoch, then it's
1289        // invalid.
1290        let execution_guard = match execution_guard {
1291            Ok(execution_guard) => execution_guard,
1292            Err(err) => {
1293                tx_guard.release();
1294                return Err(err);
1295            }
1296        };
1297        // Since we obtain a reference to the epoch store before taking the execution
1298        // lock, it's possible that reconfiguration has happened and they no
1299        // longer match.
1300        if *execution_guard != epoch_store.epoch() {
1301            tx_guard.release();
1302            info!("The epoch of the execution_guard doesn't match the epoch store");
1303            return Err(IotaError::WrongEpoch {
1304                expected_epoch: epoch_store.epoch(),
1305                actual_epoch: *execution_guard,
1306            });
1307        }
1308
1309        // Errors originating from prepare_certificate may be transient (failure to read
1310        // locks) or non-transient (transaction input is invalid, move vm
1311        // errors). However, all errors from this function occur before we have
1312        // written anything to the db, so we commit the tx guard and rely on the
1313        // client to retry the tx (if it was transient).
1314        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1315            &execution_guard,
1316            certificate,
1317            input_objects,
1318            epoch_store,
1319        ) {
1320            Err(e) => {
1321                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1322                tx_guard.release();
1323                return Err(e);
1324            }
1325            Ok(res) => res,
1326        };
1327
1328        if let Some(expected_effects_digest) = expected_effects_digest {
1329            if effects.digest() != expected_effects_digest {
1330                // We dont want to mask the original error, so we log it and continue.
1331                match self.debug_dump_transaction_state(
1332                    &digest,
1333                    &effects,
1334                    expected_effects_digest,
1335                    &inner_temporary_store,
1336                    certificate,
1337                    &self.config.state_debug_dump_config,
1338                ) {
1339                    Ok(out_path) => {
1340                        info!(
1341                            "Dumped node state for transaction {} to {}",
1342                            digest,
1343                            out_path.as_path().display().to_string()
1344                        );
1345                    }
1346                    Err(e) => {
1347                        error!("Error dumping state for transaction {}: {e}", digest);
1348                    }
1349                }
1350                error!(
1351                    tx_digest = ?digest,
1352                    ?expected_effects_digest,
1353                    actual_effects = ?effects,
1354                    "fork detected!"
1355                );
1356                panic!(
1357                    "Transaction {} is expected to have effects digest {}, but got {}!",
1358                    digest,
1359                    expected_effects_digest,
1360                    effects.digest(),
1361                );
1362            }
1363        }
1364
1365        fail_point!("crash");
1366
1367        self.commit_certificate(
1368            certificate,
1369            inner_temporary_store,
1370            &effects,
1371            tx_guard,
1372            execution_guard,
1373            epoch_store,
1374        )?;
1375
1376        if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1377            certificate.data().transaction_data().kind()
1378        {
1379            if let Some(err) = &execution_error_opt {
1380                debug_fatal!("Authenticator state update failed: {:?}", err);
1381            }
1382            epoch_store.update_authenticator_state(auth_state);
1383
1384            // double check that the signature verifier always matches the authenticator
1385            // state
1386            if cfg!(debug_assertions) {
1387                let authenticator_state = get_authenticator_state(self.get_object_store())
1388                    .expect("Read cannot fail")
1389                    .expect("Authenticator state must exist");
1390
1391                let mut sys_jwks: Vec<_> = authenticator_state
1392                    .active_jwks
1393                    .into_iter()
1394                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1395                    .collect();
1396                let mut active_jwks: Vec<_> = epoch_store
1397                    .signature_verifier
1398                    .get_jwks()
1399                    .into_iter()
1400                    .collect();
1401                sys_jwks.sort();
1402                active_jwks.sort();
1403
1404                assert_eq!(sys_jwks, active_jwks);
1405            }
1406        }
1407
1408        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1409        if elapsed > 0.0 {
1410            self.metrics
1411                .execution_gas_latency_ratio
1412                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1413        };
1414        Ok((effects, execution_error_opt))
1415    }
1416
1417    #[instrument(level = "trace", skip_all)]
1418    fn commit_certificate(
1419        &self,
1420        certificate: &VerifiedExecutableTransaction,
1421        inner_temporary_store: InnerTemporaryStore,
1422        effects: &TransactionEffects,
1423        tx_guard: CertTxGuard,
1424        _execution_guard: ExecutionLockReadGuard<'_>,
1425        epoch_store: &Arc<AuthorityPerEpochStore>,
1426    ) -> IotaResult {
1427        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1428            monitored_scope("Execution::commit_certificate");
1429        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1430
1431        let tx_key = certificate.key();
1432        let tx_digest = certificate.digest();
1433        let input_object_count = inner_temporary_store.input_objects.len();
1434        let shared_object_count = effects.input_shared_objects().len();
1435
1436        let output_keys = inner_temporary_store.get_output_keys(effects);
1437
1438        // index certificate
1439        let _ = self
1440            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1441            .tap_err(|e| {
1442                self.metrics.post_processing_total_failures.inc();
1443                error!(?tx_digest, "tx post processing failed: {e}");
1444            });
1445
1446        // The insertion to epoch_store is not atomic with the insertion to the
1447        // perpetual store. This is OK because we insert to the epoch store
1448        // first. And during lookups we always look up in the perpetual store first.
1449        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1450
1451        // Allow testing what happens if we crash here.
1452        fail_point!("crash");
1453
1454        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1455            certificate.clone().into_unsigned(),
1456            effects.clone(),
1457            inner_temporary_store,
1458        );
1459        self.get_cache_writer()
1460            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1461
1462        if certificate.transaction_data().is_end_of_epoch_tx() {
1463            // At the end of epoch, since system packages may have been upgraded, force
1464            // reload them in the cache.
1465            self.get_object_cache_reader()
1466                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1467        }
1468
1469        // commit_certificate finished, the tx is fully committed to the store.
1470        tx_guard.commit_tx();
1471
1472        // Notifies transaction manager about transaction and output objects committed.
1473        // This provides necessary information to transaction manager to start executing
1474        // additional ready transactions.
1475        self.transaction_manager
1476            .notify_commit(tx_digest, output_keys, epoch_store);
1477
1478        self.update_metrics(certificate, input_object_count, shared_object_count);
1479
1480        Ok(())
1481    }
1482
1483    fn update_metrics(
1484        &self,
1485        certificate: &VerifiedExecutableTransaction,
1486        input_object_count: usize,
1487        shared_object_count: usize,
1488    ) {
1489        // count signature by scheme, for zklogin and multisig
1490        if certificate.has_zklogin_sig() {
1491            self.metrics.zklogin_sig_count.inc();
1492        } else if certificate.has_upgraded_multisig() {
1493            self.metrics.multisig_sig_count.inc();
1494        }
1495
1496        self.metrics.total_effects.inc();
1497        self.metrics.total_certs.inc();
1498
1499        if shared_object_count > 0 {
1500            self.metrics.shared_obj_tx.inc();
1501        }
1502
1503        if certificate.is_sponsored_tx() {
1504            self.metrics.sponsored_tx.inc();
1505        }
1506
1507        self.metrics
1508            .num_input_objs
1509            .observe(input_object_count as f64);
1510        self.metrics
1511            .num_shared_objects
1512            .observe(shared_object_count as f64);
1513        self.metrics.batch_size.observe(
1514            certificate
1515                .data()
1516                .intent_message()
1517                .value
1518                .kind()
1519                .num_commands() as f64,
1520        );
1521    }
1522
1523    /// prepare_certificate validates the transaction input, and executes the
1524    /// certificate, returning effects, output objects, events, etc.
1525    ///
1526    /// It reads state from the db (both owned and shared locks), but it has no
1527    /// side effects.
1528    ///
1529    /// It can be generally understood that a failure of prepare_certificate
1530    /// indicates a non-transient error, e.g. the transaction input is
1531    /// somehow invalid, the correct locks are not held, etc. However, this
1532    /// is not entirely true, as a transient db read error may also cause
1533    /// this function to fail.
1534    #[instrument(level = "trace", skip_all)]
1535    fn prepare_certificate(
1536        &self,
1537        _execution_guard: &ExecutionLockReadGuard<'_>,
1538        certificate: &VerifiedExecutableTransaction,
1539        input_objects: InputObjects,
1540        epoch_store: &Arc<AuthorityPerEpochStore>,
1541    ) -> IotaResult<(
1542        InnerTemporaryStore,
1543        TransactionEffects,
1544        Option<ExecutionError>,
1545    )> {
1546        let _scope = monitored_scope("Execution::prepare_certificate");
1547        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1548        let prepare_certificate_start_time = tokio::time::Instant::now();
1549
1550        // TODO: We need to move this to a more appropriate place to avoid redundant
1551        // checks.
1552        let tx_data = certificate.data().transaction_data();
1553        tx_data.validity_check(epoch_store.protocol_config())?;
1554
1555        // The cost of partially re-auditing a transaction before execution is
1556        // tolerated.
1557        let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
1558            certificate,
1559            input_objects,
1560            epoch_store.protocol_config(),
1561            epoch_store.reference_gas_price(),
1562        )?;
1563
1564        let owned_object_refs = input_objects.inner().filter_owned_objects();
1565        self.check_owned_locks(&owned_object_refs)?;
1566        let tx_digest = *certificate.digest();
1567        let protocol_config = epoch_store.protocol_config();
1568        let transaction_data = &certificate.data().intent_message().value;
1569        let (kind, signer, gas) = transaction_data.execution_parts();
1570
1571        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1572        let (inner_temp_store, _, mut effects, execution_error_opt) =
1573            epoch_store.executor().execute_transaction_to_effects(
1574                self.get_backing_store().as_ref(),
1575                protocol_config,
1576                self.metrics.limits_metrics.clone(),
1577                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1578                // cyclic dependency w/ iota-adapter
1579                self.config
1580                    .expensive_safety_check_config
1581                    .enable_deep_per_tx_iota_conservation_check(),
1582                self.config.certificate_deny_config.certificate_deny_set(),
1583                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1584                epoch_store
1585                    .epoch_start_config()
1586                    .epoch_data()
1587                    .epoch_start_timestamp(),
1588                input_objects,
1589                gas,
1590                gas_status,
1591                kind,
1592                signer,
1593                tx_digest,
1594                &mut None,
1595            );
1596
1597        fail_point_if!("cp_execution_nondeterminism", || {
1598            #[cfg(msim)]
1599            self.create_fail_state(certificate, epoch_store, &mut effects);
1600        });
1601
1602        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1603        if elapsed > 0.0 {
1604            self.metrics
1605                .prepare_cert_gas_latency_ratio
1606                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1607        }
1608
1609        Ok((inner_temp_store, effects, execution_error_opt.err()))
1610    }
1611
1612    pub fn prepare_certificate_for_benchmark(
1613        &self,
1614        certificate: &VerifiedExecutableTransaction,
1615        input_objects: InputObjects,
1616        epoch_store: &Arc<AuthorityPerEpochStore>,
1617    ) -> IotaResult<(
1618        InnerTemporaryStore,
1619        TransactionEffects,
1620        Option<ExecutionError>,
1621    )> {
1622        let lock = RwLock::new(epoch_store.epoch());
1623        let execution_guard = lock.try_read().unwrap();
1624
1625        self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
1626    }
1627
1628    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1629    /// `VmChecks::Enabled` instead.
1630    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1631    #[allow(clippy::type_complexity)]
1632    pub fn dry_exec_transaction(
1633        &self,
1634        transaction: TransactionData,
1635        transaction_digest: TransactionDigest,
1636    ) -> IotaResult<(
1637        DryRunTransactionBlockResponse,
1638        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1639        TransactionEffects,
1640        Option<ObjectID>,
1641    )> {
1642        let epoch_store = self.load_epoch_store_one_call_per_task();
1643        if !self.is_fullnode(&epoch_store) {
1644            return Err(IotaError::UnsupportedFeature {
1645                error: "dry-exec is only supported on fullnodes".to_string(),
1646            });
1647        }
1648
1649        if transaction.kind().is_system_tx() {
1650            return Err(IotaError::UnsupportedFeature {
1651                error: "dry-exec does not support system transactions".to_string(),
1652            });
1653        }
1654
1655        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1656    }
1657
1658    #[allow(clippy::type_complexity)]
1659    pub fn dry_exec_transaction_for_benchmark(
1660        &self,
1661        transaction: TransactionData,
1662        transaction_digest: TransactionDigest,
1663    ) -> IotaResult<(
1664        DryRunTransactionBlockResponse,
1665        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1666        TransactionEffects,
1667        Option<ObjectID>,
1668    )> {
1669        let epoch_store = self.load_epoch_store_one_call_per_task();
1670        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1671    }
1672
1673    #[instrument(level = "trace", skip_all)]
1674    #[allow(clippy::type_complexity)]
1675    fn dry_exec_transaction_impl(
1676        &self,
1677        epoch_store: &AuthorityPerEpochStore,
1678        transaction: TransactionData,
1679        transaction_digest: TransactionDigest,
1680    ) -> IotaResult<(
1681        DryRunTransactionBlockResponse,
1682        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1683        TransactionEffects,
1684        Option<ObjectID>,
1685    )> {
1686        // Cheap validity checks for a transaction, including input size limits.
1687        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1688
1689        let input_object_kinds = transaction.input_objects()?;
1690        let receiving_object_refs = transaction.receiving_objects();
1691
1692        iota_transaction_checks::deny::check_transaction_for_signing(
1693            &transaction,
1694            &[],
1695            &input_object_kinds,
1696            &receiving_object_refs,
1697            &self.config.transaction_deny_config,
1698            self.get_backing_package_store().as_ref(),
1699        )?;
1700
1701        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1702            // We don't want to cache this transaction since it's a dry run.
1703            None,
1704            &input_object_kinds,
1705            &receiving_object_refs,
1706            epoch_store.epoch(),
1707        )?;
1708
1709        // make a gas object if one was not provided
1710        let mut transaction = transaction;
1711        let mut gas_object_refs = transaction.gas().to_vec();
1712        let reference_gas_price = epoch_store.reference_gas_price();
1713        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1714            let sender = transaction.gas_owner();
1715            let gas_object_id = ObjectID::random();
1716            let gas_object = Object::new_move(
1717                MoveObject::new_gas_coin(
1718                    OBJECT_START_VERSION,
1719                    gas_object_id,
1720                    SIMULATION_GAS_COIN_VALUE,
1721                ),
1722                Owner::AddressOwner(sender),
1723                TransactionDigest::genesis_marker(),
1724            );
1725            let gas_object_ref = gas_object.compute_object_reference();
1726            gas_object_refs = vec![gas_object_ref];
1727            // Add gas object to transaction gas payment
1728            transaction.gas_data_mut().payment = gas_object_refs.clone();
1729            (
1730                iota_transaction_checks::check_transaction_input_with_given_gas(
1731                    epoch_store.protocol_config(),
1732                    reference_gas_price,
1733                    &transaction,
1734                    input_objects,
1735                    receiving_objects,
1736                    gas_object,
1737                    &self.metrics.bytecode_verifier_metrics,
1738                    &self.config.verifier_signing_config,
1739                )?,
1740                Some(gas_object_id),
1741            )
1742        } else {
1743            (
1744                iota_transaction_checks::check_transaction_input(
1745                    epoch_store.protocol_config(),
1746                    reference_gas_price,
1747                    &transaction,
1748                    input_objects,
1749                    &receiving_objects,
1750                    &self.metrics.bytecode_verifier_metrics,
1751                    &self.config.verifier_signing_config,
1752                )?,
1753                None,
1754            )
1755        };
1756
1757        let protocol_config = epoch_store.protocol_config();
1758        let (kind, signer, _) = transaction.execution_parts();
1759
1760        let silent = true;
1761        let executor = iota_execution::executor(protocol_config, silent, None)
1762            .expect("Creating an executor should not fail here");
1763
1764        let expensive_checks = false;
1765        let (inner_temp_store, _, effects, _execution_error) = executor
1766            .execute_transaction_to_effects(
1767                self.get_backing_store().as_ref(),
1768                protocol_config,
1769                self.metrics.limits_metrics.clone(),
1770                expensive_checks,
1771                self.config.certificate_deny_config.certificate_deny_set(),
1772                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1773                epoch_store
1774                    .epoch_start_config()
1775                    .epoch_data()
1776                    .epoch_start_timestamp(),
1777                checked_input_objects,
1778                gas_object_refs,
1779                gas_status,
1780                kind,
1781                signer,
1782                transaction_digest,
1783                &mut None,
1784            );
1785        let tx_digest = *effects.transaction_digest();
1786
1787        let module_cache =
1788            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1789
1790        let mut layout_resolver =
1791            epoch_store
1792                .executor()
1793                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1794                    &inner_temp_store,
1795                    self.get_backing_package_store(),
1796                )));
1797        // Returning empty vector here because we recalculate changes in the rpc layer.
1798        let object_changes = Vec::new();
1799
1800        // Returning empty vector here because we recalculate changes in the rpc layer.
1801        let balance_changes = Vec::new();
1802
1803        let written_with_kind = effects
1804            .created()
1805            .into_iter()
1806            .map(|(oref, _)| (oref, WriteKind::Create))
1807            .chain(
1808                effects
1809                    .unwrapped()
1810                    .into_iter()
1811                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1812            )
1813            .chain(
1814                effects
1815                    .mutated()
1816                    .into_iter()
1817                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
1818            )
1819            .map(|(oref, kind)| {
1820                let obj = inner_temp_store.written.get(&oref.0).unwrap();
1821                // TODO: Avoid clones.
1822                (oref.0, (oref, obj.clone(), kind))
1823            })
1824            .collect();
1825
1826        Ok((
1827            DryRunTransactionBlockResponse {
1828                // to avoid cloning `transaction`, fields are populated in this order
1829                suggested_gas_price: self
1830                    .congestion_tracker
1831                    .get_prediction_suggested_gas_price(&transaction),
1832                input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
1833                    .map_err(|e| IotaError::TransactionSerialization {
1834                        error: format!(
1835                            "Failed to convert transaction to IotaTransactionBlockData: {e}",
1836                        ),
1837                    })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
1838                effects: effects.clone().try_into()?,
1839                events: IotaTransactionBlockEvents::try_from(
1840                    inner_temp_store.events.clone(),
1841                    tx_digest,
1842                    None,
1843                    layout_resolver.as_mut(),
1844                )?,
1845                object_changes,
1846                balance_changes,
1847            },
1848            written_with_kind,
1849            effects,
1850            mock_gas,
1851        ))
1852    }
1853
1854    pub fn simulate_transaction(
1855        &self,
1856        mut transaction: TransactionData,
1857        checks: VmChecks,
1858    ) -> IotaResult<SimulateTransactionResult> {
1859        if transaction.kind().is_system_tx() {
1860            return Err(IotaError::UnsupportedFeature {
1861                error: "simulate does not support system transactions".to_string(),
1862            });
1863        }
1864
1865        let epoch_store = self.load_epoch_store_one_call_per_task();
1866        if !self.is_fullnode(&epoch_store) {
1867            return Err(IotaError::UnsupportedFeature {
1868                error: "simulate is only supported on fullnodes".to_string(),
1869            });
1870        }
1871
1872        // Cheap validity checks for a transaction, including input size limits.
1873        // This does not check if gas objects are missing since we may create a
1874        // mock gas object. It checks for other transaction input validity.
1875        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1876
1877        let input_object_kinds = transaction.input_objects()?;
1878        let receiving_object_refs = transaction.receiving_objects();
1879
1880        // Since we need to simulate a validator signing the transaction, the first step
1881        // is to check if some transaction elements are denied.
1882        iota_transaction_checks::deny::check_transaction_for_signing(
1883            &transaction,
1884            &[],
1885            &input_object_kinds,
1886            &receiving_object_refs,
1887            &self.config.transaction_deny_config,
1888            self.get_backing_package_store().as_ref(),
1889        )?;
1890
1891        // Load input and receiving objects
1892        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1893            // We don't want to cache this transaction since it's a simulation.
1894            None,
1895            &input_object_kinds,
1896            &receiving_object_refs,
1897            epoch_store.epoch(),
1898        )?;
1899
1900        // Create a mock gas object if one was not provided
1901        let mock_gas_id = if transaction.gas().is_empty() {
1902            let mock_gas_object = Object::new_move(
1903                MoveObject::new_gas_coin(
1904                    OBJECT_START_VERSION,
1905                    ObjectID::MAX,
1906                    SIMULATION_GAS_COIN_VALUE,
1907                ),
1908                Owner::AddressOwner(transaction.gas_data().owner),
1909                TransactionDigest::genesis_marker(),
1910            );
1911            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
1912            transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
1913            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
1914            Some(mock_gas_object.id())
1915        } else {
1916            None
1917        };
1918
1919        let protocol_config = epoch_store.protocol_config();
1920
1921        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
1922        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
1923        let (gas_status, checked_input_objects) = if checks.enabled() {
1924            iota_transaction_checks::check_transaction_input(
1925                protocol_config,
1926                epoch_store.reference_gas_price(),
1927                &transaction,
1928                input_objects,
1929                &receiving_objects,
1930                &self.metrics.bytecode_verifier_metrics,
1931                &self.config.verifier_signing_config,
1932            )?
1933        } else {
1934            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
1935                protocol_config,
1936                transaction.kind(),
1937                input_objects,
1938                receiving_objects,
1939            )?;
1940            let gas_status = IotaGasStatus::new(
1941                transaction.gas_budget(),
1942                transaction.gas_price(),
1943                epoch_store.reference_gas_price(),
1944                protocol_config,
1945            )?;
1946
1947            (gas_status, checked_input_objects)
1948        };
1949
1950        // Create a new executor for the simulation
1951        let executor = iota_execution::executor(
1952            protocol_config,
1953            true, // silent
1954            None,
1955        )
1956        .expect("Creating an executor should not fail here");
1957
1958        // Execute the simulation
1959        let (kind, signer, gas_coins) = transaction.execution_parts();
1960        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
1961            self.get_backing_store().as_ref(),
1962            protocol_config,
1963            self.metrics.limits_metrics.clone(),
1964            false, // expensive_checks
1965            self.config.certificate_deny_config.certificate_deny_set(),
1966            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1967            epoch_store
1968                .epoch_start_config()
1969                .epoch_data()
1970                .epoch_start_timestamp(),
1971            checked_input_objects,
1972            gas_coins,
1973            gas_status,
1974            kind,
1975            signer,
1976            transaction.digest(),
1977            checks.disabled(),
1978        );
1979
1980        // In the case of a dev inspect, the execution_result could be filled with some
1981        // values. Else, execution_result is empty in the case of a dry run.
1982        Ok(SimulateTransactionResult {
1983            input_objects: inner_temp_store.input_objects,
1984            output_objects: inner_temp_store.written,
1985            events: effects.events_digest().map(|_| inner_temp_store.events),
1986            effects,
1987            execution_result,
1988            mock_gas_id,
1989        })
1990    }
1991
1992    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1993    /// `VmChecks::DISABLED` instead.
1994    /// The object ID for gas can be any
1995    /// object ID, even for an uncreated object
1996    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
1997    pub async fn dev_inspect_transaction_block(
1998        &self,
1999        sender: IotaAddress,
2000        transaction_kind: TransactionKind,
2001        gas_price: Option<u64>,
2002        gas_budget: Option<u64>,
2003        gas_sponsor: Option<IotaAddress>,
2004        gas_objects: Option<Vec<ObjectRef>>,
2005        show_raw_txn_data_and_effects: Option<bool>,
2006        skip_checks: Option<bool>,
2007    ) -> IotaResult<DevInspectResults> {
2008        let epoch_store = self.load_epoch_store_one_call_per_task();
2009
2010        if !self.is_fullnode(&epoch_store) {
2011            return Err(IotaError::UnsupportedFeature {
2012                error: "dev-inspect is only supported on fullnodes".to_string(),
2013            });
2014        }
2015
2016        if transaction_kind.is_system_tx() {
2017            return Err(IotaError::UnsupportedFeature {
2018                error: "system transactions are not supported".to_string(),
2019            });
2020        }
2021
2022        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2023        let skip_checks = skip_checks.unwrap_or(true);
2024        let reference_gas_price = epoch_store.reference_gas_price();
2025        let protocol_config = epoch_store.protocol_config();
2026        let max_tx_gas = protocol_config.max_tx_gas();
2027
2028        let price = gas_price.unwrap_or(reference_gas_price);
2029        let budget = gas_budget.unwrap_or(max_tx_gas);
2030        let owner = gas_sponsor.unwrap_or(sender);
2031        // Payment might be empty here, but it's fine we'll have to deal with it later
2032        // after reading all the input objects.
2033        let payment = gas_objects.unwrap_or_default();
2034        let transaction = TransactionData::V1(TransactionDataV1 {
2035            kind: transaction_kind.clone(),
2036            sender,
2037            gas_data: GasData {
2038                payment,
2039                owner,
2040                price,
2041                budget,
2042            },
2043            expiration: TransactionExpiration::None,
2044        });
2045
2046        let raw_txn_data = if show_raw_txn_data_and_effects {
2047            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2048                error: "Failed to serialize transaction during dev inspect".to_string(),
2049            })?
2050        } else {
2051            vec![]
2052        };
2053
2054        transaction.validity_check_no_gas_check(protocol_config)?;
2055
2056        let input_object_kinds = transaction.input_objects()?;
2057        let receiving_object_refs = transaction.receiving_objects();
2058
2059        iota_transaction_checks::deny::check_transaction_for_signing(
2060            &transaction,
2061            &[],
2062            &input_object_kinds,
2063            &receiving_object_refs,
2064            &self.config.transaction_deny_config,
2065            self.get_backing_package_store().as_ref(),
2066        )?;
2067
2068        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2069            // We don't want to cache this transaction since it's a dev inspect.
2070            None,
2071            &input_object_kinds,
2072            &receiving_object_refs,
2073            epoch_store.epoch(),
2074        )?;
2075
2076        // Create and use a dummy gas object if there is no gas object provided.
2077        let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2078            SIMULATION_GAS_COIN_VALUE,
2079            transaction.gas_owner(),
2080        );
2081
2082        let gas_objects = if transaction.gas().is_empty() {
2083            let gas_object_ref = dummy_gas_object.compute_object_reference();
2084            vec![gas_object_ref]
2085        } else {
2086            transaction.gas().to_vec()
2087        };
2088
2089        let (gas_status, checked_input_objects) = if skip_checks {
2090            // If we are skipping checks, then we call the check_dev_inspect_input function
2091            // which will perform only lightweight checks on the transaction
2092            // input. And if the gas field is empty, that means we will
2093            // use the dummy gas object so we need to add it to the input objects vector.
2094            if transaction.gas().is_empty() {
2095                input_objects.push(ObjectReadResult::new(
2096                    InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2097                    dummy_gas_object.into(),
2098                ));
2099            }
2100            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2101                protocol_config,
2102                &transaction_kind,
2103                input_objects,
2104                receiving_objects,
2105            )?;
2106            let gas_status = IotaGasStatus::new(
2107                max_tx_gas,
2108                transaction.gas_price(),
2109                reference_gas_price,
2110                protocol_config,
2111            )?;
2112
2113            (gas_status, checked_input_objects)
2114        } else {
2115            // If we are not skipping checks, then we call the check_transaction_input
2116            // function and its dummy gas variant which will perform full
2117            // fledged checks just like a real transaction execution.
2118            if transaction.gas().is_empty() {
2119                iota_transaction_checks::check_transaction_input_with_given_gas(
2120                    epoch_store.protocol_config(),
2121                    reference_gas_price,
2122                    &transaction,
2123                    input_objects,
2124                    receiving_objects,
2125                    dummy_gas_object,
2126                    &self.metrics.bytecode_verifier_metrics,
2127                    &self.config.verifier_signing_config,
2128                )?
2129            } else {
2130                iota_transaction_checks::check_transaction_input(
2131                    epoch_store.protocol_config(),
2132                    reference_gas_price,
2133                    &transaction,
2134                    input_objects,
2135                    &receiving_objects,
2136                    &self.metrics.bytecode_verifier_metrics,
2137                    &self.config.verifier_signing_config,
2138                )?
2139            }
2140        };
2141
2142        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2143            .expect("Creating an executor should not fail here");
2144        let intent_msg = IntentMessage::new(
2145            Intent {
2146                version: IntentVersion::V0,
2147                scope: IntentScope::TransactionData,
2148                app_id: IntentAppId::Iota,
2149            },
2150            transaction,
2151        );
2152        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2153        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2154            self.get_backing_store().as_ref(),
2155            protocol_config,
2156            self.metrics.limits_metrics.clone(),
2157            // expensive checks
2158            false,
2159            self.config.certificate_deny_config.certificate_deny_set(),
2160            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2161            epoch_store
2162                .epoch_start_config()
2163                .epoch_data()
2164                .epoch_start_timestamp(),
2165            checked_input_objects,
2166            gas_objects,
2167            gas_status,
2168            transaction_kind,
2169            sender,
2170            transaction_digest,
2171            skip_checks,
2172        );
2173
2174        let raw_effects = if show_raw_txn_data_and_effects {
2175            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2176                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2177            })?
2178        } else {
2179            vec![]
2180        };
2181
2182        let mut layout_resolver =
2183            epoch_store
2184                .executor()
2185                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2186                    &inner_temp_store,
2187                    self.get_backing_package_store(),
2188                )));
2189
2190        DevInspectResults::new(
2191            effects,
2192            inner_temp_store.events.clone(),
2193            execution_result,
2194            raw_txn_data,
2195            raw_effects,
2196            layout_resolver.as_mut(),
2197        )
2198    }
2199
2200    // Only used for testing because of how epoch store is loaded.
2201    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2202        let epoch_store = self.epoch_store_for_testing();
2203        Ok(epoch_store.reference_gas_price())
2204    }
2205
2206    #[instrument(level = "trace", skip_all)]
2207    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2208        self.get_transaction_cache_reader()
2209            .try_is_tx_already_executed(digest)
2210    }
2211
2212    /// Non-fallible version of `try_is_tx_already_executed`.
2213    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2214        self.try_is_tx_already_executed(digest)
2215            .expect("storage access failed")
2216    }
2217
2218    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2219    #[instrument(level = "debug", skip_all, err)]
2220    fn index_tx(
2221        &self,
2222        indexes: &IndexStore,
2223        digest: &TransactionDigest,
2224        // TODO: index_tx really just need the transaction data here.
2225        cert: &VerifiedExecutableTransaction,
2226        effects: &TransactionEffects,
2227        events: &TransactionEvents,
2228        timestamp_ms: u64,
2229        tx_coins: Option<TxCoins>,
2230        written: &WrittenObjects,
2231        inner_temporary_store: &InnerTemporaryStore,
2232    ) -> IotaResult<u64> {
2233        let changes = self
2234            .process_object_index(effects, written, inner_temporary_store)
2235            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2236
2237        indexes.index_tx(
2238            cert.data().intent_message().value.sender(),
2239            cert.data()
2240                .intent_message()
2241                .value
2242                .input_objects()?
2243                .iter()
2244                .map(|o| o.object_id()),
2245            effects
2246                .all_changed_objects()
2247                .into_iter()
2248                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2249            cert.data()
2250                .intent_message()
2251                .value
2252                .move_calls()
2253                .into_iter()
2254                .map(|(package, module, function)| {
2255                    (*package, module.to_owned(), function.to_owned())
2256                }),
2257            events,
2258            changes,
2259            digest,
2260            timestamp_ms,
2261            tx_coins,
2262        )
2263    }
2264
2265    #[cfg(msim)]
2266    fn create_fail_state(
2267        &self,
2268        certificate: &VerifiedExecutableTransaction,
2269        epoch_store: &Arc<AuthorityPerEpochStore>,
2270        effects: &mut TransactionEffects,
2271    ) {
2272        use std::cell::RefCell;
2273        thread_local! {
2274            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2275        }
2276        if !certificate.data().intent_message().value.is_system_tx() {
2277            let committee = epoch_store.committee();
2278            let cur_stake = (**committee).weight(&self.name);
2279            if cur_stake > 0 {
2280                FAIL_STATE.with_borrow_mut(|fail_state| {
2281                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2282                    if fail_state.0 < committee.validity_threshold() {
2283                        fail_state.0 += cur_stake;
2284                        fail_state.1.insert(self.name);
2285                    }
2286
2287                    if fail_state.1.contains(&self.name) {
2288                        info!("cp_exec failing tx");
2289                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2290                    }
2291                });
2292            }
2293        }
2294    }
2295
2296    fn process_object_index(
2297        &self,
2298        effects: &TransactionEffects,
2299        written: &WrittenObjects,
2300        inner_temporary_store: &InnerTemporaryStore,
2301    ) -> IotaResult<ObjectIndexChanges> {
2302        let epoch_store = self.load_epoch_store_one_call_per_task();
2303        let mut layout_resolver =
2304            epoch_store
2305                .executor()
2306                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2307                    inner_temporary_store,
2308                    self.get_backing_package_store(),
2309                )));
2310
2311        let modified_at_version = effects
2312            .modified_at_versions()
2313            .into_iter()
2314            .collect::<HashMap<_, _>>();
2315
2316        let tx_digest = effects.transaction_digest();
2317        let mut deleted_owners = vec![];
2318        let mut deleted_dynamic_fields = vec![];
2319        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2320            let old_version = modified_at_version.get(&id).unwrap();
2321            // When we process the index, the latest object hasn't been written yet so
2322            // the old object must be present.
2323            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2324                |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2325            ) {
2326                Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2327                Owner::ObjectOwner(object_id) => {
2328                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2329                }
2330                _ => {}
2331            }
2332        }
2333
2334        let mut new_owners = vec![];
2335        let mut new_dynamic_fields = vec![];
2336
2337        for (oref, owner, kind) in effects.all_changed_objects() {
2338            let id = &oref.0;
2339            // For mutated objects, retrieve old owner and delete old index if there is a
2340            // owner change.
2341            if let WriteKind::Mutate = kind {
2342                let Some(old_version) = modified_at_version.get(id) else {
2343                    panic!(
2344                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2345                    );
2346                };
2347                // When we process the index, the latest object hasn't been written yet so
2348                // the old object must be present.
2349                let Some(old_object) = self
2350                    .get_object_store()
2351                    .try_get_object_by_key(id, *old_version)?
2352                else {
2353                    panic!(
2354                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2355                    );
2356                };
2357                if old_object.owner != owner {
2358                    match old_object.owner {
2359                        Owner::AddressOwner(addr) => {
2360                            deleted_owners.push((addr, *id));
2361                        }
2362                        Owner::ObjectOwner(object_id) => {
2363                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2364                        }
2365                        _ => {}
2366                    }
2367                }
2368            }
2369
2370            match owner {
2371                Owner::AddressOwner(addr) => {
2372                    // TODO: We can remove the object fetching after we added ObjectType to
2373                    // TransactionEffects
2374                    let new_object = written.get(id).unwrap_or_else(
2375                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2376                    );
2377                    assert_eq!(
2378                        new_object.version(),
2379                        oref.1,
2380                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2381                        tx_digest,
2382                        id,
2383                        new_object.version(),
2384                        oref.1
2385                    );
2386
2387                    let type_ = new_object
2388                        .type_()
2389                        .map(|type_| ObjectType::Struct(type_.clone()))
2390                        .unwrap_or(ObjectType::Package);
2391
2392                    new_owners.push((
2393                        (addr, *id),
2394                        ObjectInfo {
2395                            object_id: *id,
2396                            version: oref.1,
2397                            digest: oref.2,
2398                            type_,
2399                            owner,
2400                            previous_transaction: *effects.transaction_digest(),
2401                        },
2402                    ));
2403                }
2404                Owner::ObjectOwner(owner) => {
2405                    let new_object = written.get(id).unwrap_or_else(
2406                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2407                    );
2408                    assert_eq!(
2409                        new_object.version(),
2410                        oref.1,
2411                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2412                        tx_digest,
2413                        id,
2414                        new_object.version(),
2415                        oref.1
2416                    );
2417
2418                    let Some(df_info) = self
2419                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut(), false)
2420                        .unwrap_or_else(|e| {
2421                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2422                            None
2423                        }
2424                    )
2425                        else {
2426                            // Skip indexing for non dynamic field objects.
2427                            continue;
2428                        };
2429                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2430                }
2431                _ => {}
2432            }
2433        }
2434
2435        Ok(ObjectIndexChanges {
2436            deleted_owners,
2437            deleted_dynamic_fields,
2438            new_owners,
2439            new_dynamic_fields,
2440        })
2441    }
2442
2443    fn try_create_dynamic_field_info(
2444        &self,
2445        o: &Object,
2446        written: &WrittenObjects,
2447        resolver: &mut dyn LayoutResolver,
2448        get_latest_object_version: bool,
2449    ) -> IotaResult<Option<DynamicFieldInfo>> {
2450        // Skip if not a move object
2451        let Some(move_object) = o.data.try_as_move().cloned() else {
2452            return Ok(None);
2453        };
2454
2455        // We only index dynamic field objects
2456        if !move_object.type_().is_dynamic_field() {
2457            return Ok(None);
2458        }
2459
2460        let layout = resolver
2461            .get_annotated_layout(&move_object.type_().clone().into())?
2462            .into_layout();
2463
2464        let field =
2465            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2466                IotaError::ObjectDeserialization {
2467                    error: e.to_string(),
2468                }
2469            })?;
2470
2471        let type_ = field.kind;
2472        let name_type: TypeTag = field.name_layout.into();
2473        let bcs_name = field.name_bytes.to_owned();
2474
2475        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2476            .map_err(|e| {
2477                warn!("{e}");
2478                IotaError::ObjectDeserialization {
2479                    error: e.to_string(),
2480                }
2481            })?;
2482
2483        let name = DynamicFieldName {
2484            type_: name_type,
2485            value: IotaMoveValue::from(name_value).to_json_value(),
2486        };
2487
2488        let value_metadata = field.value_metadata().map_err(|e| {
2489            warn!("{e}");
2490            IotaError::ObjectDeserialization {
2491                error: e.to_string(),
2492            }
2493        })?;
2494
2495        Ok(Some(match value_metadata {
2496            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2497                name,
2498                bcs_name,
2499                type_,
2500                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2501                object_id: o.id(),
2502                version: o.version(),
2503                digest: o.digest(),
2504            },
2505
2506            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2507                // Find the actual object from storage using the object id obtained from the
2508                // wrapper.
2509
2510                // Try to find the object in the written objects first.
2511                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2512                    (
2513                        object.version(),
2514                        object.digest(),
2515                        object.data.type_().unwrap().clone(),
2516                    )
2517                } else {
2518                    // If not found, try to find it in the database.
2519                    let object = if get_latest_object_version {
2520                        // Loading genesis object could meet a condition that the version of the
2521                        // genesis object is behind the one in the snapshot.
2522                        // In this case, the object can not be found with get_object_by_key, we need
2523                        // to use get_object instead. Since get_object is a heavier operation, we
2524                        // only allow to use it for genesis.
2525                        // reference: https://github.com/iotaledger/iota/issues/7267
2526                        self.get_object_store()
2527                            .try_get_object(&object_id)?
2528                            .ok_or_else(|| UserInputError::ObjectNotFound {
2529                                object_id,
2530                                version: Some(o.version()),
2531                            })?
2532                    } else {
2533                        // Non-genesis object should be in the database with the given version.
2534                        self.get_object_store()
2535                            .try_get_object_by_key(&object_id, o.version())?
2536                            .ok_or_else(|| UserInputError::ObjectNotFound {
2537                                object_id,
2538                                version: Some(o.version()),
2539                            })?
2540                    };
2541
2542                    (
2543                        object.version(),
2544                        object.digest(),
2545                        object.data.type_().unwrap().clone(),
2546                    )
2547                };
2548
2549                DynamicFieldInfo {
2550                    name,
2551                    bcs_name,
2552                    type_,
2553                    object_type: object_type.to_string(),
2554                    object_id,
2555                    version,
2556                    digest,
2557                }
2558            }
2559        }))
2560    }
2561
2562    #[instrument(level = "trace", skip_all, err)]
2563    fn post_process_one_tx(
2564        &self,
2565        certificate: &VerifiedExecutableTransaction,
2566        effects: &TransactionEffects,
2567        inner_temporary_store: &InnerTemporaryStore,
2568        epoch_store: &Arc<AuthorityPerEpochStore>,
2569    ) -> IotaResult {
2570        if self.indexes.is_none() {
2571            return Ok(());
2572        }
2573
2574        let tx_digest = certificate.digest();
2575        let timestamp_ms = Self::unixtime_now_ms();
2576        let events = &inner_temporary_store.events;
2577        let written = &inner_temporary_store.written;
2578        let tx_coins =
2579            self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2580
2581        // Index tx
2582        if let Some(indexes) = &self.indexes {
2583            let _ = self
2584                .index_tx(
2585                    indexes.as_ref(),
2586                    tx_digest,
2587                    certificate,
2588                    effects,
2589                    events,
2590                    timestamp_ms,
2591                    tx_coins,
2592                    written,
2593                    inner_temporary_store,
2594                )
2595                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2596                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2597                .expect("Indexing tx should not fail");
2598
2599            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2600            let events = self.make_transaction_block_events(
2601                events.clone(),
2602                *tx_digest,
2603                timestamp_ms,
2604                epoch_store,
2605                inner_temporary_store,
2606            )?;
2607            // Emit events
2608            self.subscription_handler
2609                .process_tx(certificate.data().transaction_data(), &effects, &events)
2610                .tap_ok(|_| {
2611                    self.metrics
2612                        .post_processing_total_tx_had_event_processed
2613                        .inc()
2614                })
2615                .tap_err(|e| {
2616                    warn!(
2617                        ?tx_digest,
2618                        "Post processing - Couldn't process events for tx: {}", e
2619                    )
2620                })?;
2621
2622            self.metrics
2623                .post_processing_total_events_emitted
2624                .inc_by(events.data.len() as u64);
2625        };
2626        Ok(())
2627    }
2628
2629    fn make_transaction_block_events(
2630        &self,
2631        transaction_events: TransactionEvents,
2632        digest: TransactionDigest,
2633        timestamp_ms: u64,
2634        epoch_store: &Arc<AuthorityPerEpochStore>,
2635        inner_temporary_store: &InnerTemporaryStore,
2636    ) -> IotaResult<IotaTransactionBlockEvents> {
2637        let mut layout_resolver =
2638            epoch_store
2639                .executor()
2640                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2641                    inner_temporary_store,
2642                    self.get_backing_package_store(),
2643                )));
2644        IotaTransactionBlockEvents::try_from(
2645            transaction_events,
2646            digest,
2647            Some(timestamp_ms),
2648            layout_resolver.as_mut(),
2649        )
2650    }
2651
2652    pub fn unixtime_now_ms() -> u64 {
2653        let now = SystemTime::now()
2654            .duration_since(UNIX_EPOCH)
2655            .expect("Time went backwards")
2656            .as_millis();
2657        u64::try_from(now).expect("Travelling in time machine")
2658    }
2659
2660    #[instrument(level = "trace", skip_all)]
2661    pub async fn handle_transaction_info_request(
2662        &self,
2663        request: TransactionInfoRequest,
2664    ) -> IotaResult<TransactionInfoResponse> {
2665        let epoch_store = self.load_epoch_store_one_call_per_task();
2666        let (transaction, status) = self
2667            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2668            .ok_or(IotaError::TransactionNotFound {
2669                digest: request.transaction_digest,
2670            })?;
2671        Ok(TransactionInfoResponse {
2672            transaction,
2673            status,
2674        })
2675    }
2676
2677    #[instrument(level = "trace", skip_all)]
2678    pub async fn handle_object_info_request(
2679        &self,
2680        request: ObjectInfoRequest,
2681    ) -> IotaResult<ObjectInfoResponse> {
2682        let epoch_store = self.load_epoch_store_one_call_per_task();
2683
2684        let requested_object_seq = match request.request_kind {
2685            ObjectInfoRequestKind::LatestObjectInfo => {
2686                let (_, seq, _) = self
2687                    .try_get_object_or_tombstone(request.object_id)
2688                    .await?
2689                    .ok_or_else(|| {
2690                        IotaError::from(UserInputError::ObjectNotFound {
2691                            object_id: request.object_id,
2692                            version: None,
2693                        })
2694                    })?;
2695                seq
2696            }
2697            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2698        };
2699
2700        let object = self
2701            .get_object_store()
2702            .try_get_object_by_key(&request.object_id, requested_object_seq)?
2703            .ok_or_else(|| {
2704                IotaError::from(UserInputError::ObjectNotFound {
2705                    object_id: request.object_id,
2706                    version: Some(requested_object_seq),
2707                })
2708            })?;
2709
2710        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2711            (request.generate_layout, object.data.try_as_move())
2712        {
2713            Some(into_struct_layout(
2714                epoch_store
2715                    .executor()
2716                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2717                    .get_annotated_layout(&move_obj.type_().clone().into())?,
2718            )?)
2719        } else {
2720            None
2721        };
2722
2723        let lock = if !object.is_address_owned() {
2724            // Only address owned objects have locks.
2725            None
2726        } else {
2727            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2728                .await?
2729                .map(|s| s.into_inner())
2730        };
2731
2732        Ok(ObjectInfoResponse {
2733            object,
2734            layout,
2735            lock_for_debugging: lock,
2736        })
2737    }
2738
2739    #[instrument(level = "trace", skip_all)]
2740    pub fn handle_checkpoint_request(
2741        &self,
2742        request: &CheckpointRequest,
2743    ) -> IotaResult<CheckpointResponse> {
2744        let summary = if request.certified {
2745            let summary = match request.sequence_number {
2746                Some(seq) => self
2747                    .checkpoint_store
2748                    .get_checkpoint_by_sequence_number(seq)?,
2749                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2750            }
2751            .map(|v| v.into_inner());
2752            summary.map(CheckpointSummaryResponse::Certified)
2753        } else {
2754            let summary = match request.sequence_number {
2755                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2756                None => self
2757                    .checkpoint_store
2758                    .get_latest_locally_computed_checkpoint()?,
2759            };
2760            summary.map(CheckpointSummaryResponse::Pending)
2761        };
2762        let contents = match &summary {
2763            Some(s) => self
2764                .checkpoint_store
2765                .get_checkpoint_contents(&s.content_digest())?,
2766            None => None,
2767        };
2768        Ok(CheckpointResponse {
2769            checkpoint: summary,
2770            contents,
2771        })
2772    }
2773
2774    fn check_protocol_version(
2775        supported_protocol_versions: SupportedProtocolVersions,
2776        current_version: ProtocolVersion,
2777    ) {
2778        info!("current protocol version is now {:?}", current_version);
2779        info!("supported versions are: {:?}", supported_protocol_versions);
2780        if !supported_protocol_versions.is_version_supported(current_version) {
2781            let msg = format!(
2782                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2783            );
2784
2785            error!("{}", msg);
2786            eprintln!("{msg}");
2787
2788            #[cfg(not(msim))]
2789            std::process::exit(1);
2790
2791            #[cfg(msim)]
2792            iota_simulator::task::shutdown_current_node();
2793        }
2794    }
2795
2796    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
2797    pub async fn new(
2798        name: AuthorityName,
2799        secret: StableSyncAuthoritySigner,
2800        supported_protocol_versions: SupportedProtocolVersions,
2801        store: Arc<AuthorityStore>,
2802        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2803        epoch_store: Arc<AuthorityPerEpochStore>,
2804        committee_store: Arc<CommitteeStore>,
2805        indexes: Option<Arc<IndexStore>>,
2806        rest_index: Option<Arc<RestIndexStore>>,
2807        checkpoint_store: Arc<CheckpointStore>,
2808        prometheus_registry: &Registry,
2809        genesis_objects: &[Object],
2810        db_checkpoint_config: &DBCheckpointConfig,
2811        config: NodeConfig,
2812        archive_readers: ArchiveReaderBalancer,
2813        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2814        chain_identifier: ChainIdentifier,
2815        pruner_db: Option<Arc<AuthorityPrunerTables>>,
2816    ) -> Arc<Self> {
2817        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2818
2819        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2820        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2821        let transaction_manager = Arc::new(TransactionManager::new(
2822            execution_cache_trait_pointers.object_cache_reader.clone(),
2823            execution_cache_trait_pointers
2824                .transaction_cache_reader
2825                .clone(),
2826            &epoch_store,
2827            tx_ready_certificates,
2828            metrics.clone(),
2829        ));
2830        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
2831
2832        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
2833            epoch_store.get_parent_path(),
2834            &config.authority_store_pruning_config,
2835        );
2836        let _pruner = AuthorityStorePruner::new(
2837            store.perpetual_tables.clone(),
2838            checkpoint_store.clone(),
2839            rest_index.clone(),
2840            config.authority_store_pruning_config.clone(),
2841            epoch_store.committee().authority_exists(&name),
2842            epoch_store.epoch_start_state().epoch_duration_ms(),
2843            prometheus_registry,
2844            archive_readers,
2845            pruner_db,
2846        );
2847        let input_loader =
2848            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
2849        let epoch = epoch_store.epoch();
2850        let rgp = epoch_store.reference_gas_price();
2851        let state = Arc::new(AuthorityState {
2852            name,
2853            secret,
2854            execution_lock: RwLock::new(epoch),
2855            epoch_store: ArcSwap::new(epoch_store.clone()),
2856            input_loader,
2857            execution_cache_trait_pointers,
2858            indexes,
2859            rest_index,
2860            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
2861            checkpoint_store,
2862            committee_store,
2863            transaction_manager,
2864            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
2865            metrics,
2866            _pruner,
2867            _authority_per_epoch_pruner,
2868            db_checkpoint_config: db_checkpoint_config.clone(),
2869            config,
2870            overload_info: AuthorityOverloadInfo::default(),
2871            validator_tx_finalizer,
2872            chain_identifier,
2873            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
2874        });
2875
2876        // Start a task to execute ready certificates.
2877        let authority_state = Arc::downgrade(&state);
2878        spawn_monitored_task!(execution_process(
2879            authority_state,
2880            rx_ready_certificates,
2881            rx_execution_shutdown,
2882        ));
2883
2884        // TODO: This doesn't belong to the constructor of AuthorityState.
2885        state
2886            .create_owner_index_if_empty(genesis_objects, &epoch_store)
2887            .expect("Error indexing genesis objects.");
2888
2889        state
2890    }
2891
2892    // TODO: Consolidate our traits to reduce the number of methods here.
2893    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
2894        &self.execution_cache_trait_pointers.object_cache_reader
2895    }
2896
2897    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
2898        &self.execution_cache_trait_pointers.transaction_cache_reader
2899    }
2900
2901    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
2902        &self.execution_cache_trait_pointers.cache_writer
2903    }
2904
2905    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
2906        &self.execution_cache_trait_pointers.backing_store
2907    }
2908
2909    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
2910        &self.execution_cache_trait_pointers.backing_package_store
2911    }
2912
2913    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
2914        &self.execution_cache_trait_pointers.object_store
2915    }
2916
2917    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
2918        &self.execution_cache_trait_pointers.reconfig_api
2919    }
2920
2921    pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
2922        &self.execution_cache_trait_pointers.accumulator_store
2923    }
2924
2925    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
2926        &self.execution_cache_trait_pointers.checkpoint_cache
2927    }
2928
2929    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
2930        &self.execution_cache_trait_pointers.state_sync_store
2931    }
2932
2933    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
2934        &self.execution_cache_trait_pointers.cache_commit
2935    }
2936
2937    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
2938        self.execution_cache_trait_pointers
2939            .testing_api
2940            .database_for_testing()
2941    }
2942
2943    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
2944        &self,
2945        config: NodeConfig,
2946        metrics: Arc<AuthorityStorePruningMetrics>,
2947    ) -> anyhow::Result<()> {
2948        let archive_readers =
2949            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
2950        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
2951            &self.database_for_testing().perpetual_tables,
2952            &self.checkpoint_store,
2953            self.rest_index.as_deref(),
2954            None,
2955            config.authority_store_pruning_config,
2956            metrics,
2957            archive_readers,
2958            EPOCH_DURATION_MS_FOR_TESTING,
2959        )
2960        .await
2961    }
2962
2963    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
2964        &self.transaction_manager
2965    }
2966
2967    /// Adds transactions / certificates to transaction manager for ordered
2968    /// execution.
2969    pub fn enqueue_transactions_for_execution(
2970        &self,
2971        txns: Vec<VerifiedExecutableTransaction>,
2972        epoch_store: &Arc<AuthorityPerEpochStore>,
2973    ) {
2974        self.transaction_manager.enqueue(txns, epoch_store)
2975    }
2976
2977    /// Adds certificates to transaction manager for ordered execution.
2978    pub fn enqueue_certificates_for_execution(
2979        &self,
2980        certs: Vec<VerifiedCertificate>,
2981        epoch_store: &Arc<AuthorityPerEpochStore>,
2982    ) {
2983        self.transaction_manager
2984            .enqueue_certificates(certs, epoch_store)
2985    }
2986
2987    pub fn enqueue_with_expected_effects_digest(
2988        &self,
2989        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
2990        epoch_store: &AuthorityPerEpochStore,
2991    ) {
2992        self.transaction_manager
2993            .enqueue_with_expected_effects_digest(certs, epoch_store)
2994    }
2995
2996    fn create_owner_index_if_empty(
2997        &self,
2998        genesis_objects: &[Object],
2999        epoch_store: &Arc<AuthorityPerEpochStore>,
3000    ) -> IotaResult {
3001        let Some(index_store) = &self.indexes else {
3002            return Ok(());
3003        };
3004        if !index_store.is_empty() {
3005            return Ok(());
3006        }
3007
3008        let mut new_owners = vec![];
3009        let mut new_dynamic_fields = vec![];
3010        let mut layout_resolver = epoch_store
3011            .executor()
3012            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3013        for o in genesis_objects.iter() {
3014            match o.owner {
3015                Owner::AddressOwner(addr) => new_owners.push((
3016                    (addr, o.id()),
3017                    ObjectInfo::new(&o.compute_object_reference(), o),
3018                )),
3019                Owner::ObjectOwner(object_id) => {
3020                    let id = o.id();
3021                    let Some(info) = self.try_create_dynamic_field_info(
3022                        o,
3023                        &BTreeMap::new(),
3024                        layout_resolver.as_mut(),
3025                        true,
3026                    )?
3027                    else {
3028                        continue;
3029                    };
3030                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3031                }
3032                _ => {}
3033            }
3034        }
3035
3036        index_store.insert_genesis_objects(ObjectIndexChanges {
3037            deleted_owners: vec![],
3038            deleted_dynamic_fields: vec![],
3039            new_owners,
3040            new_dynamic_fields,
3041        })
3042    }
3043
3044    /// Attempts to acquire execution lock for an executable transaction.
3045    /// Returns the lock if the transaction is matching current executed epoch
3046    /// Returns None otherwise
3047    pub fn execution_lock_for_executable_transaction(
3048        &self,
3049        transaction: &VerifiedExecutableTransaction,
3050    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3051        let lock = self
3052            .execution_lock
3053            .try_read()
3054            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3055        if *lock == transaction.auth_sig().epoch() {
3056            Ok(lock)
3057        } else {
3058            Err(IotaError::WrongEpoch {
3059                expected_epoch: *lock,
3060                actual_epoch: transaction.auth_sig().epoch(),
3061            })
3062        }
3063    }
3064
3065    /// Acquires the execution lock for the duration of a transaction signing
3066    /// request. This prevents reconfiguration from starting until we are
3067    /// finished handling the signing request. Otherwise, in-memory lock
3068    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3069    /// while we are attempting to acquire locks for the transaction.
3070    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3071        self.execution_lock
3072            .try_read()
3073            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3074    }
3075
3076    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3077        self.execution_lock.write().await
3078    }
3079
3080    #[instrument(level = "error", skip_all)]
3081    pub async fn reconfigure(
3082        &self,
3083        cur_epoch_store: &AuthorityPerEpochStore,
3084        supported_protocol_versions: SupportedProtocolVersions,
3085        new_committee: Committee,
3086        epoch_start_configuration: EpochStartConfiguration,
3087        accumulator: Arc<StateAccumulator>,
3088        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3089        epoch_supply_change: i64,
3090        epoch_last_checkpoint: CheckpointSequenceNumber,
3091    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3092        Self::check_protocol_version(
3093            supported_protocol_versions,
3094            epoch_start_configuration
3095                .epoch_start_state()
3096                .protocol_version(),
3097        );
3098        self.metrics.reset_on_reconfigure();
3099        self.committee_store.insert_new_committee(&new_committee)?;
3100
3101        // Wait until no transactions are being executed.
3102        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3103
3104        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3105        cur_epoch_store.epoch_terminated().await;
3106
3107        let highest_locally_built_checkpoint_seq = self
3108            .checkpoint_store
3109            .get_latest_locally_computed_checkpoint()?
3110            .map(|c| *c.sequence_number())
3111            .unwrap_or(0);
3112
3113        assert!(
3114            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3115            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3116        );
3117        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3118            // if we built the last checkpoint locally (as opposed to receiving it from a
3119            // peer), then all shared_version_assignments except the one for the
3120            // ChangeEpoch transaction should have been removed
3121            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3122            // Note that while 1 is the typical value, 0 is possible if the node restarts
3123            // after committing the last checkpoint but before reconfiguring.
3124            if num_shared_version_assignments > 1 {
3125                // If this happens in prod, we have a memory leak, but not a correctness issue.
3126                debug_fatal!(
3127                    "all shared_version_assignments should have been removed \
3128                    (num_shared_version_assignments: {num_shared_version_assignments})"
3129                );
3130            }
3131        }
3132
3133        // Safe to reconfigure now. No transactions are being executed,
3134        // and no epoch-specific tasks are running.
3135
3136        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3137        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3138        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3139            .await?;
3140        self.get_reconfig_api()
3141            .clear_state_end_of_epoch(&execution_lock);
3142        self.check_system_consistency(
3143            cur_epoch_store,
3144            accumulator,
3145            expensive_safety_check_config,
3146            epoch_supply_change,
3147        )?;
3148        self.get_reconfig_api()
3149            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3150        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3151            if self
3152                .db_checkpoint_config
3153                .perform_db_checkpoints_at_epoch_end
3154            {
3155                let checkpoint_indexes = self
3156                    .db_checkpoint_config
3157                    .perform_index_db_checkpoints_at_epoch_end
3158                    .unwrap_or(false);
3159                let current_epoch = cur_epoch_store.epoch();
3160                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3161                self.checkpoint_all_dbs(
3162                    &epoch_checkpoint_path,
3163                    cur_epoch_store,
3164                    checkpoint_indexes,
3165                )?;
3166            }
3167        }
3168
3169        self.get_reconfig_api()
3170            .reconfigure_cache(&epoch_start_configuration)
3171            .await;
3172
3173        let new_epoch = new_committee.epoch;
3174        let new_epoch_store = self
3175            .reopen_epoch_db(
3176                cur_epoch_store,
3177                new_committee,
3178                epoch_start_configuration,
3179                expensive_safety_check_config,
3180                epoch_last_checkpoint,
3181            )
3182            .await?;
3183        assert_eq!(new_epoch_store.epoch(), new_epoch);
3184        self.transaction_manager.reconfigure(new_epoch);
3185        *execution_lock = new_epoch;
3186        // drop execution_lock after epoch store was updated
3187        // see also assert in AuthorityState::process_certificate
3188        // on the epoch store and execution lock epoch match
3189        Ok(new_epoch_store)
3190    }
3191
3192    /// Advance the epoch store to the next epoch for testing only.
3193    /// This only manually sets all the places where we have the epoch number.
3194    /// It doesn't properly reconfigure the node, hence should be only used for
3195    /// testing.
3196    pub async fn reconfigure_for_testing(&self) {
3197        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3198        let epoch_store = self.epoch_store_for_testing().clone();
3199        let protocol_config = epoch_store.protocol_config().clone();
3200        // The current protocol config used in the epoch store may have been overridden
3201        // and diverged from the protocol config definitions. That override may
3202        // have now been dropped when the initial guard was dropped. We reapply
3203        // the override before creating the new epoch store, to make sure that
3204        // the new epoch store has the same protocol config as the current one.
3205        // Since this is for testing only, we mostly like to keep the protocol config
3206        // the same across epochs.
3207        let _guard =
3208            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3209        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3210            self.get_backing_package_store().clone(),
3211            self.get_object_store().clone(),
3212            &self.config.expensive_safety_check_config,
3213            self.checkpoint_store
3214                .get_epoch_last_checkpoint(epoch_store.epoch())
3215                .unwrap()
3216                .map(|c| *c.sequence_number())
3217                .unwrap_or_default(),
3218        );
3219        let new_epoch = new_epoch_store.epoch();
3220        self.transaction_manager.reconfigure(new_epoch);
3221        self.epoch_store.store(new_epoch_store);
3222        epoch_store.epoch_terminated().await;
3223        *execution_lock = new_epoch;
3224    }
3225
3226    #[instrument(level = "error", skip_all)]
3227    fn check_system_consistency(
3228        &self,
3229        cur_epoch_store: &AuthorityPerEpochStore,
3230        accumulator: Arc<StateAccumulator>,
3231        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3232        epoch_supply_change: i64,
3233    ) -> IotaResult<()> {
3234        info!(
3235            "Performing iota conservation consistency check for epoch {}",
3236            cur_epoch_store.epoch()
3237        );
3238
3239        if cfg!(debug_assertions) {
3240            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3241        }
3242
3243        self.get_reconfig_api()
3244            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3245
3246        // check for root state hash consistency with live object set
3247        if expensive_safety_check_config.enable_state_consistency_check() {
3248            info!(
3249                "Performing state consistency check for epoch {}",
3250                cur_epoch_store.epoch()
3251            );
3252            self.expensive_check_is_consistent_state(
3253                accumulator,
3254                cur_epoch_store,
3255                cfg!(debug_assertions), // panic in debug mode only
3256            );
3257        }
3258
3259        if expensive_safety_check_config.enable_secondary_index_checks() {
3260            if let Some(indexes) = self.indexes.clone() {
3261                verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3262                    .expect("secondary indexes are inconsistent");
3263            }
3264        }
3265
3266        Ok(())
3267    }
3268
3269    fn expensive_check_is_consistent_state(
3270        &self,
3271        accumulator: Arc<StateAccumulator>,
3272        cur_epoch_store: &AuthorityPerEpochStore,
3273        panic: bool,
3274    ) {
3275        let live_object_set_hash = accumulator.digest_live_object_set();
3276
3277        let root_state_hash: ECMHLiveObjectSetDigest = self
3278            .get_accumulator_store()
3279            .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3280            .expect("Retrieving root state hash cannot fail")
3281            .expect("Root state hash for epoch must exist")
3282            .1
3283            .digest()
3284            .into();
3285
3286        let is_inconsistent = root_state_hash != live_object_set_hash;
3287        if is_inconsistent {
3288            if panic {
3289                panic!(
3290                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3291                );
3292            } else {
3293                error!(
3294                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3295                    root_state_hash, live_object_set_hash
3296                );
3297            }
3298        } else {
3299            info!("State consistency check passed");
3300        }
3301
3302        if !panic {
3303            accumulator.set_inconsistent_state(is_inconsistent);
3304        }
3305    }
3306
3307    pub fn current_epoch_for_testing(&self) -> EpochId {
3308        self.epoch_store_for_testing().epoch()
3309    }
3310
3311    #[instrument(level = "error", skip_all)]
3312    pub fn checkpoint_all_dbs(
3313        &self,
3314        checkpoint_path: &Path,
3315        cur_epoch_store: &AuthorityPerEpochStore,
3316        checkpoint_indexes: bool,
3317    ) -> IotaResult {
3318        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3319        let current_epoch = cur_epoch_store.epoch();
3320
3321        if checkpoint_path.exists() {
3322            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3323            return Ok(());
3324        }
3325
3326        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3327        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3328
3329        if checkpoint_path_tmp.exists() {
3330            fs::remove_dir_all(&checkpoint_path_tmp)
3331                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3332        }
3333
3334        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3335        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3336
3337        // NOTE: Do not change the order of invoking these checkpoint calls
3338        // We want to snapshot checkpoint db first to not race with state sync
3339        self.checkpoint_store
3340            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3341
3342        self.get_reconfig_api()
3343            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3344
3345        self.committee_store
3346            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3347
3348        if checkpoint_indexes {
3349            if let Some(indexes) = self.indexes.as_ref() {
3350                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3351            }
3352        }
3353
3354        fs::rename(checkpoint_path_tmp, checkpoint_path)
3355            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3356        Ok(())
3357    }
3358
3359    /// Load the current epoch store. This can change during reconfiguration. To
3360    /// ensure that we never end up accessing different epoch stores in a
3361    /// single task, we need to make sure that this is called once per task.
3362    /// Each call needs to be carefully audited to ensure it is
3363    /// the case. This also means we should minimize the number of call-sites.
3364    /// Only call it when there is no way to obtain it from somewhere else.
3365    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3366        self.epoch_store.load()
3367    }
3368
3369    // Load the epoch store, should be used in tests only.
3370    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3371        self.load_epoch_store_one_call_per_task()
3372    }
3373
3374    pub fn clone_committee_for_testing(&self) -> Committee {
3375        Committee::clone(self.epoch_store_for_testing().committee())
3376    }
3377
3378    #[instrument(level = "trace", skip_all)]
3379    pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3380        self.get_object_store()
3381            .try_get_object(object_id)
3382            .map_err(Into::into)
3383    }
3384
3385    /// Non-fallible version of `try_get_object`.
3386    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3387        self.try_get_object(object_id)
3388            .await
3389            .expect("storage access failed")
3390    }
3391
3392    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3393        Ok(self
3394            .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3395            .await?
3396            .expect("framework object should always exist")
3397            .compute_object_reference())
3398    }
3399
3400    // This function is only used for testing.
3401    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3402        self.get_object_cache_reader()
3403            .try_get_iota_system_state_object_unsafe()
3404    }
3405
3406    #[instrument(level = "trace", skip_all)]
3407    pub fn get_checkpoint_by_sequence_number(
3408        &self,
3409        sequence_number: CheckpointSequenceNumber,
3410    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3411        Ok(self
3412            .checkpoint_store
3413            .get_checkpoint_by_sequence_number(sequence_number)?)
3414    }
3415
3416    #[instrument(level = "trace", skip_all)]
3417    pub fn get_transaction_checkpoint_for_tests(
3418        &self,
3419        digest: &TransactionDigest,
3420        epoch_store: &AuthorityPerEpochStore,
3421    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3422        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3423        let Some(checkpoint) = checkpoint else {
3424            return Ok(None);
3425        };
3426        let checkpoint = self
3427            .checkpoint_store
3428            .get_checkpoint_by_sequence_number(checkpoint)?;
3429        Ok(checkpoint)
3430    }
3431
3432    #[instrument(level = "trace", skip_all)]
3433    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3434        Ok(
3435            match self
3436                .get_object_cache_reader()
3437                .try_get_latest_object_or_tombstone(*object_id)?
3438            {
3439                Some((_, ObjectOrTombstone::Object(object))) => {
3440                    let layout = self.get_object_layout(&object)?;
3441                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3442                }
3443                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3444                None => ObjectRead::NotExists(*object_id),
3445            },
3446        )
3447    }
3448
3449    /// Chain Identifier is the digest of the genesis checkpoint.
3450    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3451        self.chain_identifier
3452    }
3453
3454    #[instrument(level = "trace", skip_all)]
3455    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3456    where
3457        T: DeserializeOwned,
3458    {
3459        let o = self.get_object_read(object_id)?.into_object()?;
3460        if let Some(move_object) = o.data.try_as_move() {
3461            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3462                IotaError::ObjectDeserialization {
3463                    error: format!("{e}"),
3464                }
3465            })?)
3466        } else {
3467            Err(IotaError::ObjectDeserialization {
3468                error: format!("Provided object : [{object_id}] is not a Move object."),
3469            })
3470        }
3471    }
3472
3473    /// This function aims to serve rpc reads on past objects and
3474    /// we don't expect it to be called for other purposes.
3475    /// Depending on the object pruning policies that will be enforced in the
3476    /// future there is no software-level guarantee/SLA to retrieve an object
3477    /// with an old version even if it exists/existed.
3478    #[instrument(level = "trace", skip_all)]
3479    pub fn get_past_object_read(
3480        &self,
3481        object_id: &ObjectID,
3482        version: SequenceNumber,
3483    ) -> IotaResult<PastObjectRead> {
3484        // Firstly we see if the object ever existed by getting its latest data
3485        let Some(obj_ref) = self
3486            .get_object_cache_reader()
3487            .try_get_latest_object_ref_or_tombstone(*object_id)?
3488        else {
3489            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3490        };
3491
3492        if version > obj_ref.1 {
3493            return Ok(PastObjectRead::VersionTooHigh {
3494                object_id: *object_id,
3495                asked_version: version,
3496                latest_version: obj_ref.1,
3497            });
3498        }
3499
3500        if version < obj_ref.1 {
3501            // Read past objects
3502            return Ok(match self.read_object_at_version(object_id, version)? {
3503                Some((object, layout)) => {
3504                    let obj_ref = object.compute_object_reference();
3505                    PastObjectRead::VersionFound(obj_ref, object, layout)
3506                }
3507
3508                None => PastObjectRead::VersionNotFound(*object_id, version),
3509            });
3510        }
3511
3512        if !obj_ref.2.is_alive() {
3513            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3514        }
3515
3516        match self.read_object_at_version(object_id, obj_ref.1)? {
3517            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3518            None => {
3519                error!(
3520                    "Object with in parent_entry is missing from object store, datastore is \
3521                     inconsistent",
3522                );
3523                Err(UserInputError::ObjectNotFound {
3524                    object_id: *object_id,
3525                    version: Some(obj_ref.1),
3526                }
3527                .into())
3528            }
3529        }
3530    }
3531
3532    #[instrument(level = "trace", skip_all)]
3533    fn read_object_at_version(
3534        &self,
3535        object_id: &ObjectID,
3536        version: SequenceNumber,
3537    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3538        let Some(object) = self
3539            .get_object_cache_reader()
3540            .try_get_object_by_key(object_id, version)?
3541        else {
3542            return Ok(None);
3543        };
3544
3545        let layout = self.get_object_layout(&object)?;
3546        Ok(Some((object, layout)))
3547    }
3548
3549    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3550        let layout = object
3551            .data
3552            .try_as_move()
3553            .map(|object| {
3554                into_struct_layout(
3555                    self.load_epoch_store_one_call_per_task()
3556                        .executor()
3557                        // TODO(cache) - must read through cache
3558                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3559                        .get_annotated_layout(&object.type_().clone().into())?,
3560                )
3561            })
3562            .transpose()?;
3563        Ok(layout)
3564    }
3565
3566    fn get_owner_at_version(
3567        &self,
3568        object_id: &ObjectID,
3569        version: SequenceNumber,
3570    ) -> IotaResult<Owner> {
3571        self.get_object_store()
3572            .try_get_object_by_key(object_id, version)?
3573            .ok_or_else(|| {
3574                IotaError::from(UserInputError::ObjectNotFound {
3575                    object_id: *object_id,
3576                    version: Some(version),
3577                })
3578            })
3579            .map(|o| o.owner)
3580    }
3581
3582    #[instrument(level = "trace", skip_all)]
3583    pub fn get_owner_objects(
3584        &self,
3585        owner: IotaAddress,
3586        // If `Some`, the query will start from the next item after the specified cursor
3587        cursor: Option<ObjectID>,
3588        limit: usize,
3589        filter: Option<IotaObjectDataFilter>,
3590    ) -> IotaResult<Vec<ObjectInfo>> {
3591        if let Some(indexes) = &self.indexes {
3592            indexes.get_owner_objects(owner, cursor, limit, filter)
3593        } else {
3594            Err(IotaError::IndexStoreNotAvailable)
3595        }
3596    }
3597
3598    #[instrument(level = "trace", skip_all)]
3599    pub fn get_owned_coins_iterator_with_cursor(
3600        &self,
3601        owner: IotaAddress,
3602        // If `Some`, the query will start from the next item after the specified cursor
3603        cursor: (String, ObjectID),
3604        limit: usize,
3605        one_coin_type_only: bool,
3606    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3607        if let Some(indexes) = &self.indexes {
3608            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3609        } else {
3610            Err(IotaError::IndexStoreNotAvailable)
3611        }
3612    }
3613
3614    #[instrument(level = "trace", skip_all)]
3615    pub fn get_owner_objects_iterator(
3616        &self,
3617        owner: IotaAddress,
3618        // If `Some`, the query will start from the next item after the specified cursor
3619        cursor: Option<ObjectID>,
3620        filter: Option<IotaObjectDataFilter>,
3621    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3622        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3623        if let Some(indexes) = &self.indexes {
3624            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3625        } else {
3626            Err(IotaError::IndexStoreNotAvailable)
3627        }
3628    }
3629
3630    #[instrument(level = "trace", skip_all)]
3631    pub async fn get_move_objects<T>(
3632        &self,
3633        owner: IotaAddress,
3634        type_: MoveObjectType,
3635    ) -> IotaResult<Vec<T>>
3636    where
3637        T: DeserializeOwned,
3638    {
3639        let object_ids = self
3640            .get_owner_objects_iterator(owner, None, None)?
3641            .filter(|o| match &o.type_ {
3642                ObjectType::Struct(s) => &type_ == s,
3643                ObjectType::Package => false,
3644            })
3645            .map(|info| ObjectKey(info.object_id, info.version))
3646            .collect::<Vec<_>>();
3647        let mut move_objects = vec![];
3648
3649        let objects = self
3650            .get_object_store()
3651            .try_multi_get_objects_by_key(&object_ids)?;
3652
3653        for (o, id) in objects.into_iter().zip(object_ids) {
3654            let object = o.ok_or_else(|| {
3655                IotaError::from(UserInputError::ObjectNotFound {
3656                    object_id: id.0,
3657                    version: Some(id.1),
3658                })
3659            })?;
3660            let move_object = object.data.try_as_move().ok_or_else(|| {
3661                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3662            })?;
3663            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3664                IotaError::ObjectDeserialization {
3665                    error: format!("{e}"),
3666                }
3667            })?);
3668        }
3669        Ok(move_objects)
3670    }
3671
3672    #[instrument(level = "trace", skip_all)]
3673    pub fn get_dynamic_fields(
3674        &self,
3675        owner: ObjectID,
3676        // If `Some`, the query will start from the next item after the specified cursor
3677        cursor: Option<ObjectID>,
3678        limit: usize,
3679    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3680        Ok(self
3681            .get_dynamic_fields_iterator(owner, cursor)?
3682            .take(limit)
3683            .collect::<Result<Vec<_>, _>>()?)
3684    }
3685
3686    fn get_dynamic_fields_iterator(
3687        &self,
3688        owner: ObjectID,
3689        // If `Some`, the query will start from the next item after the specified cursor
3690        cursor: Option<ObjectID>,
3691    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3692    {
3693        if let Some(indexes) = &self.indexes {
3694            indexes.get_dynamic_fields_iterator(owner, cursor)
3695        } else {
3696            Err(IotaError::IndexStoreNotAvailable)
3697        }
3698    }
3699
3700    #[instrument(level = "trace", skip_all)]
3701    pub fn get_dynamic_field_object_id(
3702        &self,
3703        owner: ObjectID,
3704        name_type: TypeTag,
3705        name_bcs_bytes: &[u8],
3706    ) -> IotaResult<Option<ObjectID>> {
3707        if let Some(indexes) = &self.indexes {
3708            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3709        } else {
3710            Err(IotaError::IndexStoreNotAvailable)
3711        }
3712    }
3713
3714    #[instrument(level = "trace", skip_all)]
3715    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3716        Ok(self.get_indexes()?.next_sequence_number())
3717    }
3718
3719    #[instrument(level = "trace", skip_all)]
3720    pub async fn get_executed_transaction_and_effects(
3721        &self,
3722        digest: TransactionDigest,
3723        kv_store: Arc<TransactionKeyValueStore>,
3724    ) -> IotaResult<(Transaction, TransactionEffects)> {
3725        let transaction = kv_store.get_tx(digest).await?;
3726        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3727        Ok((transaction, effects))
3728    }
3729
3730    #[instrument(level = "trace", skip_all)]
3731    pub fn multi_get_checkpoint_by_sequence_number(
3732        &self,
3733        sequence_numbers: &[CheckpointSequenceNumber],
3734    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3735        Ok(self
3736            .checkpoint_store
3737            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3738    }
3739
3740    #[instrument(level = "trace", skip_all)]
3741    pub fn get_transaction_events(
3742        &self,
3743        digest: &TransactionEventsDigest,
3744    ) -> IotaResult<TransactionEvents> {
3745        self.get_transaction_cache_reader()
3746            .try_get_events(digest)?
3747            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3748    }
3749
3750    pub fn get_transaction_input_objects(
3751        &self,
3752        effects: &TransactionEffects,
3753    ) -> anyhow::Result<Vec<Object>> {
3754        let input_object_keys = effects
3755            .modified_at_versions()
3756            .into_iter()
3757            .map(|(object_id, version)| ObjectKey(object_id, version))
3758            .collect::<Vec<_>>();
3759
3760        let input_objects = self
3761            .get_object_store()
3762            .try_multi_get_objects_by_key(&input_object_keys)?
3763            .into_iter()
3764            .enumerate()
3765            .map(|(idx, maybe_object)| {
3766                maybe_object.ok_or_else(|| {
3767                    anyhow::anyhow!(
3768                        "missing input object key {:?} from tx {}",
3769                        input_object_keys[idx],
3770                        effects.transaction_digest()
3771                    )
3772                })
3773            })
3774            .collect::<anyhow::Result<Vec<_>>>()?;
3775        Ok(input_objects)
3776    }
3777
3778    pub fn get_transaction_output_objects(
3779        &self,
3780        effects: &TransactionEffects,
3781    ) -> anyhow::Result<Vec<Object>> {
3782        let output_object_keys = effects
3783            .all_changed_objects()
3784            .into_iter()
3785            .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
3786            .collect::<Vec<_>>();
3787
3788        let output_objects = self
3789            .get_object_store()
3790            .try_multi_get_objects_by_key(&output_object_keys)?
3791            .into_iter()
3792            .enumerate()
3793            .map(|(idx, maybe_object)| {
3794                maybe_object.ok_or_else(|| {
3795                    anyhow::anyhow!(
3796                        "missing output object key {:?} from tx {}",
3797                        output_object_keys[idx],
3798                        effects.transaction_digest()
3799                    )
3800                })
3801            })
3802            .collect::<anyhow::Result<Vec<_>>>()?;
3803        Ok(output_objects)
3804    }
3805
3806    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3807        match &self.indexes {
3808            Some(i) => Ok(i.clone()),
3809            None => Err(IotaError::UnsupportedFeature {
3810                error: "extended object indexing is not enabled on this server".into(),
3811            }),
3812        }
3813    }
3814
3815    pub async fn get_transactions_for_tests(
3816        self: &Arc<Self>,
3817        filter: Option<TransactionFilter>,
3818        cursor: Option<TransactionDigest>,
3819        limit: Option<usize>,
3820        reverse: bool,
3821    ) -> IotaResult<Vec<TransactionDigest>> {
3822        let metrics = KeyValueStoreMetrics::new_for_tests();
3823        let kv_store = Arc::new(TransactionKeyValueStore::new(
3824            "rocksdb",
3825            metrics,
3826            self.clone(),
3827        ));
3828        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3829            .await
3830    }
3831
3832    #[instrument(level = "trace", skip_all)]
3833    pub async fn get_transactions(
3834        &self,
3835        kv_store: &Arc<TransactionKeyValueStore>,
3836        filter: Option<TransactionFilter>,
3837        // If `Some`, the query will start from the next item after the specified cursor
3838        cursor: Option<TransactionDigest>,
3839        limit: Option<usize>,
3840        reverse: bool,
3841    ) -> IotaResult<Vec<TransactionDigest>> {
3842        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3843            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3844            let iter = checkpoint_contents.iter().map(|c| c.transaction);
3845            if reverse {
3846                let iter = iter
3847                    .rev()
3848                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3849                    .skip(usize::from(cursor.is_some()));
3850                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3851            } else {
3852                let iter = iter
3853                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3854                    .skip(usize::from(cursor.is_some()));
3855                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3856            }
3857        }
3858        self.get_indexes()?
3859            .get_transactions(filter, cursor, limit, reverse)
3860    }
3861
3862    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
3863        &self.checkpoint_store
3864    }
3865
3866    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
3867        self.get_checkpoint_store()
3868            .get_highest_executed_checkpoint_seq_number()?
3869            .ok_or(IotaError::UserInput {
3870                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
3871            })
3872    }
3873
3874    #[cfg(msim)]
3875    pub fn get_highest_pruned_checkpoint_for_testing(
3876        &self,
3877    ) -> IotaResult<CheckpointSequenceNumber> {
3878        self.database_for_testing()
3879            .perpetual_tables
3880            .get_highest_pruned_checkpoint()
3881    }
3882
3883    #[instrument(level = "trace", skip_all)]
3884    pub fn get_checkpoint_summary_by_sequence_number(
3885        &self,
3886        sequence_number: CheckpointSequenceNumber,
3887    ) -> IotaResult<CheckpointSummary> {
3888        let verified_checkpoint = self
3889            .get_checkpoint_store()
3890            .get_checkpoint_by_sequence_number(sequence_number)?;
3891        match verified_checkpoint {
3892            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3893            None => Err(IotaError::UserInput {
3894                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3895            }),
3896        }
3897    }
3898
3899    #[instrument(level = "trace", skip_all)]
3900    pub fn get_checkpoint_summary_by_digest(
3901        &self,
3902        digest: CheckpointDigest,
3903    ) -> IotaResult<CheckpointSummary> {
3904        let verified_checkpoint = self
3905            .get_checkpoint_store()
3906            .get_checkpoint_by_digest(&digest)?;
3907        match verified_checkpoint {
3908            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3909            None => Err(IotaError::UserInput {
3910                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3911            }),
3912        }
3913    }
3914
3915    #[instrument(level = "trace", skip_all)]
3916    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
3917        if is_system_package(package_id) {
3918            return self.find_genesis_txn_digest();
3919        }
3920        Ok(self
3921            .get_object_read(&package_id)?
3922            .into_object()?
3923            .previous_transaction)
3924    }
3925
3926    #[instrument(level = "trace", skip_all)]
3927    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
3928        let summary = self
3929            .get_verified_checkpoint_by_sequence_number(0)?
3930            .into_message();
3931        let content = self.get_checkpoint_contents(summary.content_digest)?;
3932        let genesis_transaction = content.enumerate_transactions(&summary).next();
3933        Ok(genesis_transaction
3934            .ok_or(IotaError::UserInput {
3935                error: UserInputError::GenesisTransactionNotFound,
3936            })?
3937            .1
3938            .transaction)
3939    }
3940
3941    #[instrument(level = "trace", skip_all)]
3942    pub fn get_verified_checkpoint_by_sequence_number(
3943        &self,
3944        sequence_number: CheckpointSequenceNumber,
3945    ) -> IotaResult<VerifiedCheckpoint> {
3946        let verified_checkpoint = self
3947            .get_checkpoint_store()
3948            .get_checkpoint_by_sequence_number(sequence_number)?;
3949        match verified_checkpoint {
3950            Some(verified_checkpoint) => Ok(verified_checkpoint),
3951            None => Err(IotaError::UserInput {
3952                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3953            }),
3954        }
3955    }
3956
3957    #[instrument(level = "trace", skip_all)]
3958    pub fn get_verified_checkpoint_summary_by_digest(
3959        &self,
3960        digest: CheckpointDigest,
3961    ) -> IotaResult<VerifiedCheckpoint> {
3962        let verified_checkpoint = self
3963            .get_checkpoint_store()
3964            .get_checkpoint_by_digest(&digest)?;
3965        match verified_checkpoint {
3966            Some(verified_checkpoint) => Ok(verified_checkpoint),
3967            None => Err(IotaError::UserInput {
3968                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3969            }),
3970        }
3971    }
3972
3973    #[instrument(level = "trace", skip_all)]
3974    pub fn get_checkpoint_contents(
3975        &self,
3976        digest: CheckpointContentsDigest,
3977    ) -> IotaResult<CheckpointContents> {
3978        self.get_checkpoint_store()
3979            .get_checkpoint_contents(&digest)?
3980            .ok_or(IotaError::UserInput {
3981                error: UserInputError::CheckpointContentsNotFound(digest),
3982            })
3983    }
3984
3985    #[instrument(level = "trace", skip_all)]
3986    pub fn get_checkpoint_contents_by_sequence_number(
3987        &self,
3988        sequence_number: CheckpointSequenceNumber,
3989    ) -> IotaResult<CheckpointContents> {
3990        let verified_checkpoint = self
3991            .get_checkpoint_store()
3992            .get_checkpoint_by_sequence_number(sequence_number)?;
3993        match verified_checkpoint {
3994            Some(verified_checkpoint) => {
3995                let content_digest = verified_checkpoint.into_inner().content_digest;
3996                self.get_checkpoint_contents(content_digest)
3997            }
3998            None => Err(IotaError::UserInput {
3999                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4000            }),
4001        }
4002    }
4003
4004    #[instrument(level = "trace", skip_all)]
4005    pub async fn query_events(
4006        &self,
4007        kv_store: &Arc<TransactionKeyValueStore>,
4008        query: EventFilter,
4009        // If `Some`, the query will start from the next item after the specified cursor
4010        cursor: Option<EventID>,
4011        limit: usize,
4012        descending: bool,
4013    ) -> IotaResult<Vec<IotaEvent>> {
4014        let index_store = self.get_indexes()?;
4015
4016        // Get the tx_num from tx_digest
4017        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4018            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4019                IotaError::TransactionNotFound {
4020                    digest: cursor.tx_digest,
4021                },
4022            )?;
4023            (tx_seq, cursor.event_seq as usize)
4024        } else if descending {
4025            (u64::MAX, usize::MAX)
4026        } else {
4027            (0, 0)
4028        };
4029
4030        let limit = limit + 1;
4031        let mut event_keys = match query {
4032            EventFilter::All(filters) => {
4033                if filters.is_empty() {
4034                    index_store.all_events(tx_num, event_num, limit, descending)?
4035                } else {
4036                    return Err(IotaError::UserInput {
4037                        error: UserInputError::Unsupported(
4038                            "This query type does not currently support filter combinations"
4039                                .to_string(),
4040                        ),
4041                    });
4042                }
4043            }
4044            EventFilter::Transaction(digest) => {
4045                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4046            }
4047            EventFilter::MoveModule { package, module } => {
4048                let module_id = ModuleId::new(package.into(), module);
4049                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4050            }
4051            EventFilter::MoveEventType(struct_name) => index_store
4052                .events_by_move_event_struct_name(
4053                    &struct_name,
4054                    tx_num,
4055                    event_num,
4056                    limit,
4057                    descending,
4058                )?,
4059            EventFilter::Sender(sender) => {
4060                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4061            }
4062            EventFilter::TimeRange {
4063                start_time,
4064                end_time,
4065            } => index_store
4066                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4067            EventFilter::MoveEventModule { package, module } => index_store
4068                .events_by_move_event_module(
4069                    &ModuleId::new(package.into(), module),
4070                    tx_num,
4071                    event_num,
4072                    limit,
4073                    descending,
4074                )?,
4075            // not using "_ =>" because we want to make sure we remember to add new variants here
4076            EventFilter::Package(_)
4077            | EventFilter::MoveEventField { .. }
4078            | EventFilter::Any(_)
4079            | EventFilter::And(_, _)
4080            | EventFilter::Or(_, _) => {
4081                return Err(IotaError::UserInput {
4082                    error: UserInputError::Unsupported(
4083                        "This query type is not supported by the full node.".to_string(),
4084                    ),
4085                });
4086            }
4087        };
4088
4089        // skip one event if exclusive cursor is provided,
4090        // otherwise truncate to the original limit.
4091        if cursor.is_some() {
4092            if !event_keys.is_empty() {
4093                event_keys.remove(0);
4094            }
4095        } else {
4096            event_keys.truncate(limit - 1);
4097        }
4098
4099        // get the unique set of digests from the event_keys
4100        let transaction_digests = event_keys
4101            .iter()
4102            .map(|(_, digest, _, _)| *digest)
4103            .collect::<HashSet<_>>()
4104            .into_iter()
4105            .collect::<Vec<_>>();
4106
4107        let events = kv_store
4108            .multi_get_events_by_tx_digests(&transaction_digests)
4109            .await?;
4110
4111        let events_map: HashMap<_, _> =
4112            transaction_digests.iter().zip(events.into_iter()).collect();
4113
4114        let stored_events = event_keys
4115            .into_iter()
4116            .map(|k| {
4117                (
4118                    k,
4119                    events_map
4120                        .get(&k.1)
4121                        .expect("fetched digest is missing")
4122                        .clone()
4123                        .and_then(|e| e.data.get(k.2).cloned()),
4124                )
4125            })
4126            .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4127                event
4128                    .map(|e| (e, tx_digest, event_seq, timestamp))
4129                    .ok_or(IotaError::TransactionEventsNotFound { digest })
4130            })
4131            .collect::<Result<Vec<_>, _>>()?;
4132
4133        let epoch_store = self.load_epoch_store_one_call_per_task();
4134        let backing_store = self.get_backing_package_store().as_ref();
4135        let mut layout_resolver = epoch_store
4136            .executor()
4137            .type_layout_resolver(Box::new(backing_store));
4138        let mut events = vec![];
4139        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4140            events.push(IotaEvent::try_from(
4141                e.clone(),
4142                tx_digest,
4143                event_seq as u64,
4144                Some(timestamp),
4145                layout_resolver.get_annotated_layout(&e.type_)?,
4146            )?)
4147        }
4148        Ok(events)
4149    }
4150
4151    pub async fn insert_genesis_object(&self, object: Object) {
4152        self.get_reconfig_api()
4153            .try_insert_genesis_object(object)
4154            .expect("Cannot insert genesis object")
4155    }
4156
4157    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4158        futures::future::join_all(
4159            objects
4160                .iter()
4161                .map(|o| self.insert_genesis_object(o.clone())),
4162        )
4163        .await;
4164    }
4165
4166    /// Make a status response for a transaction
4167    #[instrument(level = "trace", skip_all)]
4168    pub fn get_transaction_status(
4169        &self,
4170        transaction_digest: &TransactionDigest,
4171        epoch_store: &Arc<AuthorityPerEpochStore>,
4172    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4173        // TODO: In the case of read path, we should not have to re-sign the effects.
4174        if let Some(effects) =
4175            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4176        {
4177            if let Some(transaction) = self
4178                .get_transaction_cache_reader()
4179                .try_get_transaction_block(transaction_digest)?
4180            {
4181                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4182                let events = if let Some(digest) = effects.events_digest() {
4183                    self.get_transaction_events(digest)?
4184                } else {
4185                    TransactionEvents::default()
4186                };
4187                return Ok(Some((
4188                    (*transaction).clone().into_message(),
4189                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4190                )));
4191            } else {
4192                // The read of effects and read of transaction are not atomic. It's possible
4193                // that we reverted the transaction (during epoch change) in
4194                // between the above two reads, and we end up having effects but
4195                // not transaction. In this case, we just fall through.
4196                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4197            }
4198        }
4199        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4200            self.metrics.tx_already_processed.inc();
4201            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4202            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4203        } else {
4204            Ok(None)
4205        }
4206    }
4207
4208    /// Get the signed effects of the given transaction. If the effects was
4209    /// signed in a previous epoch, re-sign it so that the caller is able to
4210    /// form a cert of the effects in the current epoch.
4211    #[instrument(level = "trace", skip_all)]
4212    pub fn get_signed_effects_and_maybe_resign(
4213        &self,
4214        transaction_digest: &TransactionDigest,
4215        epoch_store: &Arc<AuthorityPerEpochStore>,
4216    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4217        let effects = self
4218            .get_transaction_cache_reader()
4219            .try_get_executed_effects(transaction_digest)?;
4220        match effects {
4221            Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4222            None => Ok(None),
4223        }
4224    }
4225
4226    #[instrument(level = "trace", skip_all)]
4227    pub(crate) fn sign_effects(
4228        &self,
4229        effects: TransactionEffects,
4230        epoch_store: &Arc<AuthorityPerEpochStore>,
4231    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4232        let tx_digest = *effects.transaction_digest();
4233        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4234            Some(sig) if sig.epoch == epoch_store.epoch() => {
4235                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4236            }
4237            _ => {
4238                // If the transaction was executed in previous epochs, the validator will
4239                // re-sign the effects with new current epoch so that a client is always able to
4240                // obtain an effects certificate at the current epoch.
4241                //
4242                // Why is this necessary? Consider the following case:
4243                // - assume there are 4 validators
4244                // - Quorum driver gets 2 signed effects before reconfig halt
4245                // - The tx makes it into final checkpoint.
4246                // - 2 validators go away and are replaced in the new epoch.
4247                // - The new epoch begins.
4248                // - The quorum driver cannot complete the partial effects cert from the
4249                //   previous epoch, because it may not be able to reach either of the 2 former
4250                //   validators.
4251                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4252                //   the new epoch, the QD can make a new effects cert and return it to the
4253                //   client.
4254                //
4255                // This is a considered a short-term workaround. Eventually, Quorum Driver
4256                // should be able to return either an effects certificate, -or-
4257                // a proof of inclusion in a checkpoint. In the case above, the
4258                // Quorum Driver would return a proof of inclusion in the final
4259                // checkpoint, and this code would no longer be necessary.
4260                debug!(
4261                    ?tx_digest,
4262                    epoch=?epoch_store.epoch(),
4263                    "Re-signing the effects with the current epoch"
4264                );
4265
4266                let sig = AuthoritySignInfo::new(
4267                    epoch_store.epoch(),
4268                    &effects,
4269                    Intent::iota_app(IntentScope::TransactionEffects),
4270                    self.name,
4271                    &*self.secret,
4272                );
4273
4274                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4275
4276                epoch_store.insert_effects_digest_and_signature(
4277                    &tx_digest,
4278                    effects.digest(),
4279                    &sig,
4280                )?;
4281
4282                effects
4283            }
4284        };
4285
4286        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4287            signed_effects,
4288        ))
4289    }
4290
4291    // Returns coin objects for indexing for fullnode if indexing is enabled.
4292    #[instrument(level = "trace", skip_all)]
4293    fn fullnode_only_get_tx_coins_for_indexing(
4294        &self,
4295        inner_temporary_store: &InnerTemporaryStore,
4296        epoch_store: &Arc<AuthorityPerEpochStore>,
4297    ) -> Option<TxCoins> {
4298        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4299            return None;
4300        }
4301        let written_coin_objects = inner_temporary_store
4302            .written
4303            .iter()
4304            .filter_map(|(k, v)| {
4305                if v.is_coin() {
4306                    Some((*k, v.clone()))
4307                } else {
4308                    None
4309                }
4310            })
4311            .collect();
4312        let input_coin_objects = inner_temporary_store
4313            .input_objects
4314            .iter()
4315            .filter_map(|(k, v)| {
4316                if v.is_coin() {
4317                    Some((*k, v.clone()))
4318                } else {
4319                    None
4320                }
4321            })
4322            .collect::<ObjectMap>();
4323        Some((input_coin_objects, written_coin_objects))
4324    }
4325
4326    /// Get the TransactionEnvelope that currently locks the given object, if
4327    /// any. Since object locks are only valid for one epoch, we also need
4328    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4329    /// no lock records for the given object can be found.
4330    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4331    /// object record is at a different version.
4332    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4333    /// certain transaction. Returns None if the a lock record is
4334    /// initialized for the given ObjectRef but not yet locked by any
4335    /// transaction,     or cannot find the transaction in transaction
4336    /// table, because of data race etc.
4337    #[instrument(level = "trace", skip_all)]
4338    pub async fn get_transaction_lock(
4339        &self,
4340        object_ref: &ObjectRef,
4341        epoch_store: &AuthorityPerEpochStore,
4342    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4343        let lock_info = self
4344            .get_object_cache_reader()
4345            .try_get_lock(*object_ref, epoch_store)?;
4346        let lock_info = match lock_info {
4347            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4348                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4349                    provided_obj_ref: *object_ref,
4350                    current_version: locked_ref.1,
4351                }
4352                .into());
4353            }
4354            ObjectLockStatus::Initialized => {
4355                return Ok(None);
4356            }
4357            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4358        };
4359
4360        epoch_store.get_signed_transaction(&lock_info)
4361    }
4362
4363    pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4364        self.get_object_cache_reader().try_get_objects(objects)
4365    }
4366
4367    /// Non-fallible version of `try_get_objects`.
4368    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4369        self.try_get_objects(objects)
4370            .await
4371            .expect("storage access failed")
4372    }
4373
4374    pub async fn try_get_object_or_tombstone(
4375        &self,
4376        object_id: ObjectID,
4377    ) -> IotaResult<Option<ObjectRef>> {
4378        self.get_object_cache_reader()
4379            .try_get_latest_object_ref_or_tombstone(object_id)
4380    }
4381
4382    /// Non-fallible version of `try_get_object_or_tombstone`.
4383    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4384        self.try_get_object_or_tombstone(object_id)
4385            .await
4386            .expect("storage access failed")
4387    }
4388
4389    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4390    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4391    /// upgrade.
4392    ///
4393    /// This method can be used to dynamic adjust the amount of buffer. If set
4394    /// to 0, the upgrade will go through with only 2f+1 votes.
4395    ///
4396    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4397    /// should have the same value), or you risk halting the chain.
4398    pub fn set_override_protocol_upgrade_buffer_stake(
4399        &self,
4400        expected_epoch: EpochId,
4401        buffer_stake_bps: u64,
4402    ) -> IotaResult {
4403        let epoch_store = self.load_epoch_store_one_call_per_task();
4404        let actual_epoch = epoch_store.epoch();
4405        if actual_epoch != expected_epoch {
4406            return Err(IotaError::WrongEpoch {
4407                expected_epoch,
4408                actual_epoch,
4409            });
4410        }
4411
4412        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4413    }
4414
4415    pub fn clear_override_protocol_upgrade_buffer_stake(
4416        &self,
4417        expected_epoch: EpochId,
4418    ) -> IotaResult {
4419        let epoch_store = self.load_epoch_store_one_call_per_task();
4420        let actual_epoch = epoch_store.epoch();
4421        if actual_epoch != expected_epoch {
4422            return Err(IotaError::WrongEpoch {
4423                expected_epoch,
4424                actual_epoch,
4425            });
4426        }
4427
4428        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4429    }
4430
4431    /// Get the set of system packages that are compiled in to this build, if
4432    /// those packages are compatible with the current versions of those
4433    /// packages on-chain.
4434    pub async fn get_available_system_packages(
4435        &self,
4436        binary_config: &BinaryConfig,
4437    ) -> Vec<ObjectRef> {
4438        let mut results = vec![];
4439
4440        let system_packages = BuiltInFramework::iter_system_packages();
4441
4442        // Add extra framework packages during simtest
4443        #[cfg(msim)]
4444        let extra_packages = framework_injection::get_extra_packages(self.name);
4445        #[cfg(msim)]
4446        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4447
4448        for system_package in system_packages {
4449            let modules = system_package.modules().to_vec();
4450            // In simtests, we could override the current built-in framework packages.
4451            #[cfg(msim)]
4452            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4453                .unwrap_or(modules);
4454
4455            let Some(obj_ref) = iota_framework::compare_system_package(
4456                &self.get_object_store(),
4457                &system_package.id,
4458                &modules,
4459                system_package.dependencies.to_vec(),
4460                binary_config,
4461            )
4462            .await
4463            else {
4464                return vec![];
4465            };
4466            results.push(obj_ref);
4467        }
4468
4469        results
4470    }
4471
4472    /// Return the new versions, module bytes, and dependencies for the packages
4473    /// that have been committed to for a framework upgrade, in
4474    /// `system_packages`.  Loads the module contents from the binary, and
4475    /// performs the following checks:
4476    ///
4477    /// - Whether its contents matches what is on-chain already, in which case
4478    ///   no upgrade is required, and its contents are omitted from the output.
4479    /// - Whether the contents in the binary can form a package whose digest
4480    ///   matches the input, meaning the framework will be upgraded, and this
4481    ///   authority can satisfy that upgrade, in which case the contents are
4482    ///   included in the output.
4483    ///
4484    /// If the current version of the framework can't be loaded, the binary does
4485    /// not contain the bytes for that framework ID, or the resulting
4486    /// package fails the digest check, `None` is returned indicating that
4487    /// this authority cannot run the upgrade that the network voted on.
4488    async fn get_system_package_bytes(
4489        &self,
4490        system_packages: Vec<ObjectRef>,
4491        binary_config: &BinaryConfig,
4492    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4493        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4494        let objects = self.get_objects(&ids).await;
4495
4496        let mut res = Vec::with_capacity(system_packages.len());
4497        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4498            let prev_transaction = match object {
4499                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4500                    // Skip this one because it doesn't need to be upgraded.
4501                    info!("Framework {} does not need updating", system_package_ref.0);
4502                    continue;
4503                }
4504
4505                Some(cur_object) => cur_object.previous_transaction,
4506                None => TransactionDigest::genesis_marker(),
4507            };
4508
4509            #[cfg(msim)]
4510            let SystemPackage {
4511                id: _,
4512                bytes,
4513                dependencies,
4514            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4515                .unwrap_or_else(|| {
4516                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4517                });
4518
4519            #[cfg(not(msim))]
4520            let SystemPackage {
4521                id: _,
4522                bytes,
4523                dependencies,
4524            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4525
4526            let modules: Vec<_> = bytes
4527                .iter()
4528                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4529                .collect();
4530
4531            let new_object = Object::new_system_package(
4532                &modules,
4533                system_package_ref.1,
4534                dependencies.clone(),
4535                prev_transaction,
4536            );
4537
4538            let new_ref = new_object.compute_object_reference();
4539            if new_ref != system_package_ref {
4540                error!(
4541                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4542                );
4543                return None;
4544            }
4545
4546            res.push((system_package_ref.1, bytes, dependencies));
4547        }
4548
4549        Some(res)
4550    }
4551
4552    /// Returns the new protocol version and system packages that the network
4553    /// has voted to upgrade to. If the proposed protocol version is not
4554    /// supported, None is returned.
4555    fn is_protocol_version_supported_v1(
4556        proposed_protocol_version: ProtocolVersion,
4557        committee: &Committee,
4558        capabilities: Vec<AuthorityCapabilitiesV1>,
4559        mut buffer_stake_bps: u64,
4560    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4561        if buffer_stake_bps > 10000 {
4562            warn!("clamping buffer_stake_bps to 10000");
4563            buffer_stake_bps = 10000;
4564        }
4565
4566        // For each validator, gather the protocol version and system packages that it
4567        // would like to upgrade to in the next epoch.
4568        let mut desired_upgrades: Vec<_> = capabilities
4569            .into_iter()
4570            .filter_map(|mut cap| {
4571                // A validator that lists no packages is voting against any change at all.
4572                if cap.available_system_packages.is_empty() {
4573                    return None;
4574                }
4575
4576                cap.available_system_packages.sort();
4577
4578                info!(
4579                    "validator {:?} supports {:?} with system packages: {:?}",
4580                    cap.authority.concise(),
4581                    cap.supported_protocol_versions,
4582                    cap.available_system_packages,
4583                );
4584
4585                // A validator that only supports the current protocol version is also voting
4586                // against any change, because framework upgrades always require a protocol
4587                // version bump.
4588                cap.supported_protocol_versions
4589                    .get_version_digest(proposed_protocol_version)
4590                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4591            })
4592            .collect();
4593
4594        // There can only be one set of votes that have a majority, find one if it
4595        // exists.
4596        desired_upgrades.sort();
4597        desired_upgrades
4598            .into_iter()
4599            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4600            .into_iter()
4601            .find_map(|((digest, packages), group)| {
4602                // should have been filtered out earlier.
4603                assert!(!packages.is_empty());
4604
4605                let mut stake_aggregator: StakeAggregator<(), true> =
4606                    StakeAggregator::new(Arc::new(committee.clone()));
4607
4608                for (_, _, authority) in group {
4609                    stake_aggregator.insert_generic(authority, ());
4610                }
4611
4612                let total_votes = stake_aggregator.total_votes();
4613                let quorum_threshold = committee.quorum_threshold();
4614                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4615
4616                info!(
4617                    protocol_config_digest = ?digest,
4618                    ?total_votes,
4619                    ?quorum_threshold,
4620                    ?buffer_stake_bps,
4621                    ?effective_threshold,
4622                    ?proposed_protocol_version,
4623                    ?packages,
4624                    "support for upgrade"
4625                );
4626
4627                let has_support = total_votes >= effective_threshold;
4628                has_support.then_some((proposed_protocol_version, digest, packages))
4629            })
4630    }
4631
4632    /// Selects the highest supported protocol version and system packages that
4633    /// the network has voted to upgrade to. If no upgrade is supported,
4634    /// returns the current protocol version and system packages.
4635    fn choose_protocol_version_and_system_packages_v1(
4636        current_protocol_version: ProtocolVersion,
4637        current_protocol_digest: Digest,
4638        committee: &Committee,
4639        capabilities: Vec<AuthorityCapabilitiesV1>,
4640        buffer_stake_bps: u64,
4641    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4642        let mut next_protocol_version = current_protocol_version;
4643        let mut system_packages = vec![];
4644        let mut protocol_version_digest = current_protocol_digest;
4645
4646        // Finds the highest supported protocol version and system packages by
4647        // incrementing the proposed protocol version by one until no further
4648        // upgrades are supported.
4649        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4650            next_protocol_version + 1,
4651            committee,
4652            capabilities.clone(),
4653            buffer_stake_bps,
4654        ) {
4655            next_protocol_version = version;
4656            protocol_version_digest = digest;
4657            system_packages = packages;
4658        }
4659
4660        (
4661            next_protocol_version,
4662            protocol_version_digest,
4663            system_packages,
4664        )
4665    }
4666
4667    /// Returns the indices of validators that support the given protocol
4668    /// version and digest. This includes both committee and non-committee
4669    /// validators based on their capabilities. Uses active validators
4670    /// instead of committee indices.
4671    fn get_validators_supporting_protocol_version(
4672        target_protocol_version: ProtocolVersion,
4673        target_digest: Digest,
4674        active_validators: &[AuthorityPublicKey],
4675        capabilities: &[AuthorityCapabilitiesV1],
4676    ) -> Vec<u64> {
4677        let mut eligible_validators = Vec::new();
4678
4679        for capability in capabilities {
4680            // Check if this validator supports the target protocol version and digest
4681            if let Some(digest) = capability
4682                .supported_protocol_versions
4683                .get_version_digest(target_protocol_version)
4684            {
4685                if digest == target_digest {
4686                    // Find the validator's index in the active validators list
4687                    if let Some(index) = active_validators
4688                        .iter()
4689                        .position(|name| AuthorityName::from(name) == capability.authority)
4690                    {
4691                        eligible_validators.push(index as u64);
4692                    }
4693                }
4694            }
4695        }
4696
4697        // Sort indices for deterministic behavior
4698        eligible_validators.sort();
4699        eligible_validators
4700    }
4701
4702    /// Calculates the sum of weights for eligible validators that are part of
4703    /// the committee. Takes the indices from
4704    /// get_validators_supporting_protocol_version and maps them back
4705    /// to committee members to get their weights.
4706    fn calculate_eligible_validators_weight(
4707        eligible_validator_indices: &[u64],
4708        active_validators: &[AuthorityPublicKey],
4709        committee: &Committee,
4710    ) -> u64 {
4711        let mut total_weight = 0u64;
4712
4713        for &index in eligible_validator_indices {
4714            let authority_pubkey = &active_validators[index as usize];
4715            // Check if this validator is in the committee and get their weight
4716            if let Some((_, weight)) = committee
4717                .members()
4718                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4719            {
4720                total_weight += weight;
4721            }
4722        }
4723
4724        total_weight
4725    }
4726
4727    #[instrument(level = "debug", skip_all)]
4728    fn create_authenticator_state_tx(
4729        &self,
4730        epoch_store: &Arc<AuthorityPerEpochStore>,
4731    ) -> Option<EndOfEpochTransactionKind> {
4732        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4733            info!("authenticator state transactions not enabled");
4734            return None;
4735        }
4736
4737        let authenticator_state_exists = epoch_store.authenticator_state_exists();
4738        let tx = if authenticator_state_exists {
4739            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4740            let min_epoch =
4741                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4742            let authenticator_obj_initial_shared_version = epoch_store
4743                .epoch_start_config()
4744                .authenticator_obj_initial_shared_version()
4745                .expect("initial version must exist");
4746
4747            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4748                min_epoch,
4749                authenticator_obj_initial_shared_version,
4750            );
4751
4752            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4753
4754            tx
4755        } else {
4756            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4757            info!("Creating AuthenticatorStateCreate tx");
4758            tx
4759        };
4760        Some(tx)
4761    }
4762
4763    /// Creates and execute the advance epoch transaction to effects without
4764    /// committing it to the database. The effects of the change epoch tx
4765    /// are only written to the database after a certified checkpoint has been
4766    /// formed and executed by CheckpointExecutor.
4767    ///
4768    /// When a framework upgraded has been decided on, but the validator does
4769    /// not have the new versions of the packages locally, the validator
4770    /// cannot form the ChangeEpochTx. In this case it returns Err,
4771    /// indicating that the checkpoint builder should give up trying to make the
4772    /// final checkpoint. As long as the network is able to create a certified
4773    /// checkpoint (which should be ensured by the capabilities vote), it
4774    /// will arrive via state sync and be executed by CheckpointExecutor.
4775    #[instrument(level = "error", skip_all)]
4776    pub async fn create_and_execute_advance_epoch_tx(
4777        &self,
4778        epoch_store: &Arc<AuthorityPerEpochStore>,
4779        gas_cost_summary: &GasCostSummary,
4780        checkpoint: CheckpointSequenceNumber,
4781        epoch_start_timestamp_ms: CheckpointTimestamp,
4782    ) -> anyhow::Result<(
4783        IotaSystemState,
4784        Option<SystemEpochInfoEvent>,
4785        TransactionEffects,
4786    )> {
4787        let mut txns = Vec::new();
4788
4789        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4790            txns.push(tx);
4791        }
4792
4793        let next_epoch = epoch_store.epoch() + 1;
4794
4795        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4796        let authority_capabilities = epoch_store
4797            .get_capabilities_v1()
4798            .expect("read capabilities from db cannot fail");
4799        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4800            Self::choose_protocol_version_and_system_packages_v1(
4801                epoch_store.protocol_version(),
4802                SupportedProtocolVersionsWithHashes::protocol_config_digest(
4803                    epoch_store.protocol_config(),
4804                ),
4805                epoch_store.committee(),
4806                authority_capabilities.clone(),
4807                buffer_stake_bps,
4808            );
4809
4810        // since system packages are created during the current epoch, they should abide
4811        // by the rules of the current epoch, including the current epoch's max
4812        // Move binary format version
4813        let config = epoch_store.protocol_config();
4814        let binary_config = to_binary_config(config);
4815        let Some(next_epoch_system_package_bytes) = self
4816            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4817            .await
4818        else {
4819            error!(
4820                "upgraded system packages {:?} are not locally available, cannot create \
4821                ChangeEpochTx. validator binary must be upgraded to the correct version!",
4822                next_epoch_system_packages
4823            );
4824            // the checkpoint builder will keep retrying forever when it hits this error.
4825            // Eventually, one of two things will happen:
4826            // - The operator will upgrade this binary to one that has the new packages
4827            //   locally, and this function will succeed.
4828            // - The final checkpoint will be certified by other validators, we will receive
4829            //   it via state sync, and execute it. This will upgrade the framework
4830            //   packages, reconfigure, and most likely shut down in the new epoch (this
4831            //   validator likely doesn't support the new protocol version, or else it
4832            //   should have had the packages.)
4833            bail!("missing system packages: cannot form ChangeEpochTx");
4834        };
4835
4836        // Use ChangeEpochV3 when the feature flag is enabled and ChangeEpochV2
4837        // requirements are met
4838
4839        if config.select_committee_from_eligible_validators() {
4840            // Get the list of eligible validators that support the target protocol version
4841            let active_validators = epoch_store.epoch_start_state().get_active_validators();
4842
4843            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
4844
4845            // Use validators supporting the target protocol version as eligible validators
4846            // in the next version if select_committee_supporting_next_epoch_version feature
4847            // flag is set to true.
4848            if config.select_committee_supporting_next_epoch_version() {
4849                eligible_active_validators = Self::get_validators_supporting_protocol_version(
4850                    next_epoch_protocol_version,
4851                    next_epoch_protocol_digest,
4852                    &active_validators,
4853                    &authority_capabilities,
4854                );
4855
4856                // Calculate the total weight of eligible validators in the committee
4857                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
4858                    &eligible_active_validators,
4859                    &active_validators,
4860                    epoch_store.committee(),
4861                );
4862
4863                // Safety check: ensure eligible validators have enough stake
4864                // Use the same effective threshold calculation that was used to decide the
4865                // protocol version
4866                let committee = epoch_store.committee();
4867                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4868
4869                if eligible_validators_weight < effective_threshold {
4870                    error!(
4871                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
4872                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
4873                    );
4874                    // Pass all active validator indices as eligible validators
4875                    // to perform selection among all of them.
4876                    eligible_active_validators = (0..active_validators.len() as u64).collect();
4877                }
4878            }
4879
4880            txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
4881                next_epoch,
4882                next_epoch_protocol_version,
4883                gas_cost_summary.storage_cost,
4884                gas_cost_summary.computation_cost,
4885                gas_cost_summary.computation_cost_burned,
4886                gas_cost_summary.storage_rebate,
4887                gas_cost_summary.non_refundable_storage_fee,
4888                epoch_start_timestamp_ms,
4889                next_epoch_system_package_bytes,
4890                eligible_active_validators,
4891            ));
4892        } else if config.protocol_defined_base_fee()
4893            && config.max_committee_members_count_as_option().is_some()
4894        {
4895            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
4896                next_epoch,
4897                next_epoch_protocol_version,
4898                gas_cost_summary.storage_cost,
4899                gas_cost_summary.computation_cost,
4900                gas_cost_summary.computation_cost_burned,
4901                gas_cost_summary.storage_rebate,
4902                gas_cost_summary.non_refundable_storage_fee,
4903                epoch_start_timestamp_ms,
4904                next_epoch_system_package_bytes,
4905            ));
4906        } else {
4907            txns.push(EndOfEpochTransactionKind::new_change_epoch(
4908                next_epoch,
4909                next_epoch_protocol_version,
4910                gas_cost_summary.storage_cost,
4911                gas_cost_summary.computation_cost,
4912                gas_cost_summary.storage_rebate,
4913                gas_cost_summary.non_refundable_storage_fee,
4914                epoch_start_timestamp_ms,
4915                next_epoch_system_package_bytes,
4916            ));
4917        }
4918
4919        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
4920
4921        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
4922            tx.clone(),
4923            epoch_store.epoch(),
4924            checkpoint,
4925        );
4926
4927        let tx_digest = executable_tx.digest();
4928
4929        info!(
4930            ?next_epoch,
4931            ?next_epoch_protocol_version,
4932            ?next_epoch_system_packages,
4933            computation_cost=?gas_cost_summary.computation_cost,
4934            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
4935            storage_cost=?gas_cost_summary.storage_cost,
4936            storage_rebate=?gas_cost_summary.storage_rebate,
4937            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
4938            ?tx_digest,
4939            "Creating advance epoch transaction"
4940        );
4941
4942        fail_point_async!("change_epoch_tx_delay");
4943        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
4944
4945        // The tx could have been executed by state sync already - if so simply return
4946        // an error. The checkpoint builder will shortly be terminated by
4947        // reconfiguration anyway.
4948        if self
4949            .get_transaction_cache_reader()
4950            .try_is_tx_already_executed(tx_digest)?
4951        {
4952            warn!("change epoch tx has already been executed via state sync");
4953            bail!("change epoch tx has already been executed via state sync",);
4954        }
4955
4956        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
4957
4958        // We must manually assign the shared object versions to the transaction before
4959        // executing it. This is because we do not sequence end-of-epoch
4960        // transactions through consensus.
4961        epoch_store.assign_shared_object_versions_idempotent(
4962            self.get_object_cache_reader().as_ref(),
4963            std::slice::from_ref(&executable_tx),
4964        )?;
4965
4966        let input_objects =
4967            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
4968
4969        let (temporary_store, effects, _execution_error_opt) =
4970            self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
4971        let system_obj = get_iota_system_state(&temporary_store.written)
4972            .expect("change epoch tx must write to system object");
4973        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
4974        let system_epoch_info_event = temporary_store
4975            .events
4976            .data
4977            .into_iter()
4978            .find(|event| event.is_system_epoch_info_event())
4979            .map(SystemEpochInfoEvent::from);
4980        // The system epoch info event can be `None` in case if the `advance_epoch`
4981        // Move function call failed and was executed in the safe mode.
4982        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
4983
4984        // We must write tx and effects to the state sync tables so that state sync is
4985        // able to deliver to the transaction to CheckpointExecutor after it is
4986        // included in a certified checkpoint.
4987        self.get_state_sync_store()
4988            .try_insert_transaction_and_effects(&tx, &effects)
4989            .map_err(|err| {
4990                let err: anyhow::Error = err.into();
4991                err
4992            })?;
4993
4994        info!(
4995            "Effects summary of the change epoch transaction: {:?}",
4996            effects.summary_for_debug()
4997        );
4998        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
4999        // The change epoch transaction cannot fail to execute.
5000        assert!(effects.status().is_ok());
5001        Ok((system_obj, system_epoch_info_event, effects))
5002    }
5003
5004    /// This function is called at the very end of the epoch.
5005    /// This step is required before updating new epoch in the db and calling
5006    /// reopen_epoch_db.
5007    #[instrument(level = "error", skip_all)]
5008    async fn revert_uncommitted_epoch_transactions(
5009        &self,
5010        epoch_store: &AuthorityPerEpochStore,
5011    ) -> IotaResult {
5012        {
5013            let state = epoch_store.get_reconfig_state_write_lock_guard();
5014            if state.should_accept_user_certs() {
5015                // Need to change this so that consensus adapter do not accept certificates from
5016                // user. This can happen if our local validator did not initiate
5017                // epoch change locally, but 2f+1 nodes already concluded the
5018                // epoch.
5019                //
5020                // This lock is essentially a barrier for
5021                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5022                // after this block
5023                epoch_store.close_user_certs(state);
5024            }
5025            // lock is dropped here
5026        }
5027        let pending_certificates = epoch_store.pending_consensus_certificates();
5028        info!(
5029            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5030            pending_certificates.len(),
5031            pending_certificates,
5032        );
5033        for digest in pending_certificates {
5034            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5035                info!(
5036                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5037                    digest
5038                );
5039                continue;
5040            }
5041            info!("Reverting {:?} at the end of epoch", digest);
5042            epoch_store.revert_executed_transaction(&digest)?;
5043            self.get_reconfig_api().try_revert_state_update(&digest)?;
5044        }
5045        info!("All uncommitted local transactions reverted");
5046        Ok(())
5047    }
5048
5049    #[instrument(level = "error", skip_all)]
5050    async fn reopen_epoch_db(
5051        &self,
5052        cur_epoch_store: &AuthorityPerEpochStore,
5053        new_committee: Committee,
5054        epoch_start_configuration: EpochStartConfiguration,
5055        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5056        epoch_last_checkpoint: CheckpointSequenceNumber,
5057    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5058        let new_epoch = new_committee.epoch;
5059        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5060        assert_eq!(
5061            epoch_start_configuration.epoch_start_state().epoch(),
5062            new_committee.epoch
5063        );
5064        fail_point!("before-open-new-epoch-store");
5065        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5066            self.name,
5067            new_committee,
5068            epoch_start_configuration,
5069            self.get_backing_package_store().clone(),
5070            self.get_object_store().clone(),
5071            expensive_safety_check_config,
5072            epoch_last_checkpoint,
5073        );
5074        self.epoch_store.store(new_epoch_store.clone());
5075        Ok(new_epoch_store)
5076    }
5077
5078    #[cfg(test)]
5079    pub(crate) fn iter_live_object_set_for_testing(
5080        &self,
5081    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5082        self.get_accumulator_store()
5083            .iter_cached_live_object_set_for_testing()
5084    }
5085
5086    #[cfg(test)]
5087    pub(crate) fn shutdown_execution_for_test(&self) {
5088        self.tx_execution_shutdown
5089            .lock()
5090            .take()
5091            .unwrap()
5092            .send(())
5093            .unwrap();
5094    }
5095
5096    /// NOTE: this function is only to be used for fuzzing and testing. Never
5097    /// use in prod
5098    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5099        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5100        self.get_object_cache_reader()
5101            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5102        self.get_reconfig_api()
5103            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5104    }
5105}
5106
5107pub struct RandomnessRoundReceiver {
5108    authority_state: Arc<AuthorityState>,
5109    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5110}
5111
5112impl RandomnessRoundReceiver {
5113    pub fn spawn(
5114        authority_state: Arc<AuthorityState>,
5115        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5116    ) -> JoinHandle<()> {
5117        let rrr = RandomnessRoundReceiver {
5118            authority_state,
5119            randomness_rx,
5120        };
5121        spawn_monitored_task!(rrr.run())
5122    }
5123
5124    async fn run(mut self) {
5125        info!("RandomnessRoundReceiver event loop started");
5126
5127        loop {
5128            tokio::select! {
5129                maybe_recv = self.randomness_rx.recv() => {
5130                    if let Some((epoch, round, bytes)) = maybe_recv {
5131                        self.handle_new_randomness(epoch, round, bytes);
5132                    } else {
5133                        break;
5134                    }
5135                },
5136            }
5137        }
5138
5139        info!("RandomnessRoundReceiver event loop ended");
5140    }
5141
5142    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5143    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5144        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5145        if epoch_store.epoch() != epoch {
5146            warn!(
5147                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5148                epoch_store.epoch()
5149            );
5150            return;
5151        }
5152        let transaction = VerifiedTransaction::new_randomness_state_update(
5153            epoch,
5154            round,
5155            bytes,
5156            epoch_store
5157                .epoch_start_config()
5158                .randomness_obj_initial_shared_version(),
5159        );
5160        debug!(
5161            "created randomness state update transaction with digest: {:?}",
5162            transaction.digest()
5163        );
5164        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5165        let digest = *transaction.digest();
5166
5167        // Randomness state updates contain the full bls signature for the random round,
5168        // which cannot necessarily be reconstructed again later. Therefore we must
5169        // immediately persist this transaction. If we crash before its outputs
5170        // are committed, this ensures we will be able to re-execute it.
5171        self.authority_state
5172            .get_cache_commit()
5173            .persist_transaction(&transaction);
5174
5175        // Send transaction to TransactionManager for execution.
5176        self.authority_state
5177            .transaction_manager()
5178            .enqueue(vec![transaction], &epoch_store);
5179
5180        let authority_state = self.authority_state.clone();
5181        spawn_monitored_task!(async move {
5182            // Wait for transaction execution in a separate task, to avoid deadlock in case
5183            // of out-of-order randomness generation. (Each
5184            // RandomnessStateUpdate depends on the output of the
5185            // RandomnessStateUpdate from the previous round.)
5186            //
5187            // We set a very long timeout so that in case this gets stuck for some reason,
5188            // the validator will eventually crash rather than continuing in a
5189            // zombie mode.
5190            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5191            let result = tokio::time::timeout(
5192                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5193                authority_state
5194                    .get_transaction_cache_reader()
5195                    .try_notify_read_executed_effects(&[digest]),
5196            )
5197            .await;
5198            let result = match result {
5199                Ok(result) => result,
5200                Err(_) => {
5201                    if cfg!(debug_assertions) {
5202                        // Crash on randomness update execution timeout in debug builds.
5203                        panic!(
5204                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5205                        );
5206                    }
5207                    warn!(
5208                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5209                    );
5210                    // Continue waiting as long as necessary in non-debug builds.
5211                    authority_state
5212                        .get_transaction_cache_reader()
5213                        .try_notify_read_executed_effects(&[digest])
5214                        .await
5215                }
5216            };
5217
5218            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5219            let effects = effects.pop().expect("should return effects");
5220            if *effects.status() != ExecutionStatus::Success {
5221                fatal!(
5222                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5223                );
5224            }
5225            debug!(
5226                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5227            );
5228        });
5229    }
5230}
5231
5232#[async_trait]
5233impl TransactionKeyValueStoreTrait for AuthorityState {
5234    async fn multi_get(
5235        &self,
5236        transaction_keys: &[TransactionDigest],
5237        effects_keys: &[TransactionDigest],
5238    ) -> IotaResult<KVStoreTransactionData> {
5239        let txns = if !transaction_keys.is_empty() {
5240            self.get_transaction_cache_reader()
5241                .try_multi_get_transaction_blocks(transaction_keys)?
5242                .into_iter()
5243                .map(|t| t.map(|t| (*t).clone().into_inner()))
5244                .collect()
5245        } else {
5246            vec![]
5247        };
5248
5249        let fx = if !effects_keys.is_empty() {
5250            self.get_transaction_cache_reader()
5251                .try_multi_get_executed_effects(effects_keys)?
5252        } else {
5253            vec![]
5254        };
5255
5256        Ok((txns, fx))
5257    }
5258
5259    async fn multi_get_checkpoints(
5260        &self,
5261        checkpoint_summaries: &[CheckpointSequenceNumber],
5262        checkpoint_contents: &[CheckpointSequenceNumber],
5263        checkpoint_summaries_by_digest: &[CheckpointDigest],
5264    ) -> IotaResult<(
5265        Vec<Option<CertifiedCheckpointSummary>>,
5266        Vec<Option<CheckpointContents>>,
5267        Vec<Option<CertifiedCheckpointSummary>>,
5268    )> {
5269        // TODO: use multi-get methods if it ever becomes important (unlikely)
5270        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5271        let store = self.get_checkpoint_store();
5272        for seq in checkpoint_summaries {
5273            let checkpoint = store
5274                .get_checkpoint_by_sequence_number(*seq)?
5275                .map(|c| c.into_inner());
5276
5277            summaries.push(checkpoint);
5278        }
5279
5280        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5281        for seq in checkpoint_contents {
5282            let checkpoint = store
5283                .get_checkpoint_by_sequence_number(*seq)?
5284                .and_then(|summary| {
5285                    store
5286                        .get_checkpoint_contents(&summary.content_digest)
5287                        .expect("db read cannot fail")
5288                });
5289            contents.push(checkpoint);
5290        }
5291
5292        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5293        for digest in checkpoint_summaries_by_digest {
5294            let checkpoint = store
5295                .get_checkpoint_by_digest(digest)?
5296                .map(|c| c.into_inner());
5297            summaries_by_digest.push(checkpoint);
5298        }
5299
5300        Ok((summaries, contents, summaries_by_digest))
5301    }
5302
5303    async fn get_transaction_perpetual_checkpoint(
5304        &self,
5305        digest: TransactionDigest,
5306    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5307        self.get_checkpoint_cache()
5308            .try_get_transaction_perpetual_checkpoint(&digest)
5309            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5310    }
5311
5312    async fn get_object(
5313        &self,
5314        object_id: ObjectID,
5315        version: VersionNumber,
5316    ) -> IotaResult<Option<Object>> {
5317        self.get_object_cache_reader()
5318            .try_get_object_by_key(&object_id, version)
5319    }
5320
5321    async fn multi_get_transactions_perpetual_checkpoints(
5322        &self,
5323        digests: &[TransactionDigest],
5324    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5325        let res = self
5326            .get_checkpoint_cache()
5327            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5328
5329        Ok(res
5330            .into_iter()
5331            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5332            .collect())
5333    }
5334
5335    #[instrument(skip(self))]
5336    async fn multi_get_events_by_tx_digests(
5337        &self,
5338        digests: &[TransactionDigest],
5339    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5340        if digests.is_empty() {
5341            return Ok(vec![]);
5342        }
5343        let events_digests: Vec<_> = self
5344            .get_transaction_cache_reader()
5345            .try_multi_get_executed_effects(digests)?
5346            .into_iter()
5347            .map(|t| t.and_then(|t| t.events_digest().cloned()))
5348            .collect();
5349        let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5350        let mut events = self
5351            .get_transaction_cache_reader()
5352            .try_multi_get_events(&non_empty_events)?
5353            .into_iter();
5354        Ok(events_digests
5355            .into_iter()
5356            .map(|ev| ev.and_then(|_| events.next()?))
5357            .collect())
5358    }
5359}
5360
5361#[cfg(msim)]
5362pub mod framework_injection {
5363    use std::{
5364        cell::RefCell,
5365        collections::{BTreeMap, BTreeSet},
5366    };
5367
5368    use iota_framework::{BuiltInFramework, SystemPackage};
5369    use iota_types::{
5370        base_types::{AuthorityName, ObjectID},
5371        is_system_package,
5372    };
5373    use move_binary_format::CompiledModule;
5374
5375    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5376
5377    // Thread local cache because all simtests run in a single unique thread.
5378    thread_local! {
5379        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5380    }
5381
5382    type Framework = Vec<CompiledModule>;
5383
5384    pub type PackageUpgradeCallback =
5385        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5386
5387    enum PackageOverrideConfig {
5388        Global(Framework),
5389        PerValidator(PackageUpgradeCallback),
5390    }
5391
5392    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5393        modules
5394            .iter()
5395            .map(|m| {
5396                let mut buf = Vec::new();
5397                m.serialize_with_version(m.version, &mut buf).unwrap();
5398                buf
5399            })
5400            .collect()
5401    }
5402
5403    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5404        OVERRIDE.with(|bs| {
5405            bs.borrow_mut()
5406                .insert(package_id, PackageOverrideConfig::Global(modules))
5407        });
5408    }
5409
5410    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5411        OVERRIDE.with(|bs| {
5412            bs.borrow_mut()
5413                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5414        });
5415    }
5416
5417    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5418        OVERRIDE.with(|cfg| {
5419            cfg.borrow().get(package_id).and_then(|entry| match entry {
5420                PackageOverrideConfig::Global(framework) => {
5421                    Some(compiled_modules_to_bytes(framework))
5422                }
5423                PackageOverrideConfig::PerValidator(func) => {
5424                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
5425                }
5426            })
5427        })
5428    }
5429
5430    pub fn get_override_modules(
5431        package_id: &ObjectID,
5432        name: AuthorityName,
5433    ) -> Option<Vec<CompiledModule>> {
5434        OVERRIDE.with(|cfg| {
5435            cfg.borrow().get(package_id).and_then(|entry| match entry {
5436                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5437                PackageOverrideConfig::PerValidator(func) => func(name),
5438            })
5439        })
5440    }
5441
5442    pub fn get_override_system_package(
5443        package_id: &ObjectID,
5444        name: AuthorityName,
5445    ) -> Option<SystemPackage> {
5446        let bytes = get_override_bytes(package_id, name)?;
5447        let dependencies = if is_system_package(*package_id) {
5448            BuiltInFramework::get_package_by_id(package_id)
5449                .dependencies
5450                .to_vec()
5451        } else {
5452            // Assume that entirely new injected packages depend on all existing system
5453            // packages.
5454            BuiltInFramework::all_package_ids()
5455        };
5456        Some(SystemPackage {
5457            id: *package_id,
5458            bytes,
5459            dependencies,
5460        })
5461    }
5462
5463    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5464        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5465        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5466            cfg.borrow()
5467                .keys()
5468                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5469                .collect()
5470        });
5471
5472        extra
5473            .into_iter()
5474            .map(|package| SystemPackage {
5475                id: package,
5476                bytes: get_override_bytes(&package, name).unwrap(),
5477                dependencies: BuiltInFramework::all_package_ids(),
5478            })
5479            .collect()
5480    }
5481}
5482
5483#[derive(Debug, Serialize, Deserialize, Clone)]
5484pub struct ObjDumpFormat {
5485    pub id: ObjectID,
5486    pub version: VersionNumber,
5487    pub digest: ObjectDigest,
5488    pub object: Object,
5489}
5490
5491impl ObjDumpFormat {
5492    fn new(object: Object) -> Self {
5493        let oref = object.compute_object_reference();
5494        Self {
5495            id: oref.0,
5496            version: oref.1,
5497            digest: oref.2,
5498            object,
5499        }
5500    }
5501}
5502
5503#[derive(Debug, Serialize, Deserialize, Clone)]
5504pub struct NodeStateDump {
5505    pub tx_digest: TransactionDigest,
5506    pub sender_signed_data: SenderSignedData,
5507    pub executed_epoch: u64,
5508    pub reference_gas_price: u64,
5509    pub protocol_version: u64,
5510    pub epoch_start_timestamp_ms: u64,
5511    pub computed_effects: TransactionEffects,
5512    pub expected_effects_digest: TransactionEffectsDigest,
5513    pub relevant_system_packages: Vec<ObjDumpFormat>,
5514    pub shared_objects: Vec<ObjDumpFormat>,
5515    pub loaded_child_objects: Vec<ObjDumpFormat>,
5516    pub modified_at_versions: Vec<ObjDumpFormat>,
5517    pub runtime_reads: Vec<ObjDumpFormat>,
5518    pub input_objects: Vec<ObjDumpFormat>,
5519}
5520
5521impl NodeStateDump {
5522    pub fn new(
5523        tx_digest: &TransactionDigest,
5524        effects: &TransactionEffects,
5525        expected_effects_digest: TransactionEffectsDigest,
5526        object_store: &dyn ObjectStore,
5527        epoch_store: &Arc<AuthorityPerEpochStore>,
5528        inner_temporary_store: &InnerTemporaryStore,
5529        certificate: &VerifiedExecutableTransaction,
5530    ) -> IotaResult<Self> {
5531        // Epoch info
5532        let executed_epoch = epoch_store.epoch();
5533        let reference_gas_price = epoch_store.reference_gas_price();
5534        let epoch_start_config = epoch_store.epoch_start_config();
5535        let protocol_version = epoch_store.protocol_version().as_u64();
5536        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5537
5538        // Record all system packages at this version
5539        let mut relevant_system_packages = Vec::new();
5540        for sys_package_id in BuiltInFramework::all_package_ids() {
5541            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5542                relevant_system_packages.push(ObjDumpFormat::new(w))
5543            }
5544        }
5545
5546        // Record all the shared objects
5547        let mut shared_objects = Vec::new();
5548        for kind in effects.input_shared_objects() {
5549            match kind {
5550                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5551                    if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5552                        shared_objects.push(ObjDumpFormat::new(w))
5553                    }
5554                }
5555                InputSharedObject::ReadDeleted(..)
5556                | InputSharedObject::MutateDeleted(..)
5557                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
5558                                                           * objects. */
5559            }
5560        }
5561
5562        // Record all loaded child objects
5563        // Child objects which are read but not mutated are not tracked anywhere else
5564        let mut loaded_child_objects = Vec::new();
5565        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5566            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5567                loaded_child_objects.push(ObjDumpFormat::new(w))
5568            }
5569        }
5570
5571        // Record all modified objects
5572        let mut modified_at_versions = Vec::new();
5573        for (id, ver) in effects.modified_at_versions() {
5574            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5575                modified_at_versions.push(ObjDumpFormat::new(w))
5576            }
5577        }
5578
5579        // Packages read at runtime, which were not previously loaded into the temoorary
5580        // store Some packages may be fetched at runtime and wont show up in
5581        // input objects
5582        let mut runtime_reads = Vec::new();
5583        for obj in inner_temporary_store
5584            .runtime_packages_loaded_from_db
5585            .values()
5586        {
5587            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
5588        }
5589
5590        // All other input objects should already be in `inner_temporary_store.objects`
5591
5592        Ok(Self {
5593            tx_digest: *tx_digest,
5594            executed_epoch,
5595            reference_gas_price,
5596            epoch_start_timestamp_ms,
5597            protocol_version,
5598            relevant_system_packages,
5599            shared_objects,
5600            loaded_child_objects,
5601            modified_at_versions,
5602            runtime_reads,
5603            sender_signed_data: certificate.clone().into_message(),
5604            input_objects: inner_temporary_store
5605                .input_objects
5606                .values()
5607                .map(|o| ObjDumpFormat::new(o.clone()))
5608                .collect(),
5609            computed_effects: effects.clone(),
5610            expected_effects_digest,
5611        })
5612    }
5613
5614    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
5615        let mut objects = Vec::new();
5616        objects.extend(self.relevant_system_packages.clone());
5617        objects.extend(self.shared_objects.clone());
5618        objects.extend(self.loaded_child_objects.clone());
5619        objects.extend(self.modified_at_versions.clone());
5620        objects.extend(self.runtime_reads.clone());
5621        objects.extend(self.input_objects.clone());
5622        objects
5623    }
5624
5625    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
5626        let file_name = format!(
5627            "{}_{}_NODE_DUMP.json",
5628            self.tx_digest,
5629            AuthorityState::unixtime_now_ms()
5630        );
5631        let mut path = path.to_path_buf();
5632        path.push(&file_name);
5633        let mut file = File::create(path.clone())?;
5634        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
5635        Ok(path)
5636    }
5637
5638    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
5639        let file = File::open(path)?;
5640        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
5641    }
5642}