Skip to main content

iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::TxLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage as FrameworkSystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::{
48    Address, EndOfEpochTransactionKind, Event, ExecutionStatus, ObjectId, Owner, RandomnessRound,
49    StructTag, TransactionExpiration, TransactionKind, TypeTag,
50    crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion},
51    gas::GasCostSummary,
52};
53use iota_storage::{
54    key_value_store::{
55        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
56    },
57    key_value_store_metrics::KeyValueStoreMetrics,
58};
59#[cfg(msim)]
60use iota_types::committee::CommitteeTrait;
61use iota_types::{
62    account_abstraction::{
63        account::AuthenticatorFunctionRefV1Key,
64        authenticator_function::{
65            AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
66            AuthenticatorFunctionRefV1, extract_auth_fun_refs,
67        },
68    },
69    auth_context::AuthContextData,
70    base_types::{
71        AuthorityName, ConciseableName, ObjectInfo, ObjectRef, ObjectType, SequenceNumber,
72        VersionNumber,
73    },
74    committee::{Committee, EpochId, ProtocolVersion},
75    crypto::{AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, Signer},
76    deny_list_v1::check_coin_deny_list_v1,
77    digests::{ChainIdentifier, Digest, ObjectDigest, TransactionDigest, TransactionEffectsDigest},
78    dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
79    effects::{
80        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
81        TransactionEffectsExt, TransactionEvents, VerifiedSignedTransactionEffects,
82    },
83    error::{ExecutionError, IotaError, IotaResult, UserInputError},
84    event::{EventID, SystemEpochInfoEvent},
85    executable_transaction::VerifiedExecutableTransaction,
86    execution_config_utils::to_binary_config,
87    fp_ensure,
88    gas::IotaGasStatus,
89    gas_coin::NANOS_PER_IOTA,
90    inner_temporary_store::{
91        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
92        WrittenObjects,
93    },
94    iota_sdk_types_conversions::type_tag_core_to_sdk,
95    iota_system_state::{
96        IotaSystemState, IotaSystemStateTrait,
97        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
98    },
99    layout_resolver::{LayoutResolver, into_struct_layout},
100    message_envelope::Message,
101    messages_checkpoint::{
102        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
103        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
104        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
105        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
106    },
107    messages_consensus::AuthorityCapabilitiesV1,
108    messages_grpc::{
109        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
110        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
111        TransactionStatus,
112    },
113    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
114    move_authenticator::{MoveAuthenticator, MoveAuthenticatorExt},
115    object::{
116        MoveObject, MoveObjectExt, OBJECT_START_VERSION, Object, ObjectRead, PastObjectRead,
117        bounded_visitor::BoundedVisitor,
118    },
119    storage::{
120        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
121    },
122    supported_protocol_versions::{
123        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
124    },
125    traffic_control::{PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams},
126    transaction::*,
127    transaction_executor::{SimulateTransactionResult, VmChecks},
128};
129use itertools::Itertools;
130use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
131use move_core_types::{
132    account_address::AccountAddress, annotated_value::MoveStructLayout, language_storage::ModuleId,
133};
134use parking_lot::Mutex;
135use prometheus_filtered::{
136    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
137    register_histogram_vec_with_registry, register_histogram_with_registry,
138    register_int_counter_vec_with_registry, register_int_counter_with_registry,
139    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
140};
141use serde::{Deserialize, Serialize, de::DeserializeOwned};
142use tap::TapFallible;
143use tokio::{
144    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
145    task::JoinHandle,
146};
147use tracing::{debug, error, info, instrument, trace, warn};
148use typed_store::TypedStoreError;
149
150use self::{
151    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
152};
153#[cfg(msim)]
154pub use crate::checkpoints::checkpoint_executor::utils::{
155    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
156};
157use crate::{
158    authority::{
159        authority_per_epoch_store::{AuthorityPerEpochStore, TxGuard},
160        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
161        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
162        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
163        authority_store_tables::AuthorityPrunerTables,
164        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
165    },
166    authority_client::NetworkAuthorityClient,
167    checkpoint_progress_tracker::CheckpointProgressTracker,
168    checkpoints::CheckpointStore,
169    congestion_tracker::CongestionTracker,
170    consensus_adapter::ConsensusAdapter,
171    epoch::committee_store::CommitteeStore,
172    execution_cache::{
173        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
174        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
175        TransactionCacheRead,
176    },
177    execution_driver::execution_process,
178    global_state_hasher::{GlobalStateHashStore, GlobalStateHasher},
179    grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
180    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
181    metrics::{LatencyObserver, RateTracker},
182    module_cache_metrics::ResolverMetrics,
183    overload_monitor::{
184        AuthorityOverloadInfo, compute_graduated_load_shedding_percentage,
185        overload_monitor_accept_tx,
186    },
187    stake_aggregator::StakeAggregator,
188    subscription_handler::SubscriptionHandler,
189    traffic_controller::{TrafficController, metrics::TrafficControllerMetrics},
190    transaction_input_loader::TransactionInputLoader,
191    transaction_manager::TransactionManager,
192    transaction_outputs::TransactionOutputs,
193    validator_tx_finalizer::ValidatorTxFinalizer,
194    verify_indexes::verify_indexes,
195};
196
197#[cfg(test)]
198#[path = "unit_tests/authority_tests.rs"]
199pub mod authority_tests;
200
201#[cfg(test)]
202#[path = "unit_tests/transaction_tests.rs"]
203pub mod transaction_tests;
204
205#[cfg(test)]
206#[path = "unit_tests/batch_transaction_tests.rs"]
207mod batch_transaction_tests;
208
209#[cfg(test)]
210#[path = "unit_tests/move_integration_tests.rs"]
211pub mod move_integration_tests;
212
213#[cfg(test)]
214#[path = "unit_tests/gas_tests.rs"]
215mod gas_tests;
216
217#[cfg(test)]
218#[path = "unit_tests/batch_verification_tests.rs"]
219mod batch_verification_tests;
220
221#[cfg(test)]
222#[path = "unit_tests/coin_deny_list_tests.rs"]
223mod coin_deny_list_tests;
224
225#[cfg(test)]
226#[path = "unit_tests/auth_unit_test_utils.rs"]
227pub mod auth_unit_test_utils;
228
229pub mod authority_test_utils;
230
231pub mod authority_per_epoch_store;
232pub mod authority_per_epoch_store_pruner;
233
234mod authority_store_migrations;
235pub mod authority_store_pruner;
236pub mod authority_store_tables;
237pub mod authority_store_types;
238pub mod epoch_start_configuration;
239pub mod shared_object_congestion_tracker;
240pub mod shared_object_version_manager;
241pub mod suggested_gas_price_calculator;
242pub mod test_authority_builder;
243pub mod transaction_deferral;
244
245pub(crate) mod authority_store;
246pub mod backpressure;
247pub(crate) mod dropped_tx_status_cache;
248
249/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
250pub struct AuthorityMetrics {
251    tx_orders: IntCounter,
252    total_certs: IntCounter,
253    total_cert_attempts: IntCounter,
254    total_effects: IntCounter,
255    pub shared_obj_tx: IntCounter,
256    sponsored_tx: IntCounter,
257    tx_already_processed: IntCounter,
258    num_input_objs: Histogram,
259    num_shared_objects: Histogram,
260    batch_size: Histogram,
261
262    authority_state_handle_transaction_latency: Histogram,
263
264    execute_certificate_latency_single_writer: Histogram,
265    execute_certificate_latency_shared_object: Histogram,
266
267    internal_execution_latency: Histogram,
268    execution_load_input_objects_latency: Histogram,
269    prepare_certificate_latency: Histogram,
270    commit_certificate_latency: Histogram,
271    db_checkpoint_latency: Histogram,
272
273    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
274    pub(crate) transaction_manager_num_missing_objects: IntGauge,
275    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
276    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
277    pub(crate) transaction_manager_num_ready: IntGauge,
278    pub(crate) transaction_manager_object_cache_size: IntGauge,
279    pub(crate) transaction_manager_object_cache_hits: IntCounter,
280    pub(crate) transaction_manager_object_cache_misses: IntCounter,
281    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
282    pub(crate) transaction_manager_package_cache_size: IntGauge,
283    pub(crate) transaction_manager_package_cache_hits: IntCounter,
284    pub(crate) transaction_manager_package_cache_misses: IntCounter,
285    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
286    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
287
288    pub(crate) execution_driver_executed_transactions: IntCounter,
289    pub(crate) execution_driver_dispatch_queue: IntGauge,
290    pub(crate) execution_queueing_delay_s: Histogram,
291    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
292    pub(crate) execution_gas_latency_ratio: Histogram,
293
294    pub(crate) skipped_consensus_txns: IntCounter,
295    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
296
297    pub(crate) authority_overload_status: IntGauge,
298    /// Percentage of transactions shed due to consensus queue length.
299    pub(crate) consensus_queue_load_shedding_percentage: IntGauge,
300    /// This authority's locally computed load shedding percentage, taken as the
301    /// max of its latency/rate-based, transaction-manager-queue-based, and
302    /// writeback-cache-backpressure signals.
303    pub(crate) local_post_consensus_load_shedding_percentage: IntGauge,
304
305    pub(crate) transaction_overload_sources: IntCounterVec,
306
307    // Post processing metrics
308    post_processing_total_events_emitted: IntCounter,
309    post_processing_total_tx_indexed: IntCounter,
310    post_processing_total_tx_had_event_processed: IntCounter,
311    post_processing_total_failures: IntCounter,
312
313    // Consensus handler metrics
314    pub consensus_handler_processed: IntCounterVec,
315    pub consensus_handler_transaction_sizes: HistogramVec,
316    pub consensus_handler_num_low_scoring_authorities: IntGauge,
317    pub consensus_handler_scores: IntGaugeVec,
318    pub consensus_handler_deferred_transactions: IntCounter,
319    pub consensus_handler_congested_transactions: IntCounter,
320    pub consensus_handler_cancelled_transactions: IntCounter,
321    /// Number of user transactions dropped during a consensus commit because
322    /// post-consensus conflict/lock validation rejected them. Distinct from
323    /// `consensus_handler_load_shedding_dropped_transactions`.
324    pub consensus_handler_validation_dropped_transactions: IntCounter,
325    /// Number of user transactions dropped during a consensus commit by
326    /// post-consensus load shedding, i.e. probabilistically rejected at the
327    /// quorum `consensus_handler_load_shedding_percentage` rate.
328    pub consensus_handler_load_shedding_dropped_transactions: IntCounter,
329    /// Stake-weighted quorum (2f+1) load shedding percentage enforced on user
330    /// transactions in the most recent consensus commit. This is the cluster
331    /// value actually applied post-consensus, as opposed to this authority's
332    /// own `authority_load_shedding_percentage`. 0 when the P-COOL flow is
333    /// disabled.
334    pub consensus_handler_load_shedding_percentage: IntGauge,
335    pub consensus_handler_max_object_costs: IntGaugeVec,
336    pub consensus_committed_subdags: IntCounterVec,
337    pub consensus_committed_messages: IntGaugeVec,
338    pub consensus_committed_user_transactions: IntGaugeVec,
339    pub consensus_handler_leader_round: IntGauge,
340    pub consensus_calculated_throughput: IntGauge,
341    pub consensus_calculated_throughput_profile: IntGauge,
342
343    pub validator_scoreboard_scores: IntGaugeVec,
344    pub invalid_misbehavior_reports_by_authority: IntGaugeVec,
345
346    pub limits_metrics: Arc<LimitsMetrics>,
347
348    /// bytecode verifier metrics for tracking timeouts
349    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
350
351    /// Count of multisig signatures
352    pub multisig_sig_count: IntCounter,
353
354    // Tracks recent average txn queueing delay between when it is ready for execution
355    // until it starts executing.
356    pub execution_queueing_latency: LatencyObserver,
357
358    // Tracks the rate of transactions become ready for execution in transaction manager.
359    // The need for the Mutex is that the tracker is updated in transaction manager and read
360    // in the overload_monitor. There should be low mutex contention because
361    // transaction manager is single threaded and the read rate in overload_monitor is
362    // low. In the case where transaction manager becomes multi-threaded, we can
363    // create one rate tracker per thread.
364    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
365
366    // Tracks the rate of transactions starts execution in execution driver.
367    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
368    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
369}
370
371// Override default Prom buckets for positive numbers in 0-10M range
372const POSITIVE_INT_BUCKETS: &[f64] = &[
373    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
374    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
375    7000000., 10000000.,
376];
377
378const LATENCY_SEC_BUCKETS: &[f64] = &[
379    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
380    10., 20., 30., 60., 90.,
381];
382
383// Buckets for low latency samples. Starts from 10us.
384const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
385    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
386    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
387];
388
389const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
390    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
391    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
392];
393
394/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
395pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
396
397impl AuthorityMetrics {
398    pub fn new(registry: &prometheus_filtered::Registry) -> AuthorityMetrics {
399        let execute_certificate_latency = register_histogram_vec_with_registry!(
400            "authority_state_execute_certificate_latency",
401            "Latency of executing certificates, including waiting for inputs",
402            &["tx_type"],
403            LATENCY_SEC_BUCKETS.to_vec(),
404            registry,
405        )
406        .unwrap();
407
408        let execute_certificate_latency_single_writer =
409            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
410        let execute_certificate_latency_shared_object =
411            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
412
413        Self {
414            tx_orders: register_int_counter_with_registry!(
415                "total_transaction_orders",
416                "Total number of transaction orders",
417                registry,
418            )
419                .unwrap(),
420            total_certs: register_int_counter_with_registry!(
421                "total_transaction_certificates",
422                "Total number of transaction certificates handled",
423                registry,
424            )
425                .unwrap(),
426            total_cert_attempts: register_int_counter_with_registry!(
427                "total_handle_certificate_attempts",
428                "Number of calls to handle_certificate",
429                registry,
430            )
431                .unwrap(),
432            // total_effects == total transactions finished
433            total_effects: register_int_counter_with_registry!(
434                "total_transaction_effects",
435                "Total number of transaction effects produced",
436                registry,
437            )
438                .unwrap(),
439
440            shared_obj_tx: register_int_counter_with_registry!(
441                "num_shared_obj_tx",
442                "Number of transactions involving shared objects",
443                registry,
444            )
445                .unwrap(),
446
447            sponsored_tx: register_int_counter_with_registry!(
448                "num_sponsored_tx",
449                "Number of sponsored transactions",
450                registry,
451            )
452                .unwrap(),
453
454            tx_already_processed: register_int_counter_with_registry!(
455                "num_tx_already_processed",
456                "Number of transaction orders already processed previously",
457                registry,
458            )
459                .unwrap(),
460            num_input_objs: register_histogram_with_registry!(
461                "num_input_objects",
462                "Distribution of number of input TX objects per TX",
463                POSITIVE_INT_BUCKETS.to_vec(),
464                registry,
465            )
466                .unwrap(),
467            num_shared_objects: register_histogram_with_registry!(
468                "num_shared_objects",
469                "Number of shared input objects per TX",
470                POSITIVE_INT_BUCKETS.to_vec(),
471                registry,
472            )
473                .unwrap(),
474            batch_size: register_histogram_with_registry!(
475                "batch_size",
476                "Distribution of size of transaction batch",
477                POSITIVE_INT_BUCKETS.to_vec(),
478                registry,
479            )
480                .unwrap(),
481            authority_state_handle_transaction_latency: register_histogram_with_registry!(
482                "authority_state_handle_transaction_latency",
483                "Latency of handling transactions",
484                LATENCY_SEC_BUCKETS.to_vec(),
485                registry,
486            )
487                .unwrap(),
488            execute_certificate_latency_single_writer,
489            execute_certificate_latency_shared_object,
490            internal_execution_latency: register_histogram_with_registry!(
491                "authority_state_internal_execution_latency",
492                "Latency of actual certificate executions",
493                LATENCY_SEC_BUCKETS.to_vec(),
494                registry,
495            )
496                .unwrap(),
497            execution_load_input_objects_latency: register_histogram_with_registry!(
498                "authority_state_execution_load_input_objects_latency",
499                "Latency of loading input objects for execution",
500                LOW_LATENCY_SEC_BUCKETS.to_vec(),
501                registry,
502            )
503                .unwrap(),
504            prepare_certificate_latency: register_histogram_with_registry!(
505                "authority_state_prepare_certificate_latency",
506                "Latency of executing certificates, before committing the results",
507                LATENCY_SEC_BUCKETS.to_vec(),
508                registry,
509            )
510                .unwrap(),
511            commit_certificate_latency: register_histogram_with_registry!(
512                "authority_state_commit_certificate_latency",
513                "Latency of committing certificate execution results",
514                LATENCY_SEC_BUCKETS.to_vec(),
515                registry,
516            )
517                .unwrap(),
518            db_checkpoint_latency: register_histogram_with_registry!(
519                "db_checkpoint_latency",
520                "Latency of checkpointing dbs",
521                LATENCY_SEC_BUCKETS.to_vec(),
522                registry,
523            ).unwrap(),
524            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
525                "transaction_manager_num_enqueued_certificates",
526                "Current number of certificates enqueued to TransactionManager",
527                &["result"],
528                registry,
529            )
530                .unwrap(),
531            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
532                "transaction_manager_num_missing_objects",
533                "Current number of missing objects in TransactionManager",
534                registry,
535            )
536                .unwrap(),
537            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
538                "transaction_manager_num_pending_certificates",
539                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
540                registry,
541            )
542                .unwrap(),
543            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
544                "transaction_manager_num_executing_certificates",
545                "Number of executing certificates, including queued and actually running certificates",
546                registry,
547            )
548                .unwrap(),
549            transaction_manager_num_ready: register_int_gauge_with_registry!(
550                "transaction_manager_num_ready",
551                "Number of ready transactions in TransactionManager",
552                registry,
553            )
554                .unwrap(),
555            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
556                "transaction_manager_object_cache_size",
557                "Current size of object-availability cache in TransactionManager",
558                registry,
559            )
560                .unwrap(),
561            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
562                "transaction_manager_object_cache_hits",
563                "Number of object-availability cache hits in TransactionManager",
564                registry,
565            )
566                .unwrap(),
567            authority_overload_status: register_int_gauge_with_registry!(
568                "authority_overload_status",
569                "Whether authority is current experiencing overload and enters load shedding mode.",
570                registry)
571                .unwrap(),
572            local_post_consensus_load_shedding_percentage: register_int_gauge_with_registry!(
573                "authority_load_shedding_percentage",
574                "This authority's locally computed load shedding percentage. In the P-COOL flow this is the value broadcast to peers, not necessarily the rate enforced (see consensus_handler_load_shedding_percentage).",
575                registry)
576                .unwrap(),
577            consensus_queue_load_shedding_percentage: register_int_gauge_with_registry!(
578                "consensus_queue_load_shedding_percentage",
579                "Percentage of transactions shed due to consensus queue length. Separate admission-control signal, not an input to authority_load_shedding_percentage.",
580                registry)
581                .unwrap(),
582            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
583                "transaction_manager_object_cache_misses",
584                "Number of object-availability cache misses in TransactionManager",
585                registry,
586            )
587                .unwrap(),
588            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
589                "transaction_manager_object_cache_evictions",
590                "Number of object-availability cache evictions in TransactionManager",
591                registry,
592            )
593                .unwrap(),
594            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
595                "transaction_manager_package_cache_size",
596                "Current size of package-availability cache in TransactionManager",
597                registry,
598            )
599                .unwrap(),
600            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
601                "transaction_manager_package_cache_hits",
602                "Number of package-availability cache hits in TransactionManager",
603                registry,
604            )
605                .unwrap(),
606            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
607                "transaction_manager_package_cache_misses",
608                "Number of package-availability cache misses in TransactionManager",
609                registry,
610            )
611                .unwrap(),
612            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
613                "transaction_manager_package_cache_evictions",
614                "Number of package-availability cache evictions in TransactionManager",
615                registry,
616            )
617                .unwrap(),
618            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
619                "transaction_manager_transaction_queue_age_s",
620                "Time spent in waiting for transaction in the queue",
621                LATENCY_SEC_BUCKETS.to_vec(),
622                registry,
623            )
624                .unwrap(),
625            transaction_overload_sources: register_int_counter_vec_with_registry!(
626                "transaction_overload_sources",
627                "Number of times each source indicates transaction overload.",
628                &["source"],
629                registry)
630                .unwrap(),
631            execution_driver_executed_transactions: register_int_counter_with_registry!(
632                "execution_driver_executed_transactions",
633                "Cumulative number of transaction executed by execution driver",
634                registry,
635            )
636                .unwrap(),
637            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
638                "execution_driver_dispatch_queue",
639                "Number of transaction pending in execution driver dispatch queue",
640                registry,
641            )
642                .unwrap(),
643            execution_queueing_delay_s: register_histogram_with_registry!(
644                "execution_queueing_delay_s",
645                "Queueing delay between a transaction is ready for execution until it starts executing.",
646                LATENCY_SEC_BUCKETS.to_vec(),
647                registry
648            )
649                .unwrap(),
650            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
651                "prepare_cert_gas_latency_ratio",
652                "The ratio of computation gas divided by VM execution latency.",
653                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
654                registry
655            )
656                .unwrap(),
657            execution_gas_latency_ratio: register_histogram_with_registry!(
658                "execution_gas_latency_ratio",
659                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
660                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
661                registry
662            )
663                .unwrap(),
664            skipped_consensus_txns: register_int_counter_with_registry!(
665                "skipped_consensus_txns",
666                "Total number of consensus transactions skipped",
667                registry,
668            )
669                .unwrap(),
670            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
671                "skipped_consensus_txns_cache_hit",
672                "Total number of consensus transactions skipped because of local cache hit",
673                registry,
674            )
675                .unwrap(),
676            post_processing_total_events_emitted: register_int_counter_with_registry!(
677                "post_processing_total_events_emitted",
678                "Total number of events emitted in post processing",
679                registry,
680            )
681                .unwrap(),
682            post_processing_total_tx_indexed: register_int_counter_with_registry!(
683                "post_processing_total_tx_indexed",
684                "Total number of txes indexed in post processing",
685                registry,
686            )
687                .unwrap(),
688            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
689                "post_processing_total_tx_had_event_processed",
690                "Total number of txes finished event processing in post processing",
691                registry,
692            )
693                .unwrap(),
694            post_processing_total_failures: register_int_counter_with_registry!(
695                "post_processing_total_failures",
696                "Total number of failure in post processing",
697                registry,
698            )
699                .unwrap(),
700            consensus_handler_processed: register_int_counter_vec_with_registry!(
701                "consensus_handler_processed",
702                "Number of transactions processed by consensus handler",
703                &["class"],
704                registry
705            ).unwrap(),
706            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
707                "consensus_handler_transaction_sizes",
708                "Sizes of each type of transactions processed by consensus handler",
709                &["class"],
710                POSITIVE_INT_BUCKETS.to_vec(),
711                registry
712            ).unwrap(),
713            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
714                "consensus_handler_num_low_scoring_authorities",
715                "Number of low scoring authorities based on reputation scores from consensus",
716                registry
717            ).unwrap(),
718            consensus_handler_scores: register_int_gauge_vec_with_registry!(
719                "consensus_handler_scores",
720                "scores from consensus for each authority",
721                &["authority"],
722                registry,
723            ).unwrap(),
724            validator_scoreboard_scores: register_int_gauge_vec_with_registry!(
725                "validator_scoreboard_scores",
726                "Per-authority validator scores published by the local Scoreboard after each consensus commit. Range [0, MAX_SCORE].",
727                &["authority"],
728                registry,
729            ).unwrap(),
730            invalid_misbehavior_reports_by_authority: register_int_gauge_vec_with_registry!(
731                "invalid_misbehavior_reports_by_authority",
732                "Cumulative count of invalid misbehavior reports received from each reporting authority in the current epoch. Bumped when a `MisbehaviorReport` consensus transaction fails sender/authority match or payload validation. Snapshot republished after each consensus commit.",
733                &["authority"],
734                registry,
735            ).unwrap(),
736            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
737                "consensus_handler_deferred_transactions",
738                "Number of transactions deferred by consensus handler",
739                registry,
740            ).unwrap(),
741            consensus_handler_congested_transactions: register_int_counter_with_registry!(
742                "consensus_handler_congested_transactions",
743                "Number of transactions deferred by consensus handler due to congestion",
744                registry,
745            ).unwrap(),
746            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
747                "consensus_handler_cancelled_transactions",
748                "Number of transactions cancelled by consensus handler",
749                registry,
750            ).unwrap(),
751            consensus_handler_validation_dropped_transactions: register_int_counter_with_registry!(
752                "consensus_handler_validation_dropped_transactions",
753                "Number of UserTransactionV1 transactions dropped by post-consensus validation",
754                registry,
755            ).unwrap(),
756            consensus_handler_load_shedding_dropped_transactions: register_int_counter_with_registry!(
757                "consensus_handler_load_shedding_dropped_transactions",
758                "Number of user transactions dropped by post-consensus load shedding, based on the quorum load shedding percentage",
759                registry,
760            ).unwrap(),
761            consensus_handler_load_shedding_percentage: register_int_gauge_with_registry!(
762                "consensus_handler_load_shedding_percentage",
763                "Stake-weighted quorum (2f+1) load shedding percentage enforced on user transactions in the most recent consensus commit. 0 when the P-COOL flow is disabled.",
764                registry,
765            ).unwrap(),
766            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
767                "consensus_handler_max_congestion_control_object_costs",
768                "Max object costs for congestion control in the current consensus commit",
769                &["commit_type"],
770                registry,
771            ).unwrap(),
772            consensus_committed_subdags: register_int_counter_vec_with_registry!(
773                "consensus_committed_subdags",
774                "Number of committed subdags, sliced by leader",
775                &["authority"],
776                registry,
777            ).unwrap(),
778            consensus_committed_messages: register_int_gauge_vec_with_registry!(
779                "consensus_committed_messages",
780                "Total number of committed consensus messages, sliced by author",
781                &["authority"],
782                registry,
783            ).unwrap(),
784            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
785                "consensus_committed_user_transactions",
786                "Number of committed user transactions, sliced by submitter",
787                &["authority"],
788                registry,
789            ).unwrap(),
790            consensus_handler_leader_round: register_int_gauge_with_registry!(
791                "consensus_handler_leader_round",
792                "The leader round of the current consensus output being processed in the consensus handler",
793                registry,
794            ).unwrap(),
795            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
796            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
797            multisig_sig_count: register_int_counter_with_registry!(
798                "multisig_sig_count",
799                "Count of multisig signatures",
800                registry,
801            )
802                .unwrap(),
803            consensus_calculated_throughput: register_int_gauge_with_registry!(
804                "consensus_calculated_throughput",
805                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
806                registry,
807            ).unwrap(),
808            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
809                "consensus_calculated_throughput_profile",
810                "The current active calculated throughput profile",
811                registry
812            ).unwrap(),
813            execution_queueing_latency: LatencyObserver::new(),
814            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
815            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
816        }
817    }
818
819    /// Reset metrics that contain `hostname` as one of the labels. This is
820    /// needed to avoid retaining metrics for long-gone committee members and
821    /// only exposing metrics for the committee in the current epoch.
822    pub fn reset_on_reconfigure(&self) {
823        self.consensus_committed_messages.reset();
824        self.consensus_handler_scores.reset();
825        self.validator_scoreboard_scores.reset();
826        self.invalid_misbehavior_reports_by_authority.reset();
827        self.consensus_committed_user_transactions.reset();
828    }
829}
830
831/// a Trait object for `Signer` that is:
832/// - Pin, i.e. confined to one place in memory (we don't want to copy private
833///   keys).
834/// - Sync, i.e. can be safely shared between threads.
835///
836/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
837pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
838
839pub struct AuthorityState {
840    // Fixed size, static, identity of the authority
841    /// The name of this authority.
842    pub name: AuthorityName,
843    /// The signature key of the authority.
844    pub secret: StableSyncAuthoritySigner,
845
846    /// The database
847    input_loader: TransactionInputLoader,
848    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
849
850    epoch_store: ArcSwap<AuthorityPerEpochStore>,
851
852    /// This lock denotes current 'execution epoch'.
853    /// Execution acquires read lock, checks transaction epoch and holds it
854    /// until all writes are complete. Reconfiguration acquires write lock,
855    /// changes the epoch and revert all transactions from previous epoch
856    /// that are executed but did not make into checkpoint.
857    execution_lock: RwLock<EpochId>,
858
859    pub indexes: Option<Arc<IndexStore>>,
860    pub grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
861
862    pub subscription_handler: Arc<SubscriptionHandler>,
863    pub checkpoint_store: Arc<CheckpointStore>,
864
865    committee_store: Arc<CommitteeStore>,
866
867    /// Manages pending transactions and their missing input objects.
868    transaction_manager: Arc<TransactionManager>,
869
870    /// Shuts down the execution task. Used only in testing.
871    #[cfg_attr(not(test), expect(unused))]
872    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
873
874    pub metrics: Arc<AuthorityMetrics>,
875    _pruner: AuthorityStorePruner,
876    authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
877    checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
878
879    /// Take db checkpoints of different dbs
880    db_checkpoint_config: DBCheckpointConfig,
881
882    pub config: NodeConfig,
883
884    /// Current overload status in this authority. Updated periodically.
885    pub overload_info: AuthorityOverloadInfo,
886
887    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
888
889    /// The chain identifier is derived from the digest of the genesis
890    /// checkpoint.
891    chain_identifier: ChainIdentifier,
892
893    pub(crate) congestion_tracker: Arc<CongestionTracker>,
894
895    /// Traffic controller for IOTA core servers (json-rpc, validator service)
896    pub traffic_controller: Option<Arc<TrafficController>>,
897}
898
899/// The authority state encapsulates all state, drives execution, and ensures
900/// safety.
901///
902/// Note the authority operations can be accessed through a read ref (&) and do
903/// not require &mut. Internally a database is synchronized through a mutex
904/// lock.
905///
906/// Repeating valid commands should produce no changes and return no error.
907impl AuthorityState {
908    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
909        epoch_store.committee().authority_exists(&self.name)
910    }
911
912    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
913        epoch_store
914            .active_validators()
915            .iter()
916            .any(|a| AuthorityName::from(a) == self.name)
917    }
918
919    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
920        !self.is_committee_validator(epoch_store)
921    }
922
923    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
924        &self.committee_store
925    }
926
927    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
928        self.committee_store.clone()
929    }
930
931    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
932        &self.config.authority_overload_config
933    }
934
935    pub fn get_epoch_state_commitments(
936        &self,
937        epoch: EpochId,
938    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
939        self.checkpoint_store.get_epoch_state_commitments(epoch)
940    }
941
942    /// Runs deny list, input object validation, gas checks, coin deny list, and
943    /// MoveAuthenticator checks. Returns the owned object refs for optional
944    /// version validation. Does NOT acquire locks or sign the transaction.
945    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
946    pub(crate) async fn handle_transaction_validation_checks(
947        &self,
948        transaction: &VerifiedTransaction,
949        epoch_store: &Arc<AuthorityPerEpochStore>,
950    ) -> IotaResult<Vec<ObjectRef>> {
951        let protocol_config = epoch_store.protocol_config();
952        let reference_gas_price = epoch_store.reference_gas_price();
953
954        let epoch = epoch_store.epoch();
955
956        let tx_data = transaction.data().transaction_data();
957
958        // Note: the deny checks may do redundant package loads but:
959        // - they only load packages when there is an active package deny map
960        // - the loads are cached anyway
961        iota_transaction_checks::deny::check_transaction_for_validation(
962            tx_data,
963            transaction.tx_signatures(),
964            &transaction.input_objects()?,
965            &tx_data.receiving_objects(),
966            &self.config.transaction_deny_config,
967            self.get_backing_package_store().as_ref(),
968        )?;
969
970        // Load all transaction-related input objects including ones for every
971        // `MoveAuthenticator`. Loading all objects eagerly means that any invalid
972        // reference — missing object, wrong version, inaccessible object — causes a
973        // pre-consensus rejection.
974        let (tx_input_objects, tx_receiving_objects, per_authenticator_inputs) =
975            self.read_objects_for_validation(transaction, epoch)?;
976
977        let move_authenticators = transaction.move_authenticators();
978
979        // Check the inputs for signing.
980        // If there are `MoveAuthenticator` signatures, their input objects and the
981        // account objects are also checked and must be provided.
982        // It is also checked if there is enough gas to execute the transaction and its
983        // authenticators.
984        let (gas_status, tx_checked_input_objects, per_authenticator_checked_inputs) = self
985            .check_transaction_inputs_for_validation(
986                protocol_config,
987                reference_gas_price,
988                tx_data,
989                tx_input_objects,
990                &tx_receiving_objects,
991                &move_authenticators,
992                per_authenticator_inputs,
993            )?;
994
995        // Get the input objects for the authenticators, if there are
996        // `MoveAuthenticator`s.
997        let per_authenticator_checked_input_objects = per_authenticator_checked_inputs
998            .iter()
999            .map(|i| &i.0)
1000            .collect();
1001
1002        // Check if any of the sender, the transaction input objects, the receiving
1003        // objects and the authenticator input objects are in the coin deny
1004        // list, which would prevent the transaction from being signed.
1005        check_coin_deny_list_v1(
1006            tx_data.sender(),
1007            &tx_checked_input_objects,
1008            &tx_receiving_objects,
1009            &per_authenticator_checked_input_objects,
1010            &self.get_object_store(),
1011        )?;
1012
1013        let (kind, signer, gas_data) = tx_data.execution_parts();
1014
1015        let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
1016            extract_auth_fun_refs(signer, gas_data.owner, |address| {
1017                move_authenticators
1018                    .iter()
1019                    .zip(per_authenticator_checked_inputs.iter())
1020                    .find(|(move_authenticator, _)| move_authenticator.address() == address)
1021                    .map(|(_, (_, auth_fun_ref))| auth_fun_ref.clone())
1022            });
1023
1024        // Filter the authenticators and their checked inputs down to those that must
1025        // be executed pre-consensus. This is done *after* the deny-list check so
1026        // that all MoveAuthenticator input objects are covered by that check regardless
1027        // of deferral.
1028        let pre_consensus_move_authenticators =
1029            pre_consensus_move_authenticators(transaction, protocol_config);
1030        let (move_authenticators, per_authenticator_checked_inputs): (Vec<_>, Vec<_>) =
1031            move_authenticators
1032                .into_iter()
1033                .zip(per_authenticator_checked_inputs)
1034                .filter(|(a, _)| pre_consensus_move_authenticators.contains(a))
1035                .unzip();
1036        let per_authenticator_checked_input_objects: Vec<_> = per_authenticator_checked_inputs
1037            .iter()
1038            .map(|i| &i.0)
1039            .collect();
1040
1041        // If there are `MoveAuthenticator` signatures, execute them and check if they
1042        // all succeed.
1043        if !move_authenticators.is_empty() {
1044            let aggregated_authenticator_input_objects =
1045                iota_transaction_checks::aggregate_authenticator_input_objects(
1046                    &per_authenticator_checked_input_objects,
1047                )?;
1048
1049            debug_assert_eq!(
1050                move_authenticators.len(),
1051                per_authenticator_checked_inputs.len(),
1052                "Move authenticators amount must match the number of checked authenticator inputs"
1053            );
1054
1055            let move_authenticators = move_authenticators
1056                .into_iter()
1057                .zip(per_authenticator_checked_inputs)
1058                .map(
1059                    |(
1060                        move_authenticator,
1061                        (authenticator_checked_input_objects, authenticator_function_ref),
1062                    )| {
1063                        (
1064                            move_authenticator.to_owned(),
1065                            authenticator_function_ref,
1066                            authenticator_checked_input_objects,
1067                        )
1068                    },
1069                )
1070                .collect();
1071
1072            // It is supposed that `MoveAuthenticator` availability is checked in
1073            // `SenderSignedData::validity_check`.
1074
1075            // Serialize the TransactionData for the auth context before decomposing.
1076            let tx_data_bytes =
1077                bcs::to_bytes(&tx_data).expect("TransactionData serialization cannot fail");
1078
1079            let (sender_auth_digest, sponsor_auth_digest) =
1080                transaction.data().compute_auth_digests()?;
1081
1082            let auth_context_data = AuthContextData {
1083                transaction_data_bytes: tx_data_bytes,
1084                sender_auth_digest,
1085                sponsor_auth_digest,
1086                sender_authenticator_function_ref,
1087                sponsor_authenticator_function_ref,
1088            };
1089
1090            // Execute the Move authenticators.
1091            let validation_result = epoch_store.executor().authenticate_transaction(
1092                self.get_backing_store().as_ref(),
1093                protocol_config,
1094                self.metrics.limits_metrics.clone(),
1095                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1096                epoch_store
1097                    .epoch_start_config()
1098                    .epoch_data()
1099                    .epoch_start_timestamp(),
1100                gas_data,
1101                gas_status,
1102                move_authenticators,
1103                aggregated_authenticator_input_objects,
1104                kind,
1105                signer,
1106                transaction.digest().to_owned(),
1107                auth_context_data,
1108                &mut None,
1109            );
1110
1111            if let Err(validation_error) = validation_result {
1112                return Err(IotaError::MoveAuthenticatorExecutionFailure {
1113                    error: validation_error.to_string(),
1114                });
1115            }
1116        }
1117
1118        Ok(tx_checked_input_objects.inner().filter_owned_objects())
1119    }
1120
1121    /// This is a private method and should be kept that way. It doesn't check
1122    /// whether the provided transaction is a system transaction, and hence
1123    /// can only be called internally.
1124    async fn handle_transaction_impl(
1125        &self,
1126        transaction: VerifiedTransaction,
1127        epoch_store: &Arc<AuthorityPerEpochStore>,
1128    ) -> IotaResult<VerifiedSignedTransaction> {
1129        // Ensure that validator cannot reconfigure while we are signing the tx
1130        let _execution_lock = self.execution_lock_for_signing()?;
1131
1132        let owned_objects = self
1133            .handle_transaction_validation_checks(&transaction, epoch_store)
1134            .await?;
1135
1136        let epoch = epoch_store.epoch();
1137        let signed_transaction =
1138            VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
1139
1140        // Check and write locks, to signed transaction, into the database
1141        // The call to self.set_transaction_lock checks the lock is not conflicting,
1142        // and returns ConflictingTransaction error in case there is a lock on a
1143        // different existing transaction.
1144        self.get_cache_writer().try_acquire_transaction_locks(
1145            epoch_store,
1146            &owned_objects,
1147            signed_transaction.clone(),
1148        )?;
1149
1150        Ok(signed_transaction)
1151    }
1152
1153    /// Initiate a new transaction.
1154    #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()
1155    ))]
1156    pub async fn handle_transaction(
1157        &self,
1158        epoch_store: &Arc<AuthorityPerEpochStore>,
1159        transaction: VerifiedTransaction,
1160    ) -> IotaResult<HandleTransactionResponse> {
1161        let tx_digest = *transaction.digest();
1162        debug!("handle_transaction");
1163
1164        // Ensure an idempotent answer.
1165        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1166            return Ok(HandleTransactionResponse { status });
1167        }
1168
1169        let _metrics_guard = self
1170            .metrics
1171            .authority_state_handle_transaction_latency
1172            .start_timer();
1173        self.metrics.tx_orders.inc();
1174
1175        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1176        match signed {
1177            Ok(s) => {
1178                if self.is_committee_validator(epoch_store) {
1179                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1180                        let tx = s.clone();
1181                        let validator_tx_finalizer = validator_tx_finalizer.clone();
1182                        let cache_reader = self.get_transaction_cache_reader().clone();
1183                        let epoch_store = epoch_store.clone();
1184                        spawn_monitored_task!(epoch_store.within_alive_epoch(
1185                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1186                        ));
1187                    }
1188                }
1189                Ok(HandleTransactionResponse {
1190                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
1191                })
1192            }
1193            // It happens frequently that while we are checking the validity of the transaction, it
1194            // has just been executed.
1195            // In that case, we could still return Ok to avoid showing confusing errors.
1196            Err(err) => Ok(HandleTransactionResponse {
1197                status: self
1198                    .get_transaction_status(&tx_digest, epoch_store)?
1199                    .ok_or(err)?
1200                    .1,
1201            }),
1202        }
1203    }
1204
1205    pub fn check_system_overload_at_signing(&self) -> bool {
1206        self.config
1207            .authority_overload_config
1208            .check_system_overload_at_signing
1209    }
1210
1211    pub fn check_system_overload_at_execution(&self) -> bool {
1212        self.config
1213            .authority_overload_config
1214            .check_system_overload_at_execution
1215    }
1216
1217    /// Checks system overload conditions before accepting a transaction.
1218    ///
1219    /// In certificate-less (P-COOL) mode: only checks consensus
1220    /// queue overload, since execution-based overload will be handled
1221    /// post-consensus.
1222    ///
1223    /// In certificate mode: runs all checks — authority overload
1224    /// (execution latency), transaction manager (execution queue),
1225    /// consensus adapter (queue limit), and writeback cache backpressure.
1226    pub(crate) fn check_system_overload(
1227        &self,
1228        consensus_adapter: &Arc<ConsensusAdapter>,
1229        tx_data: &SenderSignedData,
1230        do_authority_overload_check: bool,
1231        pcool_flow_enabled: bool,
1232    ) -> IotaResult {
1233        if pcool_flow_enabled {
1234            // Graduated shedding: 0% to 100% as consensus queue fills from soft
1235            // to hard limit.
1236            self.check_consensus_queue_graduated_limits(consensus_adapter, tx_data)
1237                .tap_err(|_| {
1238                    self.update_overload_metrics("consensus");
1239                })?;
1240
1241            // NOTE: graduated shedding at 100% already rejects everything at or above
1242            // `max_pending_transactions`, so the queue-length part of the check below
1243            // is redundant but harmless. But `check_consensus_overload()` should be
1244            // kept here because it also verifies that `submit_semaphore` has permits
1245            // (see `check_consensus_hard_limits` in consensus_adapter.rs), which is a
1246            // separate concurrency limit not covered by the graduated shedding.
1247            consensus_adapter.check_consensus_overload().tap_err(|_| {
1248                self.update_overload_metrics("consensus");
1249            })?;
1250        } else {
1251            if do_authority_overload_check {
1252                self.check_authority_overload(tx_data).tap_err(|_| {
1253                    self.update_overload_metrics("execution_queue");
1254                })?;
1255            }
1256            self.transaction_manager
1257                .check_execution_overload(self.overload_config(), tx_data)
1258                .tap_err(|_| {
1259                    self.update_overload_metrics("execution_pending");
1260                })?;
1261            consensus_adapter.check_consensus_overload().tap_err(|_| {
1262                self.update_overload_metrics("consensus");
1263            })?;
1264
1265            let pending_tx_count = self
1266                .get_cache_commit()
1267                .approximate_pending_transaction_count();
1268            if pending_tx_count
1269                > self
1270                    .config
1271                    .execution_cache_config
1272                    .writeback_cache
1273                    .backpressure_threshold_for_rpc()
1274            {
1275                return Err(IotaError::ValidatorOverloadedRetryAfter {
1276                    retry_after_secs: 10,
1277                });
1278            }
1279        }
1280
1281        Ok(())
1282    }
1283
1284    /// Rejects `tx_data` via graduated shedding based on consensus queue
1285    /// length. Scales from 0% at the soft limit to 100% at
1286    /// `max_pending_transactions`. Returns `ValidatorOverloadedRetryAfter`
1287    /// for probabilistic rejection (shedding percentage < 100%, via
1288    /// `overload_monitor_accept_tx`) or `TooManyTransactionsPendingConsensus`
1289    /// for unconditional rejection (shedding percentage >= 100%). Updates
1290    /// `consensus_queue_load_shedding_percentage` metric.
1291    fn check_consensus_queue_graduated_limits(
1292        &self,
1293        consensus_adapter: &Arc<ConsensusAdapter>,
1294        tx_data: &SenderSignedData,
1295    ) -> IotaResult {
1296        let num_inflight_txs = consensus_adapter.num_inflight_transactions() as usize;
1297
1298        let shedding_pct = compute_graduated_load_shedding_percentage(
1299            num_inflight_txs,
1300            consensus_adapter.max_pending_transactions(),
1301            consensus_adapter.graduated_load_shedding_soft_limit_pct(),
1302        );
1303
1304        self.metrics
1305            .consensus_queue_load_shedding_percentage
1306            .set(shedding_pct as i64);
1307
1308        if shedding_pct == 0 {
1309            return Ok(());
1310        }
1311
1312        // At/above the hard limit, rejection is unconditional (not
1313        // probabilistic), so the seed-rotation retry hint of
1314        // `ValidatorOverloadedRetryAfter` doesn't apply - return the
1315        // capacity-bound error instead.
1316        if shedding_pct >= 100 {
1317            return Err(IotaError::TooManyTransactionsPendingConsensus);
1318        }
1319
1320        overload_monitor_accept_tx(shedding_pct, tx_data.digest())
1321    }
1322
1323    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1324        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1325            return Ok(());
1326        }
1327
1328        let load_shedding_percentage = self
1329            .overload_info
1330            .local_load_shedding_percentage
1331            .load(Ordering::Relaxed);
1332        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1333    }
1334
1335    fn update_overload_metrics(&self, source: &str) {
1336        self.metrics
1337            .transaction_overload_sources
1338            .with_label_values(&[source])
1339            .inc();
1340    }
1341
1342    /// Wait for a certificate to be executed.
1343    /// For consensus transactions, it needs to be sequenced by the consensus.
1344    /// For owned object transactions, this function will enqueue the
1345    /// transaction for execution.
1346    #[instrument(level = "trace", skip_all)]
1347    pub async fn wait_for_certificate_execution(
1348        &self,
1349        certificate: &VerifiedCertificate,
1350        epoch_store: &Arc<AuthorityPerEpochStore>,
1351    ) -> IotaResult<TransactionEffects> {
1352        let _metrics_guard = if certificate.contains_shared_object() {
1353            self.metrics
1354                .execute_certificate_latency_shared_object
1355                .start_timer()
1356        } else {
1357            self.metrics
1358                .execute_certificate_latency_single_writer
1359                .start_timer()
1360        };
1361        trace!("wait_for_certificate_execution");
1362
1363        self.metrics.total_cert_attempts.inc();
1364
1365        if !certificate.contains_shared_object() {
1366            // Shared object transactions need to be sequenced by the consensus before
1367            // enqueueing for execution, done in
1368            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1369            // object transactions, they can be enqueued for execution immediately.
1370            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1371        }
1372
1373        // tx could be reverted when epoch ends, so we must be careful not to return a
1374        // result here after the epoch ends.
1375        epoch_store
1376            .within_alive_epoch(self.notify_read_effects(certificate))
1377            .await
1378            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1379            .and_then(|r| r)
1380    }
1381
1382    /// Internal logic to execute a transaction.
1383    ///
1384    /// Guarantees that
1385    /// - If input objects are available, return no permanent failure.
1386    /// - Execution and output commit are atomic. i.e. outputs are only written
1387    ///   to storage,
1388    /// on successful execution; crashed execution has no observable effect and
1389    /// can be retried.
1390    ///
1391    /// It is caller's responsibility to ensure input objects are available and
1392    /// locks are set. If this cannot be satisfied by the caller,
1393    /// `wait_for_certificate_execution()` should be called instead.
1394    ///
1395    /// Should only be called within iota-core.
1396    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1397    pub fn try_execute_immediately(
1398        &self,
1399        transaction: &VerifiedExecutableTransaction,
1400        expected_effects_digest: Option<TransactionEffectsDigest>,
1401        epoch_store: &Arc<AuthorityPerEpochStore>,
1402    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1403        let _scope = monitored_scope("Execution::try_execute_immediately");
1404        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1405
1406        let tx_digest = transaction.digest();
1407
1408        // Acquire a lock to prevent concurrent executions of the same transaction.
1409        let tx_guard = epoch_store.acquire_tx_guard(transaction)?;
1410
1411        // The transaction could have been processed by a concurrent attempt of the
1412        // same transaction, so check if the effects have already been written.
1413        if let Some(effects) = self
1414            .get_transaction_cache_reader()
1415            .try_get_executed_effects(tx_digest)?
1416        {
1417            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1418                assert_eq!(
1419                    effects.digest(),
1420                    expected_effects_digest_inner,
1421                    "Unexpected effects digest for transaction {tx_digest}"
1422                );
1423            }
1424            tx_guard.release();
1425            return Ok((effects, None));
1426        }
1427
1428        let (tx_input_objects, per_authenticator_inputs) =
1429            self.read_objects_for_execution(tx_guard.as_lock_guard(), transaction, epoch_store)?;
1430
1431        // If no expected_effects_digest was provided, try to get it from storage.
1432        // We could be re-executing a previously executed but uncommitted transaction,
1433        // perhaps after restarting with a new binary. In this situation, if
1434        // we have published an effects signature, we must be sure not to
1435        // equivocate.
1436        let expected_effects_digest =
1437            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1438
1439        self.process_transaction(
1440            tx_guard,
1441            transaction,
1442            tx_input_objects,
1443            per_authenticator_inputs,
1444            expected_effects_digest,
1445            epoch_store,
1446        )
1447        .tap_err(|e| info!(?tx_digest, "process_transaction failed: {e}"))
1448        .tap_ok(
1449            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_transaction succeeded"),
1450        )
1451    }
1452
1453    pub fn read_objects_for_execution(
1454        &self,
1455        tx_lock: &TxLockGuard,
1456        transaction: &VerifiedExecutableTransaction,
1457        epoch_store: &Arc<AuthorityPerEpochStore>,
1458    ) -> IotaResult<(InputObjects, Vec<(InputObjects, ObjectReadResult)>)> {
1459        let _scope = monitored_scope("Execution::load_input_objects");
1460        let _metrics_guard = self
1461            .metrics
1462            .execution_load_input_objects_latency
1463            .start_timer();
1464
1465        let input_objects = transaction.collect_all_input_object_kind_for_reading()?;
1466
1467        let input_objects = self.input_loader.read_objects_for_execution(
1468            epoch_store,
1469            &transaction.key(),
1470            tx_lock,
1471            &input_objects,
1472            epoch_store.epoch(),
1473        )?;
1474
1475        transaction.split_input_objects_into_groups_for_reading(input_objects)
1476    }
1477
1478    /// Test only wrapper for `try_execute_immediately()` above, useful for
1479    /// checking errors if the pre-conditions are not satisfied, and
1480    /// executing change epoch transactions.
1481    pub fn try_execute_for_test(
1482        &self,
1483        certificate: &VerifiedCertificate,
1484    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1485        let epoch_store = self.epoch_store_for_testing();
1486        let (effects, execution_error_opt) = self.try_execute_immediately(
1487            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1488            None,
1489            &epoch_store,
1490        )?;
1491        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1492        Ok((signed_effects, execution_error_opt))
1493    }
1494
1495    /// Non-fallible version of `try_execute_for_test()`.
1496    pub fn execute_for_test(
1497        &self,
1498        certificate: &VerifiedCertificate,
1499    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1500        self.try_execute_for_test(certificate)
1501            .expect("try_execute_for_test should not fail")
1502    }
1503
1504    pub async fn notify_read_effects(
1505        &self,
1506        certificate: &VerifiedCertificate,
1507    ) -> IotaResult<TransactionEffects> {
1508        self.get_transaction_cache_reader()
1509            .try_notify_read_executed_effects(&[*certificate.digest()])
1510            .await
1511            .map(|mut r| r.pop().expect("must return correct number of effects"))
1512    }
1513
1514    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1515        self.get_object_cache_reader()
1516            .try_check_owned_objects_are_live(owned_object_refs)
1517    }
1518
1519    /// This function captures the required state to debug a forked transaction.
1520    /// The dump is written to a file in dir `path`, with name prefixed by the
1521    /// transaction digest. NOTE: Since this info escapes the validator
1522    /// context, make sure not to leak any private info here
1523    pub(crate) fn debug_dump_transaction_state(
1524        &self,
1525        tx_digest: &TransactionDigest,
1526        effects: &TransactionEffects,
1527        expected_effects_digest: TransactionEffectsDigest,
1528        inner_temporary_store: &InnerTemporaryStore,
1529        transaction: &VerifiedExecutableTransaction,
1530        debug_dump_config: &StateDebugDumpConfig,
1531    ) -> IotaResult<PathBuf> {
1532        // Fall back to the OS temp directory if no dump directory is configured.
1533        // This is safe: dump files are named by transaction digest, so no collisions.
1534        let dump_dir = debug_dump_config
1535            .dump_file_directory
1536            .as_ref()
1537            .cloned()
1538            .unwrap_or(std::env::temp_dir());
1539        let epoch_store = self.load_epoch_store_one_call_per_task();
1540
1541        NodeStateDump::new(
1542            tx_digest,
1543            effects,
1544            expected_effects_digest,
1545            self.get_object_store().as_ref(),
1546            &epoch_store,
1547            inner_temporary_store,
1548            transaction,
1549        )?
1550        .write_to_file(&dump_dir)
1551        .map_err(|e| IotaError::FileIO(e.to_string()))
1552    }
1553
1554    #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = ?transaction.data().transaction_data().gas_owner().to_string()))]
1555    pub(crate) fn process_transaction(
1556        &self,
1557        tx_guard: TxGuard,
1558        transaction: &VerifiedExecutableTransaction,
1559        tx_input_objects: InputObjects,
1560        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1561        expected_effects_digest: Option<TransactionEffectsDigest>,
1562        epoch_store: &Arc<AuthorityPerEpochStore>,
1563    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1564        let process_transaction_start_time = tokio::time::Instant::now();
1565        let digest = *transaction.digest();
1566
1567        let _scope = monitored_scope("Execution::process_certificate");
1568
1569        fail_point_if!("correlated-crash-process-transaction", || {
1570            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1571                iota_simulator::task::kill_current_node(None);
1572            }
1573        });
1574
1575        let execution_guard = self.execution_lock_for_executable_transaction(transaction);
1576        // Any caller that verifies the signatures on the transaction will have already
1577        // checked the epoch. But paths that don't verify sigs (e.g. execution
1578        // from checkpoint, reading from db) present the possibility of an epoch
1579        // mismatch. If this transaction is not finalized in previous epoch, then it's
1580        // invalid.
1581        let execution_guard = match execution_guard {
1582            Ok(execution_guard) => execution_guard,
1583            Err(err) => {
1584                tx_guard.release();
1585                return Err(err);
1586            }
1587        };
1588        // Since we obtain a reference to the epoch store before taking the execution
1589        // lock, it's possible that reconfiguration has happened and they no
1590        // longer match.
1591        if *execution_guard != epoch_store.epoch() {
1592            tx_guard.release();
1593            info!("The epoch of the execution_guard doesn't match the epoch store");
1594            return Err(IotaError::WrongEpoch {
1595                expected_epoch: epoch_store.epoch(),
1596                actual_epoch: *execution_guard,
1597            });
1598        }
1599
1600        // Errors originating from `execute_transaction` may be transient (failure to
1601        // read locks) or non-transient (transaction input is invalid, move vm
1602        // errors). However, all errors from this function occur before we have
1603        // written anything to the db, so we commit the tx guard and rely on the
1604        // client to retry the tx (if it was transient).
1605        let (inner_temporary_store, effects, execution_error_opt) = match self.execute_transaction(
1606            &execution_guard,
1607            transaction,
1608            tx_input_objects,
1609            per_authenticator_inputs,
1610            epoch_store,
1611        ) {
1612            Err(e) => {
1613                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1614                tx_guard.release();
1615                return Err(e);
1616            }
1617            Ok(res) => res,
1618        };
1619
1620        if let Some(expected_effects_digest) = expected_effects_digest {
1621            if effects.digest() != expected_effects_digest {
1622                // We dont want to mask the original error, so we log it and continue.
1623                match self.debug_dump_transaction_state(
1624                    &digest,
1625                    &effects,
1626                    expected_effects_digest,
1627                    &inner_temporary_store,
1628                    transaction,
1629                    &self.config.state_debug_dump_config,
1630                ) {
1631                    Ok(out_path) => {
1632                        info!(
1633                            "Dumped node state for transaction {} to {}",
1634                            digest,
1635                            out_path.as_path().display().to_string()
1636                        );
1637                    }
1638                    Err(e) => {
1639                        error!("Error dumping state for transaction {}: {e}", digest);
1640                    }
1641                }
1642                error!(
1643                    tx_digest = ?digest,
1644                    ?expected_effects_digest,
1645                    actual_effects = ?effects,
1646                    "fork detected!"
1647                );
1648                panic!(
1649                    "Transaction {} is expected to have effects digest {}, but got {}!",
1650                    digest,
1651                    expected_effects_digest,
1652                    effects.digest(),
1653                );
1654            }
1655        }
1656
1657        fail_point!("crash");
1658
1659        self.commit_transaction(
1660            transaction,
1661            inner_temporary_store,
1662            &effects,
1663            tx_guard,
1664            execution_guard,
1665            epoch_store,
1666        )?;
1667
1668        let elapsed = process_transaction_start_time.elapsed().as_micros() as f64;
1669        if elapsed > 0.0 {
1670            self.metrics
1671                .execution_gas_latency_ratio
1672                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1673        };
1674        Ok((effects, execution_error_opt))
1675    }
1676
1677    pub async fn reconfigure_traffic_control(
1678        &self,
1679        params: TrafficControlReconfigParams,
1680    ) -> Result<TrafficControlReconfigParams, IotaError> {
1681        if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1682            traffic_controller.admin_reconfigure(params).await
1683        } else {
1684            Err(IotaError::InvalidAdminRequest(
1685                "Traffic controller is not configured on this node".to_string(),
1686            ))
1687        }
1688    }
1689
1690    #[instrument(level = "trace", skip_all)]
1691    fn commit_transaction(
1692        &self,
1693        transaction: &VerifiedExecutableTransaction,
1694        inner_temporary_store: InnerTemporaryStore,
1695        effects: &TransactionEffects,
1696        tx_guard: TxGuard,
1697        _execution_guard: ExecutionLockReadGuard<'_>,
1698        epoch_store: &Arc<AuthorityPerEpochStore>,
1699    ) -> IotaResult {
1700        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1701            monitored_scope("Execution::commit_certificate");
1702        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1703
1704        let tx_key = transaction.key();
1705        let tx_digest = transaction.digest();
1706        let input_object_count = inner_temporary_store.input_objects.len();
1707        let shared_object_count = effects.input_shared_objects().len();
1708
1709        let output_keys = inner_temporary_store.get_output_keys(effects);
1710
1711        // index transaction
1712        let _ = self
1713            .post_process_one_tx(transaction, effects, &inner_temporary_store, epoch_store)
1714            .tap_err(|e| {
1715                self.metrics.post_processing_total_failures.inc();
1716                error!(?tx_digest, "tx post processing failed: {e}");
1717            });
1718
1719        // The insertion to epoch_store is not atomic with the insertion to the
1720        // perpetual store. This is OK because we insert to the epoch store
1721        // first. And during lookups we always look up in the perpetual store first.
1722        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1723
1724        // Allow testing what happens if we crash here.
1725        fail_point!("crash");
1726
1727        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1728            transaction.clone().into_unsigned(),
1729            effects.clone(),
1730            inner_temporary_store,
1731        );
1732        self.get_cache_writer()
1733            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1734
1735        if transaction.transaction_data().is_end_of_epoch_tx() {
1736            // At the end of epoch, since system packages may have been upgraded, force
1737            // reload them in the cache.
1738            self.get_object_cache_reader()
1739                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1740        }
1741
1742        // `commit_transaction()` finished, the tx is fully committed to the store.
1743        tx_guard.commit_tx();
1744
1745        // Notifies transaction manager about transaction and output objects committed.
1746        // This provides necessary information to transaction manager to start executing
1747        // additional ready transactions.
1748        self.transaction_manager
1749            .notify_commit(tx_digest, output_keys, epoch_store);
1750
1751        self.update_metrics(transaction, input_object_count, shared_object_count);
1752
1753        Ok(())
1754    }
1755
1756    fn update_metrics(
1757        &self,
1758        transaction: &VerifiedExecutableTransaction,
1759        input_object_count: usize,
1760        shared_object_count: usize,
1761    ) {
1762        // count signature by scheme, for multisig
1763        if transaction.has_upgraded_multisig() {
1764            self.metrics.multisig_sig_count.inc();
1765        }
1766
1767        self.metrics.total_effects.inc();
1768        self.metrics.total_certs.inc();
1769
1770        if shared_object_count > 0 {
1771            self.metrics.shared_obj_tx.inc();
1772        }
1773
1774        if transaction.is_sponsored_tx() {
1775            self.metrics.sponsored_tx.inc();
1776        }
1777
1778        self.metrics
1779            .num_input_objs
1780            .observe(input_object_count as f64);
1781        self.metrics
1782            .num_shared_objects
1783            .observe(shared_object_count as f64);
1784        self.metrics.batch_size.observe(
1785            transaction
1786                .data()
1787                .intent_message()
1788                .value
1789                .kind()
1790                .num_commands() as f64,
1791        );
1792    }
1793
1794    /// `execute_transaction()` validates the transaction input, and executes
1795    /// the transaction, returning effects, output objects, events, etc.
1796    ///
1797    /// It reads state from the db (both owned and shared locks), but it has no
1798    /// side effects.
1799    ///
1800    /// It can be generally understood that a failure of `execute_transaction`
1801    /// indicates a non-transient error, e.g. the transaction input is
1802    /// somehow invalid, the correct locks are not held, etc. However, this
1803    /// is not entirely true, as a transient db read error may also cause
1804    /// this function to fail.
1805    #[instrument(level = "trace", skip_all)]
1806    fn execute_transaction(
1807        &self,
1808        _execution_guard: &ExecutionLockReadGuard<'_>,
1809        transaction: &VerifiedExecutableTransaction,
1810        tx_input_objects: InputObjects,
1811        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1812        epoch_store: &Arc<AuthorityPerEpochStore>,
1813    ) -> IotaResult<(
1814        InnerTemporaryStore,
1815        TransactionEffects,
1816        Option<ExecutionError>,
1817    )> {
1818        let _scope = monitored_scope("Execution::execute_certificate");
1819        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1820        let prepare_transaction_start_time = tokio::time::Instant::now();
1821
1822        let protocol_config = epoch_store.protocol_config();
1823
1824        let reference_gas_price = epoch_store.reference_gas_price();
1825
1826        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1827        let epoch_start_timestamp = epoch_store
1828            .epoch_start_config()
1829            .epoch_data()
1830            .epoch_start_timestamp();
1831
1832        let backing_store = self.get_backing_store().as_ref();
1833
1834        let tx_digest = *transaction.digest();
1835
1836        // TODO: We need to move this to a more appropriate place to avoid redundant
1837        // checks.
1838        let tx_data = transaction.data().transaction_data();
1839        tx_data.validity_check(protocol_config)?;
1840
1841        let (kind, signer, gas_data) = tx_data.execution_parts();
1842
1843        let move_authenticators = transaction.move_authenticators();
1844
1845        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1846        let (inner_temp_store, _, mut effects, execution_error_opt) = if move_authenticators
1847            .is_empty()
1848        {
1849            // No Move authentication required, proceed to execute the transaction directly.
1850
1851            // The cost of partially re-auditing a transaction before execution is
1852            // tolerated.
1853            let (tx_gas_status, tx_checked_input_objects) =
1854                iota_transaction_checks::check_certificate_input(
1855                    transaction,
1856                    tx_input_objects,
1857                    protocol_config,
1858                    reference_gas_price,
1859                )?;
1860
1861            let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1862            self.check_owned_locks(&owned_object_refs)?;
1863            epoch_store.executor().execute_transaction_to_effects(
1864                backing_store,
1865                protocol_config,
1866                self.metrics.limits_metrics.clone(),
1867                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1868                // cyclic dependency w/ iota-adapter
1869                self.config
1870                    .expensive_safety_check_config
1871                    .enable_deep_per_tx_iota_conservation_check(),
1872                self.config.certificate_deny_config.certificate_deny_set(),
1873                &epoch_id,
1874                epoch_start_timestamp,
1875                tx_checked_input_objects,
1876                gas_data,
1877                tx_gas_status,
1878                kind,
1879                signer,
1880                tx_digest,
1881                &mut None,
1882            )
1883        } else {
1884            // One or more `MoveAuthenticator` signatures present — authenticate each and
1885            // then execute the transaction.
1886            // It is supposed that `MoveAuthenticator` availability is checked in
1887            // `SenderSignedData::validity_check`.
1888
1889            debug_assert_eq!(
1890                move_authenticators.len(),
1891                per_authenticator_inputs.len(),
1892                "Move authenticators amount must match the number of authenticator inputs"
1893            );
1894
1895            let per_authenticator_inputs = move_authenticators
1896                .iter()
1897                .zip(per_authenticator_inputs)
1898                .map(
1899                    |(move_authenticator, (authenticator_input_objects, account_object))| {
1900                        // Check basic `object_to_authenticate` preconditions and get its
1901                        // components.
1902                        let (
1903                            auth_account_object_id,
1904                            auth_account_object_seq_number,
1905                            auth_account_object_digest,
1906                        ) = move_authenticator.object_to_authenticate_components()?;
1907
1908                        let signer = move_authenticator.address();
1909
1910                        let authenticator_function_ref_for_execution = self.check_move_account(
1911                            auth_account_object_id,
1912                            auth_account_object_seq_number,
1913                            auth_account_object_digest,
1914                            account_object,
1915                            &signer,
1916                        )?;
1917
1918                        Ok((
1919                            authenticator_input_objects,
1920                            authenticator_function_ref_for_execution,
1921                        ))
1922                    },
1923                )
1924                .collect::<IotaResult<Vec<_>>>()?;
1925
1926            let per_authenticator_input_objects = per_authenticator_inputs
1927                .iter()
1928                .map(|(authenticator_input_objects, _)| authenticator_input_objects.clone())
1929                .collect::<Vec<_>>();
1930
1931            // Serialize the TransactionData for the auth context.
1932            let tx_data_bytes =
1933                bcs::to_bytes(tx_data).expect("TransactionData serialization cannot fail");
1934
1935            let (sender_auth_digest, sponsor_auth_digest) =
1936                transaction.data().compute_auth_digests()?;
1937
1938            // Check the `MoveAuthenticator` input objects.
1939            // The `MoveAuthenticator` receiving objects are checked on the signing step.
1940            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
1941            // not a part of the transaction data.
1942            let authenticator_gas_budget = protocol_config.max_auth_gas();
1943            let (
1944                gas_status,
1945                per_authenticator_checked_input_objects,
1946                authenticator_and_tx_checked_input_objects,
1947            ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1948                transaction,
1949                tx_input_objects,
1950                per_authenticator_input_objects,
1951                authenticator_gas_budget,
1952                protocol_config,
1953                reference_gas_price,
1954            )?;
1955
1956            debug_assert_eq!(
1957                move_authenticators.len(),
1958                per_authenticator_checked_input_objects.len(),
1959                "Move authenticators amount must match the number of checked authenticator inputs"
1960            );
1961
1962            let move_authenticators = move_authenticators
1963                .into_iter()
1964                .zip(per_authenticator_inputs)
1965                .zip(per_authenticator_checked_input_objects)
1966                .map(
1967                    |(
1968                        (move_authenticator, (_, authenticator_function_ref_for_execution)),
1969                        authenticator_checked_input_objects,
1970                    )| {
1971                        (
1972                            move_authenticator.to_owned(),
1973                            authenticator_function_ref_for_execution,
1974                            authenticator_checked_input_objects,
1975                        )
1976                    },
1977                )
1978                .collect::<Vec<_>>();
1979
1980            let owned_object_refs = authenticator_and_tx_checked_input_objects
1981                .inner()
1982                .filter_owned_objects();
1983            self.check_owned_locks(&owned_object_refs)?;
1984
1985            let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
1986                extract_auth_fun_refs(signer, gas_data.owner, |address| {
1987                    move_authenticators
1988                        .iter()
1989                        .find(|t| t.0.address() == address)
1990                        .map(|t| t.1.authenticator_function_ref.clone())
1991                });
1992
1993            let auth_context_data = AuthContextData {
1994                transaction_data_bytes: tx_data_bytes,
1995                sender_auth_digest,
1996                sponsor_auth_digest,
1997                sender_authenticator_function_ref,
1998                sponsor_authenticator_function_ref,
1999            };
2000
2001            epoch_store
2002                .executor()
2003                .authenticate_then_execute_transaction_to_effects(
2004                    backing_store,
2005                    protocol_config,
2006                    self.metrics.limits_metrics.clone(),
2007                    self.config
2008                        .expensive_safety_check_config
2009                        .enable_deep_per_tx_iota_conservation_check(),
2010                    self.config.certificate_deny_config.certificate_deny_set(),
2011                    &epoch_id,
2012                    epoch_start_timestamp,
2013                    gas_data,
2014                    gas_status,
2015                    move_authenticators,
2016                    authenticator_and_tx_checked_input_objects,
2017                    kind,
2018                    signer,
2019                    tx_digest,
2020                    auth_context_data,
2021                    &mut None,
2022                )
2023        };
2024
2025        fail_point_if!("cp_execution_nondeterminism", || {
2026            #[cfg(msim)]
2027            self.create_fail_state(transaction, epoch_store, &mut effects);
2028        });
2029
2030        let elapsed = prepare_transaction_start_time.elapsed().as_micros() as f64;
2031        if elapsed > 0.0 {
2032            self.metrics
2033                .prepare_cert_gas_latency_ratio
2034                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
2035        }
2036
2037        Ok((inner_temp_store, effects, execution_error_opt.err()))
2038    }
2039
2040    pub fn prepare_transaction_for_benchmark(
2041        &self,
2042        transaction: &VerifiedExecutableTransaction,
2043        input_objects: InputObjects,
2044        epoch_store: &Arc<AuthorityPerEpochStore>,
2045    ) -> IotaResult<(
2046        InnerTemporaryStore,
2047        TransactionEffects,
2048        Option<ExecutionError>,
2049    )> {
2050        let lock = RwLock::new(epoch_store.epoch());
2051        let execution_guard = lock.try_read().unwrap();
2052
2053        self.execute_transaction(
2054            &execution_guard,
2055            transaction,
2056            input_objects,
2057            vec![],
2058            epoch_store,
2059        )
2060    }
2061
2062    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
2063    /// `VmChecks::Enabled` instead.
2064    #[instrument("dry_exec_tx", level = "trace", skip_all)]
2065    #[allow(clippy::type_complexity)]
2066    pub fn dry_exec_transaction(
2067        &self,
2068        transaction: TransactionData,
2069        transaction_digest: TransactionDigest,
2070    ) -> IotaResult<(
2071        DryRunTransactionBlockResponse,
2072        BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
2073        TransactionEffects,
2074        Option<ObjectId>,
2075    )> {
2076        let epoch_store = self.load_epoch_store_one_call_per_task();
2077        if !self.is_fullnode(&epoch_store) {
2078            return Err(IotaError::UnsupportedFeature {
2079                error: "dry-exec is only supported on fullnodes".to_string(),
2080            });
2081        }
2082
2083        if transaction.kind().is_system() {
2084            return Err(IotaError::UnsupportedFeature {
2085                error: "dry-exec does not support system transactions".to_string(),
2086            });
2087        }
2088
2089        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2090    }
2091
2092    #[allow(clippy::type_complexity)]
2093    pub fn dry_exec_transaction_for_benchmark(
2094        &self,
2095        transaction: TransactionData,
2096        transaction_digest: TransactionDigest,
2097    ) -> IotaResult<(
2098        DryRunTransactionBlockResponse,
2099        BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
2100        TransactionEffects,
2101        Option<ObjectId>,
2102    )> {
2103        let epoch_store = self.load_epoch_store_one_call_per_task();
2104        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2105    }
2106
2107    #[instrument(level = "trace", skip_all)]
2108    #[allow(clippy::type_complexity)]
2109    fn dry_exec_transaction_impl(
2110        &self,
2111        epoch_store: &AuthorityPerEpochStore,
2112        transaction: TransactionData,
2113        transaction_digest: TransactionDigest,
2114    ) -> IotaResult<(
2115        DryRunTransactionBlockResponse,
2116        BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
2117        TransactionEffects,
2118        Option<ObjectId>,
2119    )> {
2120        // Cheap validity checks for a transaction, including input size limits.
2121        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2122
2123        let input_object_kinds = transaction.input_objects()?;
2124        let receiving_object_refs = transaction.receiving_objects();
2125
2126        iota_transaction_checks::deny::check_transaction_for_validation(
2127            &transaction,
2128            &[],
2129            &input_object_kinds,
2130            &receiving_object_refs,
2131            &self.config.transaction_deny_config,
2132            self.get_backing_package_store().as_ref(),
2133        )?;
2134
2135        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2136            // We don't want to cache this transaction since it's a dry run.
2137            None,
2138            &input_object_kinds,
2139            &receiving_object_refs,
2140            epoch_store.epoch(),
2141        )?;
2142
2143        // make a gas object if one was not provided
2144        let mut transaction = transaction;
2145        let reference_gas_price = epoch_store.reference_gas_price();
2146        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2147            let sender = transaction.gas_owner();
2148            let gas_object_id = ObjectId::random();
2149            let gas_object = Object::new_move(
2150                MoveObject::new_gas_coin(
2151                    OBJECT_START_VERSION,
2152                    gas_object_id,
2153                    SIMULATION_GAS_COIN_VALUE,
2154                ),
2155                Owner::Address(sender),
2156                TransactionDigest::GENESIS_MARKER,
2157            );
2158            let gas_object_ref = gas_object.object_ref();
2159            // Add gas object to transaction gas payment
2160            transaction.gas_data_mut().objects = vec![gas_object_ref];
2161            (
2162                iota_transaction_checks::check_transaction_input_with_given_gas(
2163                    epoch_store.protocol_config(),
2164                    reference_gas_price,
2165                    &transaction,
2166                    input_objects,
2167                    receiving_objects,
2168                    gas_object,
2169                    &self.metrics.bytecode_verifier_metrics,
2170                    &self.config.verifier_signing_config,
2171                )?,
2172                Some(gas_object_id),
2173            )
2174        } else {
2175            // `MoveAuthenticator`s are not supported in dry runs, so we set the
2176            // `authenticator_gas_budget` to 0.
2177            let authenticator_gas_budget = 0;
2178
2179            (
2180                iota_transaction_checks::check_transaction_input(
2181                    epoch_store.protocol_config(),
2182                    reference_gas_price,
2183                    &transaction,
2184                    input_objects,
2185                    &receiving_objects,
2186                    &self.metrics.bytecode_verifier_metrics,
2187                    &self.config.verifier_signing_config,
2188                    authenticator_gas_budget,
2189                )?,
2190                None,
2191            )
2192        };
2193
2194        let protocol_config = epoch_store.protocol_config();
2195        let (kind, signer, gas_data) = transaction.execution_parts();
2196
2197        let silent = true;
2198        let executor = iota_execution::executor(protocol_config, silent, None)
2199            .expect("Creating an executor should not fail here");
2200
2201        let expensive_checks = false;
2202        let (inner_temp_store, _, effects, execution_error) = executor
2203            .execute_transaction_to_effects(
2204                self.get_backing_store().as_ref(),
2205                protocol_config,
2206                self.metrics.limits_metrics.clone(),
2207                expensive_checks,
2208                self.config.certificate_deny_config.certificate_deny_set(),
2209                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2210                epoch_store
2211                    .epoch_start_config()
2212                    .epoch_data()
2213                    .epoch_start_timestamp(),
2214                checked_input_objects,
2215                gas_data,
2216                gas_status,
2217                kind,
2218                signer,
2219                transaction_digest,
2220                &mut None,
2221            );
2222        let tx_digest = *effects.transaction_digest();
2223
2224        let module_cache =
2225            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2226
2227        let mut layout_resolver =
2228            epoch_store
2229                .executor()
2230                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2231                    &inner_temp_store,
2232                    self.get_backing_package_store(),
2233                )));
2234        // Returning empty vector here because we recalculate changes in the rpc layer.
2235        let object_changes = Vec::new();
2236
2237        // Returning empty vector here because we recalculate changes in the rpc layer.
2238        let balance_changes = Vec::new();
2239
2240        let written_with_kind = effects
2241            .created()
2242            .into_iter()
2243            .map(|(oref, _)| (oref, WriteKind::Create))
2244            .chain(
2245                effects
2246                    .unwrapped()
2247                    .into_iter()
2248                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2249            )
2250            .chain(
2251                effects
2252                    .mutated()
2253                    .into_iter()
2254                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2255            )
2256            .map(|(oref, kind)| {
2257                let obj = inner_temp_store.written.get(&oref.object_id).unwrap();
2258                // TODO: Avoid clones.
2259                (oref.object_id, (oref, obj.clone(), kind))
2260            })
2261            .collect();
2262
2263        let execution_error_source = execution_error
2264            .as_ref()
2265            .err()
2266            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2267
2268        Ok((
2269            DryRunTransactionBlockResponse {
2270                // to avoid cloning `transaction`, fields are populated in this order
2271                suggested_gas_price: self
2272                    .congestion_tracker
2273                    .get_prediction_suggested_gas_price(&transaction),
2274                input: IotaTransactionBlockData::try_from_with_module_cache(
2275                    transaction,
2276                    &module_cache,
2277                    tx_digest,
2278                )
2279                .map_err(|e| IotaError::TransactionSerialization {
2280                    error: format!(
2281                        "Failed to convert transaction to IotaTransactionBlockData: {e}",
2282                    ),
2283                })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
2284                effects: effects.clone().try_into()?,
2285                events: IotaTransactionBlockEvents::try_from(
2286                    inner_temp_store.events.clone(),
2287                    tx_digest,
2288                    None,
2289                    layout_resolver.as_mut(),
2290                )?,
2291                object_changes,
2292                balance_changes,
2293                execution_error_source,
2294            },
2295            written_with_kind,
2296            effects,
2297            mock_gas,
2298        ))
2299    }
2300
2301    pub fn simulate_transaction(
2302        &self,
2303        mut transaction: TransactionData,
2304        checks: VmChecks,
2305    ) -> IotaResult<SimulateTransactionResult> {
2306        if transaction.kind().is_system() {
2307            return Err(IotaError::UnsupportedFeature {
2308                error: "simulate does not support system transactions".to_string(),
2309            });
2310        }
2311
2312        let epoch_store = self.load_epoch_store_one_call_per_task();
2313        if !self.is_fullnode(&epoch_store) {
2314            return Err(IotaError::UnsupportedFeature {
2315                error: "simulate is only supported on fullnodes".to_string(),
2316            });
2317        }
2318
2319        // Cheap validity checks for a transaction, including input size limits.
2320        // This does not check if gas objects are missing since we may create a
2321        // mock gas object. It checks for other transaction input validity.
2322        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2323
2324        let input_object_kinds = transaction.input_objects()?;
2325        let receiving_object_refs = transaction.receiving_objects();
2326
2327        // Since we need to simulate a validator signing the transaction, the first step
2328        // is to check if some transaction elements are denied.
2329        iota_transaction_checks::deny::check_transaction_for_validation(
2330            &transaction,
2331            &[],
2332            &input_object_kinds,
2333            &receiving_object_refs,
2334            &self.config.transaction_deny_config,
2335            self.get_backing_package_store().as_ref(),
2336        )?;
2337
2338        // Load input and receiving objects
2339        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2340            // We don't want to cache this transaction since it's a simulation.
2341            None,
2342            &input_object_kinds,
2343            &receiving_object_refs,
2344            epoch_store.epoch(),
2345        )?;
2346
2347        // Create a mock gas object if one was not provided
2348        let mock_gas_id = if transaction.gas().is_empty() {
2349            let mock_gas_object = Object::new_move(
2350                MoveObject::new_gas_coin(
2351                    OBJECT_START_VERSION,
2352                    ObjectId::MAX,
2353                    SIMULATION_GAS_COIN_VALUE,
2354                ),
2355                Owner::Address(transaction.gas_data().owner),
2356                TransactionDigest::GENESIS_MARKER,
2357            );
2358            let mock_gas_object_ref = mock_gas_object.object_ref();
2359            transaction.gas_data_mut().objects = vec![mock_gas_object_ref];
2360            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2361            Some(mock_gas_object.id())
2362        } else {
2363            None
2364        };
2365
2366        let protocol_config = epoch_store.protocol_config();
2367
2368        // `MoveAuthenticator`s are not supported in simulation, so we set the
2369        // `authenticator_gas_budget` to 0.
2370        let authenticator_gas_budget = 0;
2371
2372        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
2373        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
2374        let (gas_status, checked_input_objects) = if checks.enabled() {
2375            iota_transaction_checks::check_transaction_input(
2376                protocol_config,
2377                epoch_store.reference_gas_price(),
2378                &transaction,
2379                input_objects,
2380                &receiving_objects,
2381                &self.metrics.bytecode_verifier_metrics,
2382                &self.config.verifier_signing_config,
2383                authenticator_gas_budget,
2384            )?
2385        } else {
2386            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2387                protocol_config,
2388                transaction.kind(),
2389                input_objects,
2390                receiving_objects,
2391            )?;
2392            let gas_status = IotaGasStatus::new(
2393                transaction.gas_budget(),
2394                transaction.gas_price(),
2395                epoch_store.reference_gas_price(),
2396                protocol_config,
2397            )?;
2398
2399            (gas_status, checked_input_objects)
2400        };
2401
2402        // Create a new executor for the simulation
2403        let executor = iota_execution::executor(
2404            protocol_config,
2405            true, // silent
2406            None,
2407        )
2408        .expect("Creating an executor should not fail here");
2409
2410        // Execute the simulation
2411        let (kind, signer, gas_data) = transaction.execution_parts();
2412        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2413            self.get_backing_store().as_ref(),
2414            protocol_config,
2415            self.metrics.limits_metrics.clone(),
2416            false, // expensive_checks
2417            self.config.certificate_deny_config.certificate_deny_set(),
2418            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2419            epoch_store
2420                .epoch_start_config()
2421                .epoch_data()
2422                .epoch_start_timestamp(),
2423            checked_input_objects,
2424            gas_data,
2425            gas_status,
2426            kind,
2427            signer,
2428            transaction.digest(),
2429            checks.disabled(),
2430        );
2431
2432        // In the case of a dev inspect, the execution_result could be filled with some
2433        // values. Else, execution_result is empty in the case of a dry run.
2434        Ok(SimulateTransactionResult {
2435            input_objects: inner_temp_store.input_objects,
2436            output_objects: inner_temp_store.written,
2437            events: effects.events_digest().map(|_| inner_temp_store.events),
2438            effects,
2439            execution_result,
2440            suggested_gas_price: self
2441                .congestion_tracker
2442                .get_prediction_suggested_gas_price(&transaction),
2443            mock_gas_id,
2444        })
2445    }
2446
2447    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
2448    /// `VmChecks::DISABLED` instead.
2449    /// The object ID for gas can be any
2450    /// object ID, even for an uncreated object
2451    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2452    pub async fn dev_inspect_transaction_block(
2453        &self,
2454        sender: Address,
2455        transaction_kind: TransactionKind,
2456        gas_price: Option<u64>,
2457        gas_budget: Option<u64>,
2458        gas_sponsor: Option<Address>,
2459        gas_objects: Option<Vec<ObjectRef>>,
2460        show_raw_txn_data_and_effects: Option<bool>,
2461        skip_checks: Option<bool>,
2462    ) -> IotaResult<DevInspectResults> {
2463        let epoch_store = self.load_epoch_store_one_call_per_task();
2464
2465        if !self.is_fullnode(&epoch_store) {
2466            return Err(IotaError::UnsupportedFeature {
2467                error: "dev-inspect is only supported on fullnodes".to_string(),
2468            });
2469        }
2470
2471        if transaction_kind.is_system() {
2472            return Err(IotaError::UnsupportedFeature {
2473                error: "system transactions are not supported".to_string(),
2474            });
2475        }
2476
2477        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2478        let skip_checks = skip_checks.unwrap_or(true);
2479        let reference_gas_price = epoch_store.reference_gas_price();
2480        let protocol_config = epoch_store.protocol_config();
2481        let max_tx_gas = protocol_config.max_tx_gas();
2482
2483        let price = gas_price.unwrap_or(reference_gas_price);
2484        let budget = gas_budget.unwrap_or(max_tx_gas);
2485        let owner = gas_sponsor.unwrap_or(sender);
2486        // Payment might be empty here, but it's fine we'll have to deal with it later
2487        // after reading all the input objects.
2488        let payment = gas_objects.unwrap_or_default();
2489        let mut transaction = TransactionData::V1(TransactionDataV1 {
2490            kind: transaction_kind.clone(),
2491            sender,
2492            gas_payment: GasData {
2493                objects: payment,
2494                owner,
2495                price,
2496                budget,
2497            },
2498            expiration: TransactionExpiration::None,
2499        });
2500
2501        let raw_txn_data = if show_raw_txn_data_and_effects {
2502            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2503                error: "Failed to serialize transaction during dev inspect".to_string(),
2504            })?
2505        } else {
2506            vec![]
2507        };
2508
2509        transaction.validity_check_no_gas_check(protocol_config)?;
2510
2511        let input_object_kinds = transaction.input_objects()?;
2512        let receiving_object_refs = transaction.receiving_objects();
2513
2514        iota_transaction_checks::deny::check_transaction_for_validation(
2515            &transaction,
2516            &[],
2517            &input_object_kinds,
2518            &receiving_object_refs,
2519            &self.config.transaction_deny_config,
2520            self.get_backing_package_store().as_ref(),
2521        )?;
2522
2523        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2524            // We don't want to cache this transaction since it's a dev inspect.
2525            None,
2526            &input_object_kinds,
2527            &receiving_object_refs,
2528            epoch_store.epoch(),
2529        )?;
2530
2531        let (gas_status, checked_input_objects) = if skip_checks {
2532            // If we are skipping checks, then we call the check_dev_inspect_input function
2533            // which will perform only lightweight checks on the transaction
2534            // input. And if the gas field is empty, that means we will
2535            // use the dummy gas object so we need to add it to the input objects vector.
2536            if transaction.gas().is_empty() {
2537                // Create and use a dummy gas object if there is no gas object provided.
2538                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2539                    SIMULATION_GAS_COIN_VALUE,
2540                    transaction.gas_owner(),
2541                );
2542                let gas_object_ref = dummy_gas_object.object_ref();
2543                transaction.gas_data_mut().objects = vec![gas_object_ref];
2544                input_objects.push(ObjectReadResult::new(
2545                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2546                    dummy_gas_object.into(),
2547                ));
2548            }
2549            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2550                protocol_config,
2551                &transaction_kind,
2552                input_objects,
2553                receiving_objects,
2554            )?;
2555            let gas_status = IotaGasStatus::new(
2556                max_tx_gas,
2557                transaction.gas_price(),
2558                reference_gas_price,
2559                protocol_config,
2560            )?;
2561
2562            (gas_status, checked_input_objects)
2563        } else {
2564            // If we are not skipping checks, then we call the check_transaction_input
2565            // function and its dummy gas variant which will perform full
2566            // fledged checks just like a real transaction execution.
2567            if transaction.gas().is_empty() {
2568                // Create and use a dummy gas object if there is no gas object provided.
2569                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2570                    SIMULATION_GAS_COIN_VALUE,
2571                    transaction.gas_owner(),
2572                );
2573                let gas_object_ref = dummy_gas_object.object_ref();
2574                transaction.gas_data_mut().objects = vec![gas_object_ref];
2575                iota_transaction_checks::check_transaction_input_with_given_gas(
2576                    epoch_store.protocol_config(),
2577                    reference_gas_price,
2578                    &transaction,
2579                    input_objects,
2580                    receiving_objects,
2581                    dummy_gas_object,
2582                    &self.metrics.bytecode_verifier_metrics,
2583                    &self.config.verifier_signing_config,
2584                )?
2585            } else {
2586                // `MoveAuthenticator`s are not supported in dev inspects, so we set the
2587                // `authenticator_gas_budget` to 0.
2588                let authenticator_gas_budget = 0;
2589
2590                iota_transaction_checks::check_transaction_input(
2591                    epoch_store.protocol_config(),
2592                    reference_gas_price,
2593                    &transaction,
2594                    input_objects,
2595                    &receiving_objects,
2596                    &self.metrics.bytecode_verifier_metrics,
2597                    &self.config.verifier_signing_config,
2598                    authenticator_gas_budget,
2599                )?
2600            }
2601        };
2602
2603        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2604            .expect("Creating an executor should not fail here");
2605        let gas_data = transaction.gas_data().clone();
2606        let intent_msg = IntentMessage::new(
2607            Intent {
2608                version: IntentVersion::V0,
2609                scope: IntentScope::TransactionData,
2610                app_id: IntentAppId::Iota,
2611            },
2612            transaction,
2613        );
2614        let transaction_digest = TransactionDigest::new(intent_msg.value.digest().into_inner());
2615        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2616            self.get_backing_store().as_ref(),
2617            protocol_config,
2618            self.metrics.limits_metrics.clone(),
2619            // expensive checks
2620            false,
2621            self.config.certificate_deny_config.certificate_deny_set(),
2622            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2623            epoch_store
2624                .epoch_start_config()
2625                .epoch_data()
2626                .epoch_start_timestamp(),
2627            checked_input_objects,
2628            gas_data,
2629            gas_status,
2630            transaction_kind,
2631            sender,
2632            transaction_digest,
2633            skip_checks,
2634        );
2635
2636        let raw_effects = if show_raw_txn_data_and_effects {
2637            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2638                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2639            })?
2640        } else {
2641            vec![]
2642        };
2643
2644        let mut layout_resolver =
2645            epoch_store
2646                .executor()
2647                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2648                    &inner_temp_store,
2649                    self.get_backing_package_store(),
2650                )));
2651
2652        DevInspectResults::new(
2653            effects,
2654            inner_temp_store.events.clone(),
2655            execution_result,
2656            raw_txn_data,
2657            raw_effects,
2658            layout_resolver.as_mut(),
2659        )
2660    }
2661
2662    // Only used for testing because of how epoch store is loaded.
2663    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2664        let epoch_store = self.epoch_store_for_testing();
2665        Ok(epoch_store.reference_gas_price())
2666    }
2667
2668    #[instrument(level = "trace", skip_all)]
2669    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2670        self.get_transaction_cache_reader()
2671            .try_is_tx_already_executed(digest)
2672    }
2673
2674    /// Non-fallible version of `try_is_tx_already_executed`.
2675    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2676        self.try_is_tx_already_executed(digest)
2677            .expect("storage access failed")
2678    }
2679
2680    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2681    #[instrument(level = "debug", skip_all, err)]
2682    fn index_tx(
2683        &self,
2684        indexes: &IndexStore,
2685        digest: &TransactionDigest,
2686        // TODO: index_tx really just need the transaction data here.
2687        transaction: &VerifiedExecutableTransaction,
2688        effects: &TransactionEffects,
2689        events: &TransactionEvents,
2690        timestamp_ms: u64,
2691        tx_coins: Option<TxCoins>,
2692        written: &WrittenObjects,
2693        inner_temporary_store: &InnerTemporaryStore,
2694        epoch_store: &Arc<AuthorityPerEpochStore>,
2695    ) -> IotaResult<u64> {
2696        let changes = self
2697            .process_object_index(effects, written, inner_temporary_store, epoch_store)
2698            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2699
2700        indexes.index_tx(
2701            transaction.data().intent_message().value.sender(),
2702            transaction
2703                .data()
2704                .intent_message()
2705                .value
2706                .input_objects()?
2707                .iter()
2708                .map(|o| o.object_id()),
2709            effects
2710                .all_changed_objects()
2711                .into_iter()
2712                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2713            transaction
2714                .data()
2715                .intent_message()
2716                .value
2717                .move_calls()
2718                .into_iter()
2719                .map(|(package, module, function)| {
2720                    (*package, module.to_owned(), function.to_owned())
2721                }),
2722            events,
2723            changes,
2724            digest,
2725            timestamp_ms,
2726            tx_coins,
2727        )
2728    }
2729
2730    #[cfg(msim)]
2731    fn create_fail_state(
2732        &self,
2733        transaction: &VerifiedExecutableTransaction,
2734        epoch_store: &Arc<AuthorityPerEpochStore>,
2735        effects: &mut TransactionEffects,
2736    ) {
2737        use std::cell::RefCell;
2738
2739        use iota_types::effects::TransactionEffectsAPIForTesting;
2740        thread_local! {
2741            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2742        }
2743        if !transaction.data().intent_message().value.is_system_tx() {
2744            let committee = epoch_store.committee();
2745            let cur_stake = (**committee).weight(&self.name);
2746            if cur_stake > 0 {
2747                FAIL_STATE.with_borrow_mut(|fail_state| {
2748                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2749                    if fail_state.0 < committee.validity_threshold() {
2750                        fail_state.0 += cur_stake;
2751                        fail_state.1.insert(self.name);
2752                    }
2753
2754                    if fail_state.1.contains(&self.name) {
2755                        info!("cp_exec failing tx");
2756                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2757                    }
2758                });
2759            }
2760        }
2761    }
2762
2763    fn process_object_index(
2764        &self,
2765        effects: &TransactionEffects,
2766        written: &WrittenObjects,
2767        inner_temporary_store: &InnerTemporaryStore,
2768        epoch_store: &Arc<AuthorityPerEpochStore>,
2769    ) -> IotaResult<ObjectIndexChanges> {
2770        let mut layout_resolver =
2771            epoch_store
2772                .executor()
2773                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2774                    inner_temporary_store,
2775                    self.get_backing_package_store(),
2776                )));
2777
2778        let modified_at_version = effects
2779            .modified_at_versions()
2780            .into_iter()
2781            .collect::<HashMap<_, _>>();
2782
2783        let tx_digest = effects.transaction_digest();
2784        let mut deleted_owners = vec![];
2785        let mut deleted_dynamic_fields = vec![];
2786        for object_ref in effects.deleted().into_iter().chain(effects.wrapped()) {
2787            let old_version = modified_at_version.get(&object_ref.object_id).unwrap();
2788            // When we process the index, the latest object hasn't been written yet so
2789            // the old object must be present.
2790            match self.get_owner_at_version(&object_ref.object_id, *old_version).unwrap_or_else(
2791                |e| panic!("tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {} at version {old_version:?}. Err: {e:?}", object_ref.object_id)
2792            ) {
2793                Owner::Address(addr) => deleted_owners.push((addr, object_ref.object_id)),
2794                Owner::Object(object_id) => {
2795                    deleted_dynamic_fields.push((object_id, object_ref.object_id))
2796                }
2797                _ => {}
2798            }
2799        }
2800
2801        let mut new_owners = vec![];
2802        let mut new_dynamic_fields = vec![];
2803
2804        for (oref, owner, kind) in effects.all_changed_objects() {
2805            let id = &oref.object_id;
2806            // For mutated objects, retrieve old owner and delete old index if there is a
2807            // owner change.
2808            if let WriteKind::Mutate = kind {
2809                let Some(old_version) = modified_at_version.get(id) else {
2810                    panic!(
2811                        "tx_digest={tx_digest}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2812                    );
2813                };
2814                // When we process the index, the latest object hasn't been written yet so
2815                // the old object must be present.
2816                let Some(old_object) = self
2817                    .get_object_store()
2818                    .try_get_object_by_key(id, *old_version)?
2819                else {
2820                    panic!(
2821                        "tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {id} at version {old_version:?}"
2822                    );
2823                };
2824                if old_object.owner != owner {
2825                    match old_object.owner {
2826                        Owner::Address(addr) => {
2827                            deleted_owners.push((addr, *id));
2828                        }
2829                        Owner::Object(object_id) => deleted_dynamic_fields.push((object_id, *id)),
2830                        _ => {}
2831                    }
2832                }
2833            }
2834
2835            match owner {
2836                Owner::Address(addr) => {
2837                    // TODO: We can remove the object fetching after we added ObjectType to
2838                    // TransactionEffects
2839                    let new_object = written.get(id).unwrap_or_else(
2840                        || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2841                    );
2842                    assert_eq!(
2843                        new_object.version(),
2844                        oref.version,
2845                        "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2846                        tx_digest,
2847                        id,
2848                        new_object.version(),
2849                        oref.version
2850                    );
2851
2852                    let type_ = new_object
2853                        .type_()
2854                        .map(|type_| ObjectType::Struct(type_.clone()))
2855                        .unwrap_or(ObjectType::Package);
2856
2857                    new_owners.push((
2858                        (addr, *id),
2859                        ObjectInfo {
2860                            object_id: *id,
2861                            version: oref.version,
2862                            digest: oref.digest,
2863                            type_,
2864                            owner,
2865                            previous_transaction: *effects.transaction_digest(),
2866                        },
2867                    ));
2868                }
2869                Owner::Object(owner) => {
2870                    let new_object = written.get(id).unwrap_or_else(
2871                        || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2872                    );
2873                    assert_eq!(
2874                        new_object.version(),
2875                        oref.version,
2876                        "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2877                        tx_digest,
2878                        id,
2879                        new_object.version(),
2880                        oref.version
2881                    );
2882
2883                    let Some(df_info) = self
2884                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2885                        .unwrap_or_else(|e| {
2886                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2887                            None
2888                        }
2889                        )
2890                    else {
2891                        // Skip indexing for non dynamic field objects.
2892                        continue;
2893                    };
2894                    new_dynamic_fields.push(((owner, *id), df_info))
2895                }
2896                _ => {}
2897            }
2898        }
2899
2900        Ok(ObjectIndexChanges {
2901            deleted_owners,
2902            deleted_dynamic_fields,
2903            new_owners,
2904            new_dynamic_fields,
2905        })
2906    }
2907
2908    fn try_create_dynamic_field_info(
2909        &self,
2910        o: &Object,
2911        written: &WrittenObjects,
2912        resolver: &mut dyn LayoutResolver,
2913    ) -> IotaResult<Option<DynamicFieldInfo>> {
2914        // Skip if not a move object
2915        let Some(move_object) = o.data.as_opt_struct().cloned() else {
2916            return Ok(None);
2917        };
2918
2919        // We only index dynamic field objects
2920        if !move_object.struct_tag().is_dynamic_field() {
2921            return Ok(None);
2922        }
2923
2924        let layout = match resolver.get_annotated_layout(move_object.struct_tag()) {
2925            Ok(annotated_layout) => annotated_layout.into_layout(),
2926            Err(e) => {
2927                error!(
2928                    "unable to load layout for type `{:?}`: {e}",
2929                    move_object.struct_tag()
2930                );
2931                return Ok(None);
2932            }
2933        };
2934
2935        let field =
2936            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2937                IotaError::ObjectDeserialization {
2938                    error: e.to_string(),
2939                }
2940            })?;
2941
2942        let type_ = field.kind;
2943        let name_type: TypeTag = type_tag_core_to_sdk(&field.name_layout.into());
2944        let bcs_name = field.name_bytes.to_owned();
2945
2946        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2947            .map_err(|e| {
2948                warn!("{e}");
2949                IotaError::ObjectDeserialization {
2950                    error: e.to_string(),
2951                }
2952            })?;
2953
2954        let name = DynamicFieldName {
2955            type_: name_type,
2956            value: IotaMoveValue::from(name_value).to_json_value(),
2957        };
2958
2959        let value_metadata = field.value_metadata().map_err(|e| {
2960            warn!("{e}");
2961            IotaError::ObjectDeserialization {
2962                error: e.to_string(),
2963            }
2964        })?;
2965
2966        Ok(Some(match value_metadata {
2967            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2968                name,
2969                bcs_name,
2970                type_,
2971                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2972                object_id: o.id(),
2973                version: o.version(),
2974                digest: o.digest(),
2975            },
2976
2977            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2978                // Find the actual object from storage using the object id obtained from the
2979                // wrapper.
2980
2981                // Try to find the object in the written objects first.
2982                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2983                    (
2984                        object.version(),
2985                        object.digest(),
2986                        object.data.opt_object_type().unwrap().clone(),
2987                    )
2988                } else {
2989                    // If not found, try to find it in the database.
2990                    let object = self
2991                        .get_object_store()
2992                        .try_get_object_by_key(&object_id, o.version())?
2993                        .ok_or_else(|| UserInputError::ObjectNotFound {
2994                            object_id,
2995                            version: Some(o.version()),
2996                        })?;
2997                    let version = object.version();
2998                    let digest = object.digest();
2999                    let object_type = object.data.opt_object_type().unwrap().clone();
3000                    (version, digest, object_type)
3001                };
3002
3003                DynamicFieldInfo {
3004                    name,
3005                    bcs_name,
3006                    type_,
3007                    object_type: object_type.to_string(),
3008                    object_id,
3009                    version,
3010                    digest,
3011                }
3012            }
3013        }))
3014    }
3015
3016    #[instrument(level = "trace", skip_all, err)]
3017    fn post_process_one_tx(
3018        &self,
3019        transaction: &VerifiedExecutableTransaction,
3020        effects: &TransactionEffects,
3021        inner_temporary_store: &InnerTemporaryStore,
3022        epoch_store: &Arc<AuthorityPerEpochStore>,
3023    ) -> IotaResult {
3024        if self.indexes.is_none() {
3025            return Ok(());
3026        }
3027
3028        let _scope = monitored_scope("Execution::post_process_one_tx");
3029
3030        let tx_digest = transaction.digest();
3031        let timestamp_ms = Self::unixtime_now_ms();
3032        let events = &inner_temporary_store.events;
3033        let written = &inner_temporary_store.written;
3034        let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
3035            effects,
3036            inner_temporary_store,
3037            epoch_store,
3038        );
3039
3040        // Index tx
3041        if let Some(indexes) = &self.indexes {
3042            let _ = self
3043                .index_tx(
3044                    indexes.as_ref(),
3045                    tx_digest,
3046                    transaction,
3047                    effects,
3048                    events,
3049                    timestamp_ms,
3050                    tx_coins,
3051                    written,
3052                    inner_temporary_store,
3053                    epoch_store,
3054                )
3055                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
3056                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3057                .expect("Indexing tx should not fail");
3058
3059            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
3060            let events = self.make_transaction_block_events(
3061                events.clone(),
3062                *tx_digest,
3063                timestamp_ms,
3064                epoch_store,
3065                inner_temporary_store,
3066            )?;
3067            // Emit events
3068            self.subscription_handler
3069                .process_tx(transaction.data().transaction_data(), &effects, &events)
3070                .tap_ok(|_| {
3071                    self.metrics
3072                        .post_processing_total_tx_had_event_processed
3073                        .inc()
3074                })
3075                .tap_err(|e| {
3076                    warn!(
3077                        ?tx_digest,
3078                        "Post processing - Couldn't process events for tx: {}", e
3079                    )
3080                })?;
3081
3082            self.metrics
3083                .post_processing_total_events_emitted
3084                .inc_by(events.data.len() as u64);
3085        };
3086        Ok(())
3087    }
3088
3089    fn make_transaction_block_events(
3090        &self,
3091        transaction_events: TransactionEvents,
3092        digest: TransactionDigest,
3093        timestamp_ms: u64,
3094        epoch_store: &Arc<AuthorityPerEpochStore>,
3095        inner_temporary_store: &InnerTemporaryStore,
3096    ) -> IotaResult<IotaTransactionBlockEvents> {
3097        let mut layout_resolver =
3098            epoch_store
3099                .executor()
3100                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3101                    inner_temporary_store,
3102                    self.get_backing_package_store(),
3103                )));
3104        IotaTransactionBlockEvents::try_from(
3105            transaction_events,
3106            digest,
3107            Some(timestamp_ms),
3108            layout_resolver.as_mut(),
3109        )
3110    }
3111
3112    pub fn unixtime_now_ms() -> u64 {
3113        let now = SystemTime::now()
3114            .duration_since(UNIX_EPOCH)
3115            .expect("Time went backwards")
3116            .as_millis();
3117        u64::try_from(now).expect("Travelling in time machine")
3118    }
3119
3120    #[instrument(level = "trace", skip_all)]
3121    pub async fn handle_transaction_info_request(
3122        &self,
3123        request: TransactionInfoRequest,
3124    ) -> IotaResult<TransactionInfoResponse> {
3125        let epoch_store = self.load_epoch_store_one_call_per_task();
3126        let (transaction, status) = self
3127            .get_transaction_status(&request.transaction_digest, &epoch_store)?
3128            .ok_or(IotaError::TransactionNotFound {
3129                digest: request.transaction_digest,
3130            })?;
3131        Ok(TransactionInfoResponse {
3132            transaction,
3133            status,
3134        })
3135    }
3136
3137    #[instrument(level = "trace", skip_all)]
3138    pub async fn handle_object_info_request(
3139        &self,
3140        request: ObjectInfoRequest,
3141    ) -> IotaResult<ObjectInfoResponse> {
3142        let epoch_store = self.load_epoch_store_one_call_per_task();
3143
3144        let requested_object_seq = match request.request_kind {
3145            ObjectInfoRequestKind::LatestObjectInfo => {
3146                self.try_get_object_or_tombstone(request.object_id)?
3147                    .ok_or_else(|| {
3148                        IotaError::from(UserInputError::ObjectNotFound {
3149                            object_id: request.object_id,
3150                            version: None,
3151                        })
3152                    })?
3153                    .version
3154            }
3155            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3156        };
3157
3158        let object = self
3159            .get_object_store()
3160            .try_get_object_by_key(&request.object_id, requested_object_seq)?
3161            .ok_or_else(|| {
3162                IotaError::from(UserInputError::ObjectNotFound {
3163                    object_id: request.object_id,
3164                    version: Some(requested_object_seq),
3165                })
3166            })?;
3167
3168        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3169            (request.generate_layout, object.data.as_opt_struct())
3170        {
3171            Some(into_struct_layout(
3172                epoch_store
3173                    .executor()
3174                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3175                    .get_annotated_layout(move_obj.struct_tag())?,
3176            )?)
3177        } else {
3178            None
3179        };
3180
3181        let lock = if !object.is_address_owned() {
3182            // Only address owned objects have locks.
3183            None
3184        } else {
3185            self.get_transaction_lock(&object.object_ref(), &epoch_store)?
3186                .map(|s| s.into_inner())
3187        };
3188
3189        Ok(ObjectInfoResponse {
3190            object,
3191            layout,
3192            lock_for_debugging: lock,
3193        })
3194    }
3195
3196    #[instrument(level = "trace", skip_all)]
3197    pub fn handle_checkpoint_request(
3198        &self,
3199        request: &CheckpointRequest,
3200    ) -> IotaResult<CheckpointResponse> {
3201        let summary = if request.certified {
3202            let summary = match request.sequence_number {
3203                Some(seq) => self
3204                    .checkpoint_store
3205                    .get_checkpoint_by_sequence_number(seq)?,
3206                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3207            }
3208            .map(|v| v.into_inner());
3209            summary.map(CheckpointSummaryResponse::Certified)
3210        } else {
3211            let summary = match request.sequence_number {
3212                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3213                None => self
3214                    .checkpoint_store
3215                    .get_latest_locally_computed_checkpoint()?,
3216            };
3217            summary.map(CheckpointSummaryResponse::Pending)
3218        };
3219        let contents = match &summary {
3220            Some(s) => self
3221                .checkpoint_store
3222                .get_checkpoint_contents(&s.content_digest())?,
3223            None => None,
3224        };
3225        Ok(CheckpointResponse {
3226            checkpoint: summary,
3227            contents,
3228        })
3229    }
3230
3231    fn check_protocol_version(
3232        supported_protocol_versions: SupportedProtocolVersions,
3233        current_version: ProtocolVersion,
3234    ) {
3235        info!("current protocol version is now {:?}", current_version);
3236        info!("supported versions are: {:?}", supported_protocol_versions);
3237        if !supported_protocol_versions.is_version_supported(current_version) {
3238            let msg = format!(
3239                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
3240            );
3241
3242            error!("{}", msg);
3243            eprintln!("{msg}");
3244
3245            #[cfg(not(msim))]
3246            std::process::exit(1);
3247
3248            #[cfg(msim)]
3249            iota_simulator::task::shutdown_current_node();
3250        }
3251    }
3252
3253    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
3254    #[expect(clippy::too_many_arguments)]
3255    pub async fn new(
3256        name: AuthorityName,
3257        secret: StableSyncAuthoritySigner,
3258        supported_protocol_versions: SupportedProtocolVersions,
3259        store: Arc<AuthorityStore>,
3260        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3261        epoch_store: Arc<AuthorityPerEpochStore>,
3262        committee_store: Arc<CommitteeStore>,
3263        indexes: Option<Arc<IndexStore>>,
3264        grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
3265        checkpoint_store: Arc<CheckpointStore>,
3266        prometheus_registry: &Registry,
3267        genesis_objects: &[Object],
3268        db_checkpoint_config: &DBCheckpointConfig,
3269        config: NodeConfig,
3270        archive_readers: ArchiveReaderBalancer,
3271        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3272        chain_identifier: ChainIdentifier,
3273        pruner_db: Option<Arc<AuthorityPrunerTables>>,
3274        checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
3275        policy_config: Option<PolicyConfig>,
3276        firewall_config: Option<RemoteFirewallConfig>,
3277    ) -> Arc<Self> {
3278        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3279
3280        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3281        let (tx_ready_transactions, rx_ready_transactions) = unbounded_channel();
3282        let transaction_manager = Arc::new(TransactionManager::new(
3283            execution_cache_trait_pointers.object_cache_reader.clone(),
3284            execution_cache_trait_pointers
3285                .transaction_cache_reader
3286                .clone(),
3287            &epoch_store,
3288            tx_ready_transactions,
3289            metrics.clone(),
3290        ));
3291        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3292
3293        let authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3294            epoch_store.get_parent_path(),
3295            config
3296                .authority_store_pruning_config
3297                .num_latest_epoch_dbs_to_retain,
3298        )
3299        .await;
3300        let _pruner = AuthorityStorePruner::new(
3301            store.perpetual_tables.clone(),
3302            checkpoint_store.clone(),
3303            grpc_indexes_store.clone(),
3304            indexes.clone(),
3305            config.authority_store_pruning_config.clone(),
3306            epoch_store.committee().authority_exists(&name),
3307            epoch_store.epoch_start_state().epoch_duration_ms(),
3308            prometheus_registry,
3309            archive_readers,
3310            pruner_db,
3311            checkpoint_progress_tracker.clone(),
3312        );
3313        let input_loader =
3314            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3315        let epoch = epoch_store.epoch();
3316        let rgp = epoch_store.reference_gas_price();
3317        let traffic_controller_metrics =
3318            Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3319        let traffic_controller = if let Some(policy_config) = policy_config {
3320            Some(Arc::new(
3321                TrafficController::init(
3322                    policy_config,
3323                    traffic_controller_metrics,
3324                    firewall_config.clone(),
3325                )
3326                .await,
3327            ))
3328        } else {
3329            None
3330        };
3331        let state = Arc::new(AuthorityState {
3332            name,
3333            secret,
3334            execution_lock: RwLock::new(epoch),
3335            epoch_store: ArcSwap::new(epoch_store.clone()),
3336            input_loader,
3337            execution_cache_trait_pointers,
3338            indexes,
3339            grpc_indexes_store,
3340            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3341            checkpoint_store,
3342            committee_store,
3343            transaction_manager,
3344            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3345            metrics,
3346            _pruner,
3347            authority_per_epoch_pruner,
3348            checkpoint_progress_tracker,
3349            db_checkpoint_config: db_checkpoint_config.clone(),
3350            config,
3351            overload_info: AuthorityOverloadInfo::default(),
3352            validator_tx_finalizer,
3353            chain_identifier,
3354            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3355            traffic_controller,
3356        });
3357
3358        // Start a task to execute ready transactions.
3359        let authority_state = Arc::downgrade(&state);
3360        spawn_monitored_task!(execution_process(
3361            authority_state,
3362            rx_ready_transactions,
3363            rx_execution_shutdown,
3364        ));
3365        spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3366
3367        // TODO: This doesn't belong to the constructor of AuthorityState.
3368        state
3369            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3370            .expect("Error indexing genesis objects.");
3371
3372        state
3373    }
3374
3375    pub fn epoch_db_pruner(&self) -> &AuthorityPerEpochStorePruner {
3376        &self.authority_per_epoch_pruner
3377    }
3378
3379    // TODO: Consolidate our traits to reduce the number of methods here.
3380    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3381        &self.execution_cache_trait_pointers.object_cache_reader
3382    }
3383
3384    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3385        &self.execution_cache_trait_pointers.transaction_cache_reader
3386    }
3387
3388    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3389        &self.execution_cache_trait_pointers.cache_writer
3390    }
3391
3392    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3393        &self.execution_cache_trait_pointers.backing_store
3394    }
3395
3396    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3397        &self.execution_cache_trait_pointers.backing_package_store
3398    }
3399
3400    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3401        &self.execution_cache_trait_pointers.object_store
3402    }
3403
3404    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3405        &self.execution_cache_trait_pointers.reconfig_api
3406    }
3407
3408    pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3409        &self.execution_cache_trait_pointers.global_state_hash_store
3410    }
3411
3412    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3413        &self.execution_cache_trait_pointers.checkpoint_cache
3414    }
3415
3416    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3417        &self.execution_cache_trait_pointers.state_sync_store
3418    }
3419
3420    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3421        &self.execution_cache_trait_pointers.cache_commit
3422    }
3423
3424    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3425        self.execution_cache_trait_pointers
3426            .testing_api
3427            .database_for_testing()
3428    }
3429
3430    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3431        &self,
3432        config: NodeConfig,
3433        metrics: Arc<AuthorityStorePruningMetrics>,
3434    ) -> anyhow::Result<()> {
3435        let archive_readers =
3436            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3437        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3438            &self.database_for_testing().perpetual_tables,
3439            &self.checkpoint_store,
3440            self.grpc_indexes_store.as_deref(),
3441            None,
3442            config.authority_store_pruning_config,
3443            metrics,
3444            archive_readers,
3445            EPOCH_DURATION_MS_FOR_TESTING,
3446            self.checkpoint_progress_tracker.as_ref(),
3447        )
3448        .await
3449    }
3450
3451    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3452        &self.transaction_manager
3453    }
3454
3455    /// Adds transactions / certificates to transaction manager for ordered
3456    /// execution.
3457    pub fn enqueue_transactions_for_execution(
3458        &self,
3459        transactions: Vec<VerifiedExecutableTransaction>,
3460        epoch_store: &Arc<AuthorityPerEpochStore>,
3461    ) {
3462        self.transaction_manager.enqueue(transactions, epoch_store)
3463    }
3464
3465    /// Adds certificates to transaction manager for ordered execution.
3466    pub fn enqueue_certificates_for_execution(
3467        &self,
3468        certs: Vec<VerifiedCertificate>,
3469        epoch_store: &Arc<AuthorityPerEpochStore>,
3470    ) {
3471        self.transaction_manager
3472            .enqueue_certificates(certs, epoch_store)
3473    }
3474
3475    pub fn enqueue_with_expected_effects_digest(
3476        &self,
3477        transactions: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3478        epoch_store: &AuthorityPerEpochStore,
3479    ) {
3480        self.transaction_manager
3481            .enqueue_with_expected_effects_digest(transactions, epoch_store)
3482    }
3483
3484    fn create_owner_index_if_empty(
3485        &self,
3486        genesis_objects: &[Object],
3487        epoch_store: &Arc<AuthorityPerEpochStore>,
3488    ) -> IotaResult {
3489        let Some(index_store) = &self.indexes else {
3490            return Ok(());
3491        };
3492        if !index_store.is_empty() {
3493            return Ok(());
3494        }
3495
3496        let mut new_owners = vec![];
3497        let mut new_dynamic_fields = vec![];
3498        let mut layout_resolver = epoch_store
3499            .executor()
3500            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3501        for o in genesis_objects.iter() {
3502            match o.owner {
3503                Owner::Address(addr) => {
3504                    new_owners.push(((addr, o.id()), ObjectInfo::new(&o.object_ref(), o)))
3505                }
3506                Owner::Object(object_id) => {
3507                    let id = o.id();
3508                    let info = match self.try_create_dynamic_field_info(
3509                        o,
3510                        &BTreeMap::new(),
3511                        layout_resolver.as_mut(),
3512                    ) {
3513                        Ok(Some(info)) => info,
3514                        Ok(None) => continue,
3515                        Err(IotaError::UserInput {
3516                            error:
3517                                UserInputError::ObjectNotFound {
3518                                    object_id: not_found_id,
3519                                    version,
3520                                },
3521                        }) => {
3522                            warn!(
3523                                ?not_found_id,
3524                                ?version,
3525                                object_owner=?object_id,
3526                                field=?id,
3527                                "Skipping dynamic field: referenced genesis object not found"
3528                            );
3529                            continue;
3530                        }
3531                        Err(e) => return Err(e),
3532                    };
3533                    new_dynamic_fields.push(((object_id, id), info));
3534                }
3535                _ => {}
3536            }
3537        }
3538
3539        index_store.insert_genesis_objects(ObjectIndexChanges {
3540            deleted_owners: vec![],
3541            deleted_dynamic_fields: vec![],
3542            new_owners,
3543            new_dynamic_fields,
3544        })
3545    }
3546
3547    /// Attempts to acquire execution lock for an executable transaction.
3548    /// Returns the lock if the transaction is matching current executed epoch
3549    /// Returns None otherwise
3550    pub fn execution_lock_for_executable_transaction(
3551        &self,
3552        transaction: &VerifiedExecutableTransaction,
3553    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3554        let lock = self
3555            .execution_lock
3556            .try_read()
3557            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3558        if *lock == transaction.auth_sig().epoch() {
3559            Ok(lock)
3560        } else {
3561            Err(IotaError::WrongEpoch {
3562                expected_epoch: *lock,
3563                actual_epoch: transaction.auth_sig().epoch(),
3564            })
3565        }
3566    }
3567
3568    /// Acquires the execution lock for the duration of a transaction signing
3569    /// request. This prevents reconfiguration from starting until we are
3570    /// finished handling the signing request. Otherwise, in-memory lock
3571    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3572    /// while we are attempting to acquire locks for the transaction.
3573    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3574        self.execution_lock
3575            .try_read()
3576            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3577    }
3578
3579    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3580        self.execution_lock.write().await
3581    }
3582
3583    #[instrument(level = "error", skip_all)]
3584    pub async fn reconfigure(
3585        &self,
3586        cur_epoch_store: &AuthorityPerEpochStore,
3587        supported_protocol_versions: SupportedProtocolVersions,
3588        new_committee: Committee,
3589        epoch_start_configuration: EpochStartConfiguration,
3590        state_hasher: Arc<GlobalStateHasher>,
3591        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3592        epoch_supply_change: i64,
3593        epoch_last_checkpoint: CheckpointSequenceNumber,
3594    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3595        Self::check_protocol_version(
3596            supported_protocol_versions,
3597            epoch_start_configuration
3598                .epoch_start_state()
3599                .protocol_version(),
3600        );
3601        self.metrics.reset_on_reconfigure();
3602        self.committee_store.insert_new_committee(&new_committee)?;
3603
3604        // Wait until no transactions are being executed.
3605        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3606
3607        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3608        cur_epoch_store.epoch_terminated().await;
3609
3610        let highest_locally_built_checkpoint_seq = self
3611            .checkpoint_store
3612            .get_latest_locally_computed_checkpoint()?
3613            .map(|c| *c.sequence_number())
3614            .unwrap_or(0);
3615
3616        assert!(
3617            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3618            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3619        );
3620        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint
3621            || self.is_fullnode(cur_epoch_store)
3622        {
3623            // if we built the last checkpoint locally (as opposed to receiving it from a
3624            // peer), then all shared_version_assignments except the one for the
3625            // ChangeEpoch transaction should have been removed
3626            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3627            // Due to (otherwise harmless) race conditions between CheckpointExecutor and
3628            // ConsensusHandler, we actually can't guarantee that all
3629            // shared_version_assignments have been removed. However,
3630            // typically at most 2 or 3 are left over. We leave this check here in order to
3631            // catch complete failure of cleanup which would cause a memory
3632            // leak.
3633            if num_shared_version_assignments > 10 {
3634                // If this happens in prod, we have a memory leak, but not a correctness issue.
3635                debug_fatal!(
3636                    "all shared_version_assignments should have been removed \
3637                    (num_shared_version_assignments: {num_shared_version_assignments})"
3638                );
3639            }
3640        }
3641
3642        // Safe to reconfigure now. No transactions are being executed,
3643        // and no epoch-specific tasks are running.
3644
3645        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3646        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3647        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3648            .await?;
3649        self.get_reconfig_api()
3650            .clear_state_end_of_epoch(&execution_lock);
3651        self.check_system_consistency(
3652            cur_epoch_store,
3653            state_hasher,
3654            expensive_safety_check_config,
3655            epoch_supply_change,
3656        )?;
3657        self.get_reconfig_api()
3658            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3659        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3660            if self
3661                .db_checkpoint_config
3662                .perform_db_checkpoints_at_epoch_end
3663            {
3664                let checkpoint_indexes = self
3665                    .db_checkpoint_config
3666                    .perform_index_db_checkpoints_at_epoch_end
3667                    .unwrap_or(false);
3668                let current_epoch = cur_epoch_store.epoch();
3669                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3670                self.checkpoint_all_dbs(
3671                    &epoch_checkpoint_path,
3672                    cur_epoch_store,
3673                    checkpoint_indexes,
3674                )?;
3675            }
3676        }
3677
3678        let new_epoch = new_committee.epoch;
3679        let new_epoch_store = self
3680            .reopen_epoch_db(
3681                cur_epoch_store,
3682                new_committee,
3683                epoch_start_configuration,
3684                expensive_safety_check_config,
3685                epoch_last_checkpoint,
3686            )
3687            .await?;
3688        assert_eq!(new_epoch_store.epoch(), new_epoch);
3689        self.transaction_manager.reconfigure(new_epoch);
3690        *execution_lock = new_epoch;
3691        // drop execution_lock after epoch store was updated
3692        // see also assert in AuthorityState::process_transaction
3693        // on the epoch store and execution lock epoch match
3694        Ok(new_epoch_store)
3695    }
3696
3697    /// Advance the epoch store to the next epoch for testing only.
3698    /// This only manually sets all the places where we have the epoch number.
3699    /// It doesn't properly reconfigure the node, hence should be only used for
3700    /// testing.
3701    pub async fn reconfigure_for_testing(&self) {
3702        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3703        let epoch_store = self.epoch_store_for_testing().clone();
3704        let protocol_config = epoch_store.protocol_config().clone();
3705        // The current protocol config used in the epoch store may have been overridden
3706        // and diverged from the protocol config definitions. That override may
3707        // have now been dropped when the initial guard was dropped. We reapply
3708        // the override before creating the new epoch store, to make sure that
3709        // the new epoch store has the same protocol config as the current one.
3710        // Since this is for testing only, we mostly like to keep the protocol config
3711        // the same across epochs.
3712        let _guard =
3713            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3714        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3715            self.get_backing_package_store().clone(),
3716            &self.config.expensive_safety_check_config,
3717            self.checkpoint_store
3718                .get_epoch_last_checkpoint(epoch_store.epoch())
3719                .unwrap()
3720                .map(|c| *c.sequence_number())
3721                .unwrap_or_default(),
3722        );
3723        let new_epoch = new_epoch_store.epoch();
3724        self.transaction_manager.reconfigure(new_epoch);
3725        self.epoch_store.store(new_epoch_store);
3726        epoch_store.epoch_terminated().await;
3727        *execution_lock = new_epoch;
3728    }
3729
3730    #[instrument(level = "error", skip_all)]
3731    fn check_system_consistency(
3732        &self,
3733        cur_epoch_store: &AuthorityPerEpochStore,
3734        state_hasher: Arc<GlobalStateHasher>,
3735        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3736        epoch_supply_change: i64,
3737    ) -> IotaResult<()> {
3738        info!(
3739            "Performing iota conservation consistency check for epoch {}",
3740            cur_epoch_store.epoch()
3741        );
3742
3743        if cfg!(debug_assertions) {
3744            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3745        }
3746
3747        self.get_reconfig_api()
3748            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3749
3750        // check for root state hash consistency with live object set
3751        if expensive_safety_check_config.enable_state_consistency_check() {
3752            info!(
3753                "Performing state consistency check for epoch {}",
3754                cur_epoch_store.epoch()
3755            );
3756            self.expensive_check_is_consistent_state(
3757                state_hasher,
3758                cur_epoch_store,
3759                cfg!(debug_assertions), // panic in debug mode only
3760            );
3761        }
3762
3763        if expensive_safety_check_config.enable_secondary_index_checks() {
3764            if let Some(indexes) = self.indexes.clone() {
3765                verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
3766                    .expect("secondary indexes are inconsistent");
3767            }
3768        }
3769
3770        Ok(())
3771    }
3772
3773    fn expensive_check_is_consistent_state(
3774        &self,
3775        state_hasher: Arc<GlobalStateHasher>,
3776        cur_epoch_store: &AuthorityPerEpochStore,
3777        panic: bool,
3778    ) {
3779        let live_object_set_hash = state_hasher.digest_live_object_set();
3780
3781        let root_state_hash: ECMHLiveObjectSetDigest = self
3782            .get_global_state_hash_store()
3783            .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
3784            .expect("Retrieving root state hash cannot fail")
3785            .expect("Root state hash for epoch must exist")
3786            .1
3787            .digest()
3788            .into();
3789
3790        let is_inconsistent = root_state_hash != live_object_set_hash;
3791        if is_inconsistent {
3792            if panic {
3793                panic!(
3794                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3795                );
3796            } else {
3797                error!(
3798                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3799                    root_state_hash, live_object_set_hash
3800                );
3801            }
3802        } else {
3803            info!("State consistency check passed");
3804        }
3805
3806        if !panic {
3807            state_hasher.set_inconsistent_state(is_inconsistent);
3808        }
3809    }
3810
3811    pub fn current_epoch_for_testing(&self) -> EpochId {
3812        self.epoch_store_for_testing().epoch()
3813    }
3814
3815    #[instrument(level = "error", skip_all)]
3816    pub fn checkpoint_all_dbs(
3817        &self,
3818        checkpoint_path: &Path,
3819        cur_epoch_store: &AuthorityPerEpochStore,
3820        checkpoint_indexes: bool,
3821    ) -> IotaResult {
3822        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3823        let current_epoch = cur_epoch_store.epoch();
3824
3825        if checkpoint_path.exists() {
3826            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3827            return Ok(());
3828        }
3829
3830        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3831        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3832
3833        if checkpoint_path_tmp.exists() {
3834            fs::remove_dir_all(&checkpoint_path_tmp)
3835                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3836        }
3837
3838        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3839        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3840
3841        // NOTE: Do not change the order of invoking these checkpoint calls
3842        // We want to snapshot checkpoint db first to not race with state sync
3843        self.checkpoint_store
3844            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3845
3846        self.get_reconfig_api()
3847            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3848
3849        self.committee_store
3850            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3851
3852        if checkpoint_indexes {
3853            if let Some(indexes) = self.indexes.as_ref() {
3854                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3855            }
3856            if let Some(grpc_indexes_store) = self.grpc_indexes_store.as_ref() {
3857                grpc_indexes_store.checkpoint_db(&checkpoint_path_tmp.join(GRPC_INDEXES_DIR))?;
3858            }
3859        }
3860
3861        fs::rename(checkpoint_path_tmp, checkpoint_path)
3862            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3863        Ok(())
3864    }
3865
3866    /// Load the current epoch store. This can change during reconfiguration. To
3867    /// ensure that we never end up accessing different epoch stores in a
3868    /// single task, we need to make sure that this is called once per task.
3869    /// Each call needs to be carefully audited to ensure it is
3870    /// the case. This also means we should minimize the number of call-sites.
3871    /// Only call it when there is no way to obtain it from somewhere else.
3872    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3873        self.epoch_store.load()
3874    }
3875
3876    // Load the epoch store, should be used in tests only.
3877    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3878        self.load_epoch_store_one_call_per_task()
3879    }
3880
3881    pub fn clone_committee_for_testing(&self) -> Committee {
3882        Committee::clone(self.epoch_store_for_testing().committee())
3883    }
3884
3885    #[instrument(level = "trace", skip_all)]
3886    pub fn try_get_object(&self, object_id: &ObjectId) -> IotaResult<Option<Object>> {
3887        self.get_object_store()
3888            .try_get_object(object_id)
3889            .map_err(Into::into)
3890    }
3891
3892    /// Non-fallible version of `try_get_object`.
3893    pub fn get_object(&self, object_id: &ObjectId) -> Option<Object> {
3894        self.try_get_object(object_id)
3895            .expect("storage access failed")
3896    }
3897
3898    pub fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3899        Ok(self
3900            .try_get_object(&ObjectId::SYSTEM)?
3901            .expect("system package should always exist")
3902            .object_ref())
3903    }
3904
3905    // This function is only used for testing.
3906    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3907        self.get_object_cache_reader()
3908            .try_get_iota_system_state_object_unsafe()
3909    }
3910
3911    #[instrument(level = "trace", skip_all)]
3912    pub fn get_checkpoint_by_sequence_number(
3913        &self,
3914        sequence_number: CheckpointSequenceNumber,
3915    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3916        Ok(self
3917            .checkpoint_store
3918            .get_checkpoint_by_sequence_number(sequence_number)?)
3919    }
3920
3921    /// Wait for the given transactions to be included in a checkpoint.
3922    ///
3923    /// Returns a mapping from transaction digest to
3924    /// `(checkpoint_sequence_number, checkpoint_timestamp_ms)`.
3925    /// On timeout, returns partial results for any transactions that were
3926    /// already checkpointed.
3927    pub async fn wait_for_checkpoint_inclusion(
3928        &self,
3929        digests: &[TransactionDigest],
3930        timeout: Duration,
3931    ) -> IotaResult<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>> {
3932        let epoch_store = self.load_epoch_store_one_call_per_task();
3933
3934        // Local cache so multiple transactions in the same checkpoint only
3935        // trigger a single checkpoint summary lookup.
3936        let mut checkpoint_timestamp_cache = HashMap::<CheckpointSequenceNumber, u64>::new();
3937
3938        let results = epoch_store
3939            .wait_for_transactions_in_checkpoint_with_timeout(digests, timeout, |seq| {
3940                *checkpoint_timestamp_cache.entry(seq).or_insert_with(|| {
3941                    self.get_checkpoint_by_sequence_number(seq)
3942                        .ok()
3943                        .flatten()
3944                        .map(|c| c.timestamp_ms)
3945                        .unwrap_or(0)
3946                })
3947            })
3948            .await?;
3949
3950        Ok(digests
3951            .iter()
3952            .copied()
3953            .zip(results)
3954            .filter_map(|(digest, opt)| opt.map(|seq_and_ts| (digest, seq_and_ts)))
3955            .collect())
3956    }
3957
3958    #[instrument(level = "trace", skip_all)]
3959    pub fn get_transaction_checkpoint_for_tests(
3960        &self,
3961        digest: &TransactionDigest,
3962        epoch_store: &AuthorityPerEpochStore,
3963    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3964        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3965        let Some(checkpoint) = checkpoint else {
3966            return Ok(None);
3967        };
3968        let checkpoint = self
3969            .checkpoint_store
3970            .get_checkpoint_by_sequence_number(checkpoint)?;
3971        Ok(checkpoint)
3972    }
3973
3974    #[instrument(level = "trace", skip_all)]
3975    pub fn get_object_read(&self, object_id: &ObjectId) -> IotaResult<ObjectRead> {
3976        Ok(
3977            match self
3978                .get_object_cache_reader()
3979                .try_get_latest_object_or_tombstone(*object_id)?
3980            {
3981                Some((_, ObjectOrTombstone::Object(object))) => {
3982                    let layout = self.get_object_layout(&object)?;
3983                    ObjectRead::Exists(object.object_ref(), object, layout)
3984                }
3985                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3986                None => ObjectRead::NotExists(*object_id),
3987            },
3988        )
3989    }
3990
3991    /// Chain Identifier is the digest of the genesis checkpoint.
3992    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3993        self.chain_identifier
3994    }
3995
3996    #[instrument(level = "trace", skip_all)]
3997    pub fn get_move_object<T>(&self, object_id: &ObjectId) -> IotaResult<T>
3998    where
3999        T: DeserializeOwned,
4000    {
4001        let o = self.get_object_read(object_id)?.into_object()?;
4002        if let Some(move_object) = o.data.as_opt_struct() {
4003            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4004                IotaError::ObjectDeserialization {
4005                    error: format!("{e}"),
4006                }
4007            })?)
4008        } else {
4009            Err(IotaError::ObjectDeserialization {
4010                error: format!("Provided object : [{object_id}] is not a Move object."),
4011            })
4012        }
4013    }
4014
4015    /// This function aims to serve rpc reads on past objects and
4016    /// we don't expect it to be called for other purposes.
4017    /// Depending on the object pruning policies that will be enforced in the
4018    /// future there is no software-level guarantee/SLA to retrieve an object
4019    /// with an old version even if it exists/existed.
4020    #[instrument(level = "trace", skip_all)]
4021    pub fn get_past_object_read(
4022        &self,
4023        object_id: &ObjectId,
4024        version: SequenceNumber,
4025    ) -> IotaResult<PastObjectRead> {
4026        // Firstly we see if the object ever existed by getting its latest data
4027        let Some(obj_ref) = self
4028            .get_object_cache_reader()
4029            .try_get_latest_object_ref_or_tombstone(*object_id)?
4030        else {
4031            return Ok(PastObjectRead::ObjectNotExists(*object_id));
4032        };
4033
4034        if version > obj_ref.version {
4035            return Ok(PastObjectRead::VersionTooHigh {
4036                object_id: *object_id,
4037                asked_version: version,
4038                latest_version: obj_ref.version,
4039            });
4040        }
4041
4042        if version < obj_ref.version {
4043            // Read past objects
4044            return Ok(match self.read_object_at_version(object_id, version)? {
4045                Some((object, layout)) => {
4046                    let obj_ref = object.object_ref();
4047                    PastObjectRead::VersionFound(obj_ref, object, layout)
4048                }
4049
4050                None => PastObjectRead::VersionNotFound(*object_id, version),
4051            });
4052        }
4053
4054        if !obj_ref.digest.is_alive() {
4055            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4056        }
4057
4058        match self.read_object_at_version(object_id, obj_ref.version)? {
4059            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4060            None => {
4061                error!(
4062                    "Object with in parent_entry is missing from object store, datastore is \
4063                     inconsistent",
4064                );
4065                Err(UserInputError::ObjectNotFound {
4066                    object_id: *object_id,
4067                    version: Some(obj_ref.version),
4068                }
4069                .into())
4070            }
4071        }
4072    }
4073
4074    #[instrument(level = "trace", skip_all)]
4075    fn read_object_at_version(
4076        &self,
4077        object_id: &ObjectId,
4078        version: SequenceNumber,
4079    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
4080        let Some(object) = self
4081            .get_object_cache_reader()
4082            .try_get_object_by_key(object_id, version)?
4083        else {
4084            return Ok(None);
4085        };
4086
4087        let layout = self.get_object_layout(&object)?;
4088        Ok(Some((object, layout)))
4089    }
4090
4091    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
4092        let layout = object
4093            .data
4094            .as_opt_struct()
4095            .map(|object| {
4096                into_struct_layout(
4097                    self.load_epoch_store_one_call_per_task()
4098                        .executor()
4099                        // TODO(cache) - must read through cache
4100                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
4101                        .get_annotated_layout(object.struct_tag())?,
4102                )
4103            })
4104            .transpose()?;
4105        Ok(layout)
4106    }
4107
4108    fn get_owner_at_version(
4109        &self,
4110        object_id: &ObjectId,
4111        version: SequenceNumber,
4112    ) -> IotaResult<Owner> {
4113        self.get_object_store()
4114            .try_get_object_by_key(object_id, version)?
4115            .ok_or_else(|| {
4116                IotaError::from(UserInputError::ObjectNotFound {
4117                    object_id: *object_id,
4118                    version: Some(version),
4119                })
4120            })
4121            .map(|o| o.owner)
4122    }
4123
4124    #[instrument(level = "trace", skip_all)]
4125    pub fn get_owner_objects(
4126        &self,
4127        owner: Address,
4128        // If `Some`, the query will start from the next item after the specified cursor
4129        cursor: Option<ObjectId>,
4130        limit: usize,
4131        filter: Option<IotaObjectDataFilter>,
4132    ) -> IotaResult<Vec<ObjectInfo>> {
4133        if let Some(indexes) = &self.indexes {
4134            indexes.get_owner_objects(owner, cursor, limit, filter)
4135        } else {
4136            Err(IotaError::IndexStoreNotAvailable)
4137        }
4138    }
4139
4140    #[instrument(level = "trace", skip_all)]
4141    pub fn get_owned_coins_iterator_with_cursor(
4142        &self,
4143        owner: Address,
4144        // If `Some`, the query will start from the next item after the specified cursor
4145        cursor: (String, ObjectId),
4146        limit: usize,
4147        one_coin_type_only: bool,
4148    ) -> IotaResult<impl Iterator<Item = (String, ObjectId, CoinInfo)> + '_> {
4149        if let Some(indexes) = &self.indexes {
4150            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4151        } else {
4152            Err(IotaError::IndexStoreNotAvailable)
4153        }
4154    }
4155
4156    #[instrument(level = "trace", skip_all)]
4157    pub fn get_owner_objects_iterator(
4158        &self,
4159        owner: Address,
4160        // If `Some`, the query will start from the next item after the specified cursor
4161        cursor: Option<ObjectId>,
4162        filter: Option<IotaObjectDataFilter>,
4163    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
4164        let cursor_u = cursor.unwrap_or(ObjectId::ZERO);
4165        if let Some(indexes) = &self.indexes {
4166            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4167        } else {
4168            Err(IotaError::IndexStoreNotAvailable)
4169        }
4170    }
4171
4172    #[instrument(level = "trace", skip_all)]
4173    pub fn get_move_objects<T>(&self, owner: Address, tag: StructTag) -> IotaResult<Vec<T>>
4174    where
4175        T: DeserializeOwned,
4176    {
4177        let object_ids = self
4178            .get_owner_objects_iterator(owner, None, None)?
4179            .filter(|o| match &o.type_ {
4180                ObjectType::Struct(s) => *s == tag,
4181                ObjectType::Package => false,
4182            })
4183            .map(|info| ObjectKey(info.object_id, info.version))
4184            .collect::<Vec<_>>();
4185        let mut move_objects = vec![];
4186
4187        let objects = self
4188            .get_object_store()
4189            .try_multi_get_objects_by_key(&object_ids)?;
4190
4191        for (o, id) in objects.into_iter().zip(object_ids) {
4192            let object = o.ok_or_else(|| {
4193                IotaError::from(UserInputError::ObjectNotFound {
4194                    object_id: id.0,
4195                    version: Some(id.1),
4196                })
4197            })?;
4198            let move_object = object.data.as_opt_struct().ok_or_else(|| {
4199                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4200            })?;
4201            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4202                IotaError::ObjectDeserialization {
4203                    error: format!("{e}"),
4204                }
4205            })?);
4206        }
4207        Ok(move_objects)
4208    }
4209
4210    #[instrument(level = "trace", skip_all)]
4211    pub fn get_dynamic_fields(
4212        &self,
4213        owner: ObjectId,
4214        // If `Some`, the query will start from the next item after the specified cursor
4215        cursor: Option<ObjectId>,
4216        limit: usize,
4217    ) -> IotaResult<Vec<(ObjectId, DynamicFieldInfo)>> {
4218        Ok(self
4219            .get_dynamic_fields_iterator(owner, cursor)?
4220            .take(limit)
4221            .collect::<Result<Vec<_>, _>>()?)
4222    }
4223
4224    fn get_dynamic_fields_iterator(
4225        &self,
4226        owner: ObjectId,
4227        // If `Some`, the query will start from the next item after the specified cursor
4228        cursor: Option<ObjectId>,
4229    ) -> IotaResult<impl Iterator<Item = Result<(ObjectId, DynamicFieldInfo), TypedStoreError>> + '_>
4230    {
4231        if let Some(indexes) = &self.indexes {
4232            indexes.get_dynamic_fields_iterator(owner, cursor)
4233        } else {
4234            Err(IotaError::IndexStoreNotAvailable)
4235        }
4236    }
4237
4238    #[instrument(level = "trace", skip_all)]
4239    pub fn get_dynamic_field_object_id(
4240        &self,
4241        owner: ObjectId,
4242        name_type: TypeTag,
4243        name_bcs_bytes: &[u8],
4244    ) -> IotaResult<Option<ObjectId>> {
4245        if let Some(indexes) = &self.indexes {
4246            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4247        } else {
4248            Err(IotaError::IndexStoreNotAvailable)
4249        }
4250    }
4251
4252    #[instrument(level = "trace", skip_all)]
4253    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
4254        Ok(self.get_indexes()?.next_sequence_number())
4255    }
4256
4257    #[instrument(level = "trace", skip_all)]
4258    pub async fn get_executed_transaction_and_effects(
4259        &self,
4260        digest: TransactionDigest,
4261        kv_store: Arc<TransactionKeyValueStore>,
4262    ) -> IotaResult<(Transaction, TransactionEffects)> {
4263        let transaction = kv_store.get_tx(digest).await?;
4264        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4265        Ok((transaction, effects))
4266    }
4267
4268    #[instrument(level = "trace", skip_all)]
4269    pub fn multi_get_checkpoint_by_sequence_number(
4270        &self,
4271        sequence_numbers: &[CheckpointSequenceNumber],
4272    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
4273        Ok(self
4274            .checkpoint_store
4275            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4276    }
4277
4278    #[instrument(level = "trace", skip_all)]
4279    pub fn get_transaction_events(
4280        &self,
4281        digest: &TransactionDigest,
4282    ) -> IotaResult<TransactionEvents> {
4283        self.get_transaction_cache_reader()
4284            .try_get_events(digest)?
4285            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
4286    }
4287
4288    pub fn get_transaction_input_objects(
4289        &self,
4290        effects: &TransactionEffects,
4291    ) -> anyhow::Result<Vec<Object>> {
4292        iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4293            .map_err(Into::into)
4294    }
4295
4296    pub fn get_transaction_output_objects(
4297        &self,
4298        effects: &TransactionEffects,
4299    ) -> anyhow::Result<Vec<Object>> {
4300        iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4301            .map_err(Into::into)
4302    }
4303
4304    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
4305        match &self.indexes {
4306            Some(i) => Ok(i.clone()),
4307            None => Err(IotaError::UnsupportedFeature {
4308                error: "extended object indexing is not enabled on this server".into(),
4309            }),
4310        }
4311    }
4312
4313    pub async fn get_transactions_for_tests(
4314        self: &Arc<Self>,
4315        filter: Option<TransactionFilter>,
4316        cursor: Option<TransactionDigest>,
4317        limit: Option<usize>,
4318        reverse: bool,
4319    ) -> IotaResult<Vec<TransactionDigest>> {
4320        let metrics = KeyValueStoreMetrics::new_for_tests();
4321        let kv_store = Arc::new(TransactionKeyValueStore::new(
4322            "rocksdb",
4323            metrics,
4324            self.clone(),
4325        ));
4326        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4327            .await
4328    }
4329
4330    #[instrument(level = "trace", skip_all)]
4331    pub async fn get_transactions(
4332        &self,
4333        kv_store: &Arc<TransactionKeyValueStore>,
4334        filter: Option<TransactionFilter>,
4335        // If `Some`, the query will start from the next item after the specified cursor
4336        cursor: Option<TransactionDigest>,
4337        limit: Option<usize>,
4338        reverse: bool,
4339    ) -> IotaResult<Vec<TransactionDigest>> {
4340        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4341            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4342            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4343            if reverse {
4344                let iter = iter
4345                    .rev()
4346                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4347                    .skip(usize::from(cursor.is_some()));
4348                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4349            } else {
4350                let iter = iter
4351                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4352                    .skip(usize::from(cursor.is_some()));
4353                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4354            }
4355        }
4356        self.get_indexes()?
4357            .get_transactions(filter, cursor, limit, reverse)
4358    }
4359
4360    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4361        &self.checkpoint_store
4362    }
4363
4364    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4365        self.get_checkpoint_store()
4366            .get_highest_executed_checkpoint_seq_number()?
4367            .ok_or(IotaError::UserInput {
4368                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4369            })
4370    }
4371
4372    #[cfg(msim)]
4373    pub fn get_highest_pruned_checkpoint_for_testing(
4374        &self,
4375    ) -> IotaResult<CheckpointSequenceNumber> {
4376        self.database_for_testing()
4377            .perpetual_tables
4378            .get_highest_pruned_checkpoint()
4379            .map(|c| c.unwrap_or(0))
4380            .map_err(Into::into)
4381    }
4382
4383    #[instrument(level = "trace", skip_all)]
4384    pub fn get_checkpoint_summary_by_sequence_number(
4385        &self,
4386        sequence_number: CheckpointSequenceNumber,
4387    ) -> IotaResult<CheckpointSummary> {
4388        let verified_checkpoint = self
4389            .get_checkpoint_store()
4390            .get_checkpoint_by_sequence_number(sequence_number)?;
4391        match verified_checkpoint {
4392            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4393            None => Err(IotaError::UserInput {
4394                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4395            }),
4396        }
4397    }
4398
4399    #[instrument(level = "trace", skip_all)]
4400    pub fn get_checkpoint_summary_by_digest(
4401        &self,
4402        digest: CheckpointDigest,
4403    ) -> IotaResult<CheckpointSummary> {
4404        let verified_checkpoint = self
4405            .get_checkpoint_store()
4406            .get_checkpoint_by_digest(&digest)?;
4407        match verified_checkpoint {
4408            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4409            None => Err(IotaError::UserInput {
4410                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4411            }),
4412        }
4413    }
4414
4415    #[instrument(level = "trace", skip_all)]
4416    pub fn find_publish_txn_digest(&self, package_id: ObjectId) -> IotaResult<TransactionDigest> {
4417        if package_id.is_system_package() {
4418            return self.find_genesis_txn_digest();
4419        }
4420        Ok(self
4421            .get_object_read(&package_id)?
4422            .into_object()?
4423            .previous_transaction)
4424    }
4425
4426    #[instrument(level = "trace", skip_all)]
4427    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4428        let summary = self
4429            .get_verified_checkpoint_by_sequence_number(0)?
4430            .into_message();
4431        let content = self.get_checkpoint_contents(summary.content_digest)?;
4432        let genesis_transaction = content.enumerate_transactions(&summary).next();
4433        Ok(genesis_transaction
4434            .ok_or(IotaError::UserInput {
4435                error: UserInputError::GenesisTransactionNotFound,
4436            })?
4437            .1
4438            .transaction)
4439    }
4440
4441    #[instrument(level = "trace", skip_all)]
4442    pub fn get_verified_checkpoint_by_sequence_number(
4443        &self,
4444        sequence_number: CheckpointSequenceNumber,
4445    ) -> IotaResult<VerifiedCheckpoint> {
4446        let verified_checkpoint = self
4447            .get_checkpoint_store()
4448            .get_checkpoint_by_sequence_number(sequence_number)?;
4449        match verified_checkpoint {
4450            Some(verified_checkpoint) => Ok(verified_checkpoint),
4451            None => Err(IotaError::UserInput {
4452                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4453            }),
4454        }
4455    }
4456
4457    #[instrument(level = "trace", skip_all)]
4458    pub fn get_verified_checkpoint_summary_by_digest(
4459        &self,
4460        digest: CheckpointDigest,
4461    ) -> IotaResult<VerifiedCheckpoint> {
4462        let verified_checkpoint = self
4463            .get_checkpoint_store()
4464            .get_checkpoint_by_digest(&digest)?;
4465        match verified_checkpoint {
4466            Some(verified_checkpoint) => Ok(verified_checkpoint),
4467            None => Err(IotaError::UserInput {
4468                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4469            }),
4470        }
4471    }
4472
4473    #[instrument(level = "trace", skip_all)]
4474    pub fn get_checkpoint_contents(
4475        &self,
4476        digest: CheckpointContentsDigest,
4477    ) -> IotaResult<CheckpointContents> {
4478        self.get_checkpoint_store()
4479            .get_checkpoint_contents(&digest)?
4480            .ok_or(IotaError::UserInput {
4481                error: UserInputError::CheckpointContentsNotFound(digest),
4482            })
4483    }
4484
4485    #[instrument(level = "trace", skip_all)]
4486    pub fn get_checkpoint_contents_by_sequence_number(
4487        &self,
4488        sequence_number: CheckpointSequenceNumber,
4489    ) -> IotaResult<CheckpointContents> {
4490        let verified_checkpoint = self
4491            .get_checkpoint_store()
4492            .get_checkpoint_by_sequence_number(sequence_number)?;
4493        match verified_checkpoint {
4494            Some(verified_checkpoint) => {
4495                let content_digest = verified_checkpoint.into_inner().content_digest;
4496                self.get_checkpoint_contents(content_digest)
4497            }
4498            None => Err(IotaError::UserInput {
4499                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4500            }),
4501        }
4502    }
4503
4504    #[instrument(level = "trace", skip_all)]
4505    pub async fn query_events(
4506        &self,
4507        kv_store: &Arc<TransactionKeyValueStore>,
4508        query: EventFilter,
4509        // If `Some`, the query will start from the next item after the specified cursor
4510        cursor: Option<EventID>,
4511        limit: usize,
4512        descending: bool,
4513    ) -> IotaResult<Vec<IotaEvent>> {
4514        let index_store = self.get_indexes()?;
4515
4516        // Get the tx_num from tx_digest
4517        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4518            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4519                IotaError::TransactionNotFound {
4520                    digest: cursor.tx_digest,
4521                },
4522            )?;
4523            (tx_seq, cursor.event_seq as usize)
4524        } else if descending {
4525            (u64::MAX, usize::MAX)
4526        } else {
4527            (0, 0)
4528        };
4529
4530        let limit = limit + 1;
4531        let mut event_keys = match query {
4532            EventFilter::All(filters) => {
4533                if filters.is_empty() {
4534                    index_store.all_events(tx_num, event_num, limit, descending)?
4535                } else {
4536                    return Err(IotaError::UserInput {
4537                        error: UserInputError::Unsupported(
4538                            "This query type does not currently support filter combinations"
4539                                .to_string(),
4540                        ),
4541                    });
4542                }
4543            }
4544            EventFilter::Transaction(digest) => {
4545                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4546            }
4547            EventFilter::MoveModule { package, module } => {
4548                let module_id = ModuleId::new(
4549                    AccountAddress::new(package.into_bytes()),
4550                    move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4551                );
4552                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4553            }
4554            EventFilter::MoveEventType(struct_name) => index_store
4555                .events_by_move_event_struct_name(
4556                    &struct_name,
4557                    tx_num,
4558                    event_num,
4559                    limit,
4560                    descending,
4561                )?,
4562            EventFilter::Sender(sender) => {
4563                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4564            }
4565            EventFilter::TimeRange {
4566                start_time,
4567                end_time,
4568            } => index_store
4569                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4570            EventFilter::MoveEventModule { package, module } => index_store
4571                .events_by_move_event_module(
4572                    &ModuleId::new(
4573                        AccountAddress::new(package.into_bytes()),
4574                        move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4575                    ),
4576                    tx_num,
4577                    event_num,
4578                    limit,
4579                    descending,
4580                )?,
4581            // not using "_ =>" because we want to make sure we remember to add new variants here
4582            EventFilter::Package(_)
4583            | EventFilter::MoveEventField { .. }
4584            | EventFilter::Any(_)
4585            | EventFilter::And(_, _)
4586            | EventFilter::Or(_, _) => {
4587                return Err(IotaError::UserInput {
4588                    error: UserInputError::Unsupported(
4589                        "This query type is not supported by the full node.".to_string(),
4590                    ),
4591                });
4592            }
4593        };
4594
4595        // skip one event if exclusive cursor is provided,
4596        // otherwise truncate to the original limit.
4597        if cursor.is_some() {
4598            if !event_keys.is_empty() {
4599                event_keys.remove(0);
4600            }
4601        } else {
4602            event_keys.truncate(limit - 1);
4603        }
4604
4605        // get the unique set of digests from the event_keys
4606        let transaction_digests = event_keys
4607            .iter()
4608            .map(|(_, digest, _, _)| *digest)
4609            .collect::<HashSet<_>>()
4610            .into_iter()
4611            .collect::<Vec<_>>();
4612
4613        let events = kv_store
4614            .multi_get_events_by_tx_digests(&transaction_digests)
4615            .await?;
4616
4617        let events_map: HashMap<_, _> =
4618            transaction_digests.iter().zip(events.into_iter()).collect();
4619
4620        let stored_events = event_keys
4621            .into_iter()
4622            .map(|k| {
4623                (
4624                    k,
4625                    events_map
4626                        .get(&k.1)
4627                        .expect("fetched digest is missing")
4628                        .clone()
4629                        .and_then(|e| e.get(k.2).cloned()),
4630                )
4631            })
4632            .map(
4633                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4634                    event
4635                        .map(|e| (e, tx_digest, event_seq, timestamp))
4636                        .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4637                },
4638            )
4639            .collect::<Result<Vec<_>, _>>()?;
4640
4641        let epoch_store = self.load_epoch_store_one_call_per_task();
4642        let backing_store = self.get_backing_package_store().as_ref();
4643        let mut layout_resolver = epoch_store
4644            .executor()
4645            .type_layout_resolver(Box::new(backing_store));
4646        let mut events = vec![];
4647        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4648            events.push(IotaEvent::try_from(
4649                e.clone(),
4650                tx_digest,
4651                event_seq as u64,
4652                Some(timestamp),
4653                layout_resolver.get_annotated_layout(&e.type_)?,
4654            )?)
4655        }
4656        Ok(events)
4657    }
4658
4659    pub fn insert_genesis_object(&self, object: Object) {
4660        self.get_reconfig_api()
4661            .try_insert_genesis_object(object)
4662            .expect("Cannot insert genesis object")
4663    }
4664
4665    pub fn insert_genesis_objects(&self, objects: &[Object]) {
4666        for o in objects {
4667            self.insert_genesis_object(o.clone());
4668        }
4669    }
4670
4671    /// Make a status response for a transaction
4672    #[instrument(level = "trace", skip_all)]
4673    pub fn get_transaction_status(
4674        &self,
4675        transaction_digest: &TransactionDigest,
4676        epoch_store: &Arc<AuthorityPerEpochStore>,
4677    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4678        // TODO: In the case of read path, we should not have to re-sign the effects.
4679        if let Some(effects) =
4680            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4681        {
4682            if let Some(transaction) = self
4683                .get_transaction_cache_reader()
4684                .try_get_transaction_block(transaction_digest)?
4685            {
4686                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4687                let events = if effects.events_digest().is_some() {
4688                    self.get_transaction_events(effects.transaction_digest())?
4689                } else {
4690                    TransactionEvents::default()
4691                };
4692                return Ok(Some((
4693                    (*transaction).clone().into_message(),
4694                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4695                )));
4696            } else {
4697                // The read of effects and read of transaction are not atomic. It's possible
4698                // that we reverted the transaction (during epoch change) in
4699                // between the above two reads, and we end up having effects but
4700                // not transaction. In this case, we just fall through.
4701                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4702            }
4703        }
4704        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4705            self.metrics.tx_already_processed.inc();
4706            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4707            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4708        } else {
4709            Ok(None)
4710        }
4711    }
4712
4713    /// Get the signed effects of the given transaction. If the effects was
4714    /// signed in a previous epoch, re-sign it so that the caller is able to
4715    /// form a cert of the effects in the current epoch.
4716    #[instrument(level = "trace", skip_all)]
4717    pub fn get_signed_effects_and_maybe_resign(
4718        &self,
4719        transaction_digest: &TransactionDigest,
4720        epoch_store: &Arc<AuthorityPerEpochStore>,
4721    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4722        let effects = self
4723            .get_transaction_cache_reader()
4724            .try_get_executed_effects(transaction_digest)?;
4725        match effects {
4726            Some(effects) => {
4727                // If the transaction was executed in previous epochs, the validator will
4728                // re-sign the effects with new current epoch so that a client is always able to
4729                // obtain an effects certificate at the current epoch.
4730                //
4731                // Why is this necessary? Consider the following case:
4732                // - assume there are 4 validators
4733                // - Quorum driver gets 2 signed effects before reconfig halt
4734                // - The tx makes it into final checkpoint.
4735                // - 2 validators go away and are replaced in the new epoch.
4736                // - The new epoch begins.
4737                // - The quorum driver cannot complete the partial effects cert from the
4738                //   previous epoch, because it may not be able to reach either of the 2 former
4739                //   validators.
4740                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4741                //   the new epoch, the QD can make a new effects cert and return it to the
4742                //   client.
4743                //
4744                // This is a considered a short-term workaround. Eventually, Quorum Driver
4745                // should be able to return either an effects certificate, -or-
4746                // a proof of inclusion in a checkpoint. In the case above, the
4747                // Quorum Driver would return a proof of inclusion in the final
4748                // checkpoint, and this code would no longer be necessary.
4749                if effects.epoch() != epoch_store.epoch() {
4750                    debug!(
4751                        tx_digest=?transaction_digest,
4752                        effects_epoch=?effects.epoch(),
4753                        epoch=?epoch_store.epoch(),
4754                        "Re-signing the effects with the current epoch"
4755                    );
4756                }
4757                Ok(Some(self.sign_effects(effects, epoch_store)?))
4758            }
4759            None => Ok(None),
4760        }
4761    }
4762
4763    #[instrument(level = "trace", skip_all)]
4764    pub(crate) fn sign_effects(
4765        &self,
4766        effects: TransactionEffects,
4767        epoch_store: &Arc<AuthorityPerEpochStore>,
4768    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4769        let tx_digest = *effects.transaction_digest();
4770        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4771            Some(sig) => {
4772                debug_assert!(sig.epoch == epoch_store.epoch());
4773                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4774            }
4775            _ => {
4776                let sig = AuthoritySignInfo::new(
4777                    epoch_store.epoch(),
4778                    &effects,
4779                    Intent::iota_app(IntentScope::TransactionEffects),
4780                    self.name,
4781                    &*self.secret,
4782                );
4783
4784                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4785
4786                epoch_store.insert_effects_digest_and_signature(
4787                    &tx_digest,
4788                    effects.digest(),
4789                    &sig,
4790                )?;
4791
4792                effects
4793            }
4794        };
4795
4796        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4797            signed_effects,
4798        ))
4799    }
4800
4801    // Returns coin objects for indexing for fullnode if indexing is enabled.
4802    #[instrument(level = "trace", skip_all)]
4803    fn fullnode_only_get_tx_coins_for_indexing(
4804        &self,
4805        effects: &TransactionEffects,
4806        inner_temporary_store: &InnerTemporaryStore,
4807        epoch_store: &Arc<AuthorityPerEpochStore>,
4808    ) -> Option<TxCoins> {
4809        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4810            return None;
4811        }
4812        let written_coin_objects = inner_temporary_store
4813            .written
4814            .iter()
4815            .filter_map(|(k, v)| {
4816                if v.is_coin() {
4817                    Some((*k, v.clone()))
4818                } else {
4819                    None
4820                }
4821            })
4822            .collect();
4823        let mut input_coin_objects = inner_temporary_store
4824            .input_objects
4825            .iter()
4826            .filter_map(|(k, v)| {
4827                if v.is_coin() {
4828                    Some((*k, v.clone()))
4829                } else {
4830                    None
4831                }
4832            })
4833            .collect::<ObjectMap>();
4834
4835        // Check for receiving objects that were actually used and modified during
4836        // execution. Their updated version will already showup in
4837        // "written_coins" but their input isn't included in the set of input
4838        // objects in a inner_temporary_store.
4839        for (object_id, version) in effects.modified_at_versions() {
4840            if inner_temporary_store
4841                .loaded_runtime_objects
4842                .contains_key(&object_id)
4843            {
4844                if let Some(object) = self
4845                    .get_object_store()
4846                    .get_object_by_key(&object_id, version)
4847                {
4848                    if object.is_coin() {
4849                        input_coin_objects.insert(object_id, object);
4850                    }
4851                }
4852            }
4853        }
4854
4855        Some((input_coin_objects, written_coin_objects))
4856    }
4857
4858    /// Get the TransactionEnvelope that currently locks the given object, if
4859    /// any. Since object locks are only valid for one epoch, we also need
4860    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4861    /// no lock records for the given object can be found.
4862    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4863    /// object record is at a different version.
4864    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4865    /// certain transaction. Returns None if the a lock record is
4866    /// initialized for the given ObjectRef but not yet locked by any
4867    /// transaction,     or cannot find the transaction in transaction
4868    /// table, because of data race etc.
4869    #[instrument(level = "trace", skip_all)]
4870    pub fn get_transaction_lock(
4871        &self,
4872        object_ref: &ObjectRef,
4873        epoch_store: &AuthorityPerEpochStore,
4874    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4875        let lock_info = self
4876            .get_object_cache_reader()
4877            .try_get_lock(*object_ref, epoch_store)?;
4878        let lock_info = match lock_info {
4879            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4880                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4881                    provided_obj_ref: *object_ref,
4882                    current_version: locked_ref.version,
4883                }
4884                .into());
4885            }
4886            ObjectLockStatus::Initialized => {
4887                return Ok(None);
4888            }
4889            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4890        };
4891
4892        epoch_store.get_signed_transaction(&lock_info)
4893    }
4894
4895    pub fn try_get_objects(&self, objects: &[ObjectId]) -> IotaResult<Vec<Option<Object>>> {
4896        self.get_object_cache_reader().try_get_objects(objects)
4897    }
4898
4899    /// Non-fallible version of `try_get_objects`.
4900    pub fn get_objects(&self, objects: &[ObjectId]) -> Vec<Option<Object>> {
4901        self.try_get_objects(objects)
4902            .expect("storage access failed")
4903    }
4904
4905    pub fn try_get_object_or_tombstone(
4906        &self,
4907        object_id: ObjectId,
4908    ) -> IotaResult<Option<ObjectRef>> {
4909        self.get_object_cache_reader()
4910            .try_get_latest_object_ref_or_tombstone(object_id)
4911    }
4912
4913    /// Non-fallible version of `try_get_object_or_tombstone`.
4914    pub fn get_object_or_tombstone(&self, object_id: ObjectId) -> Option<ObjectRef> {
4915        self.try_get_object_or_tombstone(object_id)
4916            .expect("storage access failed")
4917    }
4918
4919    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4920    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4921    /// upgrade.
4922    ///
4923    /// This method can be used to dynamic adjust the amount of buffer. If set
4924    /// to 0, the upgrade will go through with only 2f+1 votes.
4925    ///
4926    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4927    /// should have the same value), or you risk halting the chain.
4928    pub fn set_override_protocol_upgrade_buffer_stake(
4929        &self,
4930        expected_epoch: EpochId,
4931        buffer_stake_bps: u64,
4932    ) -> IotaResult {
4933        let epoch_store = self.load_epoch_store_one_call_per_task();
4934        let actual_epoch = epoch_store.epoch();
4935        if actual_epoch != expected_epoch {
4936            return Err(IotaError::WrongEpoch {
4937                expected_epoch,
4938                actual_epoch,
4939            });
4940        }
4941
4942        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4943    }
4944
4945    pub fn clear_override_protocol_upgrade_buffer_stake(
4946        &self,
4947        expected_epoch: EpochId,
4948    ) -> IotaResult {
4949        let epoch_store = self.load_epoch_store_one_call_per_task();
4950        let actual_epoch = epoch_store.epoch();
4951        if actual_epoch != expected_epoch {
4952            return Err(IotaError::WrongEpoch {
4953                expected_epoch,
4954                actual_epoch,
4955            });
4956        }
4957
4958        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4959    }
4960
4961    /// Get the set of system packages that are compiled in to this build, if
4962    /// those packages are compatible with the current versions of those
4963    /// packages on-chain.
4964    pub async fn get_available_system_packages(
4965        &self,
4966        binary_config: &BinaryConfig,
4967    ) -> Vec<ObjectRef> {
4968        let mut results = vec![];
4969
4970        let system_packages = BuiltInFramework::iter_system_packages();
4971
4972        // Add extra framework packages during simtest
4973        #[cfg(msim)]
4974        let extra_packages = framework_injection::get_extra_packages(self.name);
4975        #[cfg(msim)]
4976        let system_packages = {
4977            let mut packages: Vec<_> = system_packages.collect();
4978            packages.extend(extra_packages.iter());
4979            packages
4980        };
4981
4982        for system_package in system_packages {
4983            let modules = system_package.modules().to_vec();
4984            // In simtests, we could override the current built-in framework packages.
4985            #[cfg(msim)]
4986            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4987                .unwrap_or(modules);
4988
4989            let Some(obj_ref) = iota_framework::compare_system_package(
4990                &self.get_object_store(),
4991                &system_package.id,
4992                &modules,
4993                system_package.dependencies.to_vec(),
4994                binary_config,
4995            )
4996            .await
4997            else {
4998                return vec![];
4999            };
5000            results.push(obj_ref);
5001        }
5002
5003        results
5004    }
5005
5006    /// Return the new versions, module bytes, and dependencies for the packages
5007    /// that have been committed to for a framework upgrade, in
5008    /// `system_packages`.  Loads the module contents from the binary, and
5009    /// performs the following checks:
5010    ///
5011    /// - Whether its contents matches what is on-chain already, in which case
5012    ///   no upgrade is required, and its contents are omitted from the output.
5013    /// - Whether the contents in the binary can form a package whose digest
5014    ///   matches the input, meaning the framework will be upgraded, and this
5015    ///   authority can satisfy that upgrade, in which case the contents are
5016    ///   included in the output.
5017    ///
5018    /// If the current version of the framework can't be loaded, the binary does
5019    /// not contain the bytes for that framework ID, or the resulting
5020    /// package fails the digest check, `None` is returned indicating that
5021    /// this authority cannot run the upgrade that the network voted on.
5022    async fn get_system_package_bytes(
5023        &self,
5024        system_packages: Vec<ObjectRef>,
5025        binary_config: &BinaryConfig,
5026    ) -> Option<Vec<SystemPackage>> {
5027        let ids: Vec<_> = system_packages
5028            .iter()
5029            .map(|object_ref| object_ref.object_id)
5030            .collect();
5031        let objects = self.get_objects(&ids);
5032
5033        let mut res = Vec::with_capacity(system_packages.len());
5034        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
5035            let prev_transaction = match object {
5036                Some(cur_object) if cur_object.object_ref() == system_package_ref => {
5037                    // Skip this one because it doesn't need to be upgraded.
5038                    info!(
5039                        "Framework {} does not need updating",
5040                        system_package_ref.object_id
5041                    );
5042                    continue;
5043                }
5044
5045                Some(cur_object) => cur_object.previous_transaction,
5046                None => TransactionDigest::GENESIS_MARKER,
5047            };
5048
5049            #[cfg(msim)]
5050            let FrameworkSystemPackage {
5051                id: _,
5052                bytes,
5053                dependencies,
5054            } = framework_injection::get_override_system_package(
5055                &system_package_ref.object_id,
5056                self.name,
5057            )
5058            .unwrap_or_else(|| {
5059                BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone()
5060            });
5061
5062            #[cfg(not(msim))]
5063            let FrameworkSystemPackage {
5064                id: _,
5065                bytes,
5066                dependencies,
5067            } = BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone();
5068
5069            let modules: Vec<_> = bytes
5070                .iter()
5071                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5072                .collect();
5073
5074            let new_object = Object::new_system_package(
5075                &modules,
5076                system_package_ref.version,
5077                dependencies.clone(),
5078                prev_transaction,
5079            );
5080
5081            let new_ref = new_object.object_ref();
5082            if new_ref != system_package_ref {
5083                error!(
5084                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5085                );
5086                return None;
5087            }
5088
5089            res.push(SystemPackage {
5090                version: system_package_ref.version,
5091                modules: bytes,
5092                dependencies,
5093            });
5094        }
5095
5096        Some(res)
5097    }
5098
5099    /// Returns the new protocol version and system packages that the network
5100    /// has voted to upgrade to. If the proposed protocol version is not
5101    /// supported, None is returned.
5102    fn is_protocol_version_supported_v1(
5103        proposed_protocol_version: ProtocolVersion,
5104        committee: &Committee,
5105        capabilities: Vec<AuthorityCapabilitiesV1>,
5106        mut buffer_stake_bps: u64,
5107    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
5108        if buffer_stake_bps > 10000 {
5109            warn!("clamping buffer_stake_bps to 10000");
5110            buffer_stake_bps = 10000;
5111        }
5112
5113        // For each validator, gather the protocol version and system packages that it
5114        // would like to upgrade to in the next epoch.
5115        let mut desired_upgrades: Vec<_> = capabilities
5116            .into_iter()
5117            .filter_map(|mut cap| {
5118                // A validator that lists no packages is voting against any change at all.
5119                if cap.available_system_packages.is_empty() {
5120                    return None;
5121                }
5122
5123                cap.available_system_packages.sort();
5124
5125                info!(
5126                    "validator {:?} supports {:?} with system packages: {:?}",
5127                    cap.authority.concise(),
5128                    cap.supported_protocol_versions,
5129                    cap.available_system_packages,
5130                );
5131
5132                // A validator that only supports the current protocol version is also voting
5133                // against any change, because framework upgrades always require a protocol
5134                // version bump.
5135                cap.supported_protocol_versions
5136                    .get_version_digest(proposed_protocol_version)
5137                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
5138            })
5139            .collect();
5140
5141        // There can only be one set of votes that have a majority, find one if it
5142        // exists.
5143        desired_upgrades.sort();
5144        desired_upgrades
5145            .into_iter()
5146            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5147            .into_iter()
5148            .find_map(|((digest, packages), group)| {
5149                // should have been filtered out earlier.
5150                assert!(!packages.is_empty());
5151
5152                let mut stake_aggregator: StakeAggregator<(), true> =
5153                    StakeAggregator::new(Arc::new(committee.clone()));
5154
5155                for (_, _, authority) in group {
5156                    stake_aggregator.insert_generic(authority, ());
5157                }
5158
5159                let total_votes = stake_aggregator.total_votes();
5160                let quorum_threshold = committee.quorum_threshold();
5161                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5162
5163                info!(
5164                    protocol_config_digest = ?digest,
5165                    ?total_votes,
5166                    ?quorum_threshold,
5167                    ?buffer_stake_bps,
5168                    ?effective_threshold,
5169                    ?proposed_protocol_version,
5170                    ?packages,
5171                    "support for upgrade"
5172                );
5173
5174                let has_support = total_votes >= effective_threshold;
5175                has_support.then_some((proposed_protocol_version, digest, packages))
5176            })
5177    }
5178
5179    /// Selects the highest supported protocol version and system packages that
5180    /// the network has voted to upgrade to. If no upgrade is supported,
5181    /// returns the current protocol version and system packages.
5182    fn choose_protocol_version_and_system_packages_v1(
5183        current_protocol_version: ProtocolVersion,
5184        current_protocol_digest: Digest,
5185        committee: &Committee,
5186        capabilities: Vec<AuthorityCapabilitiesV1>,
5187        buffer_stake_bps: u64,
5188    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
5189        let mut next_protocol_version = current_protocol_version;
5190        let mut system_packages = vec![];
5191        let mut protocol_version_digest = current_protocol_digest;
5192
5193        // Finds the highest supported protocol version and system packages by
5194        // incrementing the proposed protocol version by one until no further
5195        // upgrades are supported.
5196        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
5197            next_protocol_version + 1,
5198            committee,
5199            capabilities.clone(),
5200            buffer_stake_bps,
5201        ) {
5202            next_protocol_version = version;
5203            protocol_version_digest = digest;
5204            system_packages = packages;
5205        }
5206
5207        (
5208            next_protocol_version,
5209            protocol_version_digest,
5210            system_packages,
5211        )
5212    }
5213
5214    /// Returns the indices of validators that support the given protocol
5215    /// version and digest. This includes both committee and non-committee
5216    /// validators based on their capabilities. Uses active validators
5217    /// instead of committee indices.
5218    fn get_validators_supporting_protocol_version(
5219        target_protocol_version: ProtocolVersion,
5220        target_digest: Digest,
5221        active_validators: &[AuthorityPublicKey],
5222        capabilities: &[AuthorityCapabilitiesV1],
5223    ) -> Vec<u64> {
5224        let mut eligible_validators = Vec::new();
5225
5226        for capability in capabilities {
5227            // Check if this validator supports the target protocol version and digest
5228            if let Some(digest) = capability
5229                .supported_protocol_versions
5230                .get_version_digest(target_protocol_version)
5231            {
5232                if digest == target_digest {
5233                    // Find the validator's index in the active validators list
5234                    if let Some(index) = active_validators
5235                        .iter()
5236                        .position(|name| AuthorityName::from(name) == capability.authority)
5237                    {
5238                        eligible_validators.push(index as u64);
5239                    }
5240                }
5241            }
5242        }
5243
5244        // Sort indices for deterministic behavior
5245        eligible_validators.sort();
5246        eligible_validators
5247    }
5248
5249    /// Calculates the sum of weights for eligible validators that are part of
5250    /// the committee. Takes the indices from
5251    /// get_validators_supporting_protocol_version and maps them back
5252    /// to committee members to get their weights.
5253    fn calculate_eligible_validators_weight(
5254        eligible_validator_indices: &[u64],
5255        active_validators: &[AuthorityPublicKey],
5256        committee: &Committee,
5257    ) -> u64 {
5258        let mut total_weight = 0u64;
5259
5260        for &index in eligible_validator_indices {
5261            let authority_pubkey = &active_validators[index as usize];
5262            // Check if this validator is in the committee and get their weight
5263            if let Some((_, weight)) = committee
5264                .members()
5265                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
5266            {
5267                total_weight += weight;
5268            }
5269        }
5270
5271        total_weight
5272    }
5273
5274    /// Creates and execute the advance epoch transaction to effects without
5275    /// committing it to the database. The effects of the change epoch tx
5276    /// are only written to the database after a certified checkpoint has been
5277    /// formed and executed by CheckpointExecutor.
5278    ///
5279    /// When a framework upgraded has been decided on, but the validator does
5280    /// not have the new versions of the packages locally, the validator
5281    /// cannot form the ChangeEpochTx. In this case it returns Err,
5282    /// indicating that the checkpoint builder should give up trying to make the
5283    /// final checkpoint. As long as the network is able to create a certified
5284    /// checkpoint (which should be ensured by the capabilities vote), it
5285    /// will arrive via state sync and be executed by CheckpointExecutor.
5286    #[instrument(level = "error", skip_all)]
5287    pub async fn create_and_execute_advance_epoch_tx(
5288        &self,
5289        epoch_store: &Arc<AuthorityPerEpochStore>,
5290        gas_cost_summary: &GasCostSummary,
5291        checkpoint: CheckpointSequenceNumber,
5292        epoch_start_timestamp_ms: CheckpointTimestamp,
5293        scores: Vec<u64>,
5294    ) -> anyhow::Result<(
5295        IotaSystemState,
5296        Option<SystemEpochInfoEvent>,
5297        TransactionEffects,
5298    )> {
5299        let mut txns = Vec::new();
5300
5301        let next_epoch = epoch_store.epoch() + 1;
5302
5303        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5304        let authority_capabilities = epoch_store
5305            .get_capabilities_v1()
5306            .expect("read capabilities from db cannot fail");
5307        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
5308            Self::choose_protocol_version_and_system_packages_v1(
5309                epoch_store.protocol_version(),
5310                SupportedProtocolVersionsWithHashes::protocol_config_digest(
5311                    epoch_store.protocol_config(),
5312                ),
5313                epoch_store.committee(),
5314                authority_capabilities.clone(),
5315                buffer_stake_bps,
5316            );
5317
5318        // since system packages are created during the current epoch, they should abide
5319        // by the rules of the current epoch, including the current epoch's max
5320        // Move binary format version
5321        let config = epoch_store.protocol_config();
5322        let binary_config = to_binary_config(config);
5323        let Some(next_epoch_system_package_bytes) = self
5324            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5325            .await
5326        else {
5327            error!(
5328                "upgraded system packages {:?} are not locally available, cannot create \
5329                ChangeEpochTx. validator binary must be upgraded to the correct version!",
5330                next_epoch_system_packages
5331            );
5332            // the checkpoint builder will keep retrying forever when it hits this error.
5333            // Eventually, one of two things will happen:
5334            // - The operator will upgrade this binary to one that has the new packages
5335            //   locally, and this function will succeed.
5336            // - The final checkpoint will be certified by other validators, we will receive
5337            //   it via state sync, and execute it. This will upgrade the framework
5338            //   packages, reconfigure, and most likely shut down in the new epoch (this
5339            //   validator likely doesn't support the new protocol version, or else it
5340            //   should have had the packages.)
5341            bail!("missing system packages: cannot form ChangeEpochTx");
5342        };
5343
5344        // Use ChangeEpochV3 or ChangeEpochV4 when the feature flags are enabled and
5345        // ChangeEpochV2 requirements are met
5346        if config.select_committee_from_eligible_validators() {
5347            // Get the list of eligible validators that support the target protocol version
5348            let active_validators = epoch_store.epoch_start_state().get_active_validators();
5349
5350            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5351
5352            // Use validators supporting the target protocol version as eligible validators
5353            // in the next version if select_committee_supporting_next_epoch_version feature
5354            // flag is set to true.
5355            if config.select_committee_supporting_next_epoch_version() {
5356                eligible_active_validators = Self::get_validators_supporting_protocol_version(
5357                    next_epoch_protocol_version,
5358                    next_epoch_protocol_digest,
5359                    &active_validators,
5360                    &authority_capabilities,
5361                );
5362
5363                // Calculate the total weight of eligible validators in the committee
5364                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5365                    &eligible_active_validators,
5366                    &active_validators,
5367                    epoch_store.committee(),
5368                );
5369
5370                // Safety check: ensure eligible validators have enough stake
5371                // Use the same effective threshold calculation that was used to decide the
5372                // protocol version
5373                let committee = epoch_store.committee();
5374                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5375
5376                if eligible_validators_weight < effective_threshold {
5377                    error!(
5378                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5379                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5380                    );
5381                    // Pass all active validator indices as eligible validators
5382                    // to perform selection among all of them.
5383                    eligible_active_validators = (0..active_validators.len() as u64).collect();
5384                }
5385            }
5386
5387            // Use ChangeEpochV4 when the pass_validator_scores_to_advance_epoch feature
5388            // flag is enabled.
5389            if config.pass_validator_scores_to_advance_epoch() {
5390                txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5391                    next_epoch,
5392                    next_epoch_protocol_version.as_u64(),
5393                    gas_cost_summary.storage_cost,
5394                    gas_cost_summary.computation_cost,
5395                    gas_cost_summary.computation_cost_burned,
5396                    gas_cost_summary.storage_rebate,
5397                    gas_cost_summary.non_refundable_storage_fee,
5398                    epoch_start_timestamp_ms,
5399                    next_epoch_system_package_bytes,
5400                    eligible_active_validators,
5401                    scores,
5402                    config.adjust_rewards_by_score(),
5403                ));
5404            } else {
5405                txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5406                    next_epoch,
5407                    next_epoch_protocol_version.as_u64(),
5408                    gas_cost_summary.storage_cost,
5409                    gas_cost_summary.computation_cost,
5410                    gas_cost_summary.computation_cost_burned,
5411                    gas_cost_summary.storage_rebate,
5412                    gas_cost_summary.non_refundable_storage_fee,
5413                    epoch_start_timestamp_ms,
5414                    next_epoch_system_package_bytes,
5415                    eligible_active_validators,
5416                ));
5417            }
5418        } else if config.protocol_defined_base_fee()
5419            && config.max_committee_members_count_as_option().is_some()
5420        {
5421            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5422                next_epoch,
5423                next_epoch_protocol_version.as_u64(),
5424                gas_cost_summary.storage_cost,
5425                gas_cost_summary.computation_cost,
5426                gas_cost_summary.computation_cost_burned,
5427                gas_cost_summary.storage_rebate,
5428                gas_cost_summary.non_refundable_storage_fee,
5429                epoch_start_timestamp_ms,
5430                next_epoch_system_package_bytes,
5431            ));
5432        } else {
5433            txns.push(EndOfEpochTransactionKind::new_change_epoch(
5434                next_epoch,
5435                next_epoch_protocol_version.as_u64(),
5436                gas_cost_summary.storage_cost,
5437                gas_cost_summary.computation_cost,
5438                gas_cost_summary.storage_rebate,
5439                gas_cost_summary.non_refundable_storage_fee,
5440                epoch_start_timestamp_ms,
5441                next_epoch_system_package_bytes,
5442            ));
5443        }
5444
5445        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5446
5447        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5448            tx.clone(),
5449            epoch_store.epoch(),
5450            checkpoint,
5451        );
5452
5453        let tx_digest = executable_tx.digest();
5454
5455        info!(
5456            ?next_epoch,
5457            ?next_epoch_protocol_version,
5458            ?next_epoch_system_packages,
5459            computation_cost=?gas_cost_summary.computation_cost,
5460            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5461            storage_cost=?gas_cost_summary.storage_cost,
5462            storage_rebate=?gas_cost_summary.storage_rebate,
5463            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5464            ?tx_digest,
5465            "Creating advance epoch transaction"
5466        );
5467
5468        fail_point_async!("change_epoch_tx_delay");
5469        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5470
5471        // The tx could have been executed by state sync already - if so simply return
5472        // an error. The checkpoint builder will shortly be terminated by
5473        // reconfiguration anyway.
5474        if self
5475            .get_transaction_cache_reader()
5476            .try_is_tx_already_executed(tx_digest)?
5477        {
5478            warn!("change epoch tx has already been executed via state sync");
5479            bail!("change epoch tx has already been executed via state sync",);
5480        }
5481
5482        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5483
5484        // We must manually assign the shared object versions to the transaction before
5485        // executing it. This is because we do not sequence end-of-epoch
5486        // transactions through consensus.
5487        epoch_store.assign_shared_object_versions_idempotent(
5488            self.get_object_cache_reader().as_ref(),
5489            std::slice::from_ref(&executable_tx),
5490        )?;
5491
5492        let (input_objects, _) =
5493            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5494
5495        let (temporary_store, effects, _execution_error_opt) = self.execute_transaction(
5496            &execution_guard,
5497            &executable_tx,
5498            input_objects,
5499            vec![],
5500            epoch_store,
5501        )?;
5502        let system_obj = get_iota_system_state(&temporary_store.written)
5503            .expect("change epoch tx must write to system object");
5504        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
5505        let system_epoch_info_event = temporary_store
5506            .events
5507            .0
5508            .into_iter()
5509            .find(|event| event.is_system_epoch_info_event())
5510            .map(SystemEpochInfoEvent::from);
5511        // The system epoch info event can be `None` in case if the `advance_epoch`
5512        // Move function call failed and was executed in the safe mode.
5513        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5514
5515        // We must write tx and effects to the state sync tables so that state sync is
5516        // able to deliver to the transaction to CheckpointExecutor after it is
5517        // included in a certified checkpoint.
5518        self.get_state_sync_store()
5519            .try_insert_transaction_and_effects(&tx, &effects)
5520            .map_err(|err| {
5521                let err: anyhow::Error = err.into();
5522                err
5523            })?;
5524
5525        info!(
5526            "Effects summary of the change epoch transaction: {:?}",
5527            effects.summary_for_debug()
5528        );
5529        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5530        // The change epoch transaction cannot fail to execute.
5531        assert!(effects.status().is_success());
5532        Ok((system_obj, system_epoch_info_event, effects))
5533    }
5534
5535    /// This function is called at the very end of the epoch.
5536    /// This step is required before updating new epoch in the db and calling
5537    /// reopen_epoch_db.
5538    #[instrument(level = "error", skip_all)]
5539    async fn revert_uncommitted_epoch_transactions(
5540        &self,
5541        epoch_store: &AuthorityPerEpochStore,
5542    ) -> IotaResult {
5543        {
5544            let state = epoch_store.get_reconfig_state_write_lock_guard();
5545            if state.should_accept_user_certs() {
5546                // Need to change this so that consensus adapter do not accept certificates from
5547                // user. This can happen if our local validator did not initiate
5548                // epoch change locally, but 2f+1 nodes already concluded the
5549                // epoch.
5550                //
5551                // This lock is essentially a barrier (in the certificate mode only) for
5552                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5553                // after this block
5554                epoch_store.close_user_certs(state);
5555            }
5556            // lock is dropped here
5557        }
5558
5559        // In the P-COOL flow, the list of pending consensus certificates is
5560        // always empty, so the reverting below is only for the certificate mode.
5561        if !epoch_store.protocol_config().enable_pcool_flow() {
5562            let pending_certificates = epoch_store.pending_consensus_certificates();
5563            info!(
5564                "Reverting {} locally executed transactions that was not included in the epoch: \
5565                    {:?}",
5566                pending_certificates.len(),
5567                pending_certificates,
5568            );
5569            for digest in pending_certificates {
5570                if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5571                    info!(
5572                        "Not reverting pending consensus transaction {:?} - it was included in \
5573                            checkpoint",
5574                        digest
5575                    );
5576                    continue;
5577                }
5578                info!("Reverting {:?} at the end of epoch", digest);
5579                epoch_store.revert_executed_transaction(&digest)?;
5580                self.get_reconfig_api().try_revert_state_update(&digest)?;
5581            }
5582            info!("All uncommitted local transactions reverted");
5583        } else {
5584            info!("P-COOL mode: skipping revert of uncommitted epoch transactions");
5585        }
5586
5587        Ok(())
5588    }
5589
5590    #[instrument(level = "error", skip_all)]
5591    async fn reopen_epoch_db(
5592        &self,
5593        cur_epoch_store: &AuthorityPerEpochStore,
5594        new_committee: Committee,
5595        epoch_start_configuration: EpochStartConfiguration,
5596        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5597        epoch_last_checkpoint: CheckpointSequenceNumber,
5598    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5599        let new_epoch = new_committee.epoch;
5600        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5601        assert_eq!(
5602            epoch_start_configuration.epoch_start_state().epoch(),
5603            new_committee.epoch
5604        );
5605        fail_point!("before-open-new-epoch-store");
5606        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5607            self.name,
5608            new_committee,
5609            epoch_start_configuration,
5610            self.get_backing_package_store().clone(),
5611            expensive_safety_check_config,
5612            epoch_last_checkpoint,
5613        )?;
5614        self.epoch_store.store(new_epoch_store.clone());
5615        Ok(new_epoch_store)
5616    }
5617
5618    /// Checks if `authenticator` unlocks a valid Move account and returns the
5619    /// account-related `AuthenticatorFunctionRef` object.
5620    fn check_move_account(
5621        &self,
5622        auth_account_object_id: ObjectId,
5623        auth_account_object_seq_number: Option<SequenceNumber>,
5624        auth_account_object_digest: Option<ObjectDigest>,
5625        account_object: ObjectReadResult,
5626        signer: &Address,
5627    ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5628        let account_object = match account_object.object {
5629            ObjectReadResultKind::Object(object) => Ok(object),
5630            ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5631                Err(UserInputError::AccountObjectDeleted {
5632                    account_id: account_object.id(),
5633                    account_version: version,
5634                    transaction_digest: digest,
5635                })
5636            }
5637            // It is impossible to check the account object because it is used in a canceled
5638            // transaction and is not loaded.
5639            ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5640                Err(UserInputError::AccountObjectInCanceledTransaction {
5641                    account_id: account_object.id(),
5642                    account_version: version,
5643                })
5644            }
5645        }?;
5646
5647        let account_object_addr = Address::from(auth_account_object_id);
5648
5649        fp_ensure!(
5650            signer == &account_object_addr,
5651            UserInputError::IncorrectUserSignature {
5652                error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5653            }
5654            .into()
5655        );
5656
5657        fp_ensure!(
5658            account_object.is_shared() || account_object.is_immutable(),
5659            UserInputError::AccountObjectNotSupported {
5660                object_id: auth_account_object_id
5661            }
5662            .into()
5663        );
5664
5665        let auth_account_object_seq_number =
5666            if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5667                let account_object_version = account_object.version();
5668
5669                fp_ensure!(
5670                    account_object_version == auth_account_object_seq_number,
5671                    UserInputError::AccountObjectVersionMismatch {
5672                        object_id: auth_account_object_id,
5673                        expected_version: auth_account_object_seq_number,
5674                        actual_version: account_object_version,
5675                    }
5676                    .into()
5677                );
5678
5679                auth_account_object_seq_number
5680            } else {
5681                account_object.version()
5682            };
5683
5684        if let Some(auth_account_object_digest) = auth_account_object_digest {
5685            let expected_digest = account_object.digest();
5686            fp_ensure!(
5687                expected_digest == auth_account_object_digest,
5688                UserInputError::InvalidAccountObjectDigest {
5689                    object_id: auth_account_object_id,
5690                    expected_digest,
5691                    actual_digest: auth_account_object_digest,
5692                }
5693                .into()
5694            );
5695        }
5696
5697        let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5698            auth_account_object_id,
5699            &AuthenticatorFunctionRefV1Key::tag().into(),
5700            &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5701        )
5702        .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5703            account_object_id: auth_account_object_id,
5704        })?;
5705
5706        let authenticator_function_ref_field = self
5707            .get_object_cache_reader()
5708            .try_find_object_lt_or_eq_version(
5709                authenticator_function_ref_field_id,
5710                auth_account_object_seq_number,
5711            )?;
5712
5713        if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5714            let field_move_object = authenticator_function_ref_field_obj
5715                .data
5716                .as_opt_struct()
5717                .expect("dynamic field should never be a package object");
5718
5719            let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5720                field_move_object.to_rust().map_err(|_| {
5721                    UserInputError::InvalidAuthenticatorFunctionRefField {
5722                        account_object_id: auth_account_object_id,
5723                    }
5724                })?;
5725
5726            Ok(AuthenticatorFunctionRefForExecution::new_v1(
5727                field.value,
5728                authenticator_function_ref_field_obj.object_ref(),
5729                authenticator_function_ref_field_obj.owner,
5730                authenticator_function_ref_field_obj.storage_rebate,
5731                authenticator_function_ref_field_obj.previous_transaction,
5732            ))
5733        } else {
5734            Err(UserInputError::MoveAuthenticatorNotFound {
5735                authenticator_function_ref_id: authenticator_function_ref_field_id,
5736                account_object_id: auth_account_object_id,
5737                account_object_version: auth_account_object_seq_number,
5738            }
5739            .into())
5740        }
5741    }
5742
5743    #[allow(clippy::type_complexity)]
5744    fn read_objects_for_validation(
5745        &self,
5746        transaction: &VerifiedTransaction,
5747        epoch: u64,
5748    ) -> IotaResult<(
5749        InputObjects,
5750        ReceivingObjects,
5751        Vec<(InputObjects, ObjectReadResult)>,
5752    )> {
5753        let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5754            Some(transaction.digest()),
5755            &transaction.collect_all_input_object_kind_for_reading()?,
5756            &transaction.data().transaction_data().receiving_objects(),
5757            epoch,
5758        )?;
5759
5760        transaction
5761            .split_input_objects_into_groups_for_reading(input_objects)
5762            .map(|(tx_input_objects, per_authenticator_inputs)| {
5763                (
5764                    tx_input_objects,
5765                    tx_receiving_objects,
5766                    per_authenticator_inputs,
5767                )
5768            })
5769    }
5770
5771    #[allow(clippy::type_complexity)]
5772    fn check_transaction_inputs_for_validation(
5773        &self,
5774        protocol_config: &ProtocolConfig,
5775        reference_gas_price: u64,
5776        tx_data: &TransactionData,
5777        tx_input_objects: InputObjects,
5778        tx_receiving_objects: &ReceivingObjects,
5779        move_authenticators: &Vec<&MoveAuthenticator>,
5780        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
5781    ) -> IotaResult<(
5782        IotaGasStatus,
5783        CheckedInputObjects,
5784        Vec<(CheckedInputObjects, AuthenticatorFunctionRef)>,
5785    )> {
5786        let authenticator_gas_budget = if move_authenticators.is_empty() {
5787            0
5788        } else {
5789            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
5790            // not a part of the transaction data.
5791            protocol_config.max_auth_gas()
5792        };
5793
5794        debug_assert_eq!(
5795            move_authenticators.len(),
5796            per_authenticator_inputs.len(),
5797            "Move authenticators amount must match the number of authenticator inputs"
5798        );
5799
5800        let per_authenticator_checked_inputs = move_authenticators
5801            .iter()
5802            .zip(per_authenticator_inputs)
5803            .map(
5804                |(move_authenticator, (authenticator_input_objects, account_object))| {
5805                    // Check basic `object_to_authenticate` preconditions and get its components.
5806                    let (
5807                        auth_account_object_id,
5808                        auth_account_object_seq_number,
5809                        auth_account_object_digest,
5810                    ) = move_authenticator.object_to_authenticate_components()?;
5811
5812                    let signer = move_authenticator.address();
5813
5814                    // Make sure the signer is a Move account.
5815                    let AuthenticatorFunctionRefForExecution {
5816                        authenticator_function_ref,
5817                        ..
5818                    } = self.check_move_account(
5819                        auth_account_object_id,
5820                        auth_account_object_seq_number,
5821                        auth_account_object_digest,
5822                        account_object,
5823                        &signer,
5824                    )?;
5825
5826                    // Check the MoveAuthenticator input objects.
5827                    let authenticator_checked_input_objects =
5828                        iota_transaction_checks::check_move_authenticator_input_for_validation(
5829                            authenticator_input_objects,
5830                        )?;
5831
5832                    Ok((
5833                        authenticator_checked_input_objects,
5834                        authenticator_function_ref,
5835                    ))
5836                },
5837            )
5838            .collect::<IotaResult<Vec<_>>>()?;
5839
5840        // Check the transaction inputs.
5841        let (gas_status, tx_checked_input_objects) =
5842            iota_transaction_checks::check_transaction_input(
5843                protocol_config,
5844                reference_gas_price,
5845                tx_data,
5846                tx_input_objects,
5847                tx_receiving_objects,
5848                &self.metrics.bytecode_verifier_metrics,
5849                &self.config.verifier_signing_config,
5850                authenticator_gas_budget,
5851            )?;
5852
5853        Ok((
5854            gas_status,
5855            tx_checked_input_objects,
5856            per_authenticator_checked_inputs,
5857        ))
5858    }
5859
5860    #[cfg(test)]
5861    pub(crate) fn iter_live_object_set_for_testing(
5862        &self,
5863    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5864        self.get_global_state_hash_store()
5865            .iter_cached_live_object_set_for_testing()
5866    }
5867
5868    #[cfg(test)]
5869    pub(crate) fn shutdown_execution_for_test(&self) {
5870        self.tx_execution_shutdown
5871            .lock()
5872            .take()
5873            .unwrap()
5874            .send(())
5875            .unwrap();
5876    }
5877
5878    /// NOTE: this function is only to be used for fuzzing and testing. Never
5879    /// use in prod
5880    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5881        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5882        self.get_object_cache_reader()
5883            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5884        self.get_reconfig_api()
5885            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5886    }
5887}
5888
5889pub struct RandomnessRoundReceiver {
5890    authority_state: Arc<AuthorityState>,
5891    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5892}
5893
5894impl RandomnessRoundReceiver {
5895    pub fn spawn(
5896        authority_state: Arc<AuthorityState>,
5897        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5898    ) -> JoinHandle<()> {
5899        let rrr = RandomnessRoundReceiver {
5900            authority_state,
5901            randomness_rx,
5902        };
5903        spawn_monitored_task!(rrr.run())
5904    }
5905
5906    async fn run(mut self) {
5907        info!("RandomnessRoundReceiver event loop started");
5908
5909        loop {
5910            tokio::select! {
5911                maybe_recv = self.randomness_rx.recv() => {
5912                    if let Some((epoch, round, bytes)) = maybe_recv {
5913                        self.handle_new_randomness(epoch, round, bytes).await;
5914                    } else {
5915                        break;
5916                    }
5917                },
5918            }
5919        }
5920
5921        info!("RandomnessRoundReceiver event loop ended");
5922    }
5923
5924    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5925    async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5926        fail_point_async!("randomness-delay");
5927
5928        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5929        if epoch_store.epoch() != epoch {
5930            warn!(
5931                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5932                epoch_store.epoch()
5933            );
5934            return;
5935        }
5936        let transaction = VerifiedTransaction::new_randomness_state_update(
5937            epoch,
5938            round,
5939            bytes,
5940            epoch_store
5941                .epoch_start_config()
5942                .randomness_obj_initial_shared_version(),
5943        );
5944        debug!(
5945            "created randomness state update transaction with digest: {:?}",
5946            transaction.digest()
5947        );
5948        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5949        let digest = *transaction.digest();
5950
5951        // Randomness state updates contain the full bls signature for the random round,
5952        // which cannot necessarily be reconstructed again later. Therefore we must
5953        // immediately persist this transaction. If we crash before its outputs
5954        // are committed, this ensures we will be able to re-execute it.
5955        self.authority_state
5956            .get_cache_commit()
5957            .persist_transaction(&transaction);
5958
5959        // Send transaction to TransactionManager for execution.
5960        self.authority_state
5961            .transaction_manager()
5962            .enqueue(vec![transaction], &epoch_store);
5963
5964        let authority_state = self.authority_state.clone();
5965        spawn_monitored_task!(async move {
5966            // Wait for transaction execution in a separate task, to avoid deadlock in case
5967            // of out-of-order randomness generation. (Each
5968            // RandomnessStateUpdate depends on the output of the
5969            // RandomnessStateUpdate from the previous round.)
5970            //
5971            // We set a very long timeout so that in case this gets stuck for some reason,
5972            // the validator will eventually crash rather than continuing in a
5973            // zombie mode.
5974            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5975            let result = tokio::time::timeout(
5976                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5977                authority_state
5978                    .get_transaction_cache_reader()
5979                    .try_notify_read_executed_effects(&[digest]),
5980            )
5981            .await;
5982            let result = match result {
5983                Ok(result) => result,
5984                Err(_) => {
5985                    if cfg!(debug_assertions) {
5986                        // Crash on randomness update execution timeout in debug builds.
5987                        panic!(
5988                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5989                        );
5990                    }
5991                    warn!(
5992                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5993                    );
5994                    // Continue waiting as long as necessary in non-debug builds.
5995                    authority_state
5996                        .get_transaction_cache_reader()
5997                        .try_notify_read_executed_effects(&[digest])
5998                        .await
5999                }
6000            };
6001
6002            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
6003            let effects = effects.pop().expect("should return effects");
6004            if *effects.status() != ExecutionStatus::Success {
6005                fatal!(
6006                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6007                );
6008            }
6009            debug!(
6010                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6011            );
6012        });
6013    }
6014}
6015
6016#[async_trait]
6017impl TransactionKeyValueStoreTrait for AuthorityState {
6018    async fn multi_get(
6019        &self,
6020        transaction_keys: &[TransactionDigest],
6021        effects_keys: &[TransactionDigest],
6022    ) -> IotaResult<KVStoreTransactionData> {
6023        let txns = if !transaction_keys.is_empty() {
6024            self.get_transaction_cache_reader()
6025                .try_multi_get_transaction_blocks(transaction_keys)?
6026                .into_iter()
6027                .map(|t| t.map(|t| (*t).clone().into_inner()))
6028                .collect()
6029        } else {
6030            vec![]
6031        };
6032
6033        let fx = if !effects_keys.is_empty() {
6034            self.get_transaction_cache_reader()
6035                .try_multi_get_executed_effects(effects_keys)?
6036        } else {
6037            vec![]
6038        };
6039
6040        Ok((txns, fx))
6041    }
6042
6043    async fn multi_get_checkpoints(
6044        &self,
6045        checkpoint_summaries: &[CheckpointSequenceNumber],
6046        checkpoint_contents: &[CheckpointSequenceNumber],
6047        checkpoint_summaries_by_digest: &[CheckpointDigest],
6048    ) -> IotaResult<(
6049        Vec<Option<CertifiedCheckpointSummary>>,
6050        Vec<Option<CheckpointContents>>,
6051        Vec<Option<CertifiedCheckpointSummary>>,
6052    )> {
6053        // TODO: use multi-get methods if it ever becomes important (unlikely)
6054        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6055        let store = self.get_checkpoint_store();
6056        for seq in checkpoint_summaries {
6057            let checkpoint = store
6058                .get_checkpoint_by_sequence_number(*seq)?
6059                .map(|c| c.into_inner());
6060
6061            summaries.push(checkpoint);
6062        }
6063
6064        let mut contents = Vec::with_capacity(checkpoint_contents.len());
6065        for seq in checkpoint_contents {
6066            let checkpoint = store
6067                .get_checkpoint_by_sequence_number(*seq)?
6068                .and_then(|summary| {
6069                    store
6070                        .get_checkpoint_contents(&summary.content_digest)
6071                        .expect("db read cannot fail")
6072                });
6073            contents.push(checkpoint);
6074        }
6075
6076        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6077        for digest in checkpoint_summaries_by_digest {
6078            let checkpoint = store
6079                .get_checkpoint_by_digest(digest)?
6080                .map(|c| c.into_inner());
6081            summaries_by_digest.push(checkpoint);
6082        }
6083
6084        Ok((summaries, contents, summaries_by_digest))
6085    }
6086
6087    async fn get_transaction_perpetual_checkpoint(
6088        &self,
6089        digest: TransactionDigest,
6090    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
6091        self.get_checkpoint_cache()
6092            .try_get_transaction_perpetual_checkpoint(&digest)
6093            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
6094    }
6095
6096    async fn get_object(
6097        &self,
6098        object_id: ObjectId,
6099        version: VersionNumber,
6100    ) -> IotaResult<Option<Object>> {
6101        self.get_object_cache_reader()
6102            .try_get_object_by_key(&object_id, version)
6103    }
6104
6105    #[instrument(skip_all)]
6106    async fn multi_get_objects(
6107        &self,
6108        object_keys: &[ObjectKey],
6109    ) -> IotaResult<Vec<Option<Object>>> {
6110        Ok(self
6111            .get_object_cache_reader()
6112            .multi_get_objects_by_key(object_keys))
6113    }
6114
6115    async fn multi_get_transactions_perpetual_checkpoints(
6116        &self,
6117        digests: &[TransactionDigest],
6118    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
6119        let res = self
6120            .get_checkpoint_cache()
6121            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
6122
6123        Ok(res
6124            .into_iter()
6125            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6126            .collect())
6127    }
6128
6129    #[instrument(skip(self, digests), fields(digests = digests.iter().map(|d| d.to_string()).collect::<Vec<String>>().join(", ")))]
6130    async fn multi_get_events_by_tx_digests(
6131        &self,
6132        digests: &[TransactionDigest],
6133    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
6134        if digests.is_empty() {
6135            return Ok(vec![]);
6136        }
6137
6138        Ok(self
6139            .get_transaction_cache_reader()
6140            .multi_get_events(digests))
6141    }
6142}
6143
6144#[cfg(msim)]
6145pub mod framework_injection {
6146    use std::{
6147        cell::RefCell,
6148        collections::{BTreeMap, BTreeSet},
6149    };
6150
6151    use iota_framework::{BuiltInFramework, SystemPackage};
6152    use iota_sdk_types::ObjectId;
6153    use iota_types::base_types::AuthorityName;
6154    use move_binary_format::CompiledModule;
6155
6156    type FrameworkOverrideConfig = BTreeMap<ObjectId, PackageOverrideConfig>;
6157
6158    // Thread local cache because all simtests run in a single unique thread.
6159    thread_local! {
6160        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6161    }
6162
6163    type Framework = Vec<CompiledModule>;
6164
6165    pub type PackageUpgradeCallback =
6166        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6167
6168    enum PackageOverrideConfig {
6169        Global(Framework),
6170        PerValidator(PackageUpgradeCallback),
6171    }
6172
6173    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6174        modules
6175            .iter()
6176            .map(|m| {
6177                let mut buf = Vec::new();
6178                m.serialize_with_version(m.version, &mut buf).unwrap();
6179                buf
6180            })
6181            .collect()
6182    }
6183
6184    pub fn set_override(package_id: ObjectId, modules: Vec<CompiledModule>) {
6185        OVERRIDE.with(|bs| {
6186            bs.borrow_mut()
6187                .insert(package_id, PackageOverrideConfig::Global(modules))
6188        });
6189    }
6190
6191    pub fn set_override_cb(package_id: ObjectId, func: PackageUpgradeCallback) {
6192        OVERRIDE.with(|bs| {
6193            bs.borrow_mut()
6194                .insert(package_id, PackageOverrideConfig::PerValidator(func))
6195        });
6196    }
6197
6198    pub fn get_override_bytes(package_id: &ObjectId, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6199        OVERRIDE.with(|cfg| {
6200            cfg.borrow().get(package_id).and_then(|entry| match entry {
6201                PackageOverrideConfig::Global(framework) => {
6202                    Some(compiled_modules_to_bytes(framework))
6203                }
6204                PackageOverrideConfig::PerValidator(func) => {
6205                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6206                }
6207            })
6208        })
6209    }
6210
6211    pub fn get_override_modules(
6212        package_id: &ObjectId,
6213        name: AuthorityName,
6214    ) -> Option<Vec<CompiledModule>> {
6215        OVERRIDE.with(|cfg| {
6216            cfg.borrow().get(package_id).and_then(|entry| match entry {
6217                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6218                PackageOverrideConfig::PerValidator(func) => func(name),
6219            })
6220        })
6221    }
6222
6223    pub fn get_override_system_package(
6224        package_id: &ObjectId,
6225        name: AuthorityName,
6226    ) -> Option<SystemPackage> {
6227        let bytes = get_override_bytes(package_id, name)?;
6228        let dependencies = if package_id.is_system_package() {
6229            BuiltInFramework::get_package_by_id(package_id)
6230                .dependencies
6231                .to_vec()
6232        } else {
6233            // Assume that entirely new injected packages depend on all existing system
6234            // packages.
6235            BuiltInFramework::all_package_ids()
6236        };
6237        Some(SystemPackage {
6238            id: *package_id,
6239            bytes,
6240            dependencies,
6241        })
6242    }
6243
6244    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6245        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
6246        let extra: Vec<ObjectId> = OVERRIDE.with(|cfg| {
6247            cfg.borrow()
6248                .keys()
6249                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6250                .collect()
6251        });
6252
6253        extra
6254            .into_iter()
6255            .map(|package| SystemPackage {
6256                id: package,
6257                bytes: get_override_bytes(&package, name).unwrap(),
6258                dependencies: BuiltInFramework::all_package_ids(),
6259            })
6260            .collect()
6261    }
6262}
6263
6264#[derive(Debug, Serialize, Deserialize, Clone)]
6265pub struct ObjDumpFormat {
6266    pub id: ObjectId,
6267    pub version: VersionNumber,
6268    pub digest: ObjectDigest,
6269    pub object: Object,
6270}
6271
6272impl ObjDumpFormat {
6273    fn new(object: Object) -> Self {
6274        let oref = object.object_ref();
6275        Self {
6276            id: oref.object_id,
6277            version: oref.version,
6278            digest: oref.digest,
6279            object,
6280        }
6281    }
6282}
6283
6284#[derive(Debug, Serialize, Deserialize, Clone)]
6285pub struct NodeStateDump {
6286    pub tx_digest: TransactionDigest,
6287    pub sender_signed_data: SenderSignedData,
6288    pub executed_epoch: u64,
6289    pub reference_gas_price: u64,
6290    pub protocol_version: u64,
6291    pub epoch_start_timestamp_ms: u64,
6292    pub computed_effects: TransactionEffects,
6293    pub expected_effects_digest: TransactionEffectsDigest,
6294    pub relevant_system_packages: Vec<ObjDumpFormat>,
6295    pub shared_objects: Vec<ObjDumpFormat>,
6296    pub loaded_child_objects: Vec<ObjDumpFormat>,
6297    pub modified_at_versions: Vec<ObjDumpFormat>,
6298    pub runtime_reads: Vec<ObjDumpFormat>,
6299    pub input_objects: Vec<ObjDumpFormat>,
6300}
6301
6302impl NodeStateDump {
6303    pub fn new(
6304        tx_digest: &TransactionDigest,
6305        effects: &TransactionEffects,
6306        expected_effects_digest: TransactionEffectsDigest,
6307        object_store: &dyn ObjectStore,
6308        epoch_store: &Arc<AuthorityPerEpochStore>,
6309        inner_temporary_store: &InnerTemporaryStore,
6310        transaction: &VerifiedExecutableTransaction,
6311    ) -> IotaResult<Self> {
6312        // Epoch info
6313        let executed_epoch = epoch_store.epoch();
6314        let reference_gas_price = epoch_store.reference_gas_price();
6315        let epoch_start_config = epoch_store.epoch_start_config();
6316        let protocol_version = epoch_store.protocol_version().as_u64();
6317        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6318
6319        // Record all system packages at this version
6320        let mut relevant_system_packages = Vec::new();
6321        for sys_package_id in BuiltInFramework::all_package_ids() {
6322            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
6323                relevant_system_packages.push(ObjDumpFormat::new(w))
6324            }
6325        }
6326
6327        // Record all the shared objects
6328        let mut shared_objects = Vec::new();
6329        for kind in effects.input_shared_objects() {
6330            match kind {
6331                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
6332                    if let Some(w) =
6333                        object_store.try_get_object_by_key(&obj_ref.object_id, obj_ref.version)?
6334                    {
6335                        shared_objects.push(ObjDumpFormat::new(w))
6336                    }
6337                }
6338                InputSharedObject::ReadDeleted(..)
6339                | InputSharedObject::MutateDeleted(..)
6340                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
6341                                                           * objects. */
6342            }
6343        }
6344
6345        // Record all loaded child objects
6346        // Child objects which are read but not mutated are not tracked anywhere else
6347        let mut loaded_child_objects = Vec::new();
6348        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6349            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6350                loaded_child_objects.push(ObjDumpFormat::new(w))
6351            }
6352        }
6353
6354        // Record all modified objects
6355        let mut modified_at_versions = Vec::new();
6356        for (id, ver) in effects.modified_at_versions() {
6357            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6358                modified_at_versions.push(ObjDumpFormat::new(w))
6359            }
6360        }
6361
6362        // Packages read at runtime, which were not previously loaded into the temoorary
6363        // store Some packages may be fetched at runtime and wont show up in
6364        // input objects
6365        let mut runtime_reads = Vec::new();
6366        for obj in inner_temporary_store
6367            .runtime_packages_loaded_from_db
6368            .values()
6369        {
6370            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6371        }
6372
6373        // All other input objects should already be in `inner_temporary_store.objects`
6374
6375        Ok(Self {
6376            tx_digest: *tx_digest,
6377            executed_epoch,
6378            reference_gas_price,
6379            epoch_start_timestamp_ms,
6380            protocol_version,
6381            relevant_system_packages,
6382            shared_objects,
6383            loaded_child_objects,
6384            modified_at_versions,
6385            runtime_reads,
6386            sender_signed_data: transaction.clone().into_message(),
6387            input_objects: inner_temporary_store
6388                .input_objects
6389                .values()
6390                .map(|o| ObjDumpFormat::new(o.clone()))
6391                .collect(),
6392            computed_effects: effects.clone(),
6393            expected_effects_digest,
6394        })
6395    }
6396
6397    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6398        let mut objects = Vec::new();
6399        objects.extend(self.relevant_system_packages.clone());
6400        objects.extend(self.shared_objects.clone());
6401        objects.extend(self.loaded_child_objects.clone());
6402        objects.extend(self.modified_at_versions.clone());
6403        objects.extend(self.runtime_reads.clone());
6404        objects.extend(self.input_objects.clone());
6405        objects
6406    }
6407
6408    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6409        let file_name = format!(
6410            "{}_{}_NODE_DUMP.json",
6411            self.tx_digest,
6412            AuthorityState::unixtime_now_ms()
6413        );
6414        let mut path = path.to_path_buf();
6415        path.push(&file_name);
6416        let mut file = File::create(path.clone())?;
6417        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6418        Ok(path)
6419    }
6420
6421    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6422        let file = File::open(path)?;
6423        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6424    }
6425}
6426
6427/// Returns the [`MoveAuthenticator`]s to execute during the pre-consensus
6428/// phase.
6429///
6430/// When `pre_consensus_sponsor_only_move_authentication` is enabled:
6431/// - For sponsored transactions: only the sponsor's [`MoveAuthenticator`] is
6432///   returned (empty if the sponsor does not use one).
6433/// - For non-sponsored transactions: all [`MoveAuthenticator`]s are returned
6434///   (currently only the sender's).
6435///
6436/// When the flag is not set, all [`MoveAuthenticator`]s are returned for
6437/// compatibility.
6438fn pre_consensus_move_authenticators<'a>(
6439    tx: &'a VerifiedTransaction,
6440    protocol_config: &ProtocolConfig,
6441) -> Vec<&'a MoveAuthenticator> {
6442    if protocol_config.pre_consensus_sponsor_only_move_authentication() {
6443        if tx.transaction_data().is_sponsored_tx() {
6444            if let Some(sponsor_move_authenticator) = tx.sponsor_move_authenticator() {
6445                vec![sponsor_move_authenticator]
6446            } else {
6447                vec![]
6448            }
6449        } else {
6450            tx.move_authenticators()
6451        }
6452    } else {
6453        tx.move_authenticators()
6454    }
6455}