iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_storage::{
48    key_value_store::{
49        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
50    },
51    key_value_store_metrics::KeyValueStoreMetrics,
52};
53#[cfg(msim)]
54use iota_types::committee::CommitteeTrait;
55use iota_types::{
56    IOTA_SYSTEM_ADDRESS, TypeTag,
57    authenticator_state::get_authenticator_state,
58    base_types::*,
59    committee::{Committee, EpochId, ProtocolVersion},
60    crypto::{
61        AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
62        default_hash,
63    },
64    deny_list_v1::check_coin_deny_list_v1_during_signing,
65    digests::{ChainIdentifier, Digest, TransactionEventsDigest},
66    dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
67    effects::{
68        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
69        TransactionEvents, VerifiedSignedTransactionEffects,
70    },
71    error::{ExecutionError, IotaError, IotaResult, UserInputError},
72    event::{Event, EventID, SystemEpochInfoEvent},
73    executable_transaction::VerifiedExecutableTransaction,
74    execution_config_utils::to_binary_config,
75    execution_status::ExecutionStatus,
76    gas::{GasCostSummary, IotaGasStatus},
77    gas_coin::NANOS_PER_IOTA,
78    inner_temporary_store::{
79        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
80        WrittenObjects,
81    },
82    iota_system_state::{
83        IotaSystemState, IotaSystemStateTrait,
84        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
85    },
86    is_system_package,
87    layout_resolver::{LayoutResolver, into_struct_layout},
88    message_envelope::Message,
89    messages_checkpoint::{
90        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
91        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
92        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
93        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
94    },
95    messages_consensus::AuthorityCapabilitiesV1,
96    messages_grpc::{
97        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
98        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
99        TransactionStatus,
100    },
101    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
102    object::{
103        MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
104        bounded_visitor::BoundedVisitor,
105    },
106    storage::{
107        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
108    },
109    supported_protocol_versions::{
110        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
111    },
112    transaction::*,
113    transaction_executor::SimulateTransactionResult,
114};
115use itertools::Itertools;
116use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
117use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
118use parking_lot::Mutex;
119use prometheus::{
120    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
121    register_histogram_vec_with_registry, register_histogram_with_registry,
122    register_int_counter_vec_with_registry, register_int_counter_with_registry,
123    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
124};
125use serde::{Deserialize, Serialize, de::DeserializeOwned};
126use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
127use tap::TapFallible;
128use tokio::{
129    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
130    task::JoinHandle,
131};
132use tracing::{debug, error, info, instrument, trace, warn};
133use typed_store::TypedStoreError;
134
135use self::{
136    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
137};
138#[cfg(msim)]
139pub use crate::checkpoints::checkpoint_executor::utils::{
140    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
141};
142use crate::{
143    authority::{
144        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
145        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
146        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
147        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
148        authority_store_tables::AuthorityPrunerTables,
149        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
150    },
151    authority_client::NetworkAuthorityClient,
152    checkpoints::CheckpointStore,
153    congestion_tracker::CongestionTracker,
154    consensus_adapter::ConsensusAdapter,
155    epoch::committee_store::CommitteeStore,
156    execution_cache::{
157        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
158        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
159        TransactionCacheRead,
160    },
161    execution_driver::execution_process,
162    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
163    metrics::{LatencyObserver, RateTracker},
164    module_cache_metrics::ResolverMetrics,
165    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
166    rest_index::RestIndexStore,
167    stake_aggregator::StakeAggregator,
168    state_accumulator::{AccumulatorStore, StateAccumulator},
169    subscription_handler::SubscriptionHandler,
170    transaction_input_loader::TransactionInputLoader,
171    transaction_manager::TransactionManager,
172    transaction_outputs::TransactionOutputs,
173    validator_tx_finalizer::ValidatorTxFinalizer,
174    verify_indexes::verify_indexes,
175};
176
177#[cfg(test)]
178#[path = "unit_tests/authority_tests.rs"]
179pub mod authority_tests;
180
181#[cfg(test)]
182#[path = "unit_tests/transaction_tests.rs"]
183pub mod transaction_tests;
184
185#[cfg(test)]
186#[path = "unit_tests/batch_transaction_tests.rs"]
187mod batch_transaction_tests;
188
189#[cfg(test)]
190#[path = "unit_tests/move_integration_tests.rs"]
191pub mod move_integration_tests;
192
193#[cfg(test)]
194#[path = "unit_tests/gas_tests.rs"]
195mod gas_tests;
196
197#[cfg(test)]
198#[path = "unit_tests/batch_verification_tests.rs"]
199mod batch_verification_tests;
200
201#[cfg(test)]
202#[path = "unit_tests/coin_deny_list_tests.rs"]
203mod coin_deny_list_tests;
204
205#[cfg(test)]
206#[path = "unit_tests/auth_unit_test_utils.rs"]
207pub mod auth_unit_test_utils;
208
209pub mod authority_test_utils;
210
211pub mod authority_per_epoch_store;
212pub mod authority_per_epoch_store_pruner;
213
214pub mod authority_store_pruner;
215pub mod authority_store_tables;
216pub mod authority_store_types;
217pub mod epoch_start_configuration;
218pub mod shared_object_congestion_tracker;
219pub mod shared_object_version_manager;
220pub mod suggested_gas_price_calculator;
221pub mod test_authority_builder;
222pub mod transaction_deferral;
223
224pub(crate) mod authority_store;
225pub mod backpressure;
226
227/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
228pub struct AuthorityMetrics {
229    tx_orders: IntCounter,
230    total_certs: IntCounter,
231    total_cert_attempts: IntCounter,
232    total_effects: IntCounter,
233    pub shared_obj_tx: IntCounter,
234    sponsored_tx: IntCounter,
235    tx_already_processed: IntCounter,
236    num_input_objs: Histogram,
237    num_shared_objects: Histogram,
238    batch_size: Histogram,
239
240    authority_state_handle_transaction_latency: Histogram,
241
242    execute_certificate_latency_single_writer: Histogram,
243    execute_certificate_latency_shared_object: Histogram,
244
245    internal_execution_latency: Histogram,
246    execution_load_input_objects_latency: Histogram,
247    prepare_certificate_latency: Histogram,
248    commit_certificate_latency: Histogram,
249    db_checkpoint_latency: Histogram,
250
251    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
252    pub(crate) transaction_manager_num_missing_objects: IntGauge,
253    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
254    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
255    pub(crate) transaction_manager_num_ready: IntGauge,
256    pub(crate) transaction_manager_object_cache_size: IntGauge,
257    pub(crate) transaction_manager_object_cache_hits: IntCounter,
258    pub(crate) transaction_manager_object_cache_misses: IntCounter,
259    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
260    pub(crate) transaction_manager_package_cache_size: IntGauge,
261    pub(crate) transaction_manager_package_cache_hits: IntCounter,
262    pub(crate) transaction_manager_package_cache_misses: IntCounter,
263    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
264    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
265
266    pub(crate) execution_driver_executed_transactions: IntCounter,
267    pub(crate) execution_driver_dispatch_queue: IntGauge,
268    pub(crate) execution_queueing_delay_s: Histogram,
269    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
270    pub(crate) execution_gas_latency_ratio: Histogram,
271
272    pub(crate) skipped_consensus_txns: IntCounter,
273    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
274
275    pub(crate) authority_overload_status: IntGauge,
276    pub(crate) authority_load_shedding_percentage: IntGauge,
277
278    pub(crate) transaction_overload_sources: IntCounterVec,
279
280    /// Post processing metrics
281    post_processing_total_events_emitted: IntCounter,
282    post_processing_total_tx_indexed: IntCounter,
283    post_processing_total_tx_had_event_processed: IntCounter,
284    post_processing_total_failures: IntCounter,
285
286    /// Consensus handler metrics
287    pub consensus_handler_processed: IntCounterVec,
288    pub consensus_handler_transaction_sizes: HistogramVec,
289    pub consensus_handler_num_low_scoring_authorities: IntGauge,
290    pub consensus_handler_scores: IntGaugeVec,
291    pub consensus_handler_deferred_transactions: IntCounter,
292    pub consensus_handler_congested_transactions: IntCounter,
293    pub consensus_handler_cancelled_transactions: IntCounter,
294    pub consensus_handler_max_object_costs: IntGaugeVec,
295    pub consensus_committed_subdags: IntCounterVec,
296    pub consensus_committed_messages: IntGaugeVec,
297    pub consensus_committed_user_transactions: IntGaugeVec,
298    pub consensus_calculated_throughput: IntGauge,
299    pub consensus_calculated_throughput_profile: IntGauge,
300
301    pub limits_metrics: Arc<LimitsMetrics>,
302
303    /// bytecode verifier metrics for tracking timeouts
304    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
305
306    /// Count of zklogin signatures
307    pub zklogin_sig_count: IntCounter,
308    /// Count of multisig signatures
309    pub multisig_sig_count: IntCounter,
310
311    // Tracks recent average txn queueing delay between when it is ready for execution
312    // until it starts executing.
313    pub execution_queueing_latency: LatencyObserver,
314
315    // Tracks the rate of transactions become ready for execution in transaction manager.
316    // The need for the Mutex is that the tracker is updated in transaction manager and read
317    // in the overload_monitor. There should be low mutex contention because
318    // transaction manager is single threaded and the read rate in overload_monitor is
319    // low. In the case where transaction manager becomes multi-threaded, we can
320    // create one rate tracker per thread.
321    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
322
323    // Tracks the rate of transactions starts execution in execution driver.
324    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
325    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
326}
327
328// Override default Prom buckets for positive numbers in 0-10M range
329const POSITIVE_INT_BUCKETS: &[f64] = &[
330    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
331    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
332    7000000., 10000000.,
333];
334
335const LATENCY_SEC_BUCKETS: &[f64] = &[
336    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
337    10., 20., 30., 60., 90.,
338];
339
340// Buckets for low latency samples. Starts from 10us.
341const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
342    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
343    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
344];
345
346const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
347    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
348    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
349];
350
351/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
352pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
353
354impl AuthorityMetrics {
355    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
356        let execute_certificate_latency = register_histogram_vec_with_registry!(
357            "authority_state_execute_certificate_latency",
358            "Latency of executing certificates, including waiting for inputs",
359            &["tx_type"],
360            LATENCY_SEC_BUCKETS.to_vec(),
361            registry,
362        )
363        .unwrap();
364
365        let execute_certificate_latency_single_writer =
366            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
367        let execute_certificate_latency_shared_object =
368            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
369
370        Self {
371            tx_orders: register_int_counter_with_registry!(
372                "total_transaction_orders",
373                "Total number of transaction orders",
374                registry,
375            )
376            .unwrap(),
377            total_certs: register_int_counter_with_registry!(
378                "total_transaction_certificates",
379                "Total number of transaction certificates handled",
380                registry,
381            )
382            .unwrap(),
383            total_cert_attempts: register_int_counter_with_registry!(
384                "total_handle_certificate_attempts",
385                "Number of calls to handle_certificate",
386                registry,
387            )
388            .unwrap(),
389            // total_effects == total transactions finished
390            total_effects: register_int_counter_with_registry!(
391                "total_transaction_effects",
392                "Total number of transaction effects produced",
393                registry,
394            )
395            .unwrap(),
396
397            shared_obj_tx: register_int_counter_with_registry!(
398                "num_shared_obj_tx",
399                "Number of transactions involving shared objects",
400                registry,
401            )
402            .unwrap(),
403
404            sponsored_tx: register_int_counter_with_registry!(
405                "num_sponsored_tx",
406                "Number of sponsored transactions",
407                registry,
408            )
409            .unwrap(),
410
411            tx_already_processed: register_int_counter_with_registry!(
412                "num_tx_already_processed",
413                "Number of transaction orders already processed previously",
414                registry,
415            )
416            .unwrap(),
417            num_input_objs: register_histogram_with_registry!(
418                "num_input_objects",
419                "Distribution of number of input TX objects per TX",
420                POSITIVE_INT_BUCKETS.to_vec(),
421                registry,
422            )
423            .unwrap(),
424            num_shared_objects: register_histogram_with_registry!(
425                "num_shared_objects",
426                "Number of shared input objects per TX",
427                POSITIVE_INT_BUCKETS.to_vec(),
428                registry,
429            )
430            .unwrap(),
431            batch_size: register_histogram_with_registry!(
432                "batch_size",
433                "Distribution of size of transaction batch",
434                POSITIVE_INT_BUCKETS.to_vec(),
435                registry,
436            )
437            .unwrap(),
438            authority_state_handle_transaction_latency: register_histogram_with_registry!(
439                "authority_state_handle_transaction_latency",
440                "Latency of handling transactions",
441                LATENCY_SEC_BUCKETS.to_vec(),
442                registry,
443            )
444            .unwrap(),
445            execute_certificate_latency_single_writer,
446            execute_certificate_latency_shared_object,
447            internal_execution_latency: register_histogram_with_registry!(
448                "authority_state_internal_execution_latency",
449                "Latency of actual certificate executions",
450                LATENCY_SEC_BUCKETS.to_vec(),
451                registry,
452            )
453            .unwrap(),
454            execution_load_input_objects_latency: register_histogram_with_registry!(
455                "authority_state_execution_load_input_objects_latency",
456                "Latency of loading input objects for execution",
457                LOW_LATENCY_SEC_BUCKETS.to_vec(),
458                registry,
459            )
460            .unwrap(),
461            prepare_certificate_latency: register_histogram_with_registry!(
462                "authority_state_prepare_certificate_latency",
463                "Latency of executing certificates, before committing the results",
464                LATENCY_SEC_BUCKETS.to_vec(),
465                registry,
466            )
467            .unwrap(),
468            commit_certificate_latency: register_histogram_with_registry!(
469                "authority_state_commit_certificate_latency",
470                "Latency of committing certificate execution results",
471                LATENCY_SEC_BUCKETS.to_vec(),
472                registry,
473            )
474            .unwrap(),
475            db_checkpoint_latency: register_histogram_with_registry!(
476                "db_checkpoint_latency",
477                "Latency of checkpointing dbs",
478                LATENCY_SEC_BUCKETS.to_vec(),
479                registry,
480            ).unwrap(),
481            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
482                "transaction_manager_num_enqueued_certificates",
483                "Current number of certificates enqueued to TransactionManager",
484                &["result"],
485                registry,
486            )
487            .unwrap(),
488            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
489                "transaction_manager_num_missing_objects",
490                "Current number of missing objects in TransactionManager",
491                registry,
492            )
493            .unwrap(),
494            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
495                "transaction_manager_num_pending_certificates",
496                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
497                registry,
498            )
499            .unwrap(),
500            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
501                "transaction_manager_num_executing_certificates",
502                "Number of executing certificates, including queued and actually running certificates",
503                registry,
504            )
505            .unwrap(),
506            transaction_manager_num_ready: register_int_gauge_with_registry!(
507                "transaction_manager_num_ready",
508                "Number of ready transactions in TransactionManager",
509                registry,
510            )
511            .unwrap(),
512            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
513                "transaction_manager_object_cache_size",
514                "Current size of object-availability cache in TransactionManager",
515                registry,
516            )
517            .unwrap(),
518            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
519                "transaction_manager_object_cache_hits",
520                "Number of object-availability cache hits in TransactionManager",
521                registry,
522            )
523            .unwrap(),
524            authority_overload_status: register_int_gauge_with_registry!(
525                "authority_overload_status",
526                "Whether authority is current experiencing overload and enters load shedding mode.",
527                registry)
528            .unwrap(),
529            authority_load_shedding_percentage: register_int_gauge_with_registry!(
530                "authority_load_shedding_percentage",
531                "The percentage of transactions is shed when the authority is in load shedding mode.",
532                registry)
533            .unwrap(),
534            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
535                "transaction_manager_object_cache_misses",
536                "Number of object-availability cache misses in TransactionManager",
537                registry,
538            )
539            .unwrap(),
540            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
541                "transaction_manager_object_cache_evictions",
542                "Number of object-availability cache evictions in TransactionManager",
543                registry,
544            )
545            .unwrap(),
546            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
547                "transaction_manager_package_cache_size",
548                "Current size of package-availability cache in TransactionManager",
549                registry,
550            )
551            .unwrap(),
552            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
553                "transaction_manager_package_cache_hits",
554                "Number of package-availability cache hits in TransactionManager",
555                registry,
556            )
557            .unwrap(),
558            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
559                "transaction_manager_package_cache_misses",
560                "Number of package-availability cache misses in TransactionManager",
561                registry,
562            )
563            .unwrap(),
564            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
565                "transaction_manager_package_cache_evictions",
566                "Number of package-availability cache evictions in TransactionManager",
567                registry,
568            )
569            .unwrap(),
570            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
571                "transaction_manager_transaction_queue_age_s",
572                "Time spent in waiting for transaction in the queue",
573                LATENCY_SEC_BUCKETS.to_vec(),
574                registry,
575            )
576            .unwrap(),
577            transaction_overload_sources: register_int_counter_vec_with_registry!(
578                "transaction_overload_sources",
579                "Number of times each source indicates transaction overload.",
580                &["source"],
581                registry)
582            .unwrap(),
583            execution_driver_executed_transactions: register_int_counter_with_registry!(
584                "execution_driver_executed_transactions",
585                "Cumulative number of transaction executed by execution driver",
586                registry,
587            )
588            .unwrap(),
589            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
590                "execution_driver_dispatch_queue",
591                "Number of transaction pending in execution driver dispatch queue",
592                registry,
593            )
594            .unwrap(),
595            execution_queueing_delay_s: register_histogram_with_registry!(
596                "execution_queueing_delay_s",
597                "Queueing delay between a transaction is ready for execution until it starts executing.",
598                LATENCY_SEC_BUCKETS.to_vec(),
599                registry
600            )
601            .unwrap(),
602            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
603                "prepare_cert_gas_latency_ratio",
604                "The ratio of computation gas divided by VM execution latency.",
605                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
606                registry
607            )
608            .unwrap(),
609            execution_gas_latency_ratio: register_histogram_with_registry!(
610                "execution_gas_latency_ratio",
611                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
612                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
613                registry
614            )
615            .unwrap(),
616            skipped_consensus_txns: register_int_counter_with_registry!(
617                "skipped_consensus_txns",
618                "Total number of consensus transactions skipped",
619                registry,
620            )
621            .unwrap(),
622            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
623                "skipped_consensus_txns_cache_hit",
624                "Total number of consensus transactions skipped because of local cache hit",
625                registry,
626            )
627            .unwrap(),
628            post_processing_total_events_emitted: register_int_counter_with_registry!(
629                "post_processing_total_events_emitted",
630                "Total number of events emitted in post processing",
631                registry,
632            )
633            .unwrap(),
634            post_processing_total_tx_indexed: register_int_counter_with_registry!(
635                "post_processing_total_tx_indexed",
636                "Total number of txes indexed in post processing",
637                registry,
638            )
639            .unwrap(),
640            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
641                "post_processing_total_tx_had_event_processed",
642                "Total number of txes finished event processing in post processing",
643                registry,
644            )
645            .unwrap(),
646            post_processing_total_failures: register_int_counter_with_registry!(
647                "post_processing_total_failures",
648                "Total number of failure in post processing",
649                registry,
650            )
651            .unwrap(),
652            consensus_handler_processed: register_int_counter_vec_with_registry!(
653                "consensus_handler_processed",
654                "Number of transactions processed by consensus handler",
655                &["class"],
656                registry
657            ).unwrap(),
658            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
659                "consensus_handler_transaction_sizes",
660                "Sizes of each type of transactions processed by consensus handler",
661                &["class"],
662                POSITIVE_INT_BUCKETS.to_vec(),
663                registry
664            ).unwrap(),
665            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
666                "consensus_handler_num_low_scoring_authorities",
667                "Number of low scoring authorities based on reputation scores from consensus",
668                registry
669            ).unwrap(),
670            consensus_handler_scores: register_int_gauge_vec_with_registry!(
671                "consensus_handler_scores",
672                "scores from consensus for each authority",
673                &["authority"],
674                registry,
675            ).unwrap(),
676            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
677                "consensus_handler_deferred_transactions",
678                "Number of transactions deferred by consensus handler",
679                registry,
680            ).unwrap(),
681            consensus_handler_congested_transactions: register_int_counter_with_registry!(
682                "consensus_handler_congested_transactions",
683                "Number of transactions deferred by consensus handler due to congestion",
684                registry,
685            ).unwrap(),
686            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
687                "consensus_handler_cancelled_transactions",
688                "Number of transactions cancelled by consensus handler",
689                registry,
690            ).unwrap(),
691            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
692                "consensus_handler_max_congestion_control_object_costs",
693                "Max object costs for congestion control in the current consensus commit",
694                &["commit_type"],
695                registry,
696            ).unwrap(),
697            consensus_committed_subdags: register_int_counter_vec_with_registry!(
698                "consensus_committed_subdags",
699                "Number of committed subdags, sliced by author",
700                &["authority"],
701                registry,
702            ).unwrap(),
703            consensus_committed_messages: register_int_gauge_vec_with_registry!(
704                "consensus_committed_messages",
705                "Total number of committed consensus messages, sliced by author",
706                &["authority"],
707                registry,
708            ).unwrap(),
709            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
710                "consensus_committed_user_transactions",
711                "Number of committed user transactions, sliced by submitter",
712                &["authority"],
713                registry,
714            ).unwrap(),
715            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
716            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
717            zklogin_sig_count: register_int_counter_with_registry!(
718                "zklogin_sig_count",
719                "Count of zkLogin signatures",
720                registry,
721            )
722            .unwrap(),
723            multisig_sig_count: register_int_counter_with_registry!(
724                "multisig_sig_count",
725                "Count of zkLogin signatures",
726                registry,
727            )
728            .unwrap(),
729            consensus_calculated_throughput: register_int_gauge_with_registry!(
730                "consensus_calculated_throughput",
731                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
732                registry,
733            ).unwrap(),
734            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
735                "consensus_calculated_throughput_profile",
736                "The current active calculated throughput profile",
737                registry
738            ).unwrap(),
739            execution_queueing_latency: LatencyObserver::new(),
740            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
741            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
742        }
743    }
744
745    /// Reset metrics that contain `hostname` as one of the labels. This is
746    /// needed to avoid retaining metrics for long-gone committee members and
747    /// only exposing metrics for the committee in the current epoch.
748    pub fn reset_on_reconfigure(&self) {
749        self.consensus_committed_messages.reset();
750        self.consensus_handler_scores.reset();
751        self.consensus_committed_user_transactions.reset();
752    }
753}
754
755/// a Trait object for `Signer` that is:
756/// - Pin, i.e. confined to one place in memory (we don't want to copy private
757///   keys).
758/// - Sync, i.e. can be safely shared between threads.
759///
760/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
761pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
762
763pub struct AuthorityState {
764    // Fixed size, static, identity of the authority
765    /// The name of this authority.
766    pub name: AuthorityName,
767    /// The signature key of the authority.
768    pub secret: StableSyncAuthoritySigner,
769
770    /// The database
771    input_loader: TransactionInputLoader,
772    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
773
774    epoch_store: ArcSwap<AuthorityPerEpochStore>,
775
776    /// This lock denotes current 'execution epoch'.
777    /// Execution acquires read lock, checks certificate epoch and holds it
778    /// until all writes are complete. Reconfiguration acquires write lock,
779    /// changes the epoch and revert all transactions from previous epoch
780    /// that are executed but did not make into checkpoint.
781    execution_lock: RwLock<EpochId>,
782
783    pub indexes: Option<Arc<IndexStore>>,
784    pub rest_index: Option<Arc<RestIndexStore>>,
785
786    pub subscription_handler: Arc<SubscriptionHandler>,
787    pub checkpoint_store: Arc<CheckpointStore>,
788
789    committee_store: Arc<CommitteeStore>,
790
791    /// Manages pending certificates and their missing input objects.
792    transaction_manager: Arc<TransactionManager>,
793
794    /// Shuts down the execution task. Used only in testing.
795    #[cfg_attr(not(test), expect(unused))]
796    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
797
798    pub metrics: Arc<AuthorityMetrics>,
799    _pruner: AuthorityStorePruner,
800    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
801
802    /// Take db checkpoints of different dbs
803    db_checkpoint_config: DBCheckpointConfig,
804
805    pub config: NodeConfig,
806
807    /// Current overload status in this authority. Updated periodically.
808    pub overload_info: AuthorityOverloadInfo,
809
810    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
811
812    /// The chain identifier is derived from the digest of the genesis
813    /// checkpoint.
814    chain_identifier: ChainIdentifier,
815
816    pub(crate) congestion_tracker: Arc<CongestionTracker>,
817}
818
819/// The authority state encapsulates all state, drives execution, and ensures
820/// safety.
821///
822/// Note the authority operations can be accessed through a read ref (&) and do
823/// not require &mut. Internally a database is synchronized through a mutex
824/// lock.
825///
826/// Repeating valid commands should produce no changes and return no error.
827impl AuthorityState {
828    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
829        epoch_store.committee().authority_exists(&self.name)
830    }
831
832    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
833        epoch_store
834            .active_validators()
835            .iter()
836            .any(|a| AuthorityName::from(a) == self.name)
837    }
838
839    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
840        !self.is_committee_validator(epoch_store)
841    }
842
843    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
844        &self.committee_store
845    }
846
847    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
848        self.committee_store.clone()
849    }
850
851    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
852        &self.config.authority_overload_config
853    }
854
855    pub fn get_epoch_state_commitments(
856        &self,
857        epoch: EpochId,
858    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
859        self.checkpoint_store.get_epoch_state_commitments(epoch)
860    }
861
862    /// This is a private method and should be kept that way. It doesn't check
863    /// whether the provided transaction is a system transaction, and hence
864    /// can only be called internally.
865    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
866    async fn handle_transaction_impl(
867        &self,
868        transaction: VerifiedTransaction,
869        epoch_store: &Arc<AuthorityPerEpochStore>,
870    ) -> IotaResult<VerifiedSignedTransaction> {
871        // Ensure that validator cannot reconfigure while we are signing the tx
872        let _execution_lock = self.execution_lock_for_signing();
873
874        let tx_digest = transaction.digest();
875        let tx_data = transaction.data().transaction_data();
876
877        let input_object_kinds = tx_data.input_objects()?;
878        let receiving_objects_refs = tx_data.receiving_objects();
879
880        // Note: the deny checks may do redundant package loads but:
881        // - they only load packages when there is an active package deny map
882        // - the loads are cached anyway
883        iota_transaction_checks::deny::check_transaction_for_signing(
884            tx_data,
885            transaction.tx_signatures(),
886            &input_object_kinds,
887            &receiving_objects_refs,
888            &self.config.transaction_deny_config,
889            self.get_backing_package_store().as_ref(),
890        )?;
891
892        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
893            Some(tx_digest),
894            &input_object_kinds,
895            &receiving_objects_refs,
896            epoch_store.epoch(),
897        )?;
898
899        let (_gas_status, checked_input_objects) =
900            iota_transaction_checks::check_transaction_input(
901                epoch_store.protocol_config(),
902                epoch_store.reference_gas_price(),
903                tx_data,
904                input_objects,
905                &receiving_objects,
906                &self.metrics.bytecode_verifier_metrics,
907                &self.config.verifier_signing_config,
908            )?;
909
910        check_coin_deny_list_v1_during_signing(
911            tx_data.sender(),
912            &checked_input_objects,
913            &receiving_objects,
914            &self.get_object_store(),
915        )?;
916
917        let owned_objects = checked_input_objects.inner().filter_owned_objects();
918
919        let signed_transaction = VerifiedSignedTransaction::new(
920            epoch_store.epoch(),
921            transaction,
922            self.name,
923            &*self.secret,
924        );
925
926        // Check and write locks, to signed transaction, into the database
927        // The call to self.set_transaction_lock checks the lock is not conflicting,
928        // and returns ConflictingTransaction error in case there is a lock on a
929        // different existing transaction.
930        self.get_cache_writer().try_acquire_transaction_locks(
931            epoch_store,
932            &owned_objects,
933            signed_transaction.clone(),
934        )?;
935
936        Ok(signed_transaction)
937    }
938
939    /// Initiate a new transaction.
940    #[instrument(name = "handle_transaction", level = "trace", skip_all)]
941    pub async fn handle_transaction(
942        &self,
943        epoch_store: &Arc<AuthorityPerEpochStore>,
944        transaction: VerifiedTransaction,
945    ) -> IotaResult<HandleTransactionResponse> {
946        let tx_digest = *transaction.digest();
947        debug!("handle_transaction");
948
949        // Ensure an idempotent answer.
950        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
951            return Ok(HandleTransactionResponse { status });
952        }
953
954        let _metrics_guard = self
955            .metrics
956            .authority_state_handle_transaction_latency
957            .start_timer();
958        self.metrics.tx_orders.inc();
959
960        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
961        match signed {
962            Ok(s) => {
963                if self.is_committee_validator(epoch_store) {
964                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
965                        let tx = s.clone();
966                        let validator_tx_finalizer = validator_tx_finalizer.clone();
967                        let cache_reader = self.get_transaction_cache_reader().clone();
968                        let epoch_store = epoch_store.clone();
969                        spawn_monitored_task!(epoch_store.within_alive_epoch(
970                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
971                        ));
972                    }
973                }
974                Ok(HandleTransactionResponse {
975                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
976                })
977            }
978            // It happens frequently that while we are checking the validity of the transaction, it
979            // has just been executed.
980            // In that case, we could still return Ok to avoid showing confusing errors.
981            Err(err) => Ok(HandleTransactionResponse {
982                status: self
983                    .get_transaction_status(&tx_digest, epoch_store)?
984                    .ok_or(err)?
985                    .1,
986            }),
987        }
988    }
989
990    pub fn check_system_overload_at_signing(&self) -> bool {
991        self.config
992            .authority_overload_config
993            .check_system_overload_at_signing
994    }
995
996    pub fn check_system_overload_at_execution(&self) -> bool {
997        self.config
998            .authority_overload_config
999            .check_system_overload_at_execution
1000    }
1001
1002    pub(crate) fn check_system_overload(
1003        &self,
1004        consensus_adapter: &Arc<ConsensusAdapter>,
1005        tx_data: &SenderSignedData,
1006        do_authority_overload_check: bool,
1007    ) -> IotaResult {
1008        if do_authority_overload_check {
1009            self.check_authority_overload(tx_data).tap_err(|_| {
1010                self.update_overload_metrics("execution_queue");
1011            })?;
1012        }
1013        self.transaction_manager
1014            .check_execution_overload(self.overload_config(), tx_data)
1015            .tap_err(|_| {
1016                self.update_overload_metrics("execution_pending");
1017            })?;
1018        consensus_adapter.check_consensus_overload().tap_err(|_| {
1019            self.update_overload_metrics("consensus");
1020        })?;
1021
1022        let pending_tx_count = self
1023            .get_cache_commit()
1024            .approximate_pending_transaction_count();
1025        if pending_tx_count
1026            > self
1027                .config
1028                .execution_cache_config
1029                .writeback_cache
1030                .backpressure_threshold_for_rpc()
1031        {
1032            return Err(IotaError::ValidatorOverloadedRetryAfter {
1033                retry_after_secs: 10,
1034            });
1035        }
1036
1037        Ok(())
1038    }
1039
1040    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1041        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1042            return Ok(());
1043        }
1044
1045        let load_shedding_percentage = self
1046            .overload_info
1047            .load_shedding_percentage
1048            .load(Ordering::Relaxed);
1049        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1050    }
1051
1052    fn update_overload_metrics(&self, source: &str) {
1053        self.metrics
1054            .transaction_overload_sources
1055            .with_label_values(&[source])
1056            .inc();
1057    }
1058
1059    /// Executes a certificate for its effects.
1060    #[instrument(level = "trace", skip_all)]
1061    pub async fn execute_certificate(
1062        &self,
1063        certificate: &VerifiedCertificate,
1064        epoch_store: &Arc<AuthorityPerEpochStore>,
1065    ) -> IotaResult<TransactionEffects> {
1066        let _metrics_guard = if certificate.contains_shared_object() {
1067            self.metrics
1068                .execute_certificate_latency_shared_object
1069                .start_timer()
1070        } else {
1071            self.metrics
1072                .execute_certificate_latency_single_writer
1073                .start_timer()
1074        };
1075        trace!("execute_certificate");
1076
1077        self.metrics.total_cert_attempts.inc();
1078
1079        if !certificate.contains_shared_object() {
1080            // Shared object transactions need to be sequenced by the consensus before
1081            // enqueueing for execution, done in
1082            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1083            // object transactions, they can be enqueued for execution immediately.
1084            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1085        }
1086
1087        // tx could be reverted when epoch ends, so we must be careful not to return a
1088        // result here after the epoch ends.
1089        epoch_store
1090            .within_alive_epoch(self.notify_read_effects(certificate))
1091            .await
1092            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1093            .and_then(|r| r)
1094    }
1095
1096    /// Internal logic to execute a certificate.
1097    ///
1098    /// Guarantees that
1099    /// - If input objects are available, return no permanent failure.
1100    /// - Execution and output commit are atomic. i.e. outputs are only written
1101    ///   to storage,
1102    /// on successful execution; crashed execution has no observable effect and
1103    /// can be retried.
1104    ///
1105    /// It is caller's responsibility to ensure input objects are available and
1106    /// locks are set. If this cannot be satisfied by the caller,
1107    /// execute_certificate() should be called instead.
1108    ///
1109    /// Should only be called within iota-core.
1110    #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1111    pub fn try_execute_immediately(
1112        &self,
1113        certificate: &VerifiedExecutableTransaction,
1114        expected_effects_digest: Option<TransactionEffectsDigest>,
1115        epoch_store: &Arc<AuthorityPerEpochStore>,
1116    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1117        let _scope = monitored_scope("Execution::try_execute_immediately");
1118        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1119
1120        let tx_digest = certificate.digest();
1121
1122        // Acquire a lock to prevent concurrent executions of the same transaction.
1123        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1124
1125        // The cert could have been processed by a concurrent attempt of the same cert,
1126        // so check if the effects have already been written.
1127        if let Some(effects) = self
1128            .get_transaction_cache_reader()
1129            .try_get_executed_effects(tx_digest)?
1130        {
1131            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1132                assert_eq!(
1133                    effects.digest(),
1134                    expected_effects_digest_inner,
1135                    "Unexpected effects digest for transaction {tx_digest:?}"
1136                );
1137            }
1138            tx_guard.release();
1139            return Ok((effects, None));
1140        }
1141        let input_objects =
1142            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1143
1144        // If no expected_effects_digest was provided, try to get it from storage.
1145        // We could be re-executing a previously executed but uncommitted transaction,
1146        // perhaps after restarting with a new binary. In this situation, if
1147        // we have published an effects signature, we must be sure not to
1148        // equivocate. TODO: read from cache instead of DB
1149        let expected_effects_digest =
1150            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1151
1152        self.process_certificate(
1153            tx_guard,
1154            certificate,
1155            input_objects,
1156            expected_effects_digest,
1157            epoch_store,
1158        )
1159        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1160        .tap_ok(
1161            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1162        )
1163    }
1164
1165    pub fn read_objects_for_execution(
1166        &self,
1167        tx_lock: &CertLockGuard,
1168        certificate: &VerifiedExecutableTransaction,
1169        epoch_store: &Arc<AuthorityPerEpochStore>,
1170    ) -> IotaResult<InputObjects> {
1171        let _scope = monitored_scope("Execution::load_input_objects");
1172        let _metrics_guard = self
1173            .metrics
1174            .execution_load_input_objects_latency
1175            .start_timer();
1176        let input_objects = &certificate.data().transaction_data().input_objects()?;
1177        self.input_loader.read_objects_for_execution(
1178            epoch_store,
1179            &certificate.key(),
1180            tx_lock,
1181            input_objects,
1182            epoch_store.epoch(),
1183        )
1184    }
1185
1186    /// Test only wrapper for `try_execute_immediately()` above, useful for
1187    /// checking errors if the pre-conditions are not satisfied, and
1188    /// executing change epoch transactions.
1189    pub fn try_execute_for_test(
1190        &self,
1191        certificate: &VerifiedCertificate,
1192    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1193        let epoch_store = self.epoch_store_for_testing();
1194        let (effects, execution_error_opt) = self.try_execute_immediately(
1195            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1196            None,
1197            &epoch_store,
1198        )?;
1199        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1200        Ok((signed_effects, execution_error_opt))
1201    }
1202
1203    /// Non-fallible version of `try_execute_for_test()`.
1204    pub fn execute_for_test(
1205        &self,
1206        certificate: &VerifiedCertificate,
1207    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1208        self.try_execute_for_test(certificate)
1209            .expect("try_execute_for_test should not fail")
1210    }
1211
1212    pub async fn notify_read_effects(
1213        &self,
1214        certificate: &VerifiedCertificate,
1215    ) -> IotaResult<TransactionEffects> {
1216        self.get_transaction_cache_reader()
1217            .try_notify_read_executed_effects(&[*certificate.digest()])
1218            .await
1219            .map(|mut r| r.pop().expect("must return correct number of effects"))
1220    }
1221
1222    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1223        self.get_object_cache_reader()
1224            .try_check_owned_objects_are_live(owned_object_refs)
1225    }
1226
1227    /// This function captures the required state to debug a forked transaction.
1228    /// The dump is written to a file in dir `path`, with name prefixed by the
1229    /// transaction digest. NOTE: Since this info escapes the validator
1230    /// context, make sure not to leak any private info here
1231    pub(crate) fn debug_dump_transaction_state(
1232        &self,
1233        tx_digest: &TransactionDigest,
1234        effects: &TransactionEffects,
1235        expected_effects_digest: TransactionEffectsDigest,
1236        inner_temporary_store: &InnerTemporaryStore,
1237        certificate: &VerifiedExecutableTransaction,
1238        debug_dump_config: &StateDebugDumpConfig,
1239    ) -> IotaResult<PathBuf> {
1240        let dump_dir = debug_dump_config
1241            .dump_file_directory
1242            .as_ref()
1243            .cloned()
1244            .unwrap_or(std::env::temp_dir());
1245        let epoch_store = self.load_epoch_store_one_call_per_task();
1246
1247        NodeStateDump::new(
1248            tx_digest,
1249            effects,
1250            expected_effects_digest,
1251            self.get_object_store().as_ref(),
1252            &epoch_store,
1253            inner_temporary_store,
1254            certificate,
1255        )?
1256        .write_to_file(&dump_dir)
1257        .map_err(|e| IotaError::FileIO(e.to_string()))
1258    }
1259
1260    #[instrument(level = "trace", skip_all)]
1261    pub(crate) fn process_certificate(
1262        &self,
1263        tx_guard: CertTxGuard,
1264        certificate: &VerifiedExecutableTransaction,
1265        input_objects: InputObjects,
1266        expected_effects_digest: Option<TransactionEffectsDigest>,
1267        epoch_store: &Arc<AuthorityPerEpochStore>,
1268    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1269        let process_certificate_start_time = tokio::time::Instant::now();
1270        let digest = *certificate.digest();
1271
1272        fail_point_if!("correlated-crash-process-certificate", || {
1273            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1274                iota_simulator::task::kill_current_node(None);
1275            }
1276        });
1277
1278        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1279        // Any caller that verifies the signatures on the certificate will have already
1280        // checked the epoch. But paths that don't verify sigs (e.g. execution
1281        // from checkpoint, reading from db) present the possibility of an epoch
1282        // mismatch. If this cert is not finalzied in previous epoch, then it's
1283        // invalid.
1284        let execution_guard = match execution_guard {
1285            Ok(execution_guard) => execution_guard,
1286            Err(err) => {
1287                tx_guard.release();
1288                return Err(err);
1289            }
1290        };
1291        // Since we obtain a reference to the epoch store before taking the execution
1292        // lock, it's possible that reconfiguration has happened and they no
1293        // longer match.
1294        if *execution_guard != epoch_store.epoch() {
1295            tx_guard.release();
1296            info!("The epoch of the execution_guard doesn't match the epoch store");
1297            return Err(IotaError::WrongEpoch {
1298                expected_epoch: epoch_store.epoch(),
1299                actual_epoch: *execution_guard,
1300            });
1301        }
1302
1303        // Errors originating from prepare_certificate may be transient (failure to read
1304        // locks) or non-transient (transaction input is invalid, move vm
1305        // errors). However, all errors from this function occur before we have
1306        // written anything to the db, so we commit the tx guard and rely on the
1307        // client to retry the tx (if it was transient).
1308        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1309            &execution_guard,
1310            certificate,
1311            input_objects,
1312            epoch_store,
1313        ) {
1314            Err(e) => {
1315                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1316                tx_guard.release();
1317                return Err(e);
1318            }
1319            Ok(res) => res,
1320        };
1321
1322        if let Some(expected_effects_digest) = expected_effects_digest {
1323            if effects.digest() != expected_effects_digest {
1324                // We dont want to mask the original error, so we log it and continue.
1325                match self.debug_dump_transaction_state(
1326                    &digest,
1327                    &effects,
1328                    expected_effects_digest,
1329                    &inner_temporary_store,
1330                    certificate,
1331                    &self.config.state_debug_dump_config,
1332                ) {
1333                    Ok(out_path) => {
1334                        info!(
1335                            "Dumped node state for transaction {} to {}",
1336                            digest,
1337                            out_path.as_path().display().to_string()
1338                        );
1339                    }
1340                    Err(e) => {
1341                        error!("Error dumping state for transaction {}: {e}", digest);
1342                    }
1343                }
1344                error!(
1345                    tx_digest = ?digest,
1346                    ?expected_effects_digest,
1347                    actual_effects = ?effects,
1348                    "fork detected!"
1349                );
1350                panic!(
1351                    "Transaction {} is expected to have effects digest {}, but got {}!",
1352                    digest,
1353                    expected_effects_digest,
1354                    effects.digest(),
1355                );
1356            }
1357        }
1358
1359        fail_point!("crash");
1360
1361        self.commit_certificate(
1362            certificate,
1363            inner_temporary_store,
1364            &effects,
1365            tx_guard,
1366            execution_guard,
1367            epoch_store,
1368        )?;
1369
1370        if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1371            certificate.data().transaction_data().kind()
1372        {
1373            if let Some(err) = &execution_error_opt {
1374                debug_fatal!("Authenticator state update failed: {:?}", err);
1375            }
1376            epoch_store.update_authenticator_state(auth_state);
1377
1378            // double check that the signature verifier always matches the authenticator
1379            // state
1380            if cfg!(debug_assertions) {
1381                let authenticator_state = get_authenticator_state(self.get_object_store())
1382                    .expect("Read cannot fail")
1383                    .expect("Authenticator state must exist");
1384
1385                let mut sys_jwks: Vec<_> = authenticator_state
1386                    .active_jwks
1387                    .into_iter()
1388                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1389                    .collect();
1390                let mut active_jwks: Vec<_> = epoch_store
1391                    .signature_verifier
1392                    .get_jwks()
1393                    .into_iter()
1394                    .collect();
1395                sys_jwks.sort();
1396                active_jwks.sort();
1397
1398                assert_eq!(sys_jwks, active_jwks);
1399            }
1400        }
1401
1402        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1403        if elapsed > 0.0 {
1404            self.metrics
1405                .execution_gas_latency_ratio
1406                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1407        };
1408        Ok((effects, execution_error_opt))
1409    }
1410
1411    #[instrument(level = "trace", skip_all)]
1412    fn commit_certificate(
1413        &self,
1414        certificate: &VerifiedExecutableTransaction,
1415        inner_temporary_store: InnerTemporaryStore,
1416        effects: &TransactionEffects,
1417        tx_guard: CertTxGuard,
1418        _execution_guard: ExecutionLockReadGuard<'_>,
1419        epoch_store: &Arc<AuthorityPerEpochStore>,
1420    ) -> IotaResult {
1421        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1422            monitored_scope("Execution::commit_certificate");
1423        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1424
1425        let tx_key = certificate.key();
1426        let tx_digest = certificate.digest();
1427        let input_object_count = inner_temporary_store.input_objects.len();
1428        let shared_object_count = effects.input_shared_objects().len();
1429
1430        let output_keys = inner_temporary_store.get_output_keys(effects);
1431
1432        // index certificate
1433        let _ = self
1434            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1435            .tap_err(|e| {
1436                self.metrics.post_processing_total_failures.inc();
1437                error!(?tx_digest, "tx post processing failed: {e}");
1438            });
1439
1440        // The insertion to epoch_store is not atomic with the insertion to the
1441        // perpetual store. This is OK because we insert to the epoch store
1442        // first. And during lookups we always look up in the perpetual store first.
1443        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1444
1445        // Allow testing what happens if we crash here.
1446        fail_point!("crash");
1447
1448        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1449            certificate.clone().into_unsigned(),
1450            effects.clone(),
1451            inner_temporary_store,
1452        );
1453        self.get_cache_writer()
1454            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1455
1456        if certificate.transaction_data().is_end_of_epoch_tx() {
1457            // At the end of epoch, since system packages may have been upgraded, force
1458            // reload them in the cache.
1459            self.get_object_cache_reader()
1460                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1461        }
1462
1463        // commit_certificate finished, the tx is fully committed to the store.
1464        tx_guard.commit_tx();
1465
1466        // Notifies transaction manager about transaction and output objects committed.
1467        // This provides necessary information to transaction manager to start executing
1468        // additional ready transactions.
1469        self.transaction_manager
1470            .notify_commit(tx_digest, output_keys, epoch_store);
1471
1472        self.update_metrics(certificate, input_object_count, shared_object_count);
1473
1474        Ok(())
1475    }
1476
1477    fn update_metrics(
1478        &self,
1479        certificate: &VerifiedExecutableTransaction,
1480        input_object_count: usize,
1481        shared_object_count: usize,
1482    ) {
1483        // count signature by scheme, for zklogin and multisig
1484        if certificate.has_zklogin_sig() {
1485            self.metrics.zklogin_sig_count.inc();
1486        } else if certificate.has_upgraded_multisig() {
1487            self.metrics.multisig_sig_count.inc();
1488        }
1489
1490        self.metrics.total_effects.inc();
1491        self.metrics.total_certs.inc();
1492
1493        if shared_object_count > 0 {
1494            self.metrics.shared_obj_tx.inc();
1495        }
1496
1497        if certificate.is_sponsored_tx() {
1498            self.metrics.sponsored_tx.inc();
1499        }
1500
1501        self.metrics
1502            .num_input_objs
1503            .observe(input_object_count as f64);
1504        self.metrics
1505            .num_shared_objects
1506            .observe(shared_object_count as f64);
1507        self.metrics.batch_size.observe(
1508            certificate
1509                .data()
1510                .intent_message()
1511                .value
1512                .kind()
1513                .num_commands() as f64,
1514        );
1515    }
1516
1517    /// prepare_certificate validates the transaction input, and executes the
1518    /// certificate, returning effects, output objects, events, etc.
1519    ///
1520    /// It reads state from the db (both owned and shared locks), but it has no
1521    /// side effects.
1522    ///
1523    /// It can be generally understood that a failure of prepare_certificate
1524    /// indicates a non-transient error, e.g. the transaction input is
1525    /// somehow invalid, the correct locks are not held, etc. However, this
1526    /// is not entirely true, as a transient db read error may also cause
1527    /// this function to fail.
1528    #[instrument(level = "trace", skip_all)]
1529    fn prepare_certificate(
1530        &self,
1531        _execution_guard: &ExecutionLockReadGuard<'_>,
1532        certificate: &VerifiedExecutableTransaction,
1533        input_objects: InputObjects,
1534        epoch_store: &Arc<AuthorityPerEpochStore>,
1535    ) -> IotaResult<(
1536        InnerTemporaryStore,
1537        TransactionEffects,
1538        Option<ExecutionError>,
1539    )> {
1540        let _scope = monitored_scope("Execution::prepare_certificate");
1541        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1542        let prepare_certificate_start_time = tokio::time::Instant::now();
1543
1544        // TODO: We need to move this to a more appropriate place to avoid redundant
1545        // checks.
1546        let tx_data = certificate.data().transaction_data();
1547        tx_data.validity_check(epoch_store.protocol_config())?;
1548
1549        // The cost of partially re-auditing a transaction before execution is
1550        // tolerated.
1551        let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
1552            certificate,
1553            input_objects,
1554            epoch_store.protocol_config(),
1555            epoch_store.reference_gas_price(),
1556        )?;
1557
1558        let owned_object_refs = input_objects.inner().filter_owned_objects();
1559        self.check_owned_locks(&owned_object_refs)?;
1560        let tx_digest = *certificate.digest();
1561        let protocol_config = epoch_store.protocol_config();
1562        let transaction_data = &certificate.data().intent_message().value;
1563        let (kind, signer, gas) = transaction_data.execution_parts();
1564
1565        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1566        let (inner_temp_store, _, mut effects, execution_error_opt) =
1567            epoch_store.executor().execute_transaction_to_effects(
1568                self.get_backing_store().as_ref(),
1569                protocol_config,
1570                self.metrics.limits_metrics.clone(),
1571                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1572                // cyclic dependency w/ iota-adapter
1573                self.config
1574                    .expensive_safety_check_config
1575                    .enable_deep_per_tx_iota_conservation_check(),
1576                self.config.certificate_deny_config.certificate_deny_set(),
1577                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1578                epoch_store
1579                    .epoch_start_config()
1580                    .epoch_data()
1581                    .epoch_start_timestamp(),
1582                input_objects,
1583                gas,
1584                gas_status,
1585                kind,
1586                signer,
1587                tx_digest,
1588                &mut None,
1589            );
1590
1591        fail_point_if!("cp_execution_nondeterminism", || {
1592            #[cfg(msim)]
1593            self.create_fail_state(certificate, epoch_store, &mut effects);
1594        });
1595
1596        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1597        if elapsed > 0.0 {
1598            self.metrics
1599                .prepare_cert_gas_latency_ratio
1600                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1601        }
1602
1603        Ok((inner_temp_store, effects, execution_error_opt.err()))
1604    }
1605
1606    pub fn prepare_certificate_for_benchmark(
1607        &self,
1608        certificate: &VerifiedExecutableTransaction,
1609        input_objects: InputObjects,
1610        epoch_store: &Arc<AuthorityPerEpochStore>,
1611    ) -> IotaResult<(
1612        InnerTemporaryStore,
1613        TransactionEffects,
1614        Option<ExecutionError>,
1615    )> {
1616        let lock = RwLock::new(epoch_store.epoch());
1617        let execution_guard = lock.try_read().unwrap();
1618
1619        self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
1620    }
1621
1622    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1623    #[allow(clippy::type_complexity)]
1624    pub fn dry_exec_transaction(
1625        &self,
1626        transaction: TransactionData,
1627        transaction_digest: TransactionDigest,
1628    ) -> IotaResult<(
1629        DryRunTransactionBlockResponse,
1630        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1631        TransactionEffects,
1632        Option<ObjectID>,
1633    )> {
1634        let epoch_store = self.load_epoch_store_one_call_per_task();
1635        if !self.is_fullnode(&epoch_store) {
1636            return Err(IotaError::UnsupportedFeature {
1637                error: "dry-exec is only supported on fullnodes".to_string(),
1638            });
1639        }
1640
1641        if transaction.kind().is_system_tx() {
1642            return Err(IotaError::UnsupportedFeature {
1643                error: "dry-exec does not support system transactions".to_string(),
1644            });
1645        }
1646
1647        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1648    }
1649
1650    #[allow(clippy::type_complexity)]
1651    pub fn dry_exec_transaction_for_benchmark(
1652        &self,
1653        transaction: TransactionData,
1654        transaction_digest: TransactionDigest,
1655    ) -> IotaResult<(
1656        DryRunTransactionBlockResponse,
1657        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1658        TransactionEffects,
1659        Option<ObjectID>,
1660    )> {
1661        let epoch_store = self.load_epoch_store_one_call_per_task();
1662        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1663    }
1664
1665    #[instrument(level = "trace", skip_all)]
1666    #[allow(clippy::type_complexity)]
1667    fn dry_exec_transaction_impl(
1668        &self,
1669        epoch_store: &AuthorityPerEpochStore,
1670        transaction: TransactionData,
1671        transaction_digest: TransactionDigest,
1672    ) -> IotaResult<(
1673        DryRunTransactionBlockResponse,
1674        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1675        TransactionEffects,
1676        Option<ObjectID>,
1677    )> {
1678        // Cheap validity checks for a transaction, including input size limits.
1679        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1680
1681        let input_object_kinds = transaction.input_objects()?;
1682        let receiving_object_refs = transaction.receiving_objects();
1683
1684        iota_transaction_checks::deny::check_transaction_for_signing(
1685            &transaction,
1686            &[],
1687            &input_object_kinds,
1688            &receiving_object_refs,
1689            &self.config.transaction_deny_config,
1690            self.get_backing_package_store().as_ref(),
1691        )?;
1692
1693        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1694            // We don't want to cache this transaction since it's a dry run.
1695            None,
1696            &input_object_kinds,
1697            &receiving_object_refs,
1698            epoch_store.epoch(),
1699        )?;
1700
1701        // make a gas object if one was not provided
1702        let mut transaction = transaction;
1703        let mut gas_object_refs = transaction.gas().to_vec();
1704        let reference_gas_price = epoch_store.reference_gas_price();
1705        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1706            let sender = transaction.gas_owner();
1707            let gas_object_id = ObjectID::random();
1708            let gas_object = Object::new_move(
1709                MoveObject::new_gas_coin(
1710                    OBJECT_START_VERSION,
1711                    gas_object_id,
1712                    SIMULATION_GAS_COIN_VALUE,
1713                ),
1714                Owner::AddressOwner(sender),
1715                TransactionDigest::genesis_marker(),
1716            );
1717            let gas_object_ref = gas_object.compute_object_reference();
1718            gas_object_refs = vec![gas_object_ref];
1719            // Add gas object to transaction gas payment
1720            transaction.gas_data_mut().payment = gas_object_refs.clone();
1721            (
1722                iota_transaction_checks::check_transaction_input_with_given_gas(
1723                    epoch_store.protocol_config(),
1724                    reference_gas_price,
1725                    &transaction,
1726                    input_objects,
1727                    receiving_objects,
1728                    gas_object,
1729                    &self.metrics.bytecode_verifier_metrics,
1730                    &self.config.verifier_signing_config,
1731                )?,
1732                Some(gas_object_id),
1733            )
1734        } else {
1735            (
1736                iota_transaction_checks::check_transaction_input(
1737                    epoch_store.protocol_config(),
1738                    reference_gas_price,
1739                    &transaction,
1740                    input_objects,
1741                    &receiving_objects,
1742                    &self.metrics.bytecode_verifier_metrics,
1743                    &self.config.verifier_signing_config,
1744                )?,
1745                None,
1746            )
1747        };
1748
1749        let protocol_config = epoch_store.protocol_config();
1750        let (kind, signer, _) = transaction.execution_parts();
1751
1752        let silent = true;
1753        let executor = iota_execution::executor(protocol_config, silent, None)
1754            .expect("Creating an executor should not fail here");
1755
1756        let expensive_checks = false;
1757        let (inner_temp_store, _, effects, _execution_error) = executor
1758            .execute_transaction_to_effects(
1759                self.get_backing_store().as_ref(),
1760                protocol_config,
1761                self.metrics.limits_metrics.clone(),
1762                expensive_checks,
1763                self.config.certificate_deny_config.certificate_deny_set(),
1764                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1765                epoch_store
1766                    .epoch_start_config()
1767                    .epoch_data()
1768                    .epoch_start_timestamp(),
1769                checked_input_objects,
1770                gas_object_refs,
1771                gas_status,
1772                kind,
1773                signer,
1774                transaction_digest,
1775                &mut None,
1776            );
1777        let tx_digest = *effects.transaction_digest();
1778
1779        let module_cache =
1780            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1781
1782        let mut layout_resolver =
1783            epoch_store
1784                .executor()
1785                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1786                    &inner_temp_store,
1787                    self.get_backing_package_store(),
1788                )));
1789        // Returning empty vector here because we recalculate changes in the rpc layer.
1790        let object_changes = Vec::new();
1791
1792        // Returning empty vector here because we recalculate changes in the rpc layer.
1793        let balance_changes = Vec::new();
1794
1795        let written_with_kind = effects
1796            .created()
1797            .into_iter()
1798            .map(|(oref, _)| (oref, WriteKind::Create))
1799            .chain(
1800                effects
1801                    .unwrapped()
1802                    .into_iter()
1803                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1804            )
1805            .chain(
1806                effects
1807                    .mutated()
1808                    .into_iter()
1809                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
1810            )
1811            .map(|(oref, kind)| {
1812                let obj = inner_temp_store.written.get(&oref.0).unwrap();
1813                // TODO: Avoid clones.
1814                (oref.0, (oref, obj.clone(), kind))
1815            })
1816            .collect();
1817
1818        Ok((
1819            DryRunTransactionBlockResponse {
1820                // to avoid cloning `transaction`, fields are populated in this order
1821                suggested_gas_price: self
1822                    .congestion_tracker
1823                    .get_prediction_suggested_gas_price(&transaction),
1824                input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
1825                    .map_err(|e| IotaError::TransactionSerialization {
1826                        error: format!(
1827                            "Failed to convert transaction to IotaTransactionBlockData: {e}",
1828                        ),
1829                    })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
1830                effects: effects.clone().try_into()?,
1831                events: IotaTransactionBlockEvents::try_from(
1832                    inner_temp_store.events.clone(),
1833                    tx_digest,
1834                    None,
1835                    layout_resolver.as_mut(),
1836                )?,
1837                object_changes,
1838                balance_changes,
1839            },
1840            written_with_kind,
1841            effects,
1842            mock_gas,
1843        ))
1844    }
1845
1846    pub fn simulate_transaction(
1847        &self,
1848        transaction: TransactionData,
1849    ) -> IotaResult<SimulateTransactionResult> {
1850        if transaction.kind().is_system_tx() {
1851            return Err(IotaError::UnsupportedFeature {
1852                error: "simulate does not support system transactions".to_string(),
1853            });
1854        }
1855
1856        let epoch_store = self.load_epoch_store_one_call_per_task();
1857        if !self.is_fullnode(&epoch_store) {
1858            return Err(IotaError::UnsupportedFeature {
1859                error: "simulate is only supported on fullnodes".to_string(),
1860            });
1861        }
1862
1863        self.simulate_transaction_impl(&epoch_store, transaction)
1864    }
1865
1866    fn simulate_transaction_impl(
1867        &self,
1868        epoch_store: &AuthorityPerEpochStore,
1869        transaction: TransactionData,
1870    ) -> IotaResult<SimulateTransactionResult> {
1871        // Cheap validity checks for a transaction, including input size limits.
1872        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1873
1874        let input_object_kinds = transaction.input_objects()?;
1875        let receiving_object_refs = transaction.receiving_objects();
1876
1877        iota_transaction_checks::deny::check_transaction_for_signing(
1878            &transaction,
1879            &[],
1880            &input_object_kinds,
1881            &receiving_object_refs,
1882            &self.config.transaction_deny_config,
1883            self.get_backing_package_store().as_ref(),
1884        )?;
1885
1886        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1887            // We don't want to cache this transaction since it's a dry run.
1888            None,
1889            &input_object_kinds,
1890            &receiving_object_refs,
1891            epoch_store.epoch(),
1892        )?;
1893        // make a gas object if one was not provided
1894        let mut transaction = transaction;
1895        let mut gas_object_refs = transaction.gas().to_vec();
1896        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1897            let sender = transaction.gas_owner();
1898            let gas_object_id = ObjectID::MAX;
1899            let gas_object = Object::new_move(
1900                MoveObject::new_gas_coin(
1901                    OBJECT_START_VERSION,
1902                    gas_object_id,
1903                    SIMULATION_GAS_COIN_VALUE,
1904                ),
1905                Owner::AddressOwner(sender),
1906                TransactionDigest::genesis_marker(),
1907            );
1908            let gas_object_ref = gas_object.compute_object_reference();
1909            gas_object_refs = vec![gas_object_ref];
1910
1911            // Add the gas object to the transaction payment.
1912            transaction.gas_data_mut().payment = gas_object_refs.clone();
1913            (
1914                iota_transaction_checks::check_transaction_input_with_given_gas(
1915                    epoch_store.protocol_config(),
1916                    epoch_store.reference_gas_price(),
1917                    &transaction,
1918                    input_objects,
1919                    receiving_objects,
1920                    gas_object,
1921                    &self.metrics.bytecode_verifier_metrics,
1922                    &self.config.verifier_signing_config,
1923                )?,
1924                Some(gas_object_id),
1925            )
1926        } else {
1927            (
1928                iota_transaction_checks::check_transaction_input(
1929                    epoch_store.protocol_config(),
1930                    epoch_store.reference_gas_price(),
1931                    &transaction,
1932                    input_objects,
1933                    &receiving_objects,
1934                    &self.metrics.bytecode_verifier_metrics,
1935                    &self.config.verifier_signing_config,
1936                )?,
1937                None,
1938            )
1939        };
1940
1941        let protocol_config = epoch_store.protocol_config();
1942        let (kind, signer, _) = transaction.execution_parts();
1943
1944        let silent = true;
1945        let executor = iota_execution::executor(protocol_config, silent, None)
1946            .expect("Creating an executor should not fail here");
1947
1948        let expensive_checks = false;
1949        let (inner_temp_store, _, effects, _execution_error) = executor
1950            .execute_transaction_to_effects(
1951                self.get_backing_store().as_ref(),
1952                protocol_config,
1953                self.metrics.limits_metrics.clone(),
1954                expensive_checks,
1955                self.config.certificate_deny_config.certificate_deny_set(),
1956                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1957                epoch_store
1958                    .epoch_start_config()
1959                    .epoch_data()
1960                    .epoch_start_timestamp(),
1961                checked_input_objects,
1962                gas_object_refs,
1963                gas_status,
1964                kind,
1965                signer,
1966                transaction.digest(),
1967                &mut None,
1968            );
1969
1970        Ok(SimulateTransactionResult {
1971            input_objects: inner_temp_store.input_objects,
1972            output_objects: inner_temp_store.written,
1973            events: effects.events_digest().map(|_| inner_temp_store.events),
1974            effects,
1975            mock_gas_id: mock_gas,
1976        })
1977    }
1978
1979    /// The object ID for gas can be any object ID, even for an uncreated object
1980    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
1981    pub async fn dev_inspect_transaction_block(
1982        &self,
1983        sender: IotaAddress,
1984        transaction_kind: TransactionKind,
1985        gas_price: Option<u64>,
1986        gas_budget: Option<u64>,
1987        gas_sponsor: Option<IotaAddress>,
1988        gas_objects: Option<Vec<ObjectRef>>,
1989        show_raw_txn_data_and_effects: Option<bool>,
1990        skip_checks: Option<bool>,
1991    ) -> IotaResult<DevInspectResults> {
1992        let epoch_store = self.load_epoch_store_one_call_per_task();
1993
1994        if !self.is_fullnode(&epoch_store) {
1995            return Err(IotaError::UnsupportedFeature {
1996                error: "dev-inspect is only supported on fullnodes".to_string(),
1997            });
1998        }
1999
2000        if transaction_kind.is_system_tx() {
2001            return Err(IotaError::UnsupportedFeature {
2002                error: "system transactions are not supported".to_string(),
2003            });
2004        }
2005
2006        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2007        let skip_checks = skip_checks.unwrap_or(true);
2008        let reference_gas_price = epoch_store.reference_gas_price();
2009        let protocol_config = epoch_store.protocol_config();
2010        let max_tx_gas = protocol_config.max_tx_gas();
2011
2012        let price = gas_price.unwrap_or(reference_gas_price);
2013        let budget = gas_budget.unwrap_or(max_tx_gas);
2014        let owner = gas_sponsor.unwrap_or(sender);
2015        // Payment might be empty here, but it's fine we'll have to deal with it later
2016        // after reading all the input objects.
2017        let payment = gas_objects.unwrap_or_default();
2018        let transaction = TransactionData::V1(TransactionDataV1 {
2019            kind: transaction_kind.clone(),
2020            sender,
2021            gas_data: GasData {
2022                payment,
2023                owner,
2024                price,
2025                budget,
2026            },
2027            expiration: TransactionExpiration::None,
2028        });
2029
2030        let raw_txn_data = if show_raw_txn_data_and_effects {
2031            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2032                error: "Failed to serialize transaction during dev inspect".to_string(),
2033            })?
2034        } else {
2035            vec![]
2036        };
2037
2038        transaction.validity_check_no_gas_check(protocol_config)?;
2039
2040        let input_object_kinds = transaction.input_objects()?;
2041        let receiving_object_refs = transaction.receiving_objects();
2042
2043        iota_transaction_checks::deny::check_transaction_for_signing(
2044            &transaction,
2045            &[],
2046            &input_object_kinds,
2047            &receiving_object_refs,
2048            &self.config.transaction_deny_config,
2049            self.get_backing_package_store().as_ref(),
2050        )?;
2051
2052        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2053            // We don't want to cache this transaction since it's a dev inspect.
2054            None,
2055            &input_object_kinds,
2056            &receiving_object_refs,
2057            epoch_store.epoch(),
2058        )?;
2059
2060        // Create and use a dummy gas object if there is no gas object provided.
2061        let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2062            SIMULATION_GAS_COIN_VALUE,
2063            transaction.gas_owner(),
2064        );
2065
2066        let gas_objects = if transaction.gas().is_empty() {
2067            let gas_object_ref = dummy_gas_object.compute_object_reference();
2068            vec![gas_object_ref]
2069        } else {
2070            transaction.gas().to_vec()
2071        };
2072
2073        let (gas_status, checked_input_objects) = if skip_checks {
2074            // If we are skipping checks, then we call the check_dev_inspect_input function
2075            // which will perform only lightweight checks on the transaction
2076            // input. And if the gas field is empty, that means we will
2077            // use the dummy gas object so we need to add it to the input objects vector.
2078            if transaction.gas().is_empty() {
2079                input_objects.push(ObjectReadResult::new(
2080                    InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2081                    dummy_gas_object.into(),
2082                ));
2083            }
2084            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2085                protocol_config,
2086                &transaction_kind,
2087                input_objects,
2088                receiving_objects,
2089            )?;
2090            let gas_status = IotaGasStatus::new(
2091                max_tx_gas,
2092                transaction.gas_price(),
2093                reference_gas_price,
2094                protocol_config,
2095            )?;
2096
2097            (gas_status, checked_input_objects)
2098        } else {
2099            // If we are not skipping checks, then we call the check_transaction_input
2100            // function and its dummy gas variant which will perform full
2101            // fledged checks just like a real transaction execution.
2102            if transaction.gas().is_empty() {
2103                iota_transaction_checks::check_transaction_input_with_given_gas(
2104                    epoch_store.protocol_config(),
2105                    reference_gas_price,
2106                    &transaction,
2107                    input_objects,
2108                    receiving_objects,
2109                    dummy_gas_object,
2110                    &self.metrics.bytecode_verifier_metrics,
2111                    &self.config.verifier_signing_config,
2112                )?
2113            } else {
2114                iota_transaction_checks::check_transaction_input(
2115                    epoch_store.protocol_config(),
2116                    reference_gas_price,
2117                    &transaction,
2118                    input_objects,
2119                    &receiving_objects,
2120                    &self.metrics.bytecode_verifier_metrics,
2121                    &self.config.verifier_signing_config,
2122                )?
2123            }
2124        };
2125
2126        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2127            .expect("Creating an executor should not fail here");
2128        let intent_msg = IntentMessage::new(
2129            Intent {
2130                version: IntentVersion::V0,
2131                scope: IntentScope::TransactionData,
2132                app_id: AppId::Iota,
2133            },
2134            transaction,
2135        );
2136        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2137        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2138            self.get_backing_store().as_ref(),
2139            protocol_config,
2140            self.metrics.limits_metrics.clone(),
2141            // expensive checks
2142            false,
2143            self.config.certificate_deny_config.certificate_deny_set(),
2144            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2145            epoch_store
2146                .epoch_start_config()
2147                .epoch_data()
2148                .epoch_start_timestamp(),
2149            checked_input_objects,
2150            gas_objects,
2151            gas_status,
2152            transaction_kind,
2153            sender,
2154            transaction_digest,
2155            skip_checks,
2156        );
2157
2158        let raw_effects = if show_raw_txn_data_and_effects {
2159            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2160                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2161            })?
2162        } else {
2163            vec![]
2164        };
2165
2166        let mut layout_resolver =
2167            epoch_store
2168                .executor()
2169                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2170                    &inner_temp_store,
2171                    self.get_backing_package_store(),
2172                )));
2173
2174        DevInspectResults::new(
2175            effects,
2176            inner_temp_store.events.clone(),
2177            execution_result,
2178            raw_txn_data,
2179            raw_effects,
2180            layout_resolver.as_mut(),
2181        )
2182    }
2183
2184    // Only used for testing because of how epoch store is loaded.
2185    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2186        let epoch_store = self.epoch_store_for_testing();
2187        Ok(epoch_store.reference_gas_price())
2188    }
2189
2190    #[instrument(level = "trace", skip_all)]
2191    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2192        self.get_transaction_cache_reader()
2193            .try_is_tx_already_executed(digest)
2194    }
2195
2196    /// Non-fallible version of `try_is_tx_already_executed`.
2197    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2198        self.try_is_tx_already_executed(digest)
2199            .expect("storage access failed")
2200    }
2201
2202    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2203    #[instrument(level = "debug", skip_all, err)]
2204    fn index_tx(
2205        &self,
2206        indexes: &IndexStore,
2207        digest: &TransactionDigest,
2208        // TODO: index_tx really just need the transaction data here.
2209        cert: &VerifiedExecutableTransaction,
2210        effects: &TransactionEffects,
2211        events: &TransactionEvents,
2212        timestamp_ms: u64,
2213        tx_coins: Option<TxCoins>,
2214        written: &WrittenObjects,
2215        inner_temporary_store: &InnerTemporaryStore,
2216    ) -> IotaResult<u64> {
2217        let changes = self
2218            .process_object_index(effects, written, inner_temporary_store)
2219            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2220
2221        indexes.index_tx(
2222            cert.data().intent_message().value.sender(),
2223            cert.data()
2224                .intent_message()
2225                .value
2226                .input_objects()?
2227                .iter()
2228                .map(|o| o.object_id()),
2229            effects
2230                .all_changed_objects()
2231                .into_iter()
2232                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2233            cert.data()
2234                .intent_message()
2235                .value
2236                .move_calls()
2237                .into_iter()
2238                .map(|(package, module, function)| {
2239                    (*package, module.to_owned(), function.to_owned())
2240                }),
2241            events,
2242            changes,
2243            digest,
2244            timestamp_ms,
2245            tx_coins,
2246        )
2247    }
2248
2249    #[cfg(msim)]
2250    fn create_fail_state(
2251        &self,
2252        certificate: &VerifiedExecutableTransaction,
2253        epoch_store: &Arc<AuthorityPerEpochStore>,
2254        effects: &mut TransactionEffects,
2255    ) {
2256        use std::cell::RefCell;
2257        thread_local! {
2258            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2259        }
2260        if !certificate.data().intent_message().value.is_system_tx() {
2261            let committee = epoch_store.committee();
2262            let cur_stake = (**committee).weight(&self.name);
2263            if cur_stake > 0 {
2264                FAIL_STATE.with_borrow_mut(|fail_state| {
2265                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2266                    if fail_state.0 < committee.validity_threshold() {
2267                        fail_state.0 += cur_stake;
2268                        fail_state.1.insert(self.name);
2269                    }
2270
2271                    if fail_state.1.contains(&self.name) {
2272                        info!("cp_exec failing tx");
2273                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2274                    }
2275                });
2276            }
2277        }
2278    }
2279
2280    fn process_object_index(
2281        &self,
2282        effects: &TransactionEffects,
2283        written: &WrittenObjects,
2284        inner_temporary_store: &InnerTemporaryStore,
2285    ) -> IotaResult<ObjectIndexChanges> {
2286        let epoch_store = self.load_epoch_store_one_call_per_task();
2287        let mut layout_resolver =
2288            epoch_store
2289                .executor()
2290                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2291                    inner_temporary_store,
2292                    self.get_backing_package_store(),
2293                )));
2294
2295        let modified_at_version = effects
2296            .modified_at_versions()
2297            .into_iter()
2298            .collect::<HashMap<_, _>>();
2299
2300        let tx_digest = effects.transaction_digest();
2301        let mut deleted_owners = vec![];
2302        let mut deleted_dynamic_fields = vec![];
2303        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2304            let old_version = modified_at_version.get(&id).unwrap();
2305            // When we process the index, the latest object hasn't been written yet so
2306            // the old object must be present.
2307            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2308                |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2309            ) {
2310                Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2311                Owner::ObjectOwner(object_id) => {
2312                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2313                }
2314                _ => {}
2315            }
2316        }
2317
2318        let mut new_owners = vec![];
2319        let mut new_dynamic_fields = vec![];
2320
2321        for (oref, owner, kind) in effects.all_changed_objects() {
2322            let id = &oref.0;
2323            // For mutated objects, retrieve old owner and delete old index if there is a
2324            // owner change.
2325            if let WriteKind::Mutate = kind {
2326                let Some(old_version) = modified_at_version.get(id) else {
2327                    panic!(
2328                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2329                    );
2330                };
2331                // When we process the index, the latest object hasn't been written yet so
2332                // the old object must be present.
2333                let Some(old_object) = self
2334                    .get_object_store()
2335                    .try_get_object_by_key(id, *old_version)?
2336                else {
2337                    panic!(
2338                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2339                    );
2340                };
2341                if old_object.owner != owner {
2342                    match old_object.owner {
2343                        Owner::AddressOwner(addr) => {
2344                            deleted_owners.push((addr, *id));
2345                        }
2346                        Owner::ObjectOwner(object_id) => {
2347                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2348                        }
2349                        _ => {}
2350                    }
2351                }
2352            }
2353
2354            match owner {
2355                Owner::AddressOwner(addr) => {
2356                    // TODO: We can remove the object fetching after we added ObjectType to
2357                    // TransactionEffects
2358                    let new_object = written.get(id).unwrap_or_else(
2359                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2360                    );
2361                    assert_eq!(
2362                        new_object.version(),
2363                        oref.1,
2364                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2365                        tx_digest,
2366                        id,
2367                        new_object.version(),
2368                        oref.1
2369                    );
2370
2371                    let type_ = new_object
2372                        .type_()
2373                        .map(|type_| ObjectType::Struct(type_.clone()))
2374                        .unwrap_or(ObjectType::Package);
2375
2376                    new_owners.push((
2377                        (addr, *id),
2378                        ObjectInfo {
2379                            object_id: *id,
2380                            version: oref.1,
2381                            digest: oref.2,
2382                            type_,
2383                            owner,
2384                            previous_transaction: *effects.transaction_digest(),
2385                        },
2386                    ));
2387                }
2388                Owner::ObjectOwner(owner) => {
2389                    let new_object = written.get(id).unwrap_or_else(
2390                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2391                    );
2392                    assert_eq!(
2393                        new_object.version(),
2394                        oref.1,
2395                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2396                        tx_digest,
2397                        id,
2398                        new_object.version(),
2399                        oref.1
2400                    );
2401
2402                    let Some(df_info) = self
2403                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut(), false)
2404                        .unwrap_or_else(|e| {
2405                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2406                            None
2407                        }
2408                    )
2409                        else {
2410                            // Skip indexing for non dynamic field objects.
2411                            continue;
2412                        };
2413                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2414                }
2415                _ => {}
2416            }
2417        }
2418
2419        Ok(ObjectIndexChanges {
2420            deleted_owners,
2421            deleted_dynamic_fields,
2422            new_owners,
2423            new_dynamic_fields,
2424        })
2425    }
2426
2427    fn try_create_dynamic_field_info(
2428        &self,
2429        o: &Object,
2430        written: &WrittenObjects,
2431        resolver: &mut dyn LayoutResolver,
2432        get_latest_object_version: bool,
2433    ) -> IotaResult<Option<DynamicFieldInfo>> {
2434        // Skip if not a move object
2435        let Some(move_object) = o.data.try_as_move().cloned() else {
2436            return Ok(None);
2437        };
2438
2439        // We only index dynamic field objects
2440        if !move_object.type_().is_dynamic_field() {
2441            return Ok(None);
2442        }
2443
2444        let layout = resolver
2445            .get_annotated_layout(&move_object.type_().clone().into())?
2446            .into_layout();
2447
2448        let field =
2449            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2450                IotaError::ObjectDeserialization {
2451                    error: e.to_string(),
2452                }
2453            })?;
2454
2455        let type_ = field.kind;
2456        let name_type: TypeTag = field.name_layout.into();
2457        let bcs_name = field.name_bytes.to_owned();
2458
2459        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2460            .map_err(|e| {
2461                warn!("{e}");
2462                IotaError::ObjectDeserialization {
2463                    error: e.to_string(),
2464                }
2465            })?;
2466
2467        let name = DynamicFieldName {
2468            type_: name_type,
2469            value: IotaMoveValue::from(name_value).to_json_value(),
2470        };
2471
2472        let value_metadata = field.value_metadata().map_err(|e| {
2473            warn!("{e}");
2474            IotaError::ObjectDeserialization {
2475                error: e.to_string(),
2476            }
2477        })?;
2478
2479        Ok(Some(match value_metadata {
2480            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2481                name,
2482                bcs_name,
2483                type_,
2484                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2485                object_id: o.id(),
2486                version: o.version(),
2487                digest: o.digest(),
2488            },
2489
2490            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2491                // Find the actual object from storage using the object id obtained from the
2492                // wrapper.
2493
2494                // Try to find the object in the written objects first.
2495                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2496                    (
2497                        object.version(),
2498                        object.digest(),
2499                        object.data.type_().unwrap().clone(),
2500                    )
2501                } else {
2502                    // If not found, try to find it in the database.
2503                    let object = if get_latest_object_version {
2504                        // Loading genesis object could meet a condition that the version of the
2505                        // genesis object is behind the one in the snapshot.
2506                        // In this case, the object can not be found with get_object_by_key, we need
2507                        // to use get_object instead. Since get_object is a heavier operation, we
2508                        // only allow to use it for genesis.
2509                        // reference: https://github.com/iotaledger/iota/issues/7267
2510                        self.get_object_store()
2511                            .try_get_object(&object_id)?
2512                            .ok_or_else(|| UserInputError::ObjectNotFound {
2513                                object_id,
2514                                version: Some(o.version()),
2515                            })?
2516                    } else {
2517                        // Non-genesis object should be in the database with the given version.
2518                        self.get_object_store()
2519                            .try_get_object_by_key(&object_id, o.version())?
2520                            .ok_or_else(|| UserInputError::ObjectNotFound {
2521                                object_id,
2522                                version: Some(o.version()),
2523                            })?
2524                    };
2525
2526                    (
2527                        object.version(),
2528                        object.digest(),
2529                        object.data.type_().unwrap().clone(),
2530                    )
2531                };
2532
2533                DynamicFieldInfo {
2534                    name,
2535                    bcs_name,
2536                    type_,
2537                    object_type: object_type.to_string(),
2538                    object_id,
2539                    version,
2540                    digest,
2541                }
2542            }
2543        }))
2544    }
2545
2546    #[instrument(level = "trace", skip_all, err)]
2547    fn post_process_one_tx(
2548        &self,
2549        certificate: &VerifiedExecutableTransaction,
2550        effects: &TransactionEffects,
2551        inner_temporary_store: &InnerTemporaryStore,
2552        epoch_store: &Arc<AuthorityPerEpochStore>,
2553    ) -> IotaResult {
2554        if self.indexes.is_none() {
2555            return Ok(());
2556        }
2557
2558        let tx_digest = certificate.digest();
2559        let timestamp_ms = Self::unixtime_now_ms();
2560        let events = &inner_temporary_store.events;
2561        let written = &inner_temporary_store.written;
2562        let tx_coins =
2563            self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2564
2565        // Index tx
2566        if let Some(indexes) = &self.indexes {
2567            let _ = self
2568                .index_tx(
2569                    indexes.as_ref(),
2570                    tx_digest,
2571                    certificate,
2572                    effects,
2573                    events,
2574                    timestamp_ms,
2575                    tx_coins,
2576                    written,
2577                    inner_temporary_store,
2578                )
2579                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2580                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2581                .expect("Indexing tx should not fail");
2582
2583            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2584            let events = self.make_transaction_block_events(
2585                events.clone(),
2586                *tx_digest,
2587                timestamp_ms,
2588                epoch_store,
2589                inner_temporary_store,
2590            )?;
2591            // Emit events
2592            self.subscription_handler
2593                .process_tx(certificate.data().transaction_data(), &effects, &events)
2594                .tap_ok(|_| {
2595                    self.metrics
2596                        .post_processing_total_tx_had_event_processed
2597                        .inc()
2598                })
2599                .tap_err(|e| {
2600                    warn!(
2601                        ?tx_digest,
2602                        "Post processing - Couldn't process events for tx: {}", e
2603                    )
2604                })?;
2605
2606            self.metrics
2607                .post_processing_total_events_emitted
2608                .inc_by(events.data.len() as u64);
2609        };
2610        Ok(())
2611    }
2612
2613    fn make_transaction_block_events(
2614        &self,
2615        transaction_events: TransactionEvents,
2616        digest: TransactionDigest,
2617        timestamp_ms: u64,
2618        epoch_store: &Arc<AuthorityPerEpochStore>,
2619        inner_temporary_store: &InnerTemporaryStore,
2620    ) -> IotaResult<IotaTransactionBlockEvents> {
2621        let mut layout_resolver =
2622            epoch_store
2623                .executor()
2624                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2625                    inner_temporary_store,
2626                    self.get_backing_package_store(),
2627                )));
2628        IotaTransactionBlockEvents::try_from(
2629            transaction_events,
2630            digest,
2631            Some(timestamp_ms),
2632            layout_resolver.as_mut(),
2633        )
2634    }
2635
2636    pub fn unixtime_now_ms() -> u64 {
2637        let now = SystemTime::now()
2638            .duration_since(UNIX_EPOCH)
2639            .expect("Time went backwards")
2640            .as_millis();
2641        u64::try_from(now).expect("Travelling in time machine")
2642    }
2643
2644    #[instrument(level = "trace", skip_all)]
2645    pub async fn handle_transaction_info_request(
2646        &self,
2647        request: TransactionInfoRequest,
2648    ) -> IotaResult<TransactionInfoResponse> {
2649        let epoch_store = self.load_epoch_store_one_call_per_task();
2650        let (transaction, status) = self
2651            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2652            .ok_or(IotaError::TransactionNotFound {
2653                digest: request.transaction_digest,
2654            })?;
2655        Ok(TransactionInfoResponse {
2656            transaction,
2657            status,
2658        })
2659    }
2660
2661    #[instrument(level = "trace", skip_all)]
2662    pub async fn handle_object_info_request(
2663        &self,
2664        request: ObjectInfoRequest,
2665    ) -> IotaResult<ObjectInfoResponse> {
2666        let epoch_store = self.load_epoch_store_one_call_per_task();
2667
2668        let requested_object_seq = match request.request_kind {
2669            ObjectInfoRequestKind::LatestObjectInfo => {
2670                let (_, seq, _) = self
2671                    .try_get_object_or_tombstone(request.object_id)
2672                    .await?
2673                    .ok_or_else(|| {
2674                        IotaError::from(UserInputError::ObjectNotFound {
2675                            object_id: request.object_id,
2676                            version: None,
2677                        })
2678                    })?;
2679                seq
2680            }
2681            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2682        };
2683
2684        let object = self
2685            .get_object_store()
2686            .try_get_object_by_key(&request.object_id, requested_object_seq)?
2687            .ok_or_else(|| {
2688                IotaError::from(UserInputError::ObjectNotFound {
2689                    object_id: request.object_id,
2690                    version: Some(requested_object_seq),
2691                })
2692            })?;
2693
2694        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2695            (request.generate_layout, object.data.try_as_move())
2696        {
2697            Some(into_struct_layout(
2698                epoch_store
2699                    .executor()
2700                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2701                    .get_annotated_layout(&move_obj.type_().clone().into())?,
2702            )?)
2703        } else {
2704            None
2705        };
2706
2707        let lock = if !object.is_address_owned() {
2708            // Only address owned objects have locks.
2709            None
2710        } else {
2711            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2712                .await?
2713                .map(|s| s.into_inner())
2714        };
2715
2716        Ok(ObjectInfoResponse {
2717            object,
2718            layout,
2719            lock_for_debugging: lock,
2720        })
2721    }
2722
2723    #[instrument(level = "trace", skip_all)]
2724    pub fn handle_checkpoint_request(
2725        &self,
2726        request: &CheckpointRequest,
2727    ) -> IotaResult<CheckpointResponse> {
2728        let summary = if request.certified {
2729            let summary = match request.sequence_number {
2730                Some(seq) => self
2731                    .checkpoint_store
2732                    .get_checkpoint_by_sequence_number(seq)?,
2733                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2734            }
2735            .map(|v| v.into_inner());
2736            summary.map(CheckpointSummaryResponse::Certified)
2737        } else {
2738            let summary = match request.sequence_number {
2739                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2740                None => self
2741                    .checkpoint_store
2742                    .get_latest_locally_computed_checkpoint()?,
2743            };
2744            summary.map(CheckpointSummaryResponse::Pending)
2745        };
2746        let contents = match &summary {
2747            Some(s) => self
2748                .checkpoint_store
2749                .get_checkpoint_contents(&s.content_digest())?,
2750            None => None,
2751        };
2752        Ok(CheckpointResponse {
2753            checkpoint: summary,
2754            contents,
2755        })
2756    }
2757
2758    fn check_protocol_version(
2759        supported_protocol_versions: SupportedProtocolVersions,
2760        current_version: ProtocolVersion,
2761    ) {
2762        info!("current protocol version is now {:?}", current_version);
2763        info!("supported versions are: {:?}", supported_protocol_versions);
2764        if !supported_protocol_versions.is_version_supported(current_version) {
2765            let msg = format!(
2766                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2767            );
2768
2769            error!("{}", msg);
2770            eprintln!("{msg}");
2771
2772            #[cfg(not(msim))]
2773            std::process::exit(1);
2774
2775            #[cfg(msim)]
2776            iota_simulator::task::shutdown_current_node();
2777        }
2778    }
2779
2780    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
2781    pub async fn new(
2782        name: AuthorityName,
2783        secret: StableSyncAuthoritySigner,
2784        supported_protocol_versions: SupportedProtocolVersions,
2785        store: Arc<AuthorityStore>,
2786        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2787        epoch_store: Arc<AuthorityPerEpochStore>,
2788        committee_store: Arc<CommitteeStore>,
2789        indexes: Option<Arc<IndexStore>>,
2790        rest_index: Option<Arc<RestIndexStore>>,
2791        checkpoint_store: Arc<CheckpointStore>,
2792        prometheus_registry: &Registry,
2793        genesis_objects: &[Object],
2794        db_checkpoint_config: &DBCheckpointConfig,
2795        config: NodeConfig,
2796        archive_readers: ArchiveReaderBalancer,
2797        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2798        chain_identifier: ChainIdentifier,
2799        pruner_db: Option<Arc<AuthorityPrunerTables>>,
2800    ) -> Arc<Self> {
2801        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2802
2803        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2804        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2805        let transaction_manager = Arc::new(TransactionManager::new(
2806            execution_cache_trait_pointers.object_cache_reader.clone(),
2807            execution_cache_trait_pointers
2808                .transaction_cache_reader
2809                .clone(),
2810            &epoch_store,
2811            tx_ready_certificates,
2812            metrics.clone(),
2813        ));
2814        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
2815
2816        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
2817            epoch_store.get_parent_path(),
2818            &config.authority_store_pruning_config,
2819        );
2820        let _pruner = AuthorityStorePruner::new(
2821            store.perpetual_tables.clone(),
2822            checkpoint_store.clone(),
2823            rest_index.clone(),
2824            config.authority_store_pruning_config.clone(),
2825            epoch_store.committee().authority_exists(&name),
2826            epoch_store.epoch_start_state().epoch_duration_ms(),
2827            prometheus_registry,
2828            archive_readers,
2829            pruner_db,
2830        );
2831        let input_loader =
2832            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
2833        let epoch = epoch_store.epoch();
2834        let rgp = epoch_store.reference_gas_price();
2835        let state = Arc::new(AuthorityState {
2836            name,
2837            secret,
2838            execution_lock: RwLock::new(epoch),
2839            epoch_store: ArcSwap::new(epoch_store.clone()),
2840            input_loader,
2841            execution_cache_trait_pointers,
2842            indexes,
2843            rest_index,
2844            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
2845            checkpoint_store,
2846            committee_store,
2847            transaction_manager,
2848            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
2849            metrics,
2850            _pruner,
2851            _authority_per_epoch_pruner,
2852            db_checkpoint_config: db_checkpoint_config.clone(),
2853            config,
2854            overload_info: AuthorityOverloadInfo::default(),
2855            validator_tx_finalizer,
2856            chain_identifier,
2857            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
2858        });
2859
2860        // Start a task to execute ready certificates.
2861        let authority_state = Arc::downgrade(&state);
2862        spawn_monitored_task!(execution_process(
2863            authority_state,
2864            rx_ready_certificates,
2865            rx_execution_shutdown,
2866        ));
2867
2868        // TODO: This doesn't belong to the constructor of AuthorityState.
2869        state
2870            .create_owner_index_if_empty(genesis_objects, &epoch_store)
2871            .expect("Error indexing genesis objects.");
2872
2873        state
2874    }
2875
2876    // TODO: Consolidate our traits to reduce the number of methods here.
2877    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
2878        &self.execution_cache_trait_pointers.object_cache_reader
2879    }
2880
2881    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
2882        &self.execution_cache_trait_pointers.transaction_cache_reader
2883    }
2884
2885    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
2886        &self.execution_cache_trait_pointers.cache_writer
2887    }
2888
2889    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
2890        &self.execution_cache_trait_pointers.backing_store
2891    }
2892
2893    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
2894        &self.execution_cache_trait_pointers.backing_package_store
2895    }
2896
2897    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
2898        &self.execution_cache_trait_pointers.object_store
2899    }
2900
2901    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
2902        &self.execution_cache_trait_pointers.reconfig_api
2903    }
2904
2905    pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
2906        &self.execution_cache_trait_pointers.accumulator_store
2907    }
2908
2909    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
2910        &self.execution_cache_trait_pointers.checkpoint_cache
2911    }
2912
2913    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
2914        &self.execution_cache_trait_pointers.state_sync_store
2915    }
2916
2917    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
2918        &self.execution_cache_trait_pointers.cache_commit
2919    }
2920
2921    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
2922        self.execution_cache_trait_pointers
2923            .testing_api
2924            .database_for_testing()
2925    }
2926
2927    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
2928        &self,
2929        config: NodeConfig,
2930        metrics: Arc<AuthorityStorePruningMetrics>,
2931    ) -> anyhow::Result<()> {
2932        let archive_readers =
2933            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
2934        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
2935            &self.database_for_testing().perpetual_tables,
2936            &self.checkpoint_store,
2937            self.rest_index.as_deref(),
2938            None,
2939            config.authority_store_pruning_config,
2940            metrics,
2941            archive_readers,
2942            EPOCH_DURATION_MS_FOR_TESTING,
2943        )
2944        .await
2945    }
2946
2947    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
2948        &self.transaction_manager
2949    }
2950
2951    /// Adds transactions / certificates to transaction manager for ordered
2952    /// execution.
2953    pub fn enqueue_transactions_for_execution(
2954        &self,
2955        txns: Vec<VerifiedExecutableTransaction>,
2956        epoch_store: &Arc<AuthorityPerEpochStore>,
2957    ) {
2958        self.transaction_manager.enqueue(txns, epoch_store)
2959    }
2960
2961    /// Adds certificates to transaction manager for ordered execution.
2962    pub fn enqueue_certificates_for_execution(
2963        &self,
2964        certs: Vec<VerifiedCertificate>,
2965        epoch_store: &Arc<AuthorityPerEpochStore>,
2966    ) {
2967        self.transaction_manager
2968            .enqueue_certificates(certs, epoch_store)
2969    }
2970
2971    pub fn enqueue_with_expected_effects_digest(
2972        &self,
2973        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
2974        epoch_store: &AuthorityPerEpochStore,
2975    ) {
2976        self.transaction_manager
2977            .enqueue_with_expected_effects_digest(certs, epoch_store)
2978    }
2979
2980    fn create_owner_index_if_empty(
2981        &self,
2982        genesis_objects: &[Object],
2983        epoch_store: &Arc<AuthorityPerEpochStore>,
2984    ) -> IotaResult {
2985        let Some(index_store) = &self.indexes else {
2986            return Ok(());
2987        };
2988        if !index_store.is_empty() {
2989            return Ok(());
2990        }
2991
2992        let mut new_owners = vec![];
2993        let mut new_dynamic_fields = vec![];
2994        let mut layout_resolver = epoch_store
2995            .executor()
2996            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
2997        for o in genesis_objects.iter() {
2998            match o.owner {
2999                Owner::AddressOwner(addr) => new_owners.push((
3000                    (addr, o.id()),
3001                    ObjectInfo::new(&o.compute_object_reference(), o),
3002                )),
3003                Owner::ObjectOwner(object_id) => {
3004                    let id = o.id();
3005                    let Some(info) = self.try_create_dynamic_field_info(
3006                        o,
3007                        &BTreeMap::new(),
3008                        layout_resolver.as_mut(),
3009                        true,
3010                    )?
3011                    else {
3012                        continue;
3013                    };
3014                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3015                }
3016                _ => {}
3017            }
3018        }
3019
3020        index_store.insert_genesis_objects(ObjectIndexChanges {
3021            deleted_owners: vec![],
3022            deleted_dynamic_fields: vec![],
3023            new_owners,
3024            new_dynamic_fields,
3025        })
3026    }
3027
3028    /// Attempts to acquire execution lock for an executable transaction.
3029    /// Returns the lock if the transaction is matching current executed epoch
3030    /// Returns None otherwise
3031    pub fn execution_lock_for_executable_transaction(
3032        &self,
3033        transaction: &VerifiedExecutableTransaction,
3034    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3035        let lock = self
3036            .execution_lock
3037            .try_read()
3038            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3039        if *lock == transaction.auth_sig().epoch() {
3040            Ok(lock)
3041        } else {
3042            Err(IotaError::WrongEpoch {
3043                expected_epoch: *lock,
3044                actual_epoch: transaction.auth_sig().epoch(),
3045            })
3046        }
3047    }
3048
3049    /// Acquires the execution lock for the duration of a transaction signing
3050    /// request. This prevents reconfiguration from starting until we are
3051    /// finished handling the signing request. Otherwise, in-memory lock
3052    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3053    /// while we are attempting to acquire locks for the transaction.
3054    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3055        self.execution_lock
3056            .try_read()
3057            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3058    }
3059
3060    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3061        self.execution_lock.write().await
3062    }
3063
3064    #[instrument(level = "error", skip_all)]
3065    pub async fn reconfigure(
3066        &self,
3067        cur_epoch_store: &AuthorityPerEpochStore,
3068        supported_protocol_versions: SupportedProtocolVersions,
3069        new_committee: Committee,
3070        epoch_start_configuration: EpochStartConfiguration,
3071        accumulator: Arc<StateAccumulator>,
3072        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3073        epoch_supply_change: i64,
3074        epoch_last_checkpoint: CheckpointSequenceNumber,
3075    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3076        Self::check_protocol_version(
3077            supported_protocol_versions,
3078            epoch_start_configuration
3079                .epoch_start_state()
3080                .protocol_version(),
3081        );
3082        self.metrics.reset_on_reconfigure();
3083        self.committee_store.insert_new_committee(&new_committee)?;
3084
3085        // Wait until no transactions are being executed.
3086        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3087
3088        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3089        cur_epoch_store.epoch_terminated().await;
3090
3091        let highest_locally_built_checkpoint_seq = self
3092            .checkpoint_store
3093            .get_latest_locally_computed_checkpoint()?
3094            .map(|c| *c.sequence_number())
3095            .unwrap_or(0);
3096
3097        assert!(
3098            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3099            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3100        );
3101        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3102            // if we built the last checkpoint locally (as opposed to receiving it from a
3103            // peer), then all shared_version_assignments except the one for the
3104            // ChangeEpoch transaction should have been removed
3105            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3106            // Note that while 1 is the typical value, 0 is possible if the node restarts
3107            // after committing the last checkpoint but before reconfiguring.
3108            if num_shared_version_assignments > 1 {
3109                // If this happens in prod, we have a memory leak, but not a correctness issue.
3110                debug_fatal!(
3111                    "all shared_version_assignments should have been removed \
3112                    (num_shared_version_assignments: {num_shared_version_assignments})"
3113                );
3114            }
3115        }
3116
3117        // Safe to reconfigure now. No transactions are being executed,
3118        // and no epoch-specific tasks are running.
3119
3120        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3121        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3122        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3123            .await?;
3124        self.get_reconfig_api()
3125            .clear_state_end_of_epoch(&execution_lock);
3126        self.check_system_consistency(
3127            cur_epoch_store,
3128            accumulator,
3129            expensive_safety_check_config,
3130            epoch_supply_change,
3131        )?;
3132        self.get_reconfig_api()
3133            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3134        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3135            if self
3136                .db_checkpoint_config
3137                .perform_db_checkpoints_at_epoch_end
3138            {
3139                let checkpoint_indexes = self
3140                    .db_checkpoint_config
3141                    .perform_index_db_checkpoints_at_epoch_end
3142                    .unwrap_or(false);
3143                let current_epoch = cur_epoch_store.epoch();
3144                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3145                self.checkpoint_all_dbs(
3146                    &epoch_checkpoint_path,
3147                    cur_epoch_store,
3148                    checkpoint_indexes,
3149                )?;
3150            }
3151        }
3152
3153        self.get_reconfig_api()
3154            .reconfigure_cache(&epoch_start_configuration)
3155            .await;
3156
3157        let new_epoch = new_committee.epoch;
3158        let new_epoch_store = self
3159            .reopen_epoch_db(
3160                cur_epoch_store,
3161                new_committee,
3162                epoch_start_configuration,
3163                expensive_safety_check_config,
3164                epoch_last_checkpoint,
3165            )
3166            .await?;
3167        assert_eq!(new_epoch_store.epoch(), new_epoch);
3168        self.transaction_manager.reconfigure(new_epoch);
3169        *execution_lock = new_epoch;
3170        // drop execution_lock after epoch store was updated
3171        // see also assert in AuthorityState::process_certificate
3172        // on the epoch store and execution lock epoch match
3173        Ok(new_epoch_store)
3174    }
3175
3176    /// Advance the epoch store to the next epoch for testing only.
3177    /// This only manually sets all the places where we have the epoch number.
3178    /// It doesn't properly reconfigure the node, hence should be only used for
3179    /// testing.
3180    pub async fn reconfigure_for_testing(&self) {
3181        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3182        let epoch_store = self.epoch_store_for_testing().clone();
3183        let protocol_config = epoch_store.protocol_config().clone();
3184        // The current protocol config used in the epoch store may have been overridden
3185        // and diverged from the protocol config definitions. That override may
3186        // have now been dropped when the initial guard was dropped. We reapply
3187        // the override before creating the new epoch store, to make sure that
3188        // the new epoch store has the same protocol config as the current one.
3189        // Since this is for testing only, we mostly like to keep the protocol config
3190        // the same across epochs.
3191        let _guard =
3192            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3193        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3194            self.get_backing_package_store().clone(),
3195            self.get_object_store().clone(),
3196            &self.config.expensive_safety_check_config,
3197            self.checkpoint_store
3198                .get_epoch_last_checkpoint(epoch_store.epoch())
3199                .unwrap()
3200                .map(|c| *c.sequence_number())
3201                .unwrap_or_default(),
3202        );
3203        let new_epoch = new_epoch_store.epoch();
3204        self.transaction_manager.reconfigure(new_epoch);
3205        self.epoch_store.store(new_epoch_store);
3206        epoch_store.epoch_terminated().await;
3207        *execution_lock = new_epoch;
3208    }
3209
3210    #[instrument(level = "error", skip_all)]
3211    fn check_system_consistency(
3212        &self,
3213        cur_epoch_store: &AuthorityPerEpochStore,
3214        accumulator: Arc<StateAccumulator>,
3215        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3216        epoch_supply_change: i64,
3217    ) -> IotaResult<()> {
3218        info!(
3219            "Performing iota conservation consistency check for epoch {}",
3220            cur_epoch_store.epoch()
3221        );
3222
3223        if cfg!(debug_assertions) {
3224            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3225        }
3226
3227        self.get_reconfig_api()
3228            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3229
3230        // check for root state hash consistency with live object set
3231        if expensive_safety_check_config.enable_state_consistency_check() {
3232            info!(
3233                "Performing state consistency check for epoch {}",
3234                cur_epoch_store.epoch()
3235            );
3236            self.expensive_check_is_consistent_state(
3237                accumulator,
3238                cur_epoch_store,
3239                cfg!(debug_assertions), // panic in debug mode only
3240            );
3241        }
3242
3243        if expensive_safety_check_config.enable_secondary_index_checks() {
3244            if let Some(indexes) = self.indexes.clone() {
3245                verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3246                    .expect("secondary indexes are inconsistent");
3247            }
3248        }
3249
3250        Ok(())
3251    }
3252
3253    fn expensive_check_is_consistent_state(
3254        &self,
3255        accumulator: Arc<StateAccumulator>,
3256        cur_epoch_store: &AuthorityPerEpochStore,
3257        panic: bool,
3258    ) {
3259        let live_object_set_hash = accumulator.digest_live_object_set();
3260
3261        let root_state_hash: ECMHLiveObjectSetDigest = self
3262            .get_accumulator_store()
3263            .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3264            .expect("Retrieving root state hash cannot fail")
3265            .expect("Root state hash for epoch must exist")
3266            .1
3267            .digest()
3268            .into();
3269
3270        let is_inconsistent = root_state_hash != live_object_set_hash;
3271        if is_inconsistent {
3272            if panic {
3273                panic!(
3274                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3275                );
3276            } else {
3277                error!(
3278                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3279                    root_state_hash, live_object_set_hash
3280                );
3281            }
3282        } else {
3283            info!("State consistency check passed");
3284        }
3285
3286        if !panic {
3287            accumulator.set_inconsistent_state(is_inconsistent);
3288        }
3289    }
3290
3291    pub fn current_epoch_for_testing(&self) -> EpochId {
3292        self.epoch_store_for_testing().epoch()
3293    }
3294
3295    #[instrument(level = "error", skip_all)]
3296    pub fn checkpoint_all_dbs(
3297        &self,
3298        checkpoint_path: &Path,
3299        cur_epoch_store: &AuthorityPerEpochStore,
3300        checkpoint_indexes: bool,
3301    ) -> IotaResult {
3302        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3303        let current_epoch = cur_epoch_store.epoch();
3304
3305        if checkpoint_path.exists() {
3306            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3307            return Ok(());
3308        }
3309
3310        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3311        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3312
3313        if checkpoint_path_tmp.exists() {
3314            fs::remove_dir_all(&checkpoint_path_tmp)
3315                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3316        }
3317
3318        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3319        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3320
3321        // NOTE: Do not change the order of invoking these checkpoint calls
3322        // We want to snapshot checkpoint db first to not race with state sync
3323        self.checkpoint_store
3324            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3325
3326        self.get_reconfig_api()
3327            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3328
3329        self.committee_store
3330            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3331
3332        if checkpoint_indexes {
3333            if let Some(indexes) = self.indexes.as_ref() {
3334                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3335            }
3336        }
3337
3338        fs::rename(checkpoint_path_tmp, checkpoint_path)
3339            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3340        Ok(())
3341    }
3342
3343    /// Load the current epoch store. This can change during reconfiguration. To
3344    /// ensure that we never end up accessing different epoch stores in a
3345    /// single task, we need to make sure that this is called once per task.
3346    /// Each call needs to be carefully audited to ensure it is
3347    /// the case. This also means we should minimize the number of call-sites.
3348    /// Only call it when there is no way to obtain it from somewhere else.
3349    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3350        self.epoch_store.load()
3351    }
3352
3353    // Load the epoch store, should be used in tests only.
3354    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3355        self.load_epoch_store_one_call_per_task()
3356    }
3357
3358    pub fn clone_committee_for_testing(&self) -> Committee {
3359        Committee::clone(self.epoch_store_for_testing().committee())
3360    }
3361
3362    #[instrument(level = "trace", skip_all)]
3363    pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3364        self.get_object_store()
3365            .try_get_object(object_id)
3366            .map_err(Into::into)
3367    }
3368
3369    /// Non-fallible version of `try_get_object`.
3370    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3371        self.try_get_object(object_id)
3372            .await
3373            .expect("storage access failed")
3374    }
3375
3376    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3377        Ok(self
3378            .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3379            .await?
3380            .expect("framework object should always exist")
3381            .compute_object_reference())
3382    }
3383
3384    // This function is only used for testing.
3385    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3386        self.get_object_cache_reader()
3387            .try_get_iota_system_state_object_unsafe()
3388    }
3389
3390    #[instrument(level = "trace", skip_all)]
3391    pub fn get_checkpoint_by_sequence_number(
3392        &self,
3393        sequence_number: CheckpointSequenceNumber,
3394    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3395        Ok(self
3396            .checkpoint_store
3397            .get_checkpoint_by_sequence_number(sequence_number)?)
3398    }
3399
3400    #[instrument(level = "trace", skip_all)]
3401    pub fn get_transaction_checkpoint_for_tests(
3402        &self,
3403        digest: &TransactionDigest,
3404        epoch_store: &AuthorityPerEpochStore,
3405    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3406        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3407        let Some(checkpoint) = checkpoint else {
3408            return Ok(None);
3409        };
3410        let checkpoint = self
3411            .checkpoint_store
3412            .get_checkpoint_by_sequence_number(checkpoint)?;
3413        Ok(checkpoint)
3414    }
3415
3416    #[instrument(level = "trace", skip_all)]
3417    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3418        Ok(
3419            match self
3420                .get_object_cache_reader()
3421                .try_get_latest_object_or_tombstone(*object_id)?
3422            {
3423                Some((_, ObjectOrTombstone::Object(object))) => {
3424                    let layout = self.get_object_layout(&object)?;
3425                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3426                }
3427                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3428                None => ObjectRead::NotExists(*object_id),
3429            },
3430        )
3431    }
3432
3433    /// Chain Identifier is the digest of the genesis checkpoint.
3434    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3435        self.chain_identifier
3436    }
3437
3438    #[instrument(level = "trace", skip_all)]
3439    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3440    where
3441        T: DeserializeOwned,
3442    {
3443        let o = self.get_object_read(object_id)?.into_object()?;
3444        if let Some(move_object) = o.data.try_as_move() {
3445            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3446                IotaError::ObjectDeserialization {
3447                    error: format!("{e}"),
3448                }
3449            })?)
3450        } else {
3451            Err(IotaError::ObjectDeserialization {
3452                error: format!("Provided object : [{object_id}] is not a Move object."),
3453            })
3454        }
3455    }
3456
3457    /// This function aims to serve rpc reads on past objects and
3458    /// we don't expect it to be called for other purposes.
3459    /// Depending on the object pruning policies that will be enforced in the
3460    /// future there is no software-level guarantee/SLA to retrieve an object
3461    /// with an old version even if it exists/existed.
3462    #[instrument(level = "trace", skip_all)]
3463    pub fn get_past_object_read(
3464        &self,
3465        object_id: &ObjectID,
3466        version: SequenceNumber,
3467    ) -> IotaResult<PastObjectRead> {
3468        // Firstly we see if the object ever existed by getting its latest data
3469        let Some(obj_ref) = self
3470            .get_object_cache_reader()
3471            .try_get_latest_object_ref_or_tombstone(*object_id)?
3472        else {
3473            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3474        };
3475
3476        if version > obj_ref.1 {
3477            return Ok(PastObjectRead::VersionTooHigh {
3478                object_id: *object_id,
3479                asked_version: version,
3480                latest_version: obj_ref.1,
3481            });
3482        }
3483
3484        if version < obj_ref.1 {
3485            // Read past objects
3486            return Ok(match self.read_object_at_version(object_id, version)? {
3487                Some((object, layout)) => {
3488                    let obj_ref = object.compute_object_reference();
3489                    PastObjectRead::VersionFound(obj_ref, object, layout)
3490                }
3491
3492                None => PastObjectRead::VersionNotFound(*object_id, version),
3493            });
3494        }
3495
3496        if !obj_ref.2.is_alive() {
3497            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3498        }
3499
3500        match self.read_object_at_version(object_id, obj_ref.1)? {
3501            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3502            None => {
3503                error!(
3504                    "Object with in parent_entry is missing from object store, datastore is \
3505                     inconsistent",
3506                );
3507                Err(UserInputError::ObjectNotFound {
3508                    object_id: *object_id,
3509                    version: Some(obj_ref.1),
3510                }
3511                .into())
3512            }
3513        }
3514    }
3515
3516    #[instrument(level = "trace", skip_all)]
3517    fn read_object_at_version(
3518        &self,
3519        object_id: &ObjectID,
3520        version: SequenceNumber,
3521    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3522        let Some(object) = self
3523            .get_object_cache_reader()
3524            .try_get_object_by_key(object_id, version)?
3525        else {
3526            return Ok(None);
3527        };
3528
3529        let layout = self.get_object_layout(&object)?;
3530        Ok(Some((object, layout)))
3531    }
3532
3533    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3534        let layout = object
3535            .data
3536            .try_as_move()
3537            .map(|object| {
3538                into_struct_layout(
3539                    self.load_epoch_store_one_call_per_task()
3540                        .executor()
3541                        // TODO(cache) - must read through cache
3542                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3543                        .get_annotated_layout(&object.type_().clone().into())?,
3544                )
3545            })
3546            .transpose()?;
3547        Ok(layout)
3548    }
3549
3550    fn get_owner_at_version(
3551        &self,
3552        object_id: &ObjectID,
3553        version: SequenceNumber,
3554    ) -> IotaResult<Owner> {
3555        self.get_object_store()
3556            .try_get_object_by_key(object_id, version)?
3557            .ok_or_else(|| {
3558                IotaError::from(UserInputError::ObjectNotFound {
3559                    object_id: *object_id,
3560                    version: Some(version),
3561                })
3562            })
3563            .map(|o| o.owner)
3564    }
3565
3566    #[instrument(level = "trace", skip_all)]
3567    pub fn get_owner_objects(
3568        &self,
3569        owner: IotaAddress,
3570        // If `Some`, the query will start from the next item after the specified cursor
3571        cursor: Option<ObjectID>,
3572        limit: usize,
3573        filter: Option<IotaObjectDataFilter>,
3574    ) -> IotaResult<Vec<ObjectInfo>> {
3575        if let Some(indexes) = &self.indexes {
3576            indexes.get_owner_objects(owner, cursor, limit, filter)
3577        } else {
3578            Err(IotaError::IndexStoreNotAvailable)
3579        }
3580    }
3581
3582    #[instrument(level = "trace", skip_all)]
3583    pub fn get_owned_coins_iterator_with_cursor(
3584        &self,
3585        owner: IotaAddress,
3586        // If `Some`, the query will start from the next item after the specified cursor
3587        cursor: (String, ObjectID),
3588        limit: usize,
3589        one_coin_type_only: bool,
3590    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3591        if let Some(indexes) = &self.indexes {
3592            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3593        } else {
3594            Err(IotaError::IndexStoreNotAvailable)
3595        }
3596    }
3597
3598    #[instrument(level = "trace", skip_all)]
3599    pub fn get_owner_objects_iterator(
3600        &self,
3601        owner: IotaAddress,
3602        // If `Some`, the query will start from the next item after the specified cursor
3603        cursor: Option<ObjectID>,
3604        filter: Option<IotaObjectDataFilter>,
3605    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3606        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3607        if let Some(indexes) = &self.indexes {
3608            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3609        } else {
3610            Err(IotaError::IndexStoreNotAvailable)
3611        }
3612    }
3613
3614    #[instrument(level = "trace", skip_all)]
3615    pub async fn get_move_objects<T>(
3616        &self,
3617        owner: IotaAddress,
3618        type_: MoveObjectType,
3619    ) -> IotaResult<Vec<T>>
3620    where
3621        T: DeserializeOwned,
3622    {
3623        let object_ids = self
3624            .get_owner_objects_iterator(owner, None, None)?
3625            .filter(|o| match &o.type_ {
3626                ObjectType::Struct(s) => &type_ == s,
3627                ObjectType::Package => false,
3628            })
3629            .map(|info| ObjectKey(info.object_id, info.version))
3630            .collect::<Vec<_>>();
3631        let mut move_objects = vec![];
3632
3633        let objects = self
3634            .get_object_store()
3635            .try_multi_get_objects_by_key(&object_ids)?;
3636
3637        for (o, id) in objects.into_iter().zip(object_ids) {
3638            let object = o.ok_or_else(|| {
3639                IotaError::from(UserInputError::ObjectNotFound {
3640                    object_id: id.0,
3641                    version: Some(id.1),
3642                })
3643            })?;
3644            let move_object = object.data.try_as_move().ok_or_else(|| {
3645                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3646            })?;
3647            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3648                IotaError::ObjectDeserialization {
3649                    error: format!("{e}"),
3650                }
3651            })?);
3652        }
3653        Ok(move_objects)
3654    }
3655
3656    #[instrument(level = "trace", skip_all)]
3657    pub fn get_dynamic_fields(
3658        &self,
3659        owner: ObjectID,
3660        // If `Some`, the query will start from the next item after the specified cursor
3661        cursor: Option<ObjectID>,
3662        limit: usize,
3663    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3664        Ok(self
3665            .get_dynamic_fields_iterator(owner, cursor)?
3666            .take(limit)
3667            .collect::<Result<Vec<_>, _>>()?)
3668    }
3669
3670    fn get_dynamic_fields_iterator(
3671        &self,
3672        owner: ObjectID,
3673        // If `Some`, the query will start from the next item after the specified cursor
3674        cursor: Option<ObjectID>,
3675    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3676    {
3677        if let Some(indexes) = &self.indexes {
3678            indexes.get_dynamic_fields_iterator(owner, cursor)
3679        } else {
3680            Err(IotaError::IndexStoreNotAvailable)
3681        }
3682    }
3683
3684    #[instrument(level = "trace", skip_all)]
3685    pub fn get_dynamic_field_object_id(
3686        &self,
3687        owner: ObjectID,
3688        name_type: TypeTag,
3689        name_bcs_bytes: &[u8],
3690    ) -> IotaResult<Option<ObjectID>> {
3691        if let Some(indexes) = &self.indexes {
3692            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3693        } else {
3694            Err(IotaError::IndexStoreNotAvailable)
3695        }
3696    }
3697
3698    #[instrument(level = "trace", skip_all)]
3699    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3700        Ok(self.get_indexes()?.next_sequence_number())
3701    }
3702
3703    #[instrument(level = "trace", skip_all)]
3704    pub async fn get_executed_transaction_and_effects(
3705        &self,
3706        digest: TransactionDigest,
3707        kv_store: Arc<TransactionKeyValueStore>,
3708    ) -> IotaResult<(Transaction, TransactionEffects)> {
3709        let transaction = kv_store.get_tx(digest).await?;
3710        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3711        Ok((transaction, effects))
3712    }
3713
3714    #[instrument(level = "trace", skip_all)]
3715    pub fn multi_get_checkpoint_by_sequence_number(
3716        &self,
3717        sequence_numbers: &[CheckpointSequenceNumber],
3718    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3719        Ok(self
3720            .checkpoint_store
3721            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3722    }
3723
3724    #[instrument(level = "trace", skip_all)]
3725    pub fn get_transaction_events(
3726        &self,
3727        digest: &TransactionEventsDigest,
3728    ) -> IotaResult<TransactionEvents> {
3729        self.get_transaction_cache_reader()
3730            .try_get_events(digest)?
3731            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3732    }
3733
3734    pub fn get_transaction_input_objects(
3735        &self,
3736        effects: &TransactionEffects,
3737    ) -> anyhow::Result<Vec<Object>> {
3738        let input_object_keys = effects
3739            .modified_at_versions()
3740            .into_iter()
3741            .map(|(object_id, version)| ObjectKey(object_id, version))
3742            .collect::<Vec<_>>();
3743
3744        let input_objects = self
3745            .get_object_store()
3746            .try_multi_get_objects_by_key(&input_object_keys)?
3747            .into_iter()
3748            .enumerate()
3749            .map(|(idx, maybe_object)| {
3750                maybe_object.ok_or_else(|| {
3751                    anyhow::anyhow!(
3752                        "missing input object key {:?} from tx {}",
3753                        input_object_keys[idx],
3754                        effects.transaction_digest()
3755                    )
3756                })
3757            })
3758            .collect::<anyhow::Result<Vec<_>>>()?;
3759        Ok(input_objects)
3760    }
3761
3762    pub fn get_transaction_output_objects(
3763        &self,
3764        effects: &TransactionEffects,
3765    ) -> anyhow::Result<Vec<Object>> {
3766        let output_object_keys = effects
3767            .all_changed_objects()
3768            .into_iter()
3769            .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
3770            .collect::<Vec<_>>();
3771
3772        let output_objects = self
3773            .get_object_store()
3774            .try_multi_get_objects_by_key(&output_object_keys)?
3775            .into_iter()
3776            .enumerate()
3777            .map(|(idx, maybe_object)| {
3778                maybe_object.ok_or_else(|| {
3779                    anyhow::anyhow!(
3780                        "missing output object key {:?} from tx {}",
3781                        output_object_keys[idx],
3782                        effects.transaction_digest()
3783                    )
3784                })
3785            })
3786            .collect::<anyhow::Result<Vec<_>>>()?;
3787        Ok(output_objects)
3788    }
3789
3790    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3791        match &self.indexes {
3792            Some(i) => Ok(i.clone()),
3793            None => Err(IotaError::UnsupportedFeature {
3794                error: "extended object indexing is not enabled on this server".into(),
3795            }),
3796        }
3797    }
3798
3799    pub async fn get_transactions_for_tests(
3800        self: &Arc<Self>,
3801        filter: Option<TransactionFilter>,
3802        cursor: Option<TransactionDigest>,
3803        limit: Option<usize>,
3804        reverse: bool,
3805    ) -> IotaResult<Vec<TransactionDigest>> {
3806        let metrics = KeyValueStoreMetrics::new_for_tests();
3807        let kv_store = Arc::new(TransactionKeyValueStore::new(
3808            "rocksdb",
3809            metrics,
3810            self.clone(),
3811        ));
3812        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3813            .await
3814    }
3815
3816    #[instrument(level = "trace", skip_all)]
3817    pub async fn get_transactions(
3818        &self,
3819        kv_store: &Arc<TransactionKeyValueStore>,
3820        filter: Option<TransactionFilter>,
3821        // If `Some`, the query will start from the next item after the specified cursor
3822        cursor: Option<TransactionDigest>,
3823        limit: Option<usize>,
3824        reverse: bool,
3825    ) -> IotaResult<Vec<TransactionDigest>> {
3826        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3827            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3828            let iter = checkpoint_contents.iter().map(|c| c.transaction);
3829            if reverse {
3830                let iter = iter
3831                    .rev()
3832                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3833                    .skip(usize::from(cursor.is_some()));
3834                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3835            } else {
3836                let iter = iter
3837                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3838                    .skip(usize::from(cursor.is_some()));
3839                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3840            }
3841        }
3842        self.get_indexes()?
3843            .get_transactions(filter, cursor, limit, reverse)
3844    }
3845
3846    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
3847        &self.checkpoint_store
3848    }
3849
3850    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
3851        self.get_checkpoint_store()
3852            .get_highest_executed_checkpoint_seq_number()?
3853            .ok_or(IotaError::UserInput {
3854                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
3855            })
3856    }
3857
3858    #[cfg(msim)]
3859    pub fn get_highest_pruned_checkpoint_for_testing(
3860        &self,
3861    ) -> IotaResult<CheckpointSequenceNumber> {
3862        self.database_for_testing()
3863            .perpetual_tables
3864            .get_highest_pruned_checkpoint()
3865    }
3866
3867    #[instrument(level = "trace", skip_all)]
3868    pub fn get_checkpoint_summary_by_sequence_number(
3869        &self,
3870        sequence_number: CheckpointSequenceNumber,
3871    ) -> IotaResult<CheckpointSummary> {
3872        let verified_checkpoint = self
3873            .get_checkpoint_store()
3874            .get_checkpoint_by_sequence_number(sequence_number)?;
3875        match verified_checkpoint {
3876            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3877            None => Err(IotaError::UserInput {
3878                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3879            }),
3880        }
3881    }
3882
3883    #[instrument(level = "trace", skip_all)]
3884    pub fn get_checkpoint_summary_by_digest(
3885        &self,
3886        digest: CheckpointDigest,
3887    ) -> IotaResult<CheckpointSummary> {
3888        let verified_checkpoint = self
3889            .get_checkpoint_store()
3890            .get_checkpoint_by_digest(&digest)?;
3891        match verified_checkpoint {
3892            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3893            None => Err(IotaError::UserInput {
3894                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3895            }),
3896        }
3897    }
3898
3899    #[instrument(level = "trace", skip_all)]
3900    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
3901        if is_system_package(package_id) {
3902            return self.find_genesis_txn_digest();
3903        }
3904        Ok(self
3905            .get_object_read(&package_id)?
3906            .into_object()?
3907            .previous_transaction)
3908    }
3909
3910    #[instrument(level = "trace", skip_all)]
3911    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
3912        let summary = self
3913            .get_verified_checkpoint_by_sequence_number(0)?
3914            .into_message();
3915        let content = self.get_checkpoint_contents(summary.content_digest)?;
3916        let genesis_transaction = content.enumerate_transactions(&summary).next();
3917        Ok(genesis_transaction
3918            .ok_or(IotaError::UserInput {
3919                error: UserInputError::GenesisTransactionNotFound,
3920            })?
3921            .1
3922            .transaction)
3923    }
3924
3925    #[instrument(level = "trace", skip_all)]
3926    pub fn get_verified_checkpoint_by_sequence_number(
3927        &self,
3928        sequence_number: CheckpointSequenceNumber,
3929    ) -> IotaResult<VerifiedCheckpoint> {
3930        let verified_checkpoint = self
3931            .get_checkpoint_store()
3932            .get_checkpoint_by_sequence_number(sequence_number)?;
3933        match verified_checkpoint {
3934            Some(verified_checkpoint) => Ok(verified_checkpoint),
3935            None => Err(IotaError::UserInput {
3936                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3937            }),
3938        }
3939    }
3940
3941    #[instrument(level = "trace", skip_all)]
3942    pub fn get_verified_checkpoint_summary_by_digest(
3943        &self,
3944        digest: CheckpointDigest,
3945    ) -> IotaResult<VerifiedCheckpoint> {
3946        let verified_checkpoint = self
3947            .get_checkpoint_store()
3948            .get_checkpoint_by_digest(&digest)?;
3949        match verified_checkpoint {
3950            Some(verified_checkpoint) => Ok(verified_checkpoint),
3951            None => Err(IotaError::UserInput {
3952                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3953            }),
3954        }
3955    }
3956
3957    #[instrument(level = "trace", skip_all)]
3958    pub fn get_checkpoint_contents(
3959        &self,
3960        digest: CheckpointContentsDigest,
3961    ) -> IotaResult<CheckpointContents> {
3962        self.get_checkpoint_store()
3963            .get_checkpoint_contents(&digest)?
3964            .ok_or(IotaError::UserInput {
3965                error: UserInputError::CheckpointContentsNotFound(digest),
3966            })
3967    }
3968
3969    #[instrument(level = "trace", skip_all)]
3970    pub fn get_checkpoint_contents_by_sequence_number(
3971        &self,
3972        sequence_number: CheckpointSequenceNumber,
3973    ) -> IotaResult<CheckpointContents> {
3974        let verified_checkpoint = self
3975            .get_checkpoint_store()
3976            .get_checkpoint_by_sequence_number(sequence_number)?;
3977        match verified_checkpoint {
3978            Some(verified_checkpoint) => {
3979                let content_digest = verified_checkpoint.into_inner().content_digest;
3980                self.get_checkpoint_contents(content_digest)
3981            }
3982            None => Err(IotaError::UserInput {
3983                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3984            }),
3985        }
3986    }
3987
3988    #[instrument(level = "trace", skip_all)]
3989    pub async fn query_events(
3990        &self,
3991        kv_store: &Arc<TransactionKeyValueStore>,
3992        query: EventFilter,
3993        // If `Some`, the query will start from the next item after the specified cursor
3994        cursor: Option<EventID>,
3995        limit: usize,
3996        descending: bool,
3997    ) -> IotaResult<Vec<IotaEvent>> {
3998        let index_store = self.get_indexes()?;
3999
4000        // Get the tx_num from tx_digest
4001        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4002            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4003                IotaError::TransactionNotFound {
4004                    digest: cursor.tx_digest,
4005                },
4006            )?;
4007            (tx_seq, cursor.event_seq as usize)
4008        } else if descending {
4009            (u64::MAX, usize::MAX)
4010        } else {
4011            (0, 0)
4012        };
4013
4014        let limit = limit + 1;
4015        let mut event_keys = match query {
4016            EventFilter::All(filters) => {
4017                if filters.is_empty() {
4018                    index_store.all_events(tx_num, event_num, limit, descending)?
4019                } else {
4020                    return Err(IotaError::UserInput {
4021                        error: UserInputError::Unsupported(
4022                            "This query type does not currently support filter combinations"
4023                                .to_string(),
4024                        ),
4025                    });
4026                }
4027            }
4028            EventFilter::Transaction(digest) => {
4029                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4030            }
4031            EventFilter::MoveModule { package, module } => {
4032                let module_id = ModuleId::new(package.into(), module);
4033                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4034            }
4035            EventFilter::MoveEventType(struct_name) => index_store
4036                .events_by_move_event_struct_name(
4037                    &struct_name,
4038                    tx_num,
4039                    event_num,
4040                    limit,
4041                    descending,
4042                )?,
4043            EventFilter::Sender(sender) => {
4044                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4045            }
4046            EventFilter::TimeRange {
4047                start_time,
4048                end_time,
4049            } => index_store
4050                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4051            EventFilter::MoveEventModule { package, module } => index_store
4052                .events_by_move_event_module(
4053                    &ModuleId::new(package.into(), module),
4054                    tx_num,
4055                    event_num,
4056                    limit,
4057                    descending,
4058                )?,
4059            // not using "_ =>" because we want to make sure we remember to add new variants here
4060            EventFilter::Package(_)
4061            | EventFilter::MoveEventField { .. }
4062            | EventFilter::Any(_)
4063            | EventFilter::And(_, _)
4064            | EventFilter::Or(_, _) => {
4065                return Err(IotaError::UserInput {
4066                    error: UserInputError::Unsupported(
4067                        "This query type is not supported by the full node.".to_string(),
4068                    ),
4069                });
4070            }
4071        };
4072
4073        // skip one event if exclusive cursor is provided,
4074        // otherwise truncate to the original limit.
4075        if cursor.is_some() {
4076            if !event_keys.is_empty() {
4077                event_keys.remove(0);
4078            }
4079        } else {
4080            event_keys.truncate(limit - 1);
4081        }
4082
4083        // get the unique set of digests from the event_keys
4084        let transaction_digests = event_keys
4085            .iter()
4086            .map(|(_, digest, _, _)| *digest)
4087            .collect::<HashSet<_>>()
4088            .into_iter()
4089            .collect::<Vec<_>>();
4090
4091        let events = kv_store
4092            .multi_get_events_by_tx_digests(&transaction_digests)
4093            .await?;
4094
4095        let events_map: HashMap<_, _> =
4096            transaction_digests.iter().zip(events.into_iter()).collect();
4097
4098        let stored_events = event_keys
4099            .into_iter()
4100            .map(|k| {
4101                (
4102                    k,
4103                    events_map
4104                        .get(&k.1)
4105                        .expect("fetched digest is missing")
4106                        .clone()
4107                        .and_then(|e| e.data.get(k.2).cloned()),
4108                )
4109            })
4110            .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4111                event
4112                    .map(|e| (e, tx_digest, event_seq, timestamp))
4113                    .ok_or(IotaError::TransactionEventsNotFound { digest })
4114            })
4115            .collect::<Result<Vec<_>, _>>()?;
4116
4117        let epoch_store = self.load_epoch_store_one_call_per_task();
4118        let backing_store = self.get_backing_package_store().as_ref();
4119        let mut layout_resolver = epoch_store
4120            .executor()
4121            .type_layout_resolver(Box::new(backing_store));
4122        let mut events = vec![];
4123        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4124            events.push(IotaEvent::try_from(
4125                e.clone(),
4126                tx_digest,
4127                event_seq as u64,
4128                Some(timestamp),
4129                layout_resolver.get_annotated_layout(&e.type_)?,
4130            )?)
4131        }
4132        Ok(events)
4133    }
4134
4135    pub async fn insert_genesis_object(&self, object: Object) {
4136        self.get_reconfig_api()
4137            .try_insert_genesis_object(object)
4138            .expect("Cannot insert genesis object")
4139    }
4140
4141    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4142        futures::future::join_all(
4143            objects
4144                .iter()
4145                .map(|o| self.insert_genesis_object(o.clone())),
4146        )
4147        .await;
4148    }
4149
4150    /// Make a status response for a transaction
4151    #[instrument(level = "trace", skip_all)]
4152    pub fn get_transaction_status(
4153        &self,
4154        transaction_digest: &TransactionDigest,
4155        epoch_store: &Arc<AuthorityPerEpochStore>,
4156    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4157        // TODO: In the case of read path, we should not have to re-sign the effects.
4158        if let Some(effects) =
4159            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4160        {
4161            if let Some(transaction) = self
4162                .get_transaction_cache_reader()
4163                .try_get_transaction_block(transaction_digest)?
4164            {
4165                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4166                let events = if let Some(digest) = effects.events_digest() {
4167                    self.get_transaction_events(digest)?
4168                } else {
4169                    TransactionEvents::default()
4170                };
4171                return Ok(Some((
4172                    (*transaction).clone().into_message(),
4173                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4174                )));
4175            } else {
4176                // The read of effects and read of transaction are not atomic. It's possible
4177                // that we reverted the transaction (during epoch change) in
4178                // between the above two reads, and we end up having effects but
4179                // not transaction. In this case, we just fall through.
4180                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4181            }
4182        }
4183        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4184            self.metrics.tx_already_processed.inc();
4185            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4186            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4187        } else {
4188            Ok(None)
4189        }
4190    }
4191
4192    /// Get the signed effects of the given transaction. If the effects was
4193    /// signed in a previous epoch, re-sign it so that the caller is able to
4194    /// form a cert of the effects in the current epoch.
4195    #[instrument(level = "trace", skip_all)]
4196    pub fn get_signed_effects_and_maybe_resign(
4197        &self,
4198        transaction_digest: &TransactionDigest,
4199        epoch_store: &Arc<AuthorityPerEpochStore>,
4200    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4201        let effects = self
4202            .get_transaction_cache_reader()
4203            .try_get_executed_effects(transaction_digest)?;
4204        match effects {
4205            Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4206            None => Ok(None),
4207        }
4208    }
4209
4210    #[instrument(level = "trace", skip_all)]
4211    pub(crate) fn sign_effects(
4212        &self,
4213        effects: TransactionEffects,
4214        epoch_store: &Arc<AuthorityPerEpochStore>,
4215    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4216        let tx_digest = *effects.transaction_digest();
4217        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4218            Some(sig) if sig.epoch == epoch_store.epoch() => {
4219                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4220            }
4221            _ => {
4222                // If the transaction was executed in previous epochs, the validator will
4223                // re-sign the effects with new current epoch so that a client is always able to
4224                // obtain an effects certificate at the current epoch.
4225                //
4226                // Why is this necessary? Consider the following case:
4227                // - assume there are 4 validators
4228                // - Quorum driver gets 2 signed effects before reconfig halt
4229                // - The tx makes it into final checkpoint.
4230                // - 2 validators go away and are replaced in the new epoch.
4231                // - The new epoch begins.
4232                // - The quorum driver cannot complete the partial effects cert from the
4233                //   previous epoch, because it may not be able to reach either of the 2 former
4234                //   validators.
4235                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4236                //   the new epoch, the QD can make a new effects cert and return it to the
4237                //   client.
4238                //
4239                // This is a considered a short-term workaround. Eventually, Quorum Driver
4240                // should be able to return either an effects certificate, -or-
4241                // a proof of inclusion in a checkpoint. In the case above, the
4242                // Quorum Driver would return a proof of inclusion in the final
4243                // checkpoint, and this code would no longer be necessary.
4244                debug!(
4245                    ?tx_digest,
4246                    epoch=?epoch_store.epoch(),
4247                    "Re-signing the effects with the current epoch"
4248                );
4249
4250                let sig = AuthoritySignInfo::new(
4251                    epoch_store.epoch(),
4252                    &effects,
4253                    Intent::iota_app(IntentScope::TransactionEffects),
4254                    self.name,
4255                    &*self.secret,
4256                );
4257
4258                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4259
4260                epoch_store.insert_effects_digest_and_signature(
4261                    &tx_digest,
4262                    effects.digest(),
4263                    &sig,
4264                )?;
4265
4266                effects
4267            }
4268        };
4269
4270        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4271            signed_effects,
4272        ))
4273    }
4274
4275    // Returns coin objects for indexing for fullnode if indexing is enabled.
4276    #[instrument(level = "trace", skip_all)]
4277    fn fullnode_only_get_tx_coins_for_indexing(
4278        &self,
4279        inner_temporary_store: &InnerTemporaryStore,
4280        epoch_store: &Arc<AuthorityPerEpochStore>,
4281    ) -> Option<TxCoins> {
4282        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4283            return None;
4284        }
4285        let written_coin_objects = inner_temporary_store
4286            .written
4287            .iter()
4288            .filter_map(|(k, v)| {
4289                if v.is_coin() {
4290                    Some((*k, v.clone()))
4291                } else {
4292                    None
4293                }
4294            })
4295            .collect();
4296        let input_coin_objects = inner_temporary_store
4297            .input_objects
4298            .iter()
4299            .filter_map(|(k, v)| {
4300                if v.is_coin() {
4301                    Some((*k, v.clone()))
4302                } else {
4303                    None
4304                }
4305            })
4306            .collect::<ObjectMap>();
4307        Some((input_coin_objects, written_coin_objects))
4308    }
4309
4310    /// Get the TransactionEnvelope that currently locks the given object, if
4311    /// any. Since object locks are only valid for one epoch, we also need
4312    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4313    /// no lock records for the given object can be found.
4314    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4315    /// object record is at a different version.
4316    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4317    /// certain transaction. Returns None if the a lock record is
4318    /// initialized for the given ObjectRef but not yet locked by any
4319    /// transaction,     or cannot find the transaction in transaction
4320    /// table, because of data race etc.
4321    #[instrument(level = "trace", skip_all)]
4322    pub async fn get_transaction_lock(
4323        &self,
4324        object_ref: &ObjectRef,
4325        epoch_store: &AuthorityPerEpochStore,
4326    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4327        let lock_info = self
4328            .get_object_cache_reader()
4329            .try_get_lock(*object_ref, epoch_store)?;
4330        let lock_info = match lock_info {
4331            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4332                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4333                    provided_obj_ref: *object_ref,
4334                    current_version: locked_ref.1,
4335                }
4336                .into());
4337            }
4338            ObjectLockStatus::Initialized => {
4339                return Ok(None);
4340            }
4341            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4342        };
4343
4344        epoch_store.get_signed_transaction(&lock_info)
4345    }
4346
4347    pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4348        self.get_object_cache_reader().try_get_objects(objects)
4349    }
4350
4351    /// Non-fallible version of `try_get_objects`.
4352    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4353        self.try_get_objects(objects)
4354            .await
4355            .expect("storage access failed")
4356    }
4357
4358    pub async fn try_get_object_or_tombstone(
4359        &self,
4360        object_id: ObjectID,
4361    ) -> IotaResult<Option<ObjectRef>> {
4362        self.get_object_cache_reader()
4363            .try_get_latest_object_ref_or_tombstone(object_id)
4364    }
4365
4366    /// Non-fallible version of `try_get_object_or_tombstone`.
4367    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4368        self.try_get_object_or_tombstone(object_id)
4369            .await
4370            .expect("storage access failed")
4371    }
4372
4373    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4374    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4375    /// upgrade.
4376    ///
4377    /// This method can be used to dynamic adjust the amount of buffer. If set
4378    /// to 0, the upgrade will go through with only 2f+1 votes.
4379    ///
4380    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4381    /// should have the same value), or you risk halting the chain.
4382    pub fn set_override_protocol_upgrade_buffer_stake(
4383        &self,
4384        expected_epoch: EpochId,
4385        buffer_stake_bps: u64,
4386    ) -> IotaResult {
4387        let epoch_store = self.load_epoch_store_one_call_per_task();
4388        let actual_epoch = epoch_store.epoch();
4389        if actual_epoch != expected_epoch {
4390            return Err(IotaError::WrongEpoch {
4391                expected_epoch,
4392                actual_epoch,
4393            });
4394        }
4395
4396        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4397    }
4398
4399    pub fn clear_override_protocol_upgrade_buffer_stake(
4400        &self,
4401        expected_epoch: EpochId,
4402    ) -> IotaResult {
4403        let epoch_store = self.load_epoch_store_one_call_per_task();
4404        let actual_epoch = epoch_store.epoch();
4405        if actual_epoch != expected_epoch {
4406            return Err(IotaError::WrongEpoch {
4407                expected_epoch,
4408                actual_epoch,
4409            });
4410        }
4411
4412        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4413    }
4414
4415    /// Get the set of system packages that are compiled in to this build, if
4416    /// those packages are compatible with the current versions of those
4417    /// packages on-chain.
4418    pub async fn get_available_system_packages(
4419        &self,
4420        binary_config: &BinaryConfig,
4421    ) -> Vec<ObjectRef> {
4422        let mut results = vec![];
4423
4424        let system_packages = BuiltInFramework::iter_system_packages();
4425
4426        // Add extra framework packages during simtest
4427        #[cfg(msim)]
4428        let extra_packages = framework_injection::get_extra_packages(self.name);
4429        #[cfg(msim)]
4430        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4431
4432        for system_package in system_packages {
4433            let modules = system_package.modules().to_vec();
4434            // In simtests, we could override the current built-in framework packages.
4435            #[cfg(msim)]
4436            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4437                .unwrap_or(modules);
4438
4439            let Some(obj_ref) = iota_framework::compare_system_package(
4440                &self.get_object_store(),
4441                &system_package.id,
4442                &modules,
4443                system_package.dependencies.to_vec(),
4444                binary_config,
4445            )
4446            .await
4447            else {
4448                return vec![];
4449            };
4450            results.push(obj_ref);
4451        }
4452
4453        results
4454    }
4455
4456    /// Return the new versions, module bytes, and dependencies for the packages
4457    /// that have been committed to for a framework upgrade, in
4458    /// `system_packages`.  Loads the module contents from the binary, and
4459    /// performs the following checks:
4460    ///
4461    /// - Whether its contents matches what is on-chain already, in which case
4462    ///   no upgrade is required, and its contents are omitted from the output.
4463    /// - Whether the contents in the binary can form a package whose digest
4464    ///   matches the input, meaning the framework will be upgraded, and this
4465    ///   authority can satisfy that upgrade, in which case the contents are
4466    ///   included in the output.
4467    ///
4468    /// If the current version of the framework can't be loaded, the binary does
4469    /// not contain the bytes for that framework ID, or the resulting
4470    /// package fails the digest check, `None` is returned indicating that
4471    /// this authority cannot run the upgrade that the network voted on.
4472    async fn get_system_package_bytes(
4473        &self,
4474        system_packages: Vec<ObjectRef>,
4475        binary_config: &BinaryConfig,
4476    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4477        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4478        let objects = self.get_objects(&ids).await;
4479
4480        let mut res = Vec::with_capacity(system_packages.len());
4481        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4482            let prev_transaction = match object {
4483                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4484                    // Skip this one because it doesn't need to be upgraded.
4485                    info!("Framework {} does not need updating", system_package_ref.0);
4486                    continue;
4487                }
4488
4489                Some(cur_object) => cur_object.previous_transaction,
4490                None => TransactionDigest::genesis_marker(),
4491            };
4492
4493            #[cfg(msim)]
4494            let SystemPackage {
4495                id: _,
4496                bytes,
4497                dependencies,
4498            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4499                .unwrap_or_else(|| {
4500                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4501                });
4502
4503            #[cfg(not(msim))]
4504            let SystemPackage {
4505                id: _,
4506                bytes,
4507                dependencies,
4508            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4509
4510            let modules: Vec<_> = bytes
4511                .iter()
4512                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4513                .collect();
4514
4515            let new_object = Object::new_system_package(
4516                &modules,
4517                system_package_ref.1,
4518                dependencies.clone(),
4519                prev_transaction,
4520            );
4521
4522            let new_ref = new_object.compute_object_reference();
4523            if new_ref != system_package_ref {
4524                error!(
4525                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4526                );
4527                return None;
4528            }
4529
4530            res.push((system_package_ref.1, bytes, dependencies));
4531        }
4532
4533        Some(res)
4534    }
4535
4536    /// Returns the new protocol version and system packages that the network
4537    /// has voted to upgrade to. If the proposed protocol version is not
4538    /// supported, None is returned.
4539    fn is_protocol_version_supported_v1(
4540        proposed_protocol_version: ProtocolVersion,
4541        committee: &Committee,
4542        capabilities: Vec<AuthorityCapabilitiesV1>,
4543        mut buffer_stake_bps: u64,
4544    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4545        if buffer_stake_bps > 10000 {
4546            warn!("clamping buffer_stake_bps to 10000");
4547            buffer_stake_bps = 10000;
4548        }
4549
4550        // For each validator, gather the protocol version and system packages that it
4551        // would like to upgrade to in the next epoch.
4552        let mut desired_upgrades: Vec<_> = capabilities
4553            .into_iter()
4554            .filter_map(|mut cap| {
4555                // A validator that lists no packages is voting against any change at all.
4556                if cap.available_system_packages.is_empty() {
4557                    return None;
4558                }
4559
4560                cap.available_system_packages.sort();
4561
4562                info!(
4563                    "validator {:?} supports {:?} with system packages: {:?}",
4564                    cap.authority.concise(),
4565                    cap.supported_protocol_versions,
4566                    cap.available_system_packages,
4567                );
4568
4569                // A validator that only supports the current protocol version is also voting
4570                // against any change, because framework upgrades always require a protocol
4571                // version bump.
4572                cap.supported_protocol_versions
4573                    .get_version_digest(proposed_protocol_version)
4574                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4575            })
4576            .collect();
4577
4578        // There can only be one set of votes that have a majority, find one if it
4579        // exists.
4580        desired_upgrades.sort();
4581        desired_upgrades
4582            .into_iter()
4583            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4584            .into_iter()
4585            .find_map(|((digest, packages), group)| {
4586                // should have been filtered out earlier.
4587                assert!(!packages.is_empty());
4588
4589                let mut stake_aggregator: StakeAggregator<(), true> =
4590                    StakeAggregator::new(Arc::new(committee.clone()));
4591
4592                for (_, _, authority) in group {
4593                    stake_aggregator.insert_generic(authority, ());
4594                }
4595
4596                let total_votes = stake_aggregator.total_votes();
4597                let quorum_threshold = committee.quorum_threshold();
4598                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4599
4600                info!(
4601                    protocol_config_digest = ?digest,
4602                    ?total_votes,
4603                    ?quorum_threshold,
4604                    ?buffer_stake_bps,
4605                    ?effective_threshold,
4606                    ?proposed_protocol_version,
4607                    ?packages,
4608                    "support for upgrade"
4609                );
4610
4611                let has_support = total_votes >= effective_threshold;
4612                has_support.then_some((proposed_protocol_version, digest, packages))
4613            })
4614    }
4615
4616    /// Selects the highest supported protocol version and system packages that
4617    /// the network has voted to upgrade to. If no upgrade is supported,
4618    /// returns the current protocol version and system packages.
4619    fn choose_protocol_version_and_system_packages_v1(
4620        current_protocol_version: ProtocolVersion,
4621        current_protocol_digest: Digest,
4622        committee: &Committee,
4623        capabilities: Vec<AuthorityCapabilitiesV1>,
4624        buffer_stake_bps: u64,
4625    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4626        let mut next_protocol_version = current_protocol_version;
4627        let mut system_packages = vec![];
4628        let mut protocol_version_digest = current_protocol_digest;
4629
4630        // Finds the highest supported protocol version and system packages by
4631        // incrementing the proposed protocol version by one until no further
4632        // upgrades are supported.
4633        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4634            next_protocol_version + 1,
4635            committee,
4636            capabilities.clone(),
4637            buffer_stake_bps,
4638        ) {
4639            next_protocol_version = version;
4640            protocol_version_digest = digest;
4641            system_packages = packages;
4642        }
4643
4644        (
4645            next_protocol_version,
4646            protocol_version_digest,
4647            system_packages,
4648        )
4649    }
4650
4651    /// Returns the indices of validators that support the given protocol
4652    /// version and digest. This includes both committee and non-committee
4653    /// validators based on their capabilities. Uses active validators
4654    /// instead of committee indices.
4655    fn get_validators_supporting_protocol_version(
4656        target_protocol_version: ProtocolVersion,
4657        target_digest: Digest,
4658        active_validators: &[AuthorityPublicKey],
4659        capabilities: &[AuthorityCapabilitiesV1],
4660    ) -> Vec<u64> {
4661        let mut eligible_validators = Vec::new();
4662
4663        for capability in capabilities {
4664            // Check if this validator supports the target protocol version and digest
4665            if let Some(digest) = capability
4666                .supported_protocol_versions
4667                .get_version_digest(target_protocol_version)
4668            {
4669                if digest == target_digest {
4670                    // Find the validator's index in the active validators list
4671                    if let Some(index) = active_validators
4672                        .iter()
4673                        .position(|name| AuthorityName::from(name) == capability.authority)
4674                    {
4675                        eligible_validators.push(index as u64);
4676                    }
4677                }
4678            }
4679        }
4680
4681        // Sort indices for deterministic behavior
4682        eligible_validators.sort();
4683        eligible_validators
4684    }
4685
4686    /// Calculates the sum of weights for eligible validators that are part of
4687    /// the committee. Takes the indices from
4688    /// get_validators_supporting_protocol_version and maps them back
4689    /// to committee members to get their weights.
4690    fn calculate_eligible_validators_weight(
4691        eligible_validator_indices: &[u64],
4692        active_validators: &[AuthorityPublicKey],
4693        committee: &Committee,
4694    ) -> u64 {
4695        let mut total_weight = 0u64;
4696
4697        for &index in eligible_validator_indices {
4698            let authority_pubkey = &active_validators[index as usize];
4699            // Check if this validator is in the committee and get their weight
4700            if let Some((_, weight)) = committee
4701                .members()
4702                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4703            {
4704                total_weight += weight;
4705            }
4706        }
4707
4708        total_weight
4709    }
4710
4711    #[instrument(level = "debug", skip_all)]
4712    fn create_authenticator_state_tx(
4713        &self,
4714        epoch_store: &Arc<AuthorityPerEpochStore>,
4715    ) -> Option<EndOfEpochTransactionKind> {
4716        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4717            info!("authenticator state transactions not enabled");
4718            return None;
4719        }
4720
4721        let authenticator_state_exists = epoch_store.authenticator_state_exists();
4722        let tx = if authenticator_state_exists {
4723            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4724            let min_epoch =
4725                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4726            let authenticator_obj_initial_shared_version = epoch_store
4727                .epoch_start_config()
4728                .authenticator_obj_initial_shared_version()
4729                .expect("initial version must exist");
4730
4731            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4732                min_epoch,
4733                authenticator_obj_initial_shared_version,
4734            );
4735
4736            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4737
4738            tx
4739        } else {
4740            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4741            info!("Creating AuthenticatorStateCreate tx");
4742            tx
4743        };
4744        Some(tx)
4745    }
4746
4747    /// Creates and execute the advance epoch transaction to effects without
4748    /// committing it to the database. The effects of the change epoch tx
4749    /// are only written to the database after a certified checkpoint has been
4750    /// formed and executed by CheckpointExecutor.
4751    ///
4752    /// When a framework upgraded has been decided on, but the validator does
4753    /// not have the new versions of the packages locally, the validator
4754    /// cannot form the ChangeEpochTx. In this case it returns Err,
4755    /// indicating that the checkpoint builder should give up trying to make the
4756    /// final checkpoint. As long as the network is able to create a certified
4757    /// checkpoint (which should be ensured by the capabilities vote), it
4758    /// will arrive via state sync and be executed by CheckpointExecutor.
4759    #[instrument(level = "error", skip_all)]
4760    pub async fn create_and_execute_advance_epoch_tx(
4761        &self,
4762        epoch_store: &Arc<AuthorityPerEpochStore>,
4763        gas_cost_summary: &GasCostSummary,
4764        checkpoint: CheckpointSequenceNumber,
4765        epoch_start_timestamp_ms: CheckpointTimestamp,
4766    ) -> anyhow::Result<(
4767        IotaSystemState,
4768        Option<SystemEpochInfoEvent>,
4769        TransactionEffects,
4770    )> {
4771        let mut txns = Vec::new();
4772
4773        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4774            txns.push(tx);
4775        }
4776
4777        let next_epoch = epoch_store.epoch() + 1;
4778
4779        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4780        let authority_capabilities = epoch_store
4781            .get_capabilities_v1()
4782            .expect("read capabilities from db cannot fail");
4783        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4784            Self::choose_protocol_version_and_system_packages_v1(
4785                epoch_store.protocol_version(),
4786                SupportedProtocolVersionsWithHashes::protocol_config_digest(
4787                    epoch_store.protocol_config(),
4788                ),
4789                epoch_store.committee(),
4790                authority_capabilities.clone(),
4791                buffer_stake_bps,
4792            );
4793
4794        // since system packages are created during the current epoch, they should abide
4795        // by the rules of the current epoch, including the current epoch's max
4796        // Move binary format version
4797        let config = epoch_store.protocol_config();
4798        let binary_config = to_binary_config(config);
4799        let Some(next_epoch_system_package_bytes) = self
4800            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4801            .await
4802        else {
4803            error!(
4804                "upgraded system packages {:?} are not locally available, cannot create \
4805                ChangeEpochTx. validator binary must be upgraded to the correct version!",
4806                next_epoch_system_packages
4807            );
4808            // the checkpoint builder will keep retrying forever when it hits this error.
4809            // Eventually, one of two things will happen:
4810            // - The operator will upgrade this binary to one that has the new packages
4811            //   locally, and this function will succeed.
4812            // - The final checkpoint will be certified by other validators, we will receive
4813            //   it via state sync, and execute it. This will upgrade the framework
4814            //   packages, reconfigure, and most likely shut down in the new epoch (this
4815            //   validator likely doesn't support the new protocol version, or else it
4816            //   should have had the packages.)
4817            bail!("missing system packages: cannot form ChangeEpochTx");
4818        };
4819
4820        // Use ChangeEpochV3 when the feature flag is enabled and ChangeEpochV2
4821        // requirements are met
4822
4823        if config.select_committee_from_eligible_validators() {
4824            // Get the list of eligible validators that support the target protocol version
4825            let active_validators = epoch_store.epoch_start_state().get_active_validators();
4826
4827            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
4828
4829            // Use validators supporting the target protocol version as eligible validators
4830            // in the next version if select_committee_supporting_next_epoch_version feature
4831            // flag is set to true.
4832            if config.select_committee_supporting_next_epoch_version() {
4833                eligible_active_validators = Self::get_validators_supporting_protocol_version(
4834                    next_epoch_protocol_version,
4835                    next_epoch_protocol_digest,
4836                    &active_validators,
4837                    &authority_capabilities,
4838                );
4839
4840                // Calculate the total weight of eligible validators in the committee
4841                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
4842                    &eligible_active_validators,
4843                    &active_validators,
4844                    epoch_store.committee(),
4845                );
4846
4847                // Safety check: ensure eligible validators have enough stake
4848                // Use the same effective threshold calculation that was used to decide the
4849                // protocol version
4850                let committee = epoch_store.committee();
4851                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4852
4853                if eligible_validators_weight < effective_threshold {
4854                    error!(
4855                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
4856                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
4857                    );
4858                    // Pass all active validator indices as eligible validators
4859                    // to perform selection among all of them.
4860                    eligible_active_validators = (0..active_validators.len() as u64).collect();
4861                }
4862            }
4863
4864            txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
4865                next_epoch,
4866                next_epoch_protocol_version,
4867                gas_cost_summary.storage_cost,
4868                gas_cost_summary.computation_cost,
4869                gas_cost_summary.computation_cost_burned,
4870                gas_cost_summary.storage_rebate,
4871                gas_cost_summary.non_refundable_storage_fee,
4872                epoch_start_timestamp_ms,
4873                next_epoch_system_package_bytes,
4874                eligible_active_validators,
4875            ));
4876        } else if config.protocol_defined_base_fee()
4877            && config.max_committee_members_count_as_option().is_some()
4878        {
4879            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
4880                next_epoch,
4881                next_epoch_protocol_version,
4882                gas_cost_summary.storage_cost,
4883                gas_cost_summary.computation_cost,
4884                gas_cost_summary.computation_cost_burned,
4885                gas_cost_summary.storage_rebate,
4886                gas_cost_summary.non_refundable_storage_fee,
4887                epoch_start_timestamp_ms,
4888                next_epoch_system_package_bytes,
4889            ));
4890        } else {
4891            txns.push(EndOfEpochTransactionKind::new_change_epoch(
4892                next_epoch,
4893                next_epoch_protocol_version,
4894                gas_cost_summary.storage_cost,
4895                gas_cost_summary.computation_cost,
4896                gas_cost_summary.storage_rebate,
4897                gas_cost_summary.non_refundable_storage_fee,
4898                epoch_start_timestamp_ms,
4899                next_epoch_system_package_bytes,
4900            ));
4901        }
4902
4903        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
4904
4905        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
4906            tx.clone(),
4907            epoch_store.epoch(),
4908            checkpoint,
4909        );
4910
4911        let tx_digest = executable_tx.digest();
4912
4913        info!(
4914            ?next_epoch,
4915            ?next_epoch_protocol_version,
4916            ?next_epoch_system_packages,
4917            computation_cost=?gas_cost_summary.computation_cost,
4918            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
4919            storage_cost=?gas_cost_summary.storage_cost,
4920            storage_rebate=?gas_cost_summary.storage_rebate,
4921            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
4922            ?tx_digest,
4923            "Creating advance epoch transaction"
4924        );
4925
4926        fail_point_async!("change_epoch_tx_delay");
4927        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
4928
4929        // The tx could have been executed by state sync already - if so simply return
4930        // an error. The checkpoint builder will shortly be terminated by
4931        // reconfiguration anyway.
4932        if self
4933            .get_transaction_cache_reader()
4934            .try_is_tx_already_executed(tx_digest)?
4935        {
4936            warn!("change epoch tx has already been executed via state sync");
4937            bail!("change epoch tx has already been executed via state sync",);
4938        }
4939
4940        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
4941
4942        // We must manually assign the shared object versions to the transaction before
4943        // executing it. This is because we do not sequence end-of-epoch
4944        // transactions through consensus.
4945        epoch_store.assign_shared_object_versions_idempotent(
4946            self.get_object_cache_reader().as_ref(),
4947            std::slice::from_ref(&executable_tx),
4948        )?;
4949
4950        let input_objects =
4951            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
4952
4953        let (temporary_store, effects, _execution_error_opt) =
4954            self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
4955        let system_obj = get_iota_system_state(&temporary_store.written)
4956            .expect("change epoch tx must write to system object");
4957        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
4958        let system_epoch_info_event = temporary_store
4959            .events
4960            .data
4961            .into_iter()
4962            .find(|event| event.is_system_epoch_info_event())
4963            .map(SystemEpochInfoEvent::from);
4964        // The system epoch info event can be `None` in case if the `advance_epoch`
4965        // Move function call failed and was executed in the safe mode.
4966        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
4967
4968        // We must write tx and effects to the state sync tables so that state sync is
4969        // able to deliver to the transaction to CheckpointExecutor after it is
4970        // included in a certified checkpoint.
4971        self.get_state_sync_store()
4972            .try_insert_transaction_and_effects(&tx, &effects)
4973            .map_err(|err| {
4974                let err: anyhow::Error = err.into();
4975                err
4976            })?;
4977
4978        info!(
4979            "Effects summary of the change epoch transaction: {:?}",
4980            effects.summary_for_debug()
4981        );
4982        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
4983        // The change epoch transaction cannot fail to execute.
4984        assert!(effects.status().is_ok());
4985        Ok((system_obj, system_epoch_info_event, effects))
4986    }
4987
4988    /// This function is called at the very end of the epoch.
4989    /// This step is required before updating new epoch in the db and calling
4990    /// reopen_epoch_db.
4991    #[instrument(level = "error", skip_all)]
4992    async fn revert_uncommitted_epoch_transactions(
4993        &self,
4994        epoch_store: &AuthorityPerEpochStore,
4995    ) -> IotaResult {
4996        {
4997            let state = epoch_store.get_reconfig_state_write_lock_guard();
4998            if state.should_accept_user_certs() {
4999                // Need to change this so that consensus adapter do not accept certificates from
5000                // user. This can happen if our local validator did not initiate
5001                // epoch change locally, but 2f+1 nodes already concluded the
5002                // epoch.
5003                //
5004                // This lock is essentially a barrier for
5005                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5006                // after this block
5007                epoch_store.close_user_certs(state);
5008            }
5009            // lock is dropped here
5010        }
5011        let pending_certificates = epoch_store.pending_consensus_certificates();
5012        info!(
5013            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5014            pending_certificates.len(),
5015            pending_certificates,
5016        );
5017        for digest in pending_certificates {
5018            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5019                info!(
5020                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5021                    digest
5022                );
5023                continue;
5024            }
5025            info!("Reverting {:?} at the end of epoch", digest);
5026            epoch_store.revert_executed_transaction(&digest)?;
5027            self.get_reconfig_api().try_revert_state_update(&digest)?;
5028        }
5029        info!("All uncommitted local transactions reverted");
5030        Ok(())
5031    }
5032
5033    #[instrument(level = "error", skip_all)]
5034    async fn reopen_epoch_db(
5035        &self,
5036        cur_epoch_store: &AuthorityPerEpochStore,
5037        new_committee: Committee,
5038        epoch_start_configuration: EpochStartConfiguration,
5039        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5040        epoch_last_checkpoint: CheckpointSequenceNumber,
5041    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5042        let new_epoch = new_committee.epoch;
5043        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5044        assert_eq!(
5045            epoch_start_configuration.epoch_start_state().epoch(),
5046            new_committee.epoch
5047        );
5048        fail_point!("before-open-new-epoch-store");
5049        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5050            self.name,
5051            new_committee,
5052            epoch_start_configuration,
5053            self.get_backing_package_store().clone(),
5054            self.get_object_store().clone(),
5055            expensive_safety_check_config,
5056            cur_epoch_store.get_chain_identifier(),
5057            epoch_last_checkpoint,
5058        );
5059        self.epoch_store.store(new_epoch_store.clone());
5060        Ok(new_epoch_store)
5061    }
5062
5063    #[cfg(test)]
5064    pub(crate) fn iter_live_object_set_for_testing(
5065        &self,
5066    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5067        self.get_accumulator_store()
5068            .iter_cached_live_object_set_for_testing()
5069    }
5070
5071    #[cfg(test)]
5072    pub(crate) fn shutdown_execution_for_test(&self) {
5073        self.tx_execution_shutdown
5074            .lock()
5075            .take()
5076            .unwrap()
5077            .send(())
5078            .unwrap();
5079    }
5080
5081    /// NOTE: this function is only to be used for fuzzing and testing. Never
5082    /// use in prod
5083    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5084        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5085        self.get_object_cache_reader()
5086            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5087        self.get_reconfig_api()
5088            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5089    }
5090}
5091
5092pub struct RandomnessRoundReceiver {
5093    authority_state: Arc<AuthorityState>,
5094    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5095}
5096
5097impl RandomnessRoundReceiver {
5098    pub fn spawn(
5099        authority_state: Arc<AuthorityState>,
5100        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5101    ) -> JoinHandle<()> {
5102        let rrr = RandomnessRoundReceiver {
5103            authority_state,
5104            randomness_rx,
5105        };
5106        spawn_monitored_task!(rrr.run())
5107    }
5108
5109    async fn run(mut self) {
5110        info!("RandomnessRoundReceiver event loop started");
5111
5112        loop {
5113            tokio::select! {
5114                maybe_recv = self.randomness_rx.recv() => {
5115                    if let Some((epoch, round, bytes)) = maybe_recv {
5116                        self.handle_new_randomness(epoch, round, bytes);
5117                    } else {
5118                        break;
5119                    }
5120                },
5121            }
5122        }
5123
5124        info!("RandomnessRoundReceiver event loop ended");
5125    }
5126
5127    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5128    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5129        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5130        if epoch_store.epoch() != epoch {
5131            warn!(
5132                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5133                epoch_store.epoch()
5134            );
5135            return;
5136        }
5137        let transaction = VerifiedTransaction::new_randomness_state_update(
5138            epoch,
5139            round,
5140            bytes,
5141            epoch_store
5142                .epoch_start_config()
5143                .randomness_obj_initial_shared_version(),
5144        );
5145        debug!(
5146            "created randomness state update transaction with digest: {:?}",
5147            transaction.digest()
5148        );
5149        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5150        let digest = *transaction.digest();
5151
5152        // Randomness state updates contain the full bls signature for the random round,
5153        // which cannot necessarily be reconstructed again later. Therefore we must
5154        // immediately persist this transaction. If we crash before its outputs
5155        // are committed, this ensures we will be able to re-execute it.
5156        self.authority_state
5157            .get_cache_commit()
5158            .persist_transaction(&transaction);
5159
5160        // Send transaction to TransactionManager for execution.
5161        self.authority_state
5162            .transaction_manager()
5163            .enqueue(vec![transaction], &epoch_store);
5164
5165        let authority_state = self.authority_state.clone();
5166        spawn_monitored_task!(async move {
5167            // Wait for transaction execution in a separate task, to avoid deadlock in case
5168            // of out-of-order randomness generation. (Each
5169            // RandomnessStateUpdate depends on the output of the
5170            // RandomnessStateUpdate from the previous round.)
5171            //
5172            // We set a very long timeout so that in case this gets stuck for some reason,
5173            // the validator will eventually crash rather than continuing in a
5174            // zombie mode.
5175            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5176            let result = tokio::time::timeout(
5177                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5178                authority_state
5179                    .get_transaction_cache_reader()
5180                    .try_notify_read_executed_effects(&[digest]),
5181            )
5182            .await;
5183            let result = match result {
5184                Ok(result) => result,
5185                Err(_) => {
5186                    if cfg!(debug_assertions) {
5187                        // Crash on randomness update execution timeout in debug builds.
5188                        panic!(
5189                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5190                        );
5191                    }
5192                    warn!(
5193                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5194                    );
5195                    // Continue waiting as long as necessary in non-debug builds.
5196                    authority_state
5197                        .get_transaction_cache_reader()
5198                        .try_notify_read_executed_effects(&[digest])
5199                        .await
5200                }
5201            };
5202
5203            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5204            let effects = effects.pop().expect("should return effects");
5205            if *effects.status() != ExecutionStatus::Success {
5206                fatal!(
5207                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5208                );
5209            }
5210            debug!(
5211                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5212            );
5213        });
5214    }
5215}
5216
5217#[async_trait]
5218impl TransactionKeyValueStoreTrait for AuthorityState {
5219    async fn multi_get(
5220        &self,
5221        transaction_keys: &[TransactionDigest],
5222        effects_keys: &[TransactionDigest],
5223    ) -> IotaResult<KVStoreTransactionData> {
5224        let txns = if !transaction_keys.is_empty() {
5225            self.get_transaction_cache_reader()
5226                .try_multi_get_transaction_blocks(transaction_keys)?
5227                .into_iter()
5228                .map(|t| t.map(|t| (*t).clone().into_inner()))
5229                .collect()
5230        } else {
5231            vec![]
5232        };
5233
5234        let fx = if !effects_keys.is_empty() {
5235            self.get_transaction_cache_reader()
5236                .try_multi_get_executed_effects(effects_keys)?
5237        } else {
5238            vec![]
5239        };
5240
5241        Ok((txns, fx))
5242    }
5243
5244    async fn multi_get_checkpoints(
5245        &self,
5246        checkpoint_summaries: &[CheckpointSequenceNumber],
5247        checkpoint_contents: &[CheckpointSequenceNumber],
5248        checkpoint_summaries_by_digest: &[CheckpointDigest],
5249    ) -> IotaResult<(
5250        Vec<Option<CertifiedCheckpointSummary>>,
5251        Vec<Option<CheckpointContents>>,
5252        Vec<Option<CertifiedCheckpointSummary>>,
5253    )> {
5254        // TODO: use multi-get methods if it ever becomes important (unlikely)
5255        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5256        let store = self.get_checkpoint_store();
5257        for seq in checkpoint_summaries {
5258            let checkpoint = store
5259                .get_checkpoint_by_sequence_number(*seq)?
5260                .map(|c| c.into_inner());
5261
5262            summaries.push(checkpoint);
5263        }
5264
5265        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5266        for seq in checkpoint_contents {
5267            let checkpoint = store
5268                .get_checkpoint_by_sequence_number(*seq)?
5269                .and_then(|summary| {
5270                    store
5271                        .get_checkpoint_contents(&summary.content_digest)
5272                        .expect("db read cannot fail")
5273                });
5274            contents.push(checkpoint);
5275        }
5276
5277        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5278        for digest in checkpoint_summaries_by_digest {
5279            let checkpoint = store
5280                .get_checkpoint_by_digest(digest)?
5281                .map(|c| c.into_inner());
5282            summaries_by_digest.push(checkpoint);
5283        }
5284
5285        Ok((summaries, contents, summaries_by_digest))
5286    }
5287
5288    async fn get_transaction_perpetual_checkpoint(
5289        &self,
5290        digest: TransactionDigest,
5291    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5292        self.get_checkpoint_cache()
5293            .try_get_transaction_perpetual_checkpoint(&digest)
5294            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5295    }
5296
5297    async fn get_object(
5298        &self,
5299        object_id: ObjectID,
5300        version: VersionNumber,
5301    ) -> IotaResult<Option<Object>> {
5302        self.get_object_cache_reader()
5303            .try_get_object_by_key(&object_id, version)
5304    }
5305
5306    async fn multi_get_transactions_perpetual_checkpoints(
5307        &self,
5308        digests: &[TransactionDigest],
5309    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5310        let res = self
5311            .get_checkpoint_cache()
5312            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5313
5314        Ok(res
5315            .into_iter()
5316            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5317            .collect())
5318    }
5319
5320    #[instrument(skip(self))]
5321    async fn multi_get_events_by_tx_digests(
5322        &self,
5323        digests: &[TransactionDigest],
5324    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5325        if digests.is_empty() {
5326            return Ok(vec![]);
5327        }
5328        let events_digests: Vec<_> = self
5329            .get_transaction_cache_reader()
5330            .try_multi_get_executed_effects(digests)?
5331            .into_iter()
5332            .map(|t| t.and_then(|t| t.events_digest().cloned()))
5333            .collect();
5334        let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5335        let mut events = self
5336            .get_transaction_cache_reader()
5337            .try_multi_get_events(&non_empty_events)?
5338            .into_iter();
5339        Ok(events_digests
5340            .into_iter()
5341            .map(|ev| ev.and_then(|_| events.next()?))
5342            .collect())
5343    }
5344}
5345
5346#[cfg(msim)]
5347pub mod framework_injection {
5348    use std::{
5349        cell::RefCell,
5350        collections::{BTreeMap, BTreeSet},
5351    };
5352
5353    use iota_framework::{BuiltInFramework, SystemPackage};
5354    use iota_types::{
5355        base_types::{AuthorityName, ObjectID},
5356        is_system_package,
5357    };
5358    use move_binary_format::CompiledModule;
5359
5360    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5361
5362    // Thread local cache because all simtests run in a single unique thread.
5363    thread_local! {
5364        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5365    }
5366
5367    type Framework = Vec<CompiledModule>;
5368
5369    pub type PackageUpgradeCallback =
5370        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5371
5372    enum PackageOverrideConfig {
5373        Global(Framework),
5374        PerValidator(PackageUpgradeCallback),
5375    }
5376
5377    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5378        modules
5379            .iter()
5380            .map(|m| {
5381                let mut buf = Vec::new();
5382                m.serialize_with_version(m.version, &mut buf).unwrap();
5383                buf
5384            })
5385            .collect()
5386    }
5387
5388    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5389        OVERRIDE.with(|bs| {
5390            bs.borrow_mut()
5391                .insert(package_id, PackageOverrideConfig::Global(modules))
5392        });
5393    }
5394
5395    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5396        OVERRIDE.with(|bs| {
5397            bs.borrow_mut()
5398                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5399        });
5400    }
5401
5402    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5403        OVERRIDE.with(|cfg| {
5404            cfg.borrow().get(package_id).and_then(|entry| match entry {
5405                PackageOverrideConfig::Global(framework) => {
5406                    Some(compiled_modules_to_bytes(framework))
5407                }
5408                PackageOverrideConfig::PerValidator(func) => {
5409                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
5410                }
5411            })
5412        })
5413    }
5414
5415    pub fn get_override_modules(
5416        package_id: &ObjectID,
5417        name: AuthorityName,
5418    ) -> Option<Vec<CompiledModule>> {
5419        OVERRIDE.with(|cfg| {
5420            cfg.borrow().get(package_id).and_then(|entry| match entry {
5421                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5422                PackageOverrideConfig::PerValidator(func) => func(name),
5423            })
5424        })
5425    }
5426
5427    pub fn get_override_system_package(
5428        package_id: &ObjectID,
5429        name: AuthorityName,
5430    ) -> Option<SystemPackage> {
5431        let bytes = get_override_bytes(package_id, name)?;
5432        let dependencies = if is_system_package(*package_id) {
5433            BuiltInFramework::get_package_by_id(package_id)
5434                .dependencies
5435                .to_vec()
5436        } else {
5437            // Assume that entirely new injected packages depend on all existing system
5438            // packages.
5439            BuiltInFramework::all_package_ids()
5440        };
5441        Some(SystemPackage {
5442            id: *package_id,
5443            bytes,
5444            dependencies,
5445        })
5446    }
5447
5448    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5449        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5450        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5451            cfg.borrow()
5452                .keys()
5453                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5454                .collect()
5455        });
5456
5457        extra
5458            .into_iter()
5459            .map(|package| SystemPackage {
5460                id: package,
5461                bytes: get_override_bytes(&package, name).unwrap(),
5462                dependencies: BuiltInFramework::all_package_ids(),
5463            })
5464            .collect()
5465    }
5466}
5467
5468#[derive(Debug, Serialize, Deserialize, Clone)]
5469pub struct ObjDumpFormat {
5470    pub id: ObjectID,
5471    pub version: VersionNumber,
5472    pub digest: ObjectDigest,
5473    pub object: Object,
5474}
5475
5476impl ObjDumpFormat {
5477    fn new(object: Object) -> Self {
5478        let oref = object.compute_object_reference();
5479        Self {
5480            id: oref.0,
5481            version: oref.1,
5482            digest: oref.2,
5483            object,
5484        }
5485    }
5486}
5487
5488#[derive(Debug, Serialize, Deserialize, Clone)]
5489pub struct NodeStateDump {
5490    pub tx_digest: TransactionDigest,
5491    pub sender_signed_data: SenderSignedData,
5492    pub executed_epoch: u64,
5493    pub reference_gas_price: u64,
5494    pub protocol_version: u64,
5495    pub epoch_start_timestamp_ms: u64,
5496    pub computed_effects: TransactionEffects,
5497    pub expected_effects_digest: TransactionEffectsDigest,
5498    pub relevant_system_packages: Vec<ObjDumpFormat>,
5499    pub shared_objects: Vec<ObjDumpFormat>,
5500    pub loaded_child_objects: Vec<ObjDumpFormat>,
5501    pub modified_at_versions: Vec<ObjDumpFormat>,
5502    pub runtime_reads: Vec<ObjDumpFormat>,
5503    pub input_objects: Vec<ObjDumpFormat>,
5504}
5505
5506impl NodeStateDump {
5507    pub fn new(
5508        tx_digest: &TransactionDigest,
5509        effects: &TransactionEffects,
5510        expected_effects_digest: TransactionEffectsDigest,
5511        object_store: &dyn ObjectStore,
5512        epoch_store: &Arc<AuthorityPerEpochStore>,
5513        inner_temporary_store: &InnerTemporaryStore,
5514        certificate: &VerifiedExecutableTransaction,
5515    ) -> IotaResult<Self> {
5516        // Epoch info
5517        let executed_epoch = epoch_store.epoch();
5518        let reference_gas_price = epoch_store.reference_gas_price();
5519        let epoch_start_config = epoch_store.epoch_start_config();
5520        let protocol_version = epoch_store.protocol_version().as_u64();
5521        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5522
5523        // Record all system packages at this version
5524        let mut relevant_system_packages = Vec::new();
5525        for sys_package_id in BuiltInFramework::all_package_ids() {
5526            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5527                relevant_system_packages.push(ObjDumpFormat::new(w))
5528            }
5529        }
5530
5531        // Record all the shared objects
5532        let mut shared_objects = Vec::new();
5533        for kind in effects.input_shared_objects() {
5534            match kind {
5535                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5536                    if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5537                        shared_objects.push(ObjDumpFormat::new(w))
5538                    }
5539                }
5540                InputSharedObject::ReadDeleted(..)
5541                | InputSharedObject::MutateDeleted(..)
5542                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
5543                                                           * objects. */
5544            }
5545        }
5546
5547        // Record all loaded child objects
5548        // Child objects which are read but not mutated are not tracked anywhere else
5549        let mut loaded_child_objects = Vec::new();
5550        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5551            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5552                loaded_child_objects.push(ObjDumpFormat::new(w))
5553            }
5554        }
5555
5556        // Record all modified objects
5557        let mut modified_at_versions = Vec::new();
5558        for (id, ver) in effects.modified_at_versions() {
5559            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5560                modified_at_versions.push(ObjDumpFormat::new(w))
5561            }
5562        }
5563
5564        // Packages read at runtime, which were not previously loaded into the temoorary
5565        // store Some packages may be fetched at runtime and wont show up in
5566        // input objects
5567        let mut runtime_reads = Vec::new();
5568        for obj in inner_temporary_store
5569            .runtime_packages_loaded_from_db
5570            .values()
5571        {
5572            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
5573        }
5574
5575        // All other input objects should already be in `inner_temporary_store.objects`
5576
5577        Ok(Self {
5578            tx_digest: *tx_digest,
5579            executed_epoch,
5580            reference_gas_price,
5581            epoch_start_timestamp_ms,
5582            protocol_version,
5583            relevant_system_packages,
5584            shared_objects,
5585            loaded_child_objects,
5586            modified_at_versions,
5587            runtime_reads,
5588            sender_signed_data: certificate.clone().into_message(),
5589            input_objects: inner_temporary_store
5590                .input_objects
5591                .values()
5592                .map(|o| ObjDumpFormat::new(o.clone()))
5593                .collect(),
5594            computed_effects: effects.clone(),
5595            expected_effects_digest,
5596        })
5597    }
5598
5599    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
5600        let mut objects = Vec::new();
5601        objects.extend(self.relevant_system_packages.clone());
5602        objects.extend(self.shared_objects.clone());
5603        objects.extend(self.loaded_child_objects.clone());
5604        objects.extend(self.modified_at_versions.clone());
5605        objects.extend(self.runtime_reads.clone());
5606        objects.extend(self.input_objects.clone());
5607        objects
5608    }
5609
5610    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
5611        let file_name = format!(
5612            "{}_{}_NODE_DUMP.json",
5613            self.tx_digest,
5614            AuthorityState::unixtime_now_ms()
5615        );
5616        let mut path = path.to_path_buf();
5617        path.push(&file_name);
5618        let mut file = File::create(path.clone())?;
5619        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
5620        Ok(path)
5621    }
5622
5623    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
5624        let file = File::open(path)?;
5625        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
5626    }
5627}