iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49    key_value_store::{
50        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51    },
52    key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57    IOTA_SYSTEM_ADDRESS, TypeTag,
58    account_abstraction::{
59        account::AuthenticatorFunctionRefV1Key,
60        authenticator_function::{
61            AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
62            AuthenticatorFunctionRefV1,
63        },
64    },
65    authenticator_state::get_authenticator_state,
66    base_types::*,
67    committee::{Committee, EpochId, ProtocolVersion},
68    crypto::{
69        AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
70        default_hash,
71    },
72    deny_list_v1::check_coin_deny_list_v1_during_signing,
73    digests::{ChainIdentifier, Digest},
74    dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
75    effects::{
76        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
77        TransactionEvents, VerifiedSignedTransactionEffects,
78    },
79    error::{ExecutionError, IotaError, IotaResult, UserInputError},
80    event::{Event, EventID, SystemEpochInfoEvent},
81    executable_transaction::VerifiedExecutableTransaction,
82    execution_config_utils::to_binary_config,
83    execution_status::ExecutionStatus,
84    fp_ensure,
85    gas::{GasCostSummary, IotaGasStatus},
86    gas_coin::NANOS_PER_IOTA,
87    inner_temporary_store::{
88        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
89        WrittenObjects,
90    },
91    iota_system_state::{
92        IotaSystemState, IotaSystemStateTrait,
93        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
94    },
95    is_system_package,
96    layout_resolver::{LayoutResolver, into_struct_layout},
97    message_envelope::Message,
98    messages_checkpoint::{
99        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
100        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
101        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
102        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
103    },
104    messages_consensus::AuthorityCapabilitiesV1,
105    messages_grpc::{
106        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
107        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
108        TransactionStatus,
109    },
110    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
111    move_authenticator::MoveAuthenticator,
112    object::{
113        MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
114        bounded_visitor::BoundedVisitor,
115    },
116    storage::{
117        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
118    },
119    supported_protocol_versions::{
120        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
121    },
122    transaction::*,
123    transaction_executor::{SimulateTransactionResult, VmChecks},
124};
125use itertools::Itertools;
126use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
127use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
128use parking_lot::Mutex;
129use prometheus::{
130    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131    register_histogram_vec_with_registry, register_histogram_with_registry,
132    register_int_counter_vec_with_registry, register_int_counter_with_registry,
133    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139    task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152    authority::{
153        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157        authority_store_tables::AuthorityPrunerTables,
158        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159    },
160    authority_client::NetworkAuthorityClient,
161    checkpoints::CheckpointStore,
162    congestion_tracker::CongestionTracker,
163    consensus_adapter::ConsensusAdapter,
164    epoch::committee_store::CommitteeStore,
165    execution_cache::{
166        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
167        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
168        TransactionCacheRead,
169    },
170    execution_driver::execution_process,
171    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
172    metrics::{LatencyObserver, RateTracker},
173    module_cache_metrics::ResolverMetrics,
174    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
175    rest_index::RestIndexStore,
176    stake_aggregator::StakeAggregator,
177    state_accumulator::{AccumulatorStore, StateAccumulator},
178    subscription_handler::SubscriptionHandler,
179    transaction_input_loader::TransactionInputLoader,
180    transaction_manager::TransactionManager,
181    transaction_outputs::TransactionOutputs,
182    validator_tx_finalizer::ValidatorTxFinalizer,
183    verify_indexes::verify_indexes,
184};
185
186#[cfg(test)]
187#[path = "unit_tests/authority_tests.rs"]
188pub mod authority_tests;
189
190#[cfg(test)]
191#[path = "unit_tests/transaction_tests.rs"]
192pub mod transaction_tests;
193
194#[cfg(test)]
195#[path = "unit_tests/batch_transaction_tests.rs"]
196mod batch_transaction_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/move_integration_tests.rs"]
200pub mod move_integration_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/gas_tests.rs"]
204mod gas_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/batch_verification_tests.rs"]
208mod batch_verification_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/coin_deny_list_tests.rs"]
212mod coin_deny_list_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/auth_unit_test_utils.rs"]
216pub mod auth_unit_test_utils;
217
218pub mod authority_test_utils;
219
220pub mod authority_per_epoch_store;
221pub mod authority_per_epoch_store_pruner;
222
223mod authority_store_migrations;
224pub mod authority_store_pruner;
225pub mod authority_store_tables;
226pub mod authority_store_types;
227pub mod epoch_start_configuration;
228pub mod shared_object_congestion_tracker;
229pub mod shared_object_version_manager;
230pub mod suggested_gas_price_calculator;
231pub mod test_authority_builder;
232pub mod transaction_deferral;
233
234pub(crate) mod authority_store;
235pub mod backpressure;
236
237/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
238pub struct AuthorityMetrics {
239    tx_orders: IntCounter,
240    total_certs: IntCounter,
241    total_cert_attempts: IntCounter,
242    total_effects: IntCounter,
243    pub shared_obj_tx: IntCounter,
244    sponsored_tx: IntCounter,
245    tx_already_processed: IntCounter,
246    num_input_objs: Histogram,
247    num_shared_objects: Histogram,
248    batch_size: Histogram,
249
250    authority_state_handle_transaction_latency: Histogram,
251
252    execute_certificate_latency_single_writer: Histogram,
253    execute_certificate_latency_shared_object: Histogram,
254
255    internal_execution_latency: Histogram,
256    execution_load_input_objects_latency: Histogram,
257    prepare_certificate_latency: Histogram,
258    commit_certificate_latency: Histogram,
259    db_checkpoint_latency: Histogram,
260
261    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
262    pub(crate) transaction_manager_num_missing_objects: IntGauge,
263    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
264    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
265    pub(crate) transaction_manager_num_ready: IntGauge,
266    pub(crate) transaction_manager_object_cache_size: IntGauge,
267    pub(crate) transaction_manager_object_cache_hits: IntCounter,
268    pub(crate) transaction_manager_object_cache_misses: IntCounter,
269    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
270    pub(crate) transaction_manager_package_cache_size: IntGauge,
271    pub(crate) transaction_manager_package_cache_hits: IntCounter,
272    pub(crate) transaction_manager_package_cache_misses: IntCounter,
273    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
274    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
275
276    pub(crate) execution_driver_executed_transactions: IntCounter,
277    pub(crate) execution_driver_dispatch_queue: IntGauge,
278    pub(crate) execution_queueing_delay_s: Histogram,
279    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
280    pub(crate) execution_gas_latency_ratio: Histogram,
281
282    pub(crate) skipped_consensus_txns: IntCounter,
283    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
284
285    pub(crate) authority_overload_status: IntGauge,
286    pub(crate) authority_load_shedding_percentage: IntGauge,
287
288    pub(crate) transaction_overload_sources: IntCounterVec,
289
290    /// Post processing metrics
291    post_processing_total_events_emitted: IntCounter,
292    post_processing_total_tx_indexed: IntCounter,
293    post_processing_total_tx_had_event_processed: IntCounter,
294    post_processing_total_failures: IntCounter,
295
296    /// Consensus handler metrics
297    pub consensus_handler_processed: IntCounterVec,
298    pub consensus_handler_transaction_sizes: HistogramVec,
299    pub consensus_handler_num_low_scoring_authorities: IntGauge,
300    pub consensus_handler_scores: IntGaugeVec,
301    pub consensus_handler_deferred_transactions: IntCounter,
302    pub consensus_handler_congested_transactions: IntCounter,
303    pub consensus_handler_cancelled_transactions: IntCounter,
304    pub consensus_handler_max_object_costs: IntGaugeVec,
305    pub consensus_committed_subdags: IntCounterVec,
306    pub consensus_committed_messages: IntGaugeVec,
307    pub consensus_committed_user_transactions: IntGaugeVec,
308    pub consensus_handler_leader_round: IntGauge,
309    pub consensus_calculated_throughput: IntGauge,
310    pub consensus_calculated_throughput_profile: IntGauge,
311
312    pub limits_metrics: Arc<LimitsMetrics>,
313
314    /// bytecode verifier metrics for tracking timeouts
315    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
316
317    /// Count of zklogin signatures
318    pub zklogin_sig_count: IntCounter,
319    /// Count of multisig signatures
320    pub multisig_sig_count: IntCounter,
321
322    // Tracks recent average txn queueing delay between when it is ready for execution
323    // until it starts executing.
324    pub execution_queueing_latency: LatencyObserver,
325
326    // Tracks the rate of transactions become ready for execution in transaction manager.
327    // The need for the Mutex is that the tracker is updated in transaction manager and read
328    // in the overload_monitor. There should be low mutex contention because
329    // transaction manager is single threaded and the read rate in overload_monitor is
330    // low. In the case where transaction manager becomes multi-threaded, we can
331    // create one rate tracker per thread.
332    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
333
334    // Tracks the rate of transactions starts execution in execution driver.
335    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
336    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
337}
338
339// Override default Prom buckets for positive numbers in 0-10M range
340const POSITIVE_INT_BUCKETS: &[f64] = &[
341    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
342    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
343    7000000., 10000000.,
344];
345
346const LATENCY_SEC_BUCKETS: &[f64] = &[
347    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
348    10., 20., 30., 60., 90.,
349];
350
351// Buckets for low latency samples. Starts from 10us.
352const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
353    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
354    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
355];
356
357const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
358    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
359    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
360];
361
362/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
363pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
364
365impl AuthorityMetrics {
366    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
367        let execute_certificate_latency = register_histogram_vec_with_registry!(
368            "authority_state_execute_certificate_latency",
369            "Latency of executing certificates, including waiting for inputs",
370            &["tx_type"],
371            LATENCY_SEC_BUCKETS.to_vec(),
372            registry,
373        )
374        .unwrap();
375
376        let execute_certificate_latency_single_writer =
377            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
378        let execute_certificate_latency_shared_object =
379            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
380
381        Self {
382            tx_orders: register_int_counter_with_registry!(
383                "total_transaction_orders",
384                "Total number of transaction orders",
385                registry,
386            )
387            .unwrap(),
388            total_certs: register_int_counter_with_registry!(
389                "total_transaction_certificates",
390                "Total number of transaction certificates handled",
391                registry,
392            )
393            .unwrap(),
394            total_cert_attempts: register_int_counter_with_registry!(
395                "total_handle_certificate_attempts",
396                "Number of calls to handle_certificate",
397                registry,
398            )
399            .unwrap(),
400            // total_effects == total transactions finished
401            total_effects: register_int_counter_with_registry!(
402                "total_transaction_effects",
403                "Total number of transaction effects produced",
404                registry,
405            )
406            .unwrap(),
407
408            shared_obj_tx: register_int_counter_with_registry!(
409                "num_shared_obj_tx",
410                "Number of transactions involving shared objects",
411                registry,
412            )
413            .unwrap(),
414
415            sponsored_tx: register_int_counter_with_registry!(
416                "num_sponsored_tx",
417                "Number of sponsored transactions",
418                registry,
419            )
420            .unwrap(),
421
422            tx_already_processed: register_int_counter_with_registry!(
423                "num_tx_already_processed",
424                "Number of transaction orders already processed previously",
425                registry,
426            )
427            .unwrap(),
428            num_input_objs: register_histogram_with_registry!(
429                "num_input_objects",
430                "Distribution of number of input TX objects per TX",
431                POSITIVE_INT_BUCKETS.to_vec(),
432                registry,
433            )
434            .unwrap(),
435            num_shared_objects: register_histogram_with_registry!(
436                "num_shared_objects",
437                "Number of shared input objects per TX",
438                POSITIVE_INT_BUCKETS.to_vec(),
439                registry,
440            )
441            .unwrap(),
442            batch_size: register_histogram_with_registry!(
443                "batch_size",
444                "Distribution of size of transaction batch",
445                POSITIVE_INT_BUCKETS.to_vec(),
446                registry,
447            )
448            .unwrap(),
449            authority_state_handle_transaction_latency: register_histogram_with_registry!(
450                "authority_state_handle_transaction_latency",
451                "Latency of handling transactions",
452                LATENCY_SEC_BUCKETS.to_vec(),
453                registry,
454            )
455            .unwrap(),
456            execute_certificate_latency_single_writer,
457            execute_certificate_latency_shared_object,
458            internal_execution_latency: register_histogram_with_registry!(
459                "authority_state_internal_execution_latency",
460                "Latency of actual certificate executions",
461                LATENCY_SEC_BUCKETS.to_vec(),
462                registry,
463            )
464            .unwrap(),
465            execution_load_input_objects_latency: register_histogram_with_registry!(
466                "authority_state_execution_load_input_objects_latency",
467                "Latency of loading input objects for execution",
468                LOW_LATENCY_SEC_BUCKETS.to_vec(),
469                registry,
470            )
471            .unwrap(),
472            prepare_certificate_latency: register_histogram_with_registry!(
473                "authority_state_prepare_certificate_latency",
474                "Latency of executing certificates, before committing the results",
475                LATENCY_SEC_BUCKETS.to_vec(),
476                registry,
477            )
478            .unwrap(),
479            commit_certificate_latency: register_histogram_with_registry!(
480                "authority_state_commit_certificate_latency",
481                "Latency of committing certificate execution results",
482                LATENCY_SEC_BUCKETS.to_vec(),
483                registry,
484            )
485            .unwrap(),
486            db_checkpoint_latency: register_histogram_with_registry!(
487                "db_checkpoint_latency",
488                "Latency of checkpointing dbs",
489                LATENCY_SEC_BUCKETS.to_vec(),
490                registry,
491            ).unwrap(),
492            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
493                "transaction_manager_num_enqueued_certificates",
494                "Current number of certificates enqueued to TransactionManager",
495                &["result"],
496                registry,
497            )
498            .unwrap(),
499            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
500                "transaction_manager_num_missing_objects",
501                "Current number of missing objects in TransactionManager",
502                registry,
503            )
504            .unwrap(),
505            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
506                "transaction_manager_num_pending_certificates",
507                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
508                registry,
509            )
510            .unwrap(),
511            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
512                "transaction_manager_num_executing_certificates",
513                "Number of executing certificates, including queued and actually running certificates",
514                registry,
515            )
516            .unwrap(),
517            transaction_manager_num_ready: register_int_gauge_with_registry!(
518                "transaction_manager_num_ready",
519                "Number of ready transactions in TransactionManager",
520                registry,
521            )
522            .unwrap(),
523            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
524                "transaction_manager_object_cache_size",
525                "Current size of object-availability cache in TransactionManager",
526                registry,
527            )
528            .unwrap(),
529            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
530                "transaction_manager_object_cache_hits",
531                "Number of object-availability cache hits in TransactionManager",
532                registry,
533            )
534            .unwrap(),
535            authority_overload_status: register_int_gauge_with_registry!(
536                "authority_overload_status",
537                "Whether authority is current experiencing overload and enters load shedding mode.",
538                registry)
539            .unwrap(),
540            authority_load_shedding_percentage: register_int_gauge_with_registry!(
541                "authority_load_shedding_percentage",
542                "The percentage of transactions is shed when the authority is in load shedding mode.",
543                registry)
544            .unwrap(),
545            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
546                "transaction_manager_object_cache_misses",
547                "Number of object-availability cache misses in TransactionManager",
548                registry,
549            )
550            .unwrap(),
551            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
552                "transaction_manager_object_cache_evictions",
553                "Number of object-availability cache evictions in TransactionManager",
554                registry,
555            )
556            .unwrap(),
557            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
558                "transaction_manager_package_cache_size",
559                "Current size of package-availability cache in TransactionManager",
560                registry,
561            )
562            .unwrap(),
563            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
564                "transaction_manager_package_cache_hits",
565                "Number of package-availability cache hits in TransactionManager",
566                registry,
567            )
568            .unwrap(),
569            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
570                "transaction_manager_package_cache_misses",
571                "Number of package-availability cache misses in TransactionManager",
572                registry,
573            )
574            .unwrap(),
575            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
576                "transaction_manager_package_cache_evictions",
577                "Number of package-availability cache evictions in TransactionManager",
578                registry,
579            )
580            .unwrap(),
581            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
582                "transaction_manager_transaction_queue_age_s",
583                "Time spent in waiting for transaction in the queue",
584                LATENCY_SEC_BUCKETS.to_vec(),
585                registry,
586            )
587            .unwrap(),
588            transaction_overload_sources: register_int_counter_vec_with_registry!(
589                "transaction_overload_sources",
590                "Number of times each source indicates transaction overload.",
591                &["source"],
592                registry)
593            .unwrap(),
594            execution_driver_executed_transactions: register_int_counter_with_registry!(
595                "execution_driver_executed_transactions",
596                "Cumulative number of transaction executed by execution driver",
597                registry,
598            )
599            .unwrap(),
600            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
601                "execution_driver_dispatch_queue",
602                "Number of transaction pending in execution driver dispatch queue",
603                registry,
604            )
605            .unwrap(),
606            execution_queueing_delay_s: register_histogram_with_registry!(
607                "execution_queueing_delay_s",
608                "Queueing delay between a transaction is ready for execution until it starts executing.",
609                LATENCY_SEC_BUCKETS.to_vec(),
610                registry
611            )
612            .unwrap(),
613            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
614                "prepare_cert_gas_latency_ratio",
615                "The ratio of computation gas divided by VM execution latency.",
616                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
617                registry
618            )
619            .unwrap(),
620            execution_gas_latency_ratio: register_histogram_with_registry!(
621                "execution_gas_latency_ratio",
622                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
623                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
624                registry
625            )
626            .unwrap(),
627            skipped_consensus_txns: register_int_counter_with_registry!(
628                "skipped_consensus_txns",
629                "Total number of consensus transactions skipped",
630                registry,
631            )
632            .unwrap(),
633            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
634                "skipped_consensus_txns_cache_hit",
635                "Total number of consensus transactions skipped because of local cache hit",
636                registry,
637            )
638            .unwrap(),
639            post_processing_total_events_emitted: register_int_counter_with_registry!(
640                "post_processing_total_events_emitted",
641                "Total number of events emitted in post processing",
642                registry,
643            )
644            .unwrap(),
645            post_processing_total_tx_indexed: register_int_counter_with_registry!(
646                "post_processing_total_tx_indexed",
647                "Total number of txes indexed in post processing",
648                registry,
649            )
650            .unwrap(),
651            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
652                "post_processing_total_tx_had_event_processed",
653                "Total number of txes finished event processing in post processing",
654                registry,
655            )
656            .unwrap(),
657            post_processing_total_failures: register_int_counter_with_registry!(
658                "post_processing_total_failures",
659                "Total number of failure in post processing",
660                registry,
661            )
662            .unwrap(),
663            consensus_handler_processed: register_int_counter_vec_with_registry!(
664                "consensus_handler_processed",
665                "Number of transactions processed by consensus handler",
666                &["class"],
667                registry
668            ).unwrap(),
669            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
670                "consensus_handler_transaction_sizes",
671                "Sizes of each type of transactions processed by consensus handler",
672                &["class"],
673                POSITIVE_INT_BUCKETS.to_vec(),
674                registry
675            ).unwrap(),
676            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
677                "consensus_handler_num_low_scoring_authorities",
678                "Number of low scoring authorities based on reputation scores from consensus",
679                registry
680            ).unwrap(),
681            consensus_handler_scores: register_int_gauge_vec_with_registry!(
682                "consensus_handler_scores",
683                "scores from consensus for each authority",
684                &["authority"],
685                registry,
686            ).unwrap(),
687            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
688                "consensus_handler_deferred_transactions",
689                "Number of transactions deferred by consensus handler",
690                registry,
691            ).unwrap(),
692            consensus_handler_congested_transactions: register_int_counter_with_registry!(
693                "consensus_handler_congested_transactions",
694                "Number of transactions deferred by consensus handler due to congestion",
695                registry,
696            ).unwrap(),
697            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
698                "consensus_handler_cancelled_transactions",
699                "Number of transactions cancelled by consensus handler",
700                registry,
701            ).unwrap(),
702            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
703                "consensus_handler_max_congestion_control_object_costs",
704                "Max object costs for congestion control in the current consensus commit",
705                &["commit_type"],
706                registry,
707            ).unwrap(),
708            consensus_committed_subdags: register_int_counter_vec_with_registry!(
709                "consensus_committed_subdags",
710                "Number of committed subdags, sliced by leader",
711                &["authority"],
712                registry,
713            ).unwrap(),
714            consensus_committed_messages: register_int_gauge_vec_with_registry!(
715                "consensus_committed_messages",
716                "Total number of committed consensus messages, sliced by author",
717                &["authority"],
718                registry,
719            ).unwrap(),
720            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
721                "consensus_committed_user_transactions",
722                "Number of committed user transactions, sliced by submitter",
723                &["authority"],
724                registry,
725            ).unwrap(),
726            consensus_handler_leader_round: register_int_gauge_with_registry!(
727                "consensus_handler_leader_round",
728                "The leader round of the current consensus output being processed in the consensus handler",
729                registry,
730            ).unwrap(),
731            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
732            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
733            zklogin_sig_count: register_int_counter_with_registry!(
734                "zklogin_sig_count",
735                "Count of zkLogin signatures",
736                registry,
737            )
738            .unwrap(),
739            multisig_sig_count: register_int_counter_with_registry!(
740                "multisig_sig_count",
741                "Count of zkLogin signatures",
742                registry,
743            )
744            .unwrap(),
745            consensus_calculated_throughput: register_int_gauge_with_registry!(
746                "consensus_calculated_throughput",
747                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
748                registry,
749            ).unwrap(),
750            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
751                "consensus_calculated_throughput_profile",
752                "The current active calculated throughput profile",
753                registry
754            ).unwrap(),
755            execution_queueing_latency: LatencyObserver::new(),
756            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
758        }
759    }
760
761    /// Reset metrics that contain `hostname` as one of the labels. This is
762    /// needed to avoid retaining metrics for long-gone committee members and
763    /// only exposing metrics for the committee in the current epoch.
764    pub fn reset_on_reconfigure(&self) {
765        self.consensus_committed_messages.reset();
766        self.consensus_handler_scores.reset();
767        self.consensus_committed_user_transactions.reset();
768    }
769}
770
771/// a Trait object for `Signer` that is:
772/// - Pin, i.e. confined to one place in memory (we don't want to copy private
773///   keys).
774/// - Sync, i.e. can be safely shared between threads.
775///
776/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
777pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
778
779pub struct AuthorityState {
780    // Fixed size, static, identity of the authority
781    /// The name of this authority.
782    pub name: AuthorityName,
783    /// The signature key of the authority.
784    pub secret: StableSyncAuthoritySigner,
785
786    /// The database
787    input_loader: TransactionInputLoader,
788    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
789
790    epoch_store: ArcSwap<AuthorityPerEpochStore>,
791
792    /// This lock denotes current 'execution epoch'.
793    /// Execution acquires read lock, checks certificate epoch and holds it
794    /// until all writes are complete. Reconfiguration acquires write lock,
795    /// changes the epoch and revert all transactions from previous epoch
796    /// that are executed but did not make into checkpoint.
797    execution_lock: RwLock<EpochId>,
798
799    pub indexes: Option<Arc<IndexStore>>,
800    pub rest_index: Option<Arc<RestIndexStore>>,
801
802    pub subscription_handler: Arc<SubscriptionHandler>,
803    pub checkpoint_store: Arc<CheckpointStore>,
804
805    committee_store: Arc<CommitteeStore>,
806
807    /// Manages pending certificates and their missing input objects.
808    transaction_manager: Arc<TransactionManager>,
809
810    /// Shuts down the execution task. Used only in testing.
811    #[cfg_attr(not(test), expect(unused))]
812    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
813
814    pub metrics: Arc<AuthorityMetrics>,
815    _pruner: AuthorityStorePruner,
816    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
817
818    /// Take db checkpoints of different dbs
819    db_checkpoint_config: DBCheckpointConfig,
820
821    pub config: NodeConfig,
822
823    /// Current overload status in this authority. Updated periodically.
824    pub overload_info: AuthorityOverloadInfo,
825
826    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
827
828    /// The chain identifier is derived from the digest of the genesis
829    /// checkpoint.
830    chain_identifier: ChainIdentifier,
831
832    pub(crate) congestion_tracker: Arc<CongestionTracker>,
833}
834
835/// The authority state encapsulates all state, drives execution, and ensures
836/// safety.
837///
838/// Note the authority operations can be accessed through a read ref (&) and do
839/// not require &mut. Internally a database is synchronized through a mutex
840/// lock.
841///
842/// Repeating valid commands should produce no changes and return no error.
843impl AuthorityState {
844    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
845        epoch_store.committee().authority_exists(&self.name)
846    }
847
848    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
849        epoch_store
850            .active_validators()
851            .iter()
852            .any(|a| AuthorityName::from(a) == self.name)
853    }
854
855    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
856        !self.is_committee_validator(epoch_store)
857    }
858
859    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
860        &self.committee_store
861    }
862
863    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
864        self.committee_store.clone()
865    }
866
867    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
868        &self.config.authority_overload_config
869    }
870
871    pub fn get_epoch_state_commitments(
872        &self,
873        epoch: EpochId,
874    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
875        self.checkpoint_store.get_epoch_state_commitments(epoch)
876    }
877
878    /// This is a private method and should be kept that way. It doesn't check
879    /// whether the provided transaction is a system transaction, and hence
880    /// can only be called internally.
881    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
882    async fn handle_transaction_impl(
883        &self,
884        transaction: VerifiedTransaction,
885        epoch_store: &Arc<AuthorityPerEpochStore>,
886    ) -> IotaResult<VerifiedSignedTransaction> {
887        // Ensure that validator cannot reconfigure while we are signing the tx
888        let _execution_lock = self.execution_lock_for_signing()?;
889
890        let protocol_config = epoch_store.protocol_config();
891        let reference_gas_price = epoch_store.reference_gas_price();
892
893        let epoch = epoch_store.epoch();
894
895        let tx_data = transaction.data().transaction_data();
896
897        // Note: the deny checks may do redundant package loads but:
898        // - they only load packages when there is an active package deny map
899        // - the loads are cached anyway
900        iota_transaction_checks::deny::check_transaction_for_signing(
901            tx_data,
902            transaction.tx_signatures(),
903            &transaction.input_objects()?,
904            &tx_data.receiving_objects(),
905            &self.config.transaction_deny_config,
906            self.get_backing_package_store().as_ref(),
907        )?;
908
909        // Load all transaction-related input objects.
910        // Authenticator input objects and the account object are loaded in the same
911        // call if there is a sender `MoveAuthenticator` signature present in the
912        // transaction.
913        let (tx_input_objects, tx_receiving_objects, auth_input_objects, account_object) =
914            self.read_objects_for_signing(&transaction, epoch)?;
915
916        // Get the sender `MoveAuthenticator`, if any.
917        // Only one `MoveAuthenticator` signature is possible, since it is not
918        // implemented for the sponsor at the moment.
919        let move_authenticator = transaction.sender_move_authenticator();
920
921        // Check the inputs for signing.
922        // If there is a sender `MoveAuthenticator` signature, its input objects and the
923        // account object are also checked and must be provided.
924        // It is also checked if there is enough gas to execute the transaction and its
925        // authenticators.
926        let (
927            gas_status,
928            tx_checked_input_objects,
929            auth_checked_input_objects,
930            authenticator_function_ref,
931        ) = self.check_transaction_inputs_for_signing(
932            protocol_config,
933            reference_gas_price,
934            tx_data,
935            tx_input_objects,
936            &tx_receiving_objects,
937            move_authenticator,
938            auth_input_objects,
939            account_object,
940        )?;
941
942        check_coin_deny_list_v1_during_signing(
943            tx_data.sender(),
944            &tx_checked_input_objects,
945            &tx_receiving_objects,
946            &auth_checked_input_objects,
947            &self.get_object_store(),
948        )?;
949
950        if let Some(move_authenticator) = move_authenticator {
951            // It is supposed that `MoveAuthenticator` availability is checked in
952            // `SenderSignedData::validity_check`.
953
954            let auth_checked_input_objects = auth_checked_input_objects
955                .expect("MoveAuthenticator input objects must be provided");
956            let authenticator_function_ref = authenticator_function_ref
957                .expect("AuthenticatorFunctionRefV1 object must be provided");
958
959            let (kind, signer, gas_data) = tx_data.execution_parts();
960
961            // Execute the Move authenticator.
962            let validation_result = epoch_store.executor().authenticate_transaction(
963                self.get_backing_store().as_ref(),
964                protocol_config,
965                self.metrics.limits_metrics.clone(),
966                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
967                epoch_store
968                    .epoch_start_config()
969                    .epoch_data()
970                    .epoch_start_timestamp(),
971                gas_data,
972                gas_status,
973                move_authenticator.to_owned(),
974                authenticator_function_ref,
975                auth_checked_input_objects,
976                kind,
977                signer,
978                transaction.digest().to_owned(),
979                &mut None,
980            );
981
982            if let Err(validation_error) = validation_result {
983                return Err(IotaError::MoveAuthenticatorExecutionFailure {
984                    error: validation_error.to_string(),
985                });
986            }
987        }
988
989        let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
990
991        let signed_transaction =
992            VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
993
994        // Check and write locks, to signed transaction, into the database
995        // The call to self.set_transaction_lock checks the lock is not conflicting,
996        // and returns ConflictingTransaction error in case there is a lock on a
997        // different existing transaction.
998        self.get_cache_writer().try_acquire_transaction_locks(
999            epoch_store,
1000            &owned_objects,
1001            signed_transaction.clone(),
1002        )?;
1003
1004        Ok(signed_transaction)
1005    }
1006
1007    /// Initiate a new transaction.
1008    #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()))]
1009    pub async fn handle_transaction(
1010        &self,
1011        epoch_store: &Arc<AuthorityPerEpochStore>,
1012        transaction: VerifiedTransaction,
1013    ) -> IotaResult<HandleTransactionResponse> {
1014        let tx_digest = *transaction.digest();
1015        debug!("handle_transaction");
1016
1017        // Ensure an idempotent answer.
1018        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1019            return Ok(HandleTransactionResponse { status });
1020        }
1021
1022        let _metrics_guard = self
1023            .metrics
1024            .authority_state_handle_transaction_latency
1025            .start_timer();
1026        self.metrics.tx_orders.inc();
1027
1028        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1029        match signed {
1030            Ok(s) => {
1031                if self.is_committee_validator(epoch_store) {
1032                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1033                        let tx = s.clone();
1034                        let validator_tx_finalizer = validator_tx_finalizer.clone();
1035                        let cache_reader = self.get_transaction_cache_reader().clone();
1036                        let epoch_store = epoch_store.clone();
1037                        spawn_monitored_task!(epoch_store.within_alive_epoch(
1038                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1039                        ));
1040                    }
1041                }
1042                Ok(HandleTransactionResponse {
1043                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
1044                })
1045            }
1046            // It happens frequently that while we are checking the validity of the transaction, it
1047            // has just been executed.
1048            // In that case, we could still return Ok to avoid showing confusing errors.
1049            Err(err) => Ok(HandleTransactionResponse {
1050                status: self
1051                    .get_transaction_status(&tx_digest, epoch_store)?
1052                    .ok_or(err)?
1053                    .1,
1054            }),
1055        }
1056    }
1057
1058    pub fn check_system_overload_at_signing(&self) -> bool {
1059        self.config
1060            .authority_overload_config
1061            .check_system_overload_at_signing
1062    }
1063
1064    pub fn check_system_overload_at_execution(&self) -> bool {
1065        self.config
1066            .authority_overload_config
1067            .check_system_overload_at_execution
1068    }
1069
1070    pub(crate) fn check_system_overload(
1071        &self,
1072        consensus_adapter: &Arc<ConsensusAdapter>,
1073        tx_data: &SenderSignedData,
1074        do_authority_overload_check: bool,
1075    ) -> IotaResult {
1076        if do_authority_overload_check {
1077            self.check_authority_overload(tx_data).tap_err(|_| {
1078                self.update_overload_metrics("execution_queue");
1079            })?;
1080        }
1081        self.transaction_manager
1082            .check_execution_overload(self.overload_config(), tx_data)
1083            .tap_err(|_| {
1084                self.update_overload_metrics("execution_pending");
1085            })?;
1086        consensus_adapter.check_consensus_overload().tap_err(|_| {
1087            self.update_overload_metrics("consensus");
1088        })?;
1089
1090        let pending_tx_count = self
1091            .get_cache_commit()
1092            .approximate_pending_transaction_count();
1093        if pending_tx_count
1094            > self
1095                .config
1096                .execution_cache_config
1097                .writeback_cache
1098                .backpressure_threshold_for_rpc()
1099        {
1100            return Err(IotaError::ValidatorOverloadedRetryAfter {
1101                retry_after_secs: 10,
1102            });
1103        }
1104
1105        Ok(())
1106    }
1107
1108    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1109        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1110            return Ok(());
1111        }
1112
1113        let load_shedding_percentage = self
1114            .overload_info
1115            .load_shedding_percentage
1116            .load(Ordering::Relaxed);
1117        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1118    }
1119
1120    fn update_overload_metrics(&self, source: &str) {
1121        self.metrics
1122            .transaction_overload_sources
1123            .with_label_values(&[source])
1124            .inc();
1125    }
1126
1127    /// Executes a certificate for its effects.
1128    #[instrument(level = "trace", skip_all)]
1129    pub async fn execute_certificate(
1130        &self,
1131        certificate: &VerifiedCertificate,
1132        epoch_store: &Arc<AuthorityPerEpochStore>,
1133    ) -> IotaResult<TransactionEffects> {
1134        let _metrics_guard = if certificate.contains_shared_object() {
1135            self.metrics
1136                .execute_certificate_latency_shared_object
1137                .start_timer()
1138        } else {
1139            self.metrics
1140                .execute_certificate_latency_single_writer
1141                .start_timer()
1142        };
1143        trace!("execute_certificate");
1144
1145        self.metrics.total_cert_attempts.inc();
1146
1147        if !certificate.contains_shared_object() {
1148            // Shared object transactions need to be sequenced by the consensus before
1149            // enqueueing for execution, done in
1150            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1151            // object transactions, they can be enqueued for execution immediately.
1152            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1153        }
1154
1155        // tx could be reverted when epoch ends, so we must be careful not to return a
1156        // result here after the epoch ends.
1157        epoch_store
1158            .within_alive_epoch(self.notify_read_effects(certificate))
1159            .await
1160            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1161            .and_then(|r| r)
1162    }
1163
1164    /// Internal logic to execute a certificate.
1165    ///
1166    /// Guarantees that
1167    /// - If input objects are available, return no permanent failure.
1168    /// - Execution and output commit are atomic. i.e. outputs are only written
1169    ///   to storage,
1170    /// on successful execution; crashed execution has no observable effect and
1171    /// can be retried.
1172    ///
1173    /// It is caller's responsibility to ensure input objects are available and
1174    /// locks are set. If this cannot be satisfied by the caller,
1175    /// execute_certificate() should be called instead.
1176    ///
1177    /// Should only be called within iota-core.
1178    #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1179    pub fn try_execute_immediately(
1180        &self,
1181        certificate: &VerifiedExecutableTransaction,
1182        expected_effects_digest: Option<TransactionEffectsDigest>,
1183        epoch_store: &Arc<AuthorityPerEpochStore>,
1184    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1185        let _scope = monitored_scope("Execution::try_execute_immediately");
1186        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1187
1188        let tx_digest = certificate.digest();
1189
1190        // Acquire a lock to prevent concurrent executions of the same transaction.
1191        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1192
1193        // The cert could have been processed by a concurrent attempt of the same cert,
1194        // so check if the effects have already been written.
1195        if let Some(effects) = self
1196            .get_transaction_cache_reader()
1197            .try_get_executed_effects(tx_digest)?
1198        {
1199            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1200                assert_eq!(
1201                    effects.digest(),
1202                    expected_effects_digest_inner,
1203                    "Unexpected effects digest for transaction {tx_digest:?}"
1204                );
1205            }
1206            tx_guard.release();
1207            return Ok((effects, None));
1208        }
1209
1210        let (tx_input_objects, authenticator_input_objects, account_object) =
1211            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1212
1213        // If no expected_effects_digest was provided, try to get it from storage.
1214        // We could be re-executing a previously executed but uncommitted transaction,
1215        // perhaps after restarting with a new binary. In this situation, if
1216        // we have published an effects signature, we must be sure not to
1217        // equivocate. TODO: read from cache instead of DB
1218        let expected_effects_digest =
1219            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1220
1221        self.process_certificate(
1222            tx_guard,
1223            certificate,
1224            tx_input_objects,
1225            account_object,
1226            authenticator_input_objects,
1227            expected_effects_digest,
1228            epoch_store,
1229        )
1230        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1231        .tap_ok(
1232            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1233        )
1234    }
1235
1236    pub fn read_objects_for_execution(
1237        &self,
1238        tx_lock: &CertLockGuard,
1239        certificate: &VerifiedExecutableTransaction,
1240        epoch_store: &Arc<AuthorityPerEpochStore>,
1241    ) -> IotaResult<(InputObjects, Option<InputObjects>, Option<ObjectReadResult>)> {
1242        let _scope = monitored_scope("Execution::load_input_objects");
1243        let _metrics_guard = self
1244            .metrics
1245            .execution_load_input_objects_latency
1246            .start_timer();
1247
1248        let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1249
1250        let input_objects = self.input_loader.read_objects_for_execution(
1251            epoch_store,
1252            &certificate.key(),
1253            tx_lock,
1254            &input_objects,
1255            epoch_store.epoch(),
1256        )?;
1257
1258        certificate.split_input_objects_into_groups_for_reading(input_objects)
1259    }
1260
1261    /// Test only wrapper for `try_execute_immediately()` above, useful for
1262    /// checking errors if the pre-conditions are not satisfied, and
1263    /// executing change epoch transactions.
1264    pub fn try_execute_for_test(
1265        &self,
1266        certificate: &VerifiedCertificate,
1267    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1268        let epoch_store = self.epoch_store_for_testing();
1269        let (effects, execution_error_opt) = self.try_execute_immediately(
1270            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1271            None,
1272            &epoch_store,
1273        )?;
1274        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1275        Ok((signed_effects, execution_error_opt))
1276    }
1277
1278    /// Non-fallible version of `try_execute_for_test()`.
1279    pub fn execute_for_test(
1280        &self,
1281        certificate: &VerifiedCertificate,
1282    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1283        self.try_execute_for_test(certificate)
1284            .expect("try_execute_for_test should not fail")
1285    }
1286
1287    pub async fn notify_read_effects(
1288        &self,
1289        certificate: &VerifiedCertificate,
1290    ) -> IotaResult<TransactionEffects> {
1291        self.get_transaction_cache_reader()
1292            .try_notify_read_executed_effects(&[*certificate.digest()])
1293            .await
1294            .map(|mut r| r.pop().expect("must return correct number of effects"))
1295    }
1296
1297    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1298        self.get_object_cache_reader()
1299            .try_check_owned_objects_are_live(owned_object_refs)
1300    }
1301
1302    /// This function captures the required state to debug a forked transaction.
1303    /// The dump is written to a file in dir `path`, with name prefixed by the
1304    /// transaction digest. NOTE: Since this info escapes the validator
1305    /// context, make sure not to leak any private info here
1306    pub(crate) fn debug_dump_transaction_state(
1307        &self,
1308        tx_digest: &TransactionDigest,
1309        effects: &TransactionEffects,
1310        expected_effects_digest: TransactionEffectsDigest,
1311        inner_temporary_store: &InnerTemporaryStore,
1312        certificate: &VerifiedExecutableTransaction,
1313        debug_dump_config: &StateDebugDumpConfig,
1314    ) -> IotaResult<PathBuf> {
1315        let dump_dir = debug_dump_config
1316            .dump_file_directory
1317            .as_ref()
1318            .cloned()
1319            .unwrap_or(std::env::temp_dir());
1320        let epoch_store = self.load_epoch_store_one_call_per_task();
1321
1322        NodeStateDump::new(
1323            tx_digest,
1324            effects,
1325            expected_effects_digest,
1326            self.get_object_store().as_ref(),
1327            &epoch_store,
1328            inner_temporary_store,
1329            certificate,
1330        )?
1331        .write_to_file(&dump_dir)
1332        .map_err(|e| IotaError::FileIO(e.to_string()))
1333    }
1334
1335    #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?certificate.digest(), sender = ?certificate.data().transaction_data().gas_owner().to_string()))]
1336    pub(crate) fn process_certificate(
1337        &self,
1338        tx_guard: CertTxGuard,
1339        certificate: &VerifiedExecutableTransaction,
1340        tx_input_objects: InputObjects,
1341        account_object: Option<ObjectReadResult>,
1342        authenticator_input_objects: Option<InputObjects>,
1343        expected_effects_digest: Option<TransactionEffectsDigest>,
1344        epoch_store: &Arc<AuthorityPerEpochStore>,
1345    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1346        let process_certificate_start_time = tokio::time::Instant::now();
1347        let digest = *certificate.digest();
1348
1349        let _scope = monitored_scope("Execution::process_certificate");
1350
1351        fail_point_if!("correlated-crash-process-certificate", || {
1352            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1353                iota_simulator::task::kill_current_node(None);
1354            }
1355        });
1356
1357        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1358        // Any caller that verifies the signatures on the certificate will have already
1359        // checked the epoch. But paths that don't verify sigs (e.g. execution
1360        // from checkpoint, reading from db) present the possibility of an epoch
1361        // mismatch. If this cert is not finalzied in previous epoch, then it's
1362        // invalid.
1363        let execution_guard = match execution_guard {
1364            Ok(execution_guard) => execution_guard,
1365            Err(err) => {
1366                tx_guard.release();
1367                return Err(err);
1368            }
1369        };
1370        // Since we obtain a reference to the epoch store before taking the execution
1371        // lock, it's possible that reconfiguration has happened and they no
1372        // longer match.
1373        if *execution_guard != epoch_store.epoch() {
1374            tx_guard.release();
1375            info!("The epoch of the execution_guard doesn't match the epoch store");
1376            return Err(IotaError::WrongEpoch {
1377                expected_epoch: epoch_store.epoch(),
1378                actual_epoch: *execution_guard,
1379            });
1380        }
1381
1382        // Errors originating from prepare_certificate may be transient (failure to read
1383        // locks) or non-transient (transaction input is invalid, move vm
1384        // errors). However, all errors from this function occur before we have
1385        // written anything to the db, so we commit the tx guard and rely on the
1386        // client to retry the tx (if it was transient).
1387        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1388            &execution_guard,
1389            certificate,
1390            tx_input_objects,
1391            account_object,
1392            authenticator_input_objects,
1393            epoch_store,
1394        ) {
1395            Err(e) => {
1396                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1397                tx_guard.release();
1398                return Err(e);
1399            }
1400            Ok(res) => res,
1401        };
1402
1403        if let Some(expected_effects_digest) = expected_effects_digest {
1404            if effects.digest() != expected_effects_digest {
1405                // We dont want to mask the original error, so we log it and continue.
1406                match self.debug_dump_transaction_state(
1407                    &digest,
1408                    &effects,
1409                    expected_effects_digest,
1410                    &inner_temporary_store,
1411                    certificate,
1412                    &self.config.state_debug_dump_config,
1413                ) {
1414                    Ok(out_path) => {
1415                        info!(
1416                            "Dumped node state for transaction {} to {}",
1417                            digest,
1418                            out_path.as_path().display().to_string()
1419                        );
1420                    }
1421                    Err(e) => {
1422                        error!("Error dumping state for transaction {}: {e}", digest);
1423                    }
1424                }
1425                error!(
1426                    tx_digest = ?digest,
1427                    ?expected_effects_digest,
1428                    actual_effects = ?effects,
1429                    "fork detected!"
1430                );
1431                panic!(
1432                    "Transaction {} is expected to have effects digest {}, but got {}!",
1433                    digest,
1434                    expected_effects_digest,
1435                    effects.digest(),
1436                );
1437            }
1438        }
1439
1440        fail_point!("crash");
1441
1442        self.commit_certificate(
1443            certificate,
1444            inner_temporary_store,
1445            &effects,
1446            tx_guard,
1447            execution_guard,
1448            epoch_store,
1449        )?;
1450
1451        if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1452            certificate.data().transaction_data().kind()
1453        {
1454            if let Some(err) = &execution_error_opt {
1455                debug_fatal!("Authenticator state update failed: {:?}", err);
1456            }
1457            epoch_store.update_authenticator_state(auth_state);
1458
1459            // double check that the signature verifier always matches the authenticator
1460            // state
1461            if cfg!(debug_assertions) {
1462                let authenticator_state = get_authenticator_state(self.get_object_store())
1463                    .expect("Read cannot fail")
1464                    .expect("Authenticator state must exist");
1465
1466                let mut sys_jwks: Vec<_> = authenticator_state
1467                    .active_jwks
1468                    .into_iter()
1469                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1470                    .collect();
1471                let mut active_jwks: Vec<_> = epoch_store
1472                    .signature_verifier
1473                    .get_jwks()
1474                    .into_iter()
1475                    .collect();
1476                sys_jwks.sort();
1477                active_jwks.sort();
1478
1479                assert_eq!(sys_jwks, active_jwks);
1480            }
1481        }
1482
1483        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1484        if elapsed > 0.0 {
1485            self.metrics
1486                .execution_gas_latency_ratio
1487                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1488        };
1489        Ok((effects, execution_error_opt))
1490    }
1491
1492    #[instrument(level = "trace", skip_all)]
1493    fn commit_certificate(
1494        &self,
1495        certificate: &VerifiedExecutableTransaction,
1496        inner_temporary_store: InnerTemporaryStore,
1497        effects: &TransactionEffects,
1498        tx_guard: CertTxGuard,
1499        _execution_guard: ExecutionLockReadGuard<'_>,
1500        epoch_store: &Arc<AuthorityPerEpochStore>,
1501    ) -> IotaResult {
1502        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1503            monitored_scope("Execution::commit_certificate");
1504        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1505
1506        let tx_key = certificate.key();
1507        let tx_digest = certificate.digest();
1508        let input_object_count = inner_temporary_store.input_objects.len();
1509        let shared_object_count = effects.input_shared_objects().len();
1510
1511        let output_keys = inner_temporary_store.get_output_keys(effects);
1512
1513        // index certificate
1514        let _ = self
1515            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1516            .tap_err(|e| {
1517                self.metrics.post_processing_total_failures.inc();
1518                error!(?tx_digest, "tx post processing failed: {e}");
1519            });
1520
1521        // The insertion to epoch_store is not atomic with the insertion to the
1522        // perpetual store. This is OK because we insert to the epoch store
1523        // first. And during lookups we always look up in the perpetual store first.
1524        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1525
1526        // Allow testing what happens if we crash here.
1527        fail_point!("crash");
1528
1529        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1530            certificate.clone().into_unsigned(),
1531            effects.clone(),
1532            inner_temporary_store,
1533        );
1534        self.get_cache_writer()
1535            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1536
1537        if certificate.transaction_data().is_end_of_epoch_tx() {
1538            // At the end of epoch, since system packages may have been upgraded, force
1539            // reload them in the cache.
1540            self.get_object_cache_reader()
1541                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1542        }
1543
1544        // commit_certificate finished, the tx is fully committed to the store.
1545        tx_guard.commit_tx();
1546
1547        // Notifies transaction manager about transaction and output objects committed.
1548        // This provides necessary information to transaction manager to start executing
1549        // additional ready transactions.
1550        self.transaction_manager
1551            .notify_commit(tx_digest, output_keys, epoch_store);
1552
1553        self.update_metrics(certificate, input_object_count, shared_object_count);
1554
1555        Ok(())
1556    }
1557
1558    fn update_metrics(
1559        &self,
1560        certificate: &VerifiedExecutableTransaction,
1561        input_object_count: usize,
1562        shared_object_count: usize,
1563    ) {
1564        // count signature by scheme, for zklogin and multisig
1565        if certificate.has_zklogin_sig() {
1566            self.metrics.zklogin_sig_count.inc();
1567        } else if certificate.has_upgraded_multisig() {
1568            self.metrics.multisig_sig_count.inc();
1569        }
1570
1571        self.metrics.total_effects.inc();
1572        self.metrics.total_certs.inc();
1573
1574        if shared_object_count > 0 {
1575            self.metrics.shared_obj_tx.inc();
1576        }
1577
1578        if certificate.is_sponsored_tx() {
1579            self.metrics.sponsored_tx.inc();
1580        }
1581
1582        self.metrics
1583            .num_input_objs
1584            .observe(input_object_count as f64);
1585        self.metrics
1586            .num_shared_objects
1587            .observe(shared_object_count as f64);
1588        self.metrics.batch_size.observe(
1589            certificate
1590                .data()
1591                .intent_message()
1592                .value
1593                .kind()
1594                .num_commands() as f64,
1595        );
1596    }
1597
1598    /// prepare_certificate validates the transaction input, and executes the
1599    /// certificate, returning effects, output objects, events, etc.
1600    ///
1601    /// It reads state from the db (both owned and shared locks), but it has no
1602    /// side effects.
1603    ///
1604    /// It can be generally understood that a failure of prepare_certificate
1605    /// indicates a non-transient error, e.g. the transaction input is
1606    /// somehow invalid, the correct locks are not held, etc. However, this
1607    /// is not entirely true, as a transient db read error may also cause
1608    /// this function to fail.
1609    #[instrument(level = "trace", skip_all)]
1610    fn prepare_certificate(
1611        &self,
1612        _execution_guard: &ExecutionLockReadGuard<'_>,
1613        certificate: &VerifiedExecutableTransaction,
1614        tx_input_objects: InputObjects,
1615        account_object: Option<ObjectReadResult>,
1616        move_authenticator_input_objects: Option<InputObjects>,
1617        epoch_store: &Arc<AuthorityPerEpochStore>,
1618    ) -> IotaResult<(
1619        InnerTemporaryStore,
1620        TransactionEffects,
1621        Option<ExecutionError>,
1622    )> {
1623        let _scope = monitored_scope("Execution::prepare_certificate");
1624        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1625        let prepare_certificate_start_time = tokio::time::Instant::now();
1626
1627        let protocol_config = epoch_store.protocol_config();
1628
1629        let reference_gas_price = epoch_store.reference_gas_price();
1630
1631        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1632        let epoch_start_timestamp = epoch_store
1633            .epoch_start_config()
1634            .epoch_data()
1635            .epoch_start_timestamp();
1636
1637        let backing_store = self.get_backing_store().as_ref();
1638
1639        let tx_digest = *certificate.digest();
1640
1641        // TODO: We need to move this to a more appropriate place to avoid redundant
1642        // checks.
1643        let tx_data = certificate.data().transaction_data();
1644        tx_data.validity_check(protocol_config)?;
1645
1646        let (kind, signer, gas_data) = tx_data.execution_parts();
1647
1648        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1649        let (inner_temp_store, _, mut effects, execution_error_opt) = if let Some(
1650            move_authenticator,
1651        ) =
1652            certificate.sender_move_authenticator()
1653        {
1654            // Check if the sender needs to be authenticated in Move and, if so, execute the
1655            // authentication.
1656            // It is supposed that `MoveAuthenticator` availability is checked in
1657            // `SenderSignedData::validity_check`.
1658
1659            // Check basic `object_to_authenticate` preconditions and get its components.
1660            let (
1661                auth_account_object_id,
1662                auth_account_object_seq_number,
1663                auth_account_object_digest,
1664            ) = move_authenticator.object_to_authenticate_components()?;
1665
1666            // Since the `object_to_authenticate` components are provided, it is supposed
1667            // that the account object is loaded.
1668            let account_object = account_object.expect("Account object must be provided");
1669
1670            let authenticator_function_ref_for_execution = self.check_move_account(
1671                auth_account_object_id,
1672                auth_account_object_seq_number,
1673                auth_account_object_digest,
1674                account_object,
1675                &signer,
1676            )?;
1677
1678            let move_authenticator_input_objects = move_authenticator_input_objects.expect(
1679                    "In case of a `MoveAuthenticator` signature, the authenticator input objects must be provided",
1680                );
1681
1682            // Check the `MoveAuthenticator` input objects.
1683            // The `MoveAuthenticator` receiving objects are checked on the signing step.
1684            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
1685            // not a part of the transaction data.
1686            let authenticator_gas_budget = protocol_config.max_auth_gas();
1687            let (
1688                gas_status,
1689                authenticator_checked_input_objects,
1690                authenticator_and_tx_checked_input_objects,
1691            ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1692                certificate,
1693                tx_input_objects,
1694                move_authenticator_input_objects,
1695                authenticator_gas_budget,
1696                protocol_config,
1697                reference_gas_price,
1698            )?;
1699
1700            let owned_object_refs = authenticator_and_tx_checked_input_objects
1701                .inner()
1702                .filter_owned_objects();
1703            self.check_owned_locks(&owned_object_refs)?;
1704
1705            epoch_store
1706                .executor()
1707                .authenticate_then_execute_transaction_to_effects(
1708                    backing_store,
1709                    protocol_config,
1710                    self.metrics.limits_metrics.clone(),
1711                    self.config
1712                        .expensive_safety_check_config
1713                        .enable_deep_per_tx_iota_conservation_check(),
1714                    self.config.certificate_deny_config.certificate_deny_set(),
1715                    &epoch_id,
1716                    epoch_start_timestamp,
1717                    gas_data,
1718                    gas_status,
1719                    move_authenticator.to_owned(),
1720                    authenticator_function_ref_for_execution,
1721                    authenticator_checked_input_objects,
1722                    authenticator_and_tx_checked_input_objects,
1723                    kind,
1724                    signer,
1725                    tx_digest,
1726                    &mut None,
1727                )
1728        } else {
1729            // No Move authentication required, proceed to execute the transaction directly.
1730
1731            // The cost of partially re-auditing a transaction before execution is
1732            // tolerated.
1733            let (tx_gas_status, tx_checked_input_objects) =
1734                iota_transaction_checks::check_certificate_input(
1735                    certificate,
1736                    tx_input_objects,
1737                    protocol_config,
1738                    reference_gas_price,
1739                )?;
1740
1741            let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1742            self.check_owned_locks(&owned_object_refs)?;
1743            epoch_store.executor().execute_transaction_to_effects(
1744                backing_store,
1745                protocol_config,
1746                self.metrics.limits_metrics.clone(),
1747                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1748                // cyclic dependency w/ iota-adapter
1749                self.config
1750                    .expensive_safety_check_config
1751                    .enable_deep_per_tx_iota_conservation_check(),
1752                self.config.certificate_deny_config.certificate_deny_set(),
1753                &epoch_id,
1754                epoch_start_timestamp,
1755                tx_checked_input_objects,
1756                gas_data,
1757                tx_gas_status,
1758                kind,
1759                signer,
1760                tx_digest,
1761                &mut None,
1762            )
1763        };
1764
1765        fail_point_if!("cp_execution_nondeterminism", || {
1766            #[cfg(msim)]
1767            self.create_fail_state(certificate, epoch_store, &mut effects);
1768        });
1769
1770        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1771        if elapsed > 0.0 {
1772            self.metrics
1773                .prepare_cert_gas_latency_ratio
1774                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1775        }
1776
1777        Ok((inner_temp_store, effects, execution_error_opt.err()))
1778    }
1779
1780    pub fn prepare_certificate_for_benchmark(
1781        &self,
1782        certificate: &VerifiedExecutableTransaction,
1783        input_objects: InputObjects,
1784        epoch_store: &Arc<AuthorityPerEpochStore>,
1785    ) -> IotaResult<(
1786        InnerTemporaryStore,
1787        TransactionEffects,
1788        Option<ExecutionError>,
1789    )> {
1790        let lock = RwLock::new(epoch_store.epoch());
1791        let execution_guard = lock.try_read().unwrap();
1792
1793        self.prepare_certificate(
1794            &execution_guard,
1795            certificate,
1796            input_objects,
1797            None,
1798            None,
1799            epoch_store,
1800        )
1801    }
1802
1803    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1804    /// `VmChecks::Enabled` instead.
1805    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1806    #[allow(clippy::type_complexity)]
1807    pub fn dry_exec_transaction(
1808        &self,
1809        transaction: TransactionData,
1810        transaction_digest: TransactionDigest,
1811    ) -> IotaResult<(
1812        DryRunTransactionBlockResponse,
1813        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1814        TransactionEffects,
1815        Option<ObjectID>,
1816    )> {
1817        let epoch_store = self.load_epoch_store_one_call_per_task();
1818        if !self.is_fullnode(&epoch_store) {
1819            return Err(IotaError::UnsupportedFeature {
1820                error: "dry-exec is only supported on fullnodes".to_string(),
1821            });
1822        }
1823
1824        if transaction.kind().is_system_tx() {
1825            return Err(IotaError::UnsupportedFeature {
1826                error: "dry-exec does not support system transactions".to_string(),
1827            });
1828        }
1829
1830        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1831    }
1832
1833    #[allow(clippy::type_complexity)]
1834    pub fn dry_exec_transaction_for_benchmark(
1835        &self,
1836        transaction: TransactionData,
1837        transaction_digest: TransactionDigest,
1838    ) -> IotaResult<(
1839        DryRunTransactionBlockResponse,
1840        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1841        TransactionEffects,
1842        Option<ObjectID>,
1843    )> {
1844        let epoch_store = self.load_epoch_store_one_call_per_task();
1845        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1846    }
1847
1848    #[instrument(level = "trace", skip_all)]
1849    #[allow(clippy::type_complexity)]
1850    fn dry_exec_transaction_impl(
1851        &self,
1852        epoch_store: &AuthorityPerEpochStore,
1853        transaction: TransactionData,
1854        transaction_digest: TransactionDigest,
1855    ) -> IotaResult<(
1856        DryRunTransactionBlockResponse,
1857        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1858        TransactionEffects,
1859        Option<ObjectID>,
1860    )> {
1861        // Cheap validity checks for a transaction, including input size limits.
1862        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1863
1864        let input_object_kinds = transaction.input_objects()?;
1865        let receiving_object_refs = transaction.receiving_objects();
1866
1867        iota_transaction_checks::deny::check_transaction_for_signing(
1868            &transaction,
1869            &[],
1870            &input_object_kinds,
1871            &receiving_object_refs,
1872            &self.config.transaction_deny_config,
1873            self.get_backing_package_store().as_ref(),
1874        )?;
1875
1876        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1877            // We don't want to cache this transaction since it's a dry run.
1878            None,
1879            &input_object_kinds,
1880            &receiving_object_refs,
1881            epoch_store.epoch(),
1882        )?;
1883
1884        // make a gas object if one was not provided
1885        let mut transaction = transaction;
1886        let reference_gas_price = epoch_store.reference_gas_price();
1887        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1888            let sender = transaction.gas_owner();
1889            let gas_object_id = ObjectID::random();
1890            let gas_object = Object::new_move(
1891                MoveObject::new_gas_coin(
1892                    OBJECT_START_VERSION,
1893                    gas_object_id,
1894                    SIMULATION_GAS_COIN_VALUE,
1895                ),
1896                Owner::AddressOwner(sender),
1897                TransactionDigest::genesis_marker(),
1898            );
1899            let gas_object_ref = gas_object.compute_object_reference();
1900            // Add gas object to transaction gas payment
1901            transaction.gas_data_mut().payment = vec![gas_object_ref];
1902            (
1903                iota_transaction_checks::check_transaction_input_with_given_gas(
1904                    epoch_store.protocol_config(),
1905                    reference_gas_price,
1906                    &transaction,
1907                    input_objects,
1908                    receiving_objects,
1909                    gas_object,
1910                    &self.metrics.bytecode_verifier_metrics,
1911                    &self.config.verifier_signing_config,
1912                )?,
1913                Some(gas_object_id),
1914            )
1915        } else {
1916            // `MoveAuthenticator`s are not supported in dry runs, so we set the
1917            // `authenticator_gas_budget` to 0.
1918            let authenticator_gas_budget = 0;
1919
1920            (
1921                iota_transaction_checks::check_transaction_input(
1922                    epoch_store.protocol_config(),
1923                    reference_gas_price,
1924                    &transaction,
1925                    input_objects,
1926                    &receiving_objects,
1927                    &self.metrics.bytecode_verifier_metrics,
1928                    &self.config.verifier_signing_config,
1929                    authenticator_gas_budget,
1930                )?,
1931                None,
1932            )
1933        };
1934
1935        let protocol_config = epoch_store.protocol_config();
1936        let (kind, signer, gas_data) = transaction.execution_parts();
1937
1938        let silent = true;
1939        let executor = iota_execution::executor(protocol_config, silent, None)
1940            .expect("Creating an executor should not fail here");
1941
1942        let expensive_checks = false;
1943        let (inner_temp_store, _, effects, execution_error) = executor
1944            .execute_transaction_to_effects(
1945                self.get_backing_store().as_ref(),
1946                protocol_config,
1947                self.metrics.limits_metrics.clone(),
1948                expensive_checks,
1949                self.config.certificate_deny_config.certificate_deny_set(),
1950                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1951                epoch_store
1952                    .epoch_start_config()
1953                    .epoch_data()
1954                    .epoch_start_timestamp(),
1955                checked_input_objects,
1956                gas_data,
1957                gas_status,
1958                kind,
1959                signer,
1960                transaction_digest,
1961                &mut None,
1962            );
1963        let tx_digest = *effects.transaction_digest();
1964
1965        let module_cache =
1966            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1967
1968        let mut layout_resolver =
1969            epoch_store
1970                .executor()
1971                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1972                    &inner_temp_store,
1973                    self.get_backing_package_store(),
1974                )));
1975        // Returning empty vector here because we recalculate changes in the rpc layer.
1976        let object_changes = Vec::new();
1977
1978        // Returning empty vector here because we recalculate changes in the rpc layer.
1979        let balance_changes = Vec::new();
1980
1981        let written_with_kind = effects
1982            .created()
1983            .into_iter()
1984            .map(|(oref, _)| (oref, WriteKind::Create))
1985            .chain(
1986                effects
1987                    .unwrapped()
1988                    .into_iter()
1989                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1990            )
1991            .chain(
1992                effects
1993                    .mutated()
1994                    .into_iter()
1995                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
1996            )
1997            .map(|(oref, kind)| {
1998                let obj = inner_temp_store.written.get(&oref.0).unwrap();
1999                // TODO: Avoid clones.
2000                (oref.0, (oref, obj.clone(), kind))
2001            })
2002            .collect();
2003
2004        let execution_error_source = execution_error
2005            .as_ref()
2006            .err()
2007            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2008
2009        Ok((
2010            DryRunTransactionBlockResponse {
2011                // to avoid cloning `transaction`, fields are populated in this order
2012                suggested_gas_price: self
2013                    .congestion_tracker
2014                    .get_prediction_suggested_gas_price(&transaction),
2015                input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
2016                    .map_err(|e| IotaError::TransactionSerialization {
2017                        error: format!(
2018                            "Failed to convert transaction to IotaTransactionBlockData: {e}",
2019                        ),
2020                    })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
2021                effects: effects.clone().try_into()?,
2022                events: IotaTransactionBlockEvents::try_from(
2023                    inner_temp_store.events.clone(),
2024                    tx_digest,
2025                    None,
2026                    layout_resolver.as_mut(),
2027                )?,
2028                object_changes,
2029                balance_changes,
2030                execution_error_source,
2031            },
2032            written_with_kind,
2033            effects,
2034            mock_gas,
2035        ))
2036    }
2037
2038    pub fn simulate_transaction(
2039        &self,
2040        mut transaction: TransactionData,
2041        checks: VmChecks,
2042    ) -> IotaResult<SimulateTransactionResult> {
2043        if transaction.kind().is_system_tx() {
2044            return Err(IotaError::UnsupportedFeature {
2045                error: "simulate does not support system transactions".to_string(),
2046            });
2047        }
2048
2049        let epoch_store = self.load_epoch_store_one_call_per_task();
2050        if !self.is_fullnode(&epoch_store) {
2051            return Err(IotaError::UnsupportedFeature {
2052                error: "simulate is only supported on fullnodes".to_string(),
2053            });
2054        }
2055
2056        // Cheap validity checks for a transaction, including input size limits.
2057        // This does not check if gas objects are missing since we may create a
2058        // mock gas object. It checks for other transaction input validity.
2059        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2060
2061        let input_object_kinds = transaction.input_objects()?;
2062        let receiving_object_refs = transaction.receiving_objects();
2063
2064        // Since we need to simulate a validator signing the transaction, the first step
2065        // is to check if some transaction elements are denied.
2066        iota_transaction_checks::deny::check_transaction_for_signing(
2067            &transaction,
2068            &[],
2069            &input_object_kinds,
2070            &receiving_object_refs,
2071            &self.config.transaction_deny_config,
2072            self.get_backing_package_store().as_ref(),
2073        )?;
2074
2075        // Load input and receiving objects
2076        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2077            // We don't want to cache this transaction since it's a simulation.
2078            None,
2079            &input_object_kinds,
2080            &receiving_object_refs,
2081            epoch_store.epoch(),
2082        )?;
2083
2084        // Create a mock gas object if one was not provided
2085        let mock_gas_id = if transaction.gas().is_empty() {
2086            let mock_gas_object = Object::new_move(
2087                MoveObject::new_gas_coin(
2088                    OBJECT_START_VERSION,
2089                    ObjectID::MAX,
2090                    SIMULATION_GAS_COIN_VALUE,
2091                ),
2092                Owner::AddressOwner(transaction.gas_data().owner),
2093                TransactionDigest::genesis_marker(),
2094            );
2095            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2096            transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2097            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2098            Some(mock_gas_object.id())
2099        } else {
2100            None
2101        };
2102
2103        let protocol_config = epoch_store.protocol_config();
2104
2105        // `MoveAuthenticator`s are not supported in simulation, so we set the
2106        // `authenticator_gas_budget` to 0.
2107        let authenticator_gas_budget = 0;
2108
2109        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
2110        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
2111        let (gas_status, checked_input_objects) = if checks.enabled() {
2112            iota_transaction_checks::check_transaction_input(
2113                protocol_config,
2114                epoch_store.reference_gas_price(),
2115                &transaction,
2116                input_objects,
2117                &receiving_objects,
2118                &self.metrics.bytecode_verifier_metrics,
2119                &self.config.verifier_signing_config,
2120                authenticator_gas_budget,
2121            )?
2122        } else {
2123            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2124                protocol_config,
2125                transaction.kind(),
2126                input_objects,
2127                receiving_objects,
2128            )?;
2129            let gas_status = IotaGasStatus::new(
2130                transaction.gas_budget(),
2131                transaction.gas_price(),
2132                epoch_store.reference_gas_price(),
2133                protocol_config,
2134            )?;
2135
2136            (gas_status, checked_input_objects)
2137        };
2138
2139        // Create a new executor for the simulation
2140        let executor = iota_execution::executor(
2141            protocol_config,
2142            true, // silent
2143            None,
2144        )
2145        .expect("Creating an executor should not fail here");
2146
2147        // Execute the simulation
2148        let (kind, signer, gas_data) = transaction.execution_parts();
2149        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2150            self.get_backing_store().as_ref(),
2151            protocol_config,
2152            self.metrics.limits_metrics.clone(),
2153            false, // expensive_checks
2154            self.config.certificate_deny_config.certificate_deny_set(),
2155            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2156            epoch_store
2157                .epoch_start_config()
2158                .epoch_data()
2159                .epoch_start_timestamp(),
2160            checked_input_objects,
2161            gas_data,
2162            gas_status,
2163            kind,
2164            signer,
2165            transaction.digest(),
2166            checks.disabled(),
2167        );
2168
2169        // In the case of a dev inspect, the execution_result could be filled with some
2170        // values. Else, execution_result is empty in the case of a dry run.
2171        Ok(SimulateTransactionResult {
2172            input_objects: inner_temp_store.input_objects,
2173            output_objects: inner_temp_store.written,
2174            events: effects.events_digest().map(|_| inner_temp_store.events),
2175            effects,
2176            execution_result,
2177            suggested_gas_price: self
2178                .congestion_tracker
2179                .get_prediction_suggested_gas_price(&transaction),
2180            mock_gas_id,
2181        })
2182    }
2183
2184    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
2185    /// `VmChecks::DISABLED` instead.
2186    /// The object ID for gas can be any
2187    /// object ID, even for an uncreated object
2188    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2189    pub async fn dev_inspect_transaction_block(
2190        &self,
2191        sender: IotaAddress,
2192        transaction_kind: TransactionKind,
2193        gas_price: Option<u64>,
2194        gas_budget: Option<u64>,
2195        gas_sponsor: Option<IotaAddress>,
2196        gas_objects: Option<Vec<ObjectRef>>,
2197        show_raw_txn_data_and_effects: Option<bool>,
2198        skip_checks: Option<bool>,
2199    ) -> IotaResult<DevInspectResults> {
2200        let epoch_store = self.load_epoch_store_one_call_per_task();
2201
2202        if !self.is_fullnode(&epoch_store) {
2203            return Err(IotaError::UnsupportedFeature {
2204                error: "dev-inspect is only supported on fullnodes".to_string(),
2205            });
2206        }
2207
2208        if transaction_kind.is_system_tx() {
2209            return Err(IotaError::UnsupportedFeature {
2210                error: "system transactions are not supported".to_string(),
2211            });
2212        }
2213
2214        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2215        let skip_checks = skip_checks.unwrap_or(true);
2216        let reference_gas_price = epoch_store.reference_gas_price();
2217        let protocol_config = epoch_store.protocol_config();
2218        let max_tx_gas = protocol_config.max_tx_gas();
2219
2220        let price = gas_price.unwrap_or(reference_gas_price);
2221        let budget = gas_budget.unwrap_or(max_tx_gas);
2222        let owner = gas_sponsor.unwrap_or(sender);
2223        // Payment might be empty here, but it's fine we'll have to deal with it later
2224        // after reading all the input objects.
2225        let payment = gas_objects.unwrap_or_default();
2226        let mut transaction = TransactionData::V1(TransactionDataV1 {
2227            kind: transaction_kind.clone(),
2228            sender,
2229            gas_data: GasData {
2230                payment,
2231                owner,
2232                price,
2233                budget,
2234            },
2235            expiration: TransactionExpiration::None,
2236        });
2237
2238        let raw_txn_data = if show_raw_txn_data_and_effects {
2239            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2240                error: "Failed to serialize transaction during dev inspect".to_string(),
2241            })?
2242        } else {
2243            vec![]
2244        };
2245
2246        transaction.validity_check_no_gas_check(protocol_config)?;
2247
2248        let input_object_kinds = transaction.input_objects()?;
2249        let receiving_object_refs = transaction.receiving_objects();
2250
2251        iota_transaction_checks::deny::check_transaction_for_signing(
2252            &transaction,
2253            &[],
2254            &input_object_kinds,
2255            &receiving_object_refs,
2256            &self.config.transaction_deny_config,
2257            self.get_backing_package_store().as_ref(),
2258        )?;
2259
2260        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2261            // We don't want to cache this transaction since it's a dev inspect.
2262            None,
2263            &input_object_kinds,
2264            &receiving_object_refs,
2265            epoch_store.epoch(),
2266        )?;
2267
2268        let (gas_status, checked_input_objects) = if skip_checks {
2269            // If we are skipping checks, then we call the check_dev_inspect_input function
2270            // which will perform only lightweight checks on the transaction
2271            // input. And if the gas field is empty, that means we will
2272            // use the dummy gas object so we need to add it to the input objects vector.
2273            if transaction.gas().is_empty() {
2274                // Create and use a dummy gas object if there is no gas object provided.
2275                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2276                    SIMULATION_GAS_COIN_VALUE,
2277                    transaction.gas_owner(),
2278                );
2279                let gas_object_ref = dummy_gas_object.compute_object_reference();
2280                transaction.gas_data_mut().payment = vec![gas_object_ref];
2281                input_objects.push(ObjectReadResult::new(
2282                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2283                    dummy_gas_object.into(),
2284                ));
2285            }
2286            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2287                protocol_config,
2288                &transaction_kind,
2289                input_objects,
2290                receiving_objects,
2291            )?;
2292            let gas_status = IotaGasStatus::new(
2293                max_tx_gas,
2294                transaction.gas_price(),
2295                reference_gas_price,
2296                protocol_config,
2297            )?;
2298
2299            (gas_status, checked_input_objects)
2300        } else {
2301            // If we are not skipping checks, then we call the check_transaction_input
2302            // function and its dummy gas variant which will perform full
2303            // fledged checks just like a real transaction execution.
2304            if transaction.gas().is_empty() {
2305                // Create and use a dummy gas object if there is no gas object provided.
2306                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2307                    SIMULATION_GAS_COIN_VALUE,
2308                    transaction.gas_owner(),
2309                );
2310                let gas_object_ref = dummy_gas_object.compute_object_reference();
2311                transaction.gas_data_mut().payment = vec![gas_object_ref];
2312                iota_transaction_checks::check_transaction_input_with_given_gas(
2313                    epoch_store.protocol_config(),
2314                    reference_gas_price,
2315                    &transaction,
2316                    input_objects,
2317                    receiving_objects,
2318                    dummy_gas_object,
2319                    &self.metrics.bytecode_verifier_metrics,
2320                    &self.config.verifier_signing_config,
2321                )?
2322            } else {
2323                // `MoveAuthenticator`s are not supported in dev inspects, so we set the
2324                // `authenticator_gas_budget` to 0.
2325                let authenticator_gas_budget = 0;
2326
2327                iota_transaction_checks::check_transaction_input(
2328                    epoch_store.protocol_config(),
2329                    reference_gas_price,
2330                    &transaction,
2331                    input_objects,
2332                    &receiving_objects,
2333                    &self.metrics.bytecode_verifier_metrics,
2334                    &self.config.verifier_signing_config,
2335                    authenticator_gas_budget,
2336                )?
2337            }
2338        };
2339
2340        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2341            .expect("Creating an executor should not fail here");
2342        let gas_data = transaction.gas_data().clone();
2343        let intent_msg = IntentMessage::new(
2344            Intent {
2345                version: IntentVersion::V0,
2346                scope: IntentScope::TransactionData,
2347                app_id: IntentAppId::Iota,
2348            },
2349            transaction,
2350        );
2351        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2352        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2353            self.get_backing_store().as_ref(),
2354            protocol_config,
2355            self.metrics.limits_metrics.clone(),
2356            // expensive checks
2357            false,
2358            self.config.certificate_deny_config.certificate_deny_set(),
2359            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2360            epoch_store
2361                .epoch_start_config()
2362                .epoch_data()
2363                .epoch_start_timestamp(),
2364            checked_input_objects,
2365            gas_data,
2366            gas_status,
2367            transaction_kind,
2368            sender,
2369            transaction_digest,
2370            skip_checks,
2371        );
2372
2373        let raw_effects = if show_raw_txn_data_and_effects {
2374            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2375                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2376            })?
2377        } else {
2378            vec![]
2379        };
2380
2381        let mut layout_resolver =
2382            epoch_store
2383                .executor()
2384                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2385                    &inner_temp_store,
2386                    self.get_backing_package_store(),
2387                )));
2388
2389        DevInspectResults::new(
2390            effects,
2391            inner_temp_store.events.clone(),
2392            execution_result,
2393            raw_txn_data,
2394            raw_effects,
2395            layout_resolver.as_mut(),
2396        )
2397    }
2398
2399    // Only used for testing because of how epoch store is loaded.
2400    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2401        let epoch_store = self.epoch_store_for_testing();
2402        Ok(epoch_store.reference_gas_price())
2403    }
2404
2405    #[instrument(level = "trace", skip_all)]
2406    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2407        self.get_transaction_cache_reader()
2408            .try_is_tx_already_executed(digest)
2409    }
2410
2411    /// Non-fallible version of `try_is_tx_already_executed`.
2412    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2413        self.try_is_tx_already_executed(digest)
2414            .expect("storage access failed")
2415    }
2416
2417    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2418    #[instrument(level = "debug", skip_all, err)]
2419    fn index_tx(
2420        &self,
2421        indexes: &IndexStore,
2422        digest: &TransactionDigest,
2423        // TODO: index_tx really just need the transaction data here.
2424        cert: &VerifiedExecutableTransaction,
2425        effects: &TransactionEffects,
2426        events: &TransactionEvents,
2427        timestamp_ms: u64,
2428        tx_coins: Option<TxCoins>,
2429        written: &WrittenObjects,
2430        inner_temporary_store: &InnerTemporaryStore,
2431    ) -> IotaResult<u64> {
2432        let changes = self
2433            .process_object_index(effects, written, inner_temporary_store)
2434            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2435
2436        indexes.index_tx(
2437            cert.data().intent_message().value.sender(),
2438            cert.data()
2439                .intent_message()
2440                .value
2441                .input_objects()?
2442                .iter()
2443                .map(|o| o.object_id()),
2444            effects
2445                .all_changed_objects()
2446                .into_iter()
2447                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2448            cert.data()
2449                .intent_message()
2450                .value
2451                .move_calls()
2452                .into_iter()
2453                .map(|(package, module, function)| {
2454                    (*package, module.to_owned(), function.to_owned())
2455                }),
2456            events,
2457            changes,
2458            digest,
2459            timestamp_ms,
2460            tx_coins,
2461        )
2462    }
2463
2464    #[cfg(msim)]
2465    fn create_fail_state(
2466        &self,
2467        certificate: &VerifiedExecutableTransaction,
2468        epoch_store: &Arc<AuthorityPerEpochStore>,
2469        effects: &mut TransactionEffects,
2470    ) {
2471        use std::cell::RefCell;
2472        thread_local! {
2473            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2474        }
2475        if !certificate.data().intent_message().value.is_system_tx() {
2476            let committee = epoch_store.committee();
2477            let cur_stake = (**committee).weight(&self.name);
2478            if cur_stake > 0 {
2479                FAIL_STATE.with_borrow_mut(|fail_state| {
2480                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2481                    if fail_state.0 < committee.validity_threshold() {
2482                        fail_state.0 += cur_stake;
2483                        fail_state.1.insert(self.name);
2484                    }
2485
2486                    if fail_state.1.contains(&self.name) {
2487                        info!("cp_exec failing tx");
2488                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2489                    }
2490                });
2491            }
2492        }
2493    }
2494
2495    fn process_object_index(
2496        &self,
2497        effects: &TransactionEffects,
2498        written: &WrittenObjects,
2499        inner_temporary_store: &InnerTemporaryStore,
2500    ) -> IotaResult<ObjectIndexChanges> {
2501        let epoch_store = self.load_epoch_store_one_call_per_task();
2502        let mut layout_resolver =
2503            epoch_store
2504                .executor()
2505                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2506                    inner_temporary_store,
2507                    self.get_backing_package_store(),
2508                )));
2509
2510        let modified_at_version = effects
2511            .modified_at_versions()
2512            .into_iter()
2513            .collect::<HashMap<_, _>>();
2514
2515        let tx_digest = effects.transaction_digest();
2516        let mut deleted_owners = vec![];
2517        let mut deleted_dynamic_fields = vec![];
2518        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2519            let old_version = modified_at_version.get(&id).unwrap();
2520            // When we process the index, the latest object hasn't been written yet so
2521            // the old object must be present.
2522            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2523                |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2524            ) {
2525                Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2526                Owner::ObjectOwner(object_id) => {
2527                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2528                }
2529                _ => {}
2530            }
2531        }
2532
2533        let mut new_owners = vec![];
2534        let mut new_dynamic_fields = vec![];
2535
2536        for (oref, owner, kind) in effects.all_changed_objects() {
2537            let id = &oref.0;
2538            // For mutated objects, retrieve old owner and delete old index if there is a
2539            // owner change.
2540            if let WriteKind::Mutate = kind {
2541                let Some(old_version) = modified_at_version.get(id) else {
2542                    panic!(
2543                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2544                    );
2545                };
2546                // When we process the index, the latest object hasn't been written yet so
2547                // the old object must be present.
2548                let Some(old_object) = self
2549                    .get_object_store()
2550                    .try_get_object_by_key(id, *old_version)?
2551                else {
2552                    panic!(
2553                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2554                    );
2555                };
2556                if old_object.owner != owner {
2557                    match old_object.owner {
2558                        Owner::AddressOwner(addr) => {
2559                            deleted_owners.push((addr, *id));
2560                        }
2561                        Owner::ObjectOwner(object_id) => {
2562                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2563                        }
2564                        _ => {}
2565                    }
2566                }
2567            }
2568
2569            match owner {
2570                Owner::AddressOwner(addr) => {
2571                    // TODO: We can remove the object fetching after we added ObjectType to
2572                    // TransactionEffects
2573                    let new_object = written.get(id).unwrap_or_else(
2574                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2575                    );
2576                    assert_eq!(
2577                        new_object.version(),
2578                        oref.1,
2579                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2580                        tx_digest,
2581                        id,
2582                        new_object.version(),
2583                        oref.1
2584                    );
2585
2586                    let type_ = new_object
2587                        .type_()
2588                        .map(|type_| ObjectType::Struct(type_.clone()))
2589                        .unwrap_or(ObjectType::Package);
2590
2591                    new_owners.push((
2592                        (addr, *id),
2593                        ObjectInfo {
2594                            object_id: *id,
2595                            version: oref.1,
2596                            digest: oref.2,
2597                            type_,
2598                            owner,
2599                            previous_transaction: *effects.transaction_digest(),
2600                        },
2601                    ));
2602                }
2603                Owner::ObjectOwner(owner) => {
2604                    let new_object = written.get(id).unwrap_or_else(
2605                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2606                    );
2607                    assert_eq!(
2608                        new_object.version(),
2609                        oref.1,
2610                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2611                        tx_digest,
2612                        id,
2613                        new_object.version(),
2614                        oref.1
2615                    );
2616
2617                    let Some(df_info) = self
2618                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2619                        .unwrap_or_else(|e| {
2620                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2621                            None
2622                        }
2623                    )
2624                        else {
2625                            // Skip indexing for non dynamic field objects.
2626                            continue;
2627                        };
2628                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2629                }
2630                _ => {}
2631            }
2632        }
2633
2634        Ok(ObjectIndexChanges {
2635            deleted_owners,
2636            deleted_dynamic_fields,
2637            new_owners,
2638            new_dynamic_fields,
2639        })
2640    }
2641
2642    fn try_create_dynamic_field_info(
2643        &self,
2644        o: &Object,
2645        written: &WrittenObjects,
2646        resolver: &mut dyn LayoutResolver,
2647    ) -> IotaResult<Option<DynamicFieldInfo>> {
2648        // Skip if not a move object
2649        let Some(move_object) = o.data.try_as_move().cloned() else {
2650            return Ok(None);
2651        };
2652
2653        // We only index dynamic field objects
2654        if !move_object.type_().is_dynamic_field() {
2655            return Ok(None);
2656        }
2657
2658        let layout = resolver
2659            .get_annotated_layout(&move_object.type_().clone().into())?
2660            .into_layout();
2661
2662        let field =
2663            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2664                IotaError::ObjectDeserialization {
2665                    error: e.to_string(),
2666                }
2667            })?;
2668
2669        let type_ = field.kind;
2670        let name_type: TypeTag = field.name_layout.into();
2671        let bcs_name = field.name_bytes.to_owned();
2672
2673        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2674            .map_err(|e| {
2675                warn!("{e}");
2676                IotaError::ObjectDeserialization {
2677                    error: e.to_string(),
2678                }
2679            })?;
2680
2681        let name = DynamicFieldName {
2682            type_: name_type,
2683            value: IotaMoveValue::from(name_value).to_json_value(),
2684        };
2685
2686        let value_metadata = field.value_metadata().map_err(|e| {
2687            warn!("{e}");
2688            IotaError::ObjectDeserialization {
2689                error: e.to_string(),
2690            }
2691        })?;
2692
2693        Ok(Some(match value_metadata {
2694            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2695                name,
2696                bcs_name,
2697                type_,
2698                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2699                object_id: o.id(),
2700                version: o.version(),
2701                digest: o.digest(),
2702            },
2703
2704            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2705                // Find the actual object from storage using the object id obtained from the
2706                // wrapper.
2707
2708                // Try to find the object in the written objects first.
2709                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2710                    (
2711                        object.version(),
2712                        object.digest(),
2713                        object.data.type_().unwrap().clone(),
2714                    )
2715                } else {
2716                    // If not found, try to find it in the database.
2717                    let object = self
2718                        .get_object_store()
2719                        .try_get_object_by_key(&object_id, o.version())?
2720                        .ok_or_else(|| UserInputError::ObjectNotFound {
2721                            object_id,
2722                            version: Some(o.version()),
2723                        })?;
2724                    let version = object.version();
2725                    let digest = object.digest();
2726                    let object_type = object.data.type_().unwrap().clone();
2727                    (version, digest, object_type)
2728                };
2729
2730                DynamicFieldInfo {
2731                    name,
2732                    bcs_name,
2733                    type_,
2734                    object_type: object_type.to_string(),
2735                    object_id,
2736                    version,
2737                    digest,
2738                }
2739            }
2740        }))
2741    }
2742
2743    #[instrument(level = "trace", skip_all, err)]
2744    fn post_process_one_tx(
2745        &self,
2746        certificate: &VerifiedExecutableTransaction,
2747        effects: &TransactionEffects,
2748        inner_temporary_store: &InnerTemporaryStore,
2749        epoch_store: &Arc<AuthorityPerEpochStore>,
2750    ) -> IotaResult {
2751        if self.indexes.is_none() {
2752            return Ok(());
2753        }
2754
2755        let _scope = monitored_scope("Execution::post_process_one_tx");
2756
2757        let tx_digest = certificate.digest();
2758        let timestamp_ms = Self::unixtime_now_ms();
2759        let events = &inner_temporary_store.events;
2760        let written = &inner_temporary_store.written;
2761        let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
2762            effects,
2763            inner_temporary_store,
2764            epoch_store,
2765        );
2766
2767        // Index tx
2768        if let Some(indexes) = &self.indexes {
2769            let _ = self
2770                .index_tx(
2771                    indexes.as_ref(),
2772                    tx_digest,
2773                    certificate,
2774                    effects,
2775                    events,
2776                    timestamp_ms,
2777                    tx_coins,
2778                    written,
2779                    inner_temporary_store,
2780                )
2781                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2782                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2783                .expect("Indexing tx should not fail");
2784
2785            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2786            let events = self.make_transaction_block_events(
2787                events.clone(),
2788                *tx_digest,
2789                timestamp_ms,
2790                epoch_store,
2791                inner_temporary_store,
2792            )?;
2793            // Emit events
2794            self.subscription_handler
2795                .process_tx(certificate.data().transaction_data(), &effects, &events)
2796                .tap_ok(|_| {
2797                    self.metrics
2798                        .post_processing_total_tx_had_event_processed
2799                        .inc()
2800                })
2801                .tap_err(|e| {
2802                    warn!(
2803                        ?tx_digest,
2804                        "Post processing - Couldn't process events for tx: {}", e
2805                    )
2806                })?;
2807
2808            self.metrics
2809                .post_processing_total_events_emitted
2810                .inc_by(events.data.len() as u64);
2811        };
2812        Ok(())
2813    }
2814
2815    fn make_transaction_block_events(
2816        &self,
2817        transaction_events: TransactionEvents,
2818        digest: TransactionDigest,
2819        timestamp_ms: u64,
2820        epoch_store: &Arc<AuthorityPerEpochStore>,
2821        inner_temporary_store: &InnerTemporaryStore,
2822    ) -> IotaResult<IotaTransactionBlockEvents> {
2823        let mut layout_resolver =
2824            epoch_store
2825                .executor()
2826                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2827                    inner_temporary_store,
2828                    self.get_backing_package_store(),
2829                )));
2830        IotaTransactionBlockEvents::try_from(
2831            transaction_events,
2832            digest,
2833            Some(timestamp_ms),
2834            layout_resolver.as_mut(),
2835        )
2836    }
2837
2838    pub fn unixtime_now_ms() -> u64 {
2839        let now = SystemTime::now()
2840            .duration_since(UNIX_EPOCH)
2841            .expect("Time went backwards")
2842            .as_millis();
2843        u64::try_from(now).expect("Travelling in time machine")
2844    }
2845
2846    #[instrument(level = "trace", skip_all)]
2847    pub async fn handle_transaction_info_request(
2848        &self,
2849        request: TransactionInfoRequest,
2850    ) -> IotaResult<TransactionInfoResponse> {
2851        let epoch_store = self.load_epoch_store_one_call_per_task();
2852        let (transaction, status) = self
2853            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2854            .ok_or(IotaError::TransactionNotFound {
2855                digest: request.transaction_digest,
2856            })?;
2857        Ok(TransactionInfoResponse {
2858            transaction,
2859            status,
2860        })
2861    }
2862
2863    #[instrument(level = "trace", skip_all)]
2864    pub async fn handle_object_info_request(
2865        &self,
2866        request: ObjectInfoRequest,
2867    ) -> IotaResult<ObjectInfoResponse> {
2868        let epoch_store = self.load_epoch_store_one_call_per_task();
2869
2870        let requested_object_seq = match request.request_kind {
2871            ObjectInfoRequestKind::LatestObjectInfo => {
2872                let (_, seq, _) = self
2873                    .try_get_object_or_tombstone(request.object_id)
2874                    .await?
2875                    .ok_or_else(|| {
2876                        IotaError::from(UserInputError::ObjectNotFound {
2877                            object_id: request.object_id,
2878                            version: None,
2879                        })
2880                    })?;
2881                seq
2882            }
2883            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2884        };
2885
2886        let object = self
2887            .get_object_store()
2888            .try_get_object_by_key(&request.object_id, requested_object_seq)?
2889            .ok_or_else(|| {
2890                IotaError::from(UserInputError::ObjectNotFound {
2891                    object_id: request.object_id,
2892                    version: Some(requested_object_seq),
2893                })
2894            })?;
2895
2896        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2897            (request.generate_layout, object.data.try_as_move())
2898        {
2899            Some(into_struct_layout(
2900                epoch_store
2901                    .executor()
2902                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2903                    .get_annotated_layout(&move_obj.type_().clone().into())?,
2904            )?)
2905        } else {
2906            None
2907        };
2908
2909        let lock = if !object.is_address_owned() {
2910            // Only address owned objects have locks.
2911            None
2912        } else {
2913            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2914                .await?
2915                .map(|s| s.into_inner())
2916        };
2917
2918        Ok(ObjectInfoResponse {
2919            object,
2920            layout,
2921            lock_for_debugging: lock,
2922        })
2923    }
2924
2925    #[instrument(level = "trace", skip_all)]
2926    pub fn handle_checkpoint_request(
2927        &self,
2928        request: &CheckpointRequest,
2929    ) -> IotaResult<CheckpointResponse> {
2930        let summary = if request.certified {
2931            let summary = match request.sequence_number {
2932                Some(seq) => self
2933                    .checkpoint_store
2934                    .get_checkpoint_by_sequence_number(seq)?,
2935                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2936            }
2937            .map(|v| v.into_inner());
2938            summary.map(CheckpointSummaryResponse::Certified)
2939        } else {
2940            let summary = match request.sequence_number {
2941                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2942                None => self
2943                    .checkpoint_store
2944                    .get_latest_locally_computed_checkpoint()?,
2945            };
2946            summary.map(CheckpointSummaryResponse::Pending)
2947        };
2948        let contents = match &summary {
2949            Some(s) => self
2950                .checkpoint_store
2951                .get_checkpoint_contents(&s.content_digest())?,
2952            None => None,
2953        };
2954        Ok(CheckpointResponse {
2955            checkpoint: summary,
2956            contents,
2957        })
2958    }
2959
2960    fn check_protocol_version(
2961        supported_protocol_versions: SupportedProtocolVersions,
2962        current_version: ProtocolVersion,
2963    ) {
2964        info!("current protocol version is now {:?}", current_version);
2965        info!("supported versions are: {:?}", supported_protocol_versions);
2966        if !supported_protocol_versions.is_version_supported(current_version) {
2967            let msg = format!(
2968                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2969            );
2970
2971            error!("{}", msg);
2972            eprintln!("{msg}");
2973
2974            #[cfg(not(msim))]
2975            std::process::exit(1);
2976
2977            #[cfg(msim)]
2978            iota_simulator::task::shutdown_current_node();
2979        }
2980    }
2981
2982    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
2983    pub async fn new(
2984        name: AuthorityName,
2985        secret: StableSyncAuthoritySigner,
2986        supported_protocol_versions: SupportedProtocolVersions,
2987        store: Arc<AuthorityStore>,
2988        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2989        epoch_store: Arc<AuthorityPerEpochStore>,
2990        committee_store: Arc<CommitteeStore>,
2991        indexes: Option<Arc<IndexStore>>,
2992        rest_index: Option<Arc<RestIndexStore>>,
2993        checkpoint_store: Arc<CheckpointStore>,
2994        prometheus_registry: &Registry,
2995        genesis_objects: &[Object],
2996        db_checkpoint_config: &DBCheckpointConfig,
2997        config: NodeConfig,
2998        archive_readers: ArchiveReaderBalancer,
2999        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3000        chain_identifier: ChainIdentifier,
3001        pruner_db: Option<Arc<AuthorityPrunerTables>>,
3002    ) -> Arc<Self> {
3003        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3004
3005        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3006        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3007        let transaction_manager = Arc::new(TransactionManager::new(
3008            execution_cache_trait_pointers.object_cache_reader.clone(),
3009            execution_cache_trait_pointers
3010                .transaction_cache_reader
3011                .clone(),
3012            &epoch_store,
3013            tx_ready_certificates,
3014            metrics.clone(),
3015        ));
3016        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3017
3018        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3019            epoch_store.get_parent_path(),
3020            &config.authority_store_pruning_config,
3021        );
3022        let _pruner = AuthorityStorePruner::new(
3023            store.perpetual_tables.clone(),
3024            checkpoint_store.clone(),
3025            rest_index.clone(),
3026            indexes.clone(),
3027            config.authority_store_pruning_config.clone(),
3028            epoch_store.committee().authority_exists(&name),
3029            epoch_store.epoch_start_state().epoch_duration_ms(),
3030            prometheus_registry,
3031            archive_readers,
3032            pruner_db,
3033        );
3034        let input_loader =
3035            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3036        let epoch = epoch_store.epoch();
3037        let rgp = epoch_store.reference_gas_price();
3038        let state = Arc::new(AuthorityState {
3039            name,
3040            secret,
3041            execution_lock: RwLock::new(epoch),
3042            epoch_store: ArcSwap::new(epoch_store.clone()),
3043            input_loader,
3044            execution_cache_trait_pointers,
3045            indexes,
3046            rest_index,
3047            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3048            checkpoint_store,
3049            committee_store,
3050            transaction_manager,
3051            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3052            metrics,
3053            _pruner,
3054            _authority_per_epoch_pruner,
3055            db_checkpoint_config: db_checkpoint_config.clone(),
3056            config,
3057            overload_info: AuthorityOverloadInfo::default(),
3058            validator_tx_finalizer,
3059            chain_identifier,
3060            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3061        });
3062
3063        // Start a task to execute ready certificates.
3064        let authority_state = Arc::downgrade(&state);
3065        spawn_monitored_task!(execution_process(
3066            authority_state,
3067            rx_ready_certificates,
3068            rx_execution_shutdown,
3069        ));
3070        spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3071
3072        // TODO: This doesn't belong to the constructor of AuthorityState.
3073        state
3074            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3075            .expect("Error indexing genesis objects.");
3076
3077        state
3078    }
3079
3080    // TODO: Consolidate our traits to reduce the number of methods here.
3081    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3082        &self.execution_cache_trait_pointers.object_cache_reader
3083    }
3084
3085    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3086        &self.execution_cache_trait_pointers.transaction_cache_reader
3087    }
3088
3089    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3090        &self.execution_cache_trait_pointers.cache_writer
3091    }
3092
3093    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3094        &self.execution_cache_trait_pointers.backing_store
3095    }
3096
3097    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3098        &self.execution_cache_trait_pointers.backing_package_store
3099    }
3100
3101    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3102        &self.execution_cache_trait_pointers.object_store
3103    }
3104
3105    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3106        &self.execution_cache_trait_pointers.reconfig_api
3107    }
3108
3109    pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
3110        &self.execution_cache_trait_pointers.accumulator_store
3111    }
3112
3113    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3114        &self.execution_cache_trait_pointers.checkpoint_cache
3115    }
3116
3117    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3118        &self.execution_cache_trait_pointers.state_sync_store
3119    }
3120
3121    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3122        &self.execution_cache_trait_pointers.cache_commit
3123    }
3124
3125    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3126        self.execution_cache_trait_pointers
3127            .testing_api
3128            .database_for_testing()
3129    }
3130
3131    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3132        &self,
3133        config: NodeConfig,
3134        metrics: Arc<AuthorityStorePruningMetrics>,
3135    ) -> anyhow::Result<()> {
3136        let archive_readers =
3137            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3138        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3139            &self.database_for_testing().perpetual_tables,
3140            &self.checkpoint_store,
3141            self.rest_index.as_deref(),
3142            None,
3143            config.authority_store_pruning_config,
3144            metrics,
3145            archive_readers,
3146            EPOCH_DURATION_MS_FOR_TESTING,
3147        )
3148        .await
3149    }
3150
3151    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3152        &self.transaction_manager
3153    }
3154
3155    /// Adds transactions / certificates to transaction manager for ordered
3156    /// execution.
3157    pub fn enqueue_transactions_for_execution(
3158        &self,
3159        txns: Vec<VerifiedExecutableTransaction>,
3160        epoch_store: &Arc<AuthorityPerEpochStore>,
3161    ) {
3162        self.transaction_manager.enqueue(txns, epoch_store)
3163    }
3164
3165    /// Adds certificates to transaction manager for ordered execution.
3166    pub fn enqueue_certificates_for_execution(
3167        &self,
3168        certs: Vec<VerifiedCertificate>,
3169        epoch_store: &Arc<AuthorityPerEpochStore>,
3170    ) {
3171        self.transaction_manager
3172            .enqueue_certificates(certs, epoch_store)
3173    }
3174
3175    pub fn enqueue_with_expected_effects_digest(
3176        &self,
3177        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3178        epoch_store: &AuthorityPerEpochStore,
3179    ) {
3180        self.transaction_manager
3181            .enqueue_with_expected_effects_digest(certs, epoch_store)
3182    }
3183
3184    fn create_owner_index_if_empty(
3185        &self,
3186        genesis_objects: &[Object],
3187        epoch_store: &Arc<AuthorityPerEpochStore>,
3188    ) -> IotaResult {
3189        let Some(index_store) = &self.indexes else {
3190            return Ok(());
3191        };
3192        if !index_store.is_empty() {
3193            return Ok(());
3194        }
3195
3196        let mut new_owners = vec![];
3197        let mut new_dynamic_fields = vec![];
3198        let mut layout_resolver = epoch_store
3199            .executor()
3200            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3201        for o in genesis_objects.iter() {
3202            match o.owner {
3203                Owner::AddressOwner(addr) => new_owners.push((
3204                    (addr, o.id()),
3205                    ObjectInfo::new(&o.compute_object_reference(), o),
3206                )),
3207                Owner::ObjectOwner(object_id) => {
3208                    let id = o.id();
3209                    let info = match self.try_create_dynamic_field_info(
3210                        o,
3211                        &BTreeMap::new(),
3212                        layout_resolver.as_mut(),
3213                    ) {
3214                        Ok(Some(info)) => info,
3215                        Ok(None) => continue,
3216                        Err(IotaError::UserInput {
3217                            error:
3218                                UserInputError::ObjectNotFound {
3219                                    object_id: not_found_id,
3220                                    version,
3221                                },
3222                        }) => {
3223                            warn!(
3224                                ?not_found_id,
3225                                ?version,
3226                                object_owner=?object_id,
3227                                field=?id,
3228                                "Skipping dynamic field: referenced genesis object not found"
3229                            );
3230                            continue;
3231                        }
3232                        Err(e) => return Err(e),
3233                    };
3234                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3235                }
3236                _ => {}
3237            }
3238        }
3239
3240        index_store.insert_genesis_objects(ObjectIndexChanges {
3241            deleted_owners: vec![],
3242            deleted_dynamic_fields: vec![],
3243            new_owners,
3244            new_dynamic_fields,
3245        })
3246    }
3247
3248    /// Attempts to acquire execution lock for an executable transaction.
3249    /// Returns the lock if the transaction is matching current executed epoch
3250    /// Returns None otherwise
3251    pub fn execution_lock_for_executable_transaction(
3252        &self,
3253        transaction: &VerifiedExecutableTransaction,
3254    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3255        let lock = self
3256            .execution_lock
3257            .try_read()
3258            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3259        if *lock == transaction.auth_sig().epoch() {
3260            Ok(lock)
3261        } else {
3262            Err(IotaError::WrongEpoch {
3263                expected_epoch: *lock,
3264                actual_epoch: transaction.auth_sig().epoch(),
3265            })
3266        }
3267    }
3268
3269    /// Acquires the execution lock for the duration of a transaction signing
3270    /// request. This prevents reconfiguration from starting until we are
3271    /// finished handling the signing request. Otherwise, in-memory lock
3272    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3273    /// while we are attempting to acquire locks for the transaction.
3274    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3275        self.execution_lock
3276            .try_read()
3277            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3278    }
3279
3280    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3281        self.execution_lock.write().await
3282    }
3283
3284    #[instrument(level = "error", skip_all)]
3285    pub async fn reconfigure(
3286        &self,
3287        cur_epoch_store: &AuthorityPerEpochStore,
3288        supported_protocol_versions: SupportedProtocolVersions,
3289        new_committee: Committee,
3290        epoch_start_configuration: EpochStartConfiguration,
3291        accumulator: Arc<StateAccumulator>,
3292        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3293        epoch_supply_change: i64,
3294        epoch_last_checkpoint: CheckpointSequenceNumber,
3295    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3296        Self::check_protocol_version(
3297            supported_protocol_versions,
3298            epoch_start_configuration
3299                .epoch_start_state()
3300                .protocol_version(),
3301        );
3302        self.metrics.reset_on_reconfigure();
3303        self.committee_store.insert_new_committee(&new_committee)?;
3304
3305        // Wait until no transactions are being executed.
3306        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3307
3308        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3309        cur_epoch_store.epoch_terminated().await;
3310
3311        let highest_locally_built_checkpoint_seq = self
3312            .checkpoint_store
3313            .get_latest_locally_computed_checkpoint()?
3314            .map(|c| *c.sequence_number())
3315            .unwrap_or(0);
3316
3317        assert!(
3318            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3319            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3320        );
3321        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3322            // if we built the last checkpoint locally (as opposed to receiving it from a
3323            // peer), then all shared_version_assignments except the one for the
3324            // ChangeEpoch transaction should have been removed
3325            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3326            // Note that while 1 is the typical value, 0 is possible if the node restarts
3327            // after committing the last checkpoint but before reconfiguring.
3328            if num_shared_version_assignments > 1 {
3329                // If this happens in prod, we have a memory leak, but not a correctness issue.
3330                debug_fatal!(
3331                    "all shared_version_assignments should have been removed \
3332                    (num_shared_version_assignments: {num_shared_version_assignments})"
3333                );
3334            }
3335        }
3336
3337        // Safe to reconfigure now. No transactions are being executed,
3338        // and no epoch-specific tasks are running.
3339
3340        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3341        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3342        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3343            .await?;
3344        self.get_reconfig_api()
3345            .clear_state_end_of_epoch(&execution_lock);
3346        self.check_system_consistency(
3347            cur_epoch_store,
3348            accumulator,
3349            expensive_safety_check_config,
3350            epoch_supply_change,
3351        )?;
3352        self.get_reconfig_api()
3353            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3354        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3355            if self
3356                .db_checkpoint_config
3357                .perform_db_checkpoints_at_epoch_end
3358            {
3359                let checkpoint_indexes = self
3360                    .db_checkpoint_config
3361                    .perform_index_db_checkpoints_at_epoch_end
3362                    .unwrap_or(false);
3363                let current_epoch = cur_epoch_store.epoch();
3364                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3365                self.checkpoint_all_dbs(
3366                    &epoch_checkpoint_path,
3367                    cur_epoch_store,
3368                    checkpoint_indexes,
3369                )?;
3370            }
3371        }
3372
3373        self.get_reconfig_api()
3374            .reconfigure_cache(&epoch_start_configuration)
3375            .await;
3376
3377        let new_epoch = new_committee.epoch;
3378        let new_epoch_store = self
3379            .reopen_epoch_db(
3380                cur_epoch_store,
3381                new_committee,
3382                epoch_start_configuration,
3383                expensive_safety_check_config,
3384                epoch_last_checkpoint,
3385            )
3386            .await?;
3387        assert_eq!(new_epoch_store.epoch(), new_epoch);
3388        self.transaction_manager.reconfigure(new_epoch);
3389        *execution_lock = new_epoch;
3390        // drop execution_lock after epoch store was updated
3391        // see also assert in AuthorityState::process_certificate
3392        // on the epoch store and execution lock epoch match
3393        Ok(new_epoch_store)
3394    }
3395
3396    /// Advance the epoch store to the next epoch for testing only.
3397    /// This only manually sets all the places where we have the epoch number.
3398    /// It doesn't properly reconfigure the node, hence should be only used for
3399    /// testing.
3400    pub async fn reconfigure_for_testing(&self) {
3401        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3402        let epoch_store = self.epoch_store_for_testing().clone();
3403        let protocol_config = epoch_store.protocol_config().clone();
3404        // The current protocol config used in the epoch store may have been overridden
3405        // and diverged from the protocol config definitions. That override may
3406        // have now been dropped when the initial guard was dropped. We reapply
3407        // the override before creating the new epoch store, to make sure that
3408        // the new epoch store has the same protocol config as the current one.
3409        // Since this is for testing only, we mostly like to keep the protocol config
3410        // the same across epochs.
3411        let _guard =
3412            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3413        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3414            self.get_backing_package_store().clone(),
3415            self.get_object_store().clone(),
3416            &self.config.expensive_safety_check_config,
3417            self.checkpoint_store
3418                .get_epoch_last_checkpoint(epoch_store.epoch())
3419                .unwrap()
3420                .map(|c| *c.sequence_number())
3421                .unwrap_or_default(),
3422        );
3423        let new_epoch = new_epoch_store.epoch();
3424        self.transaction_manager.reconfigure(new_epoch);
3425        self.epoch_store.store(new_epoch_store);
3426        epoch_store.epoch_terminated().await;
3427        *execution_lock = new_epoch;
3428    }
3429
3430    #[instrument(level = "error", skip_all)]
3431    fn check_system_consistency(
3432        &self,
3433        cur_epoch_store: &AuthorityPerEpochStore,
3434        accumulator: Arc<StateAccumulator>,
3435        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3436        epoch_supply_change: i64,
3437    ) -> IotaResult<()> {
3438        info!(
3439            "Performing iota conservation consistency check for epoch {}",
3440            cur_epoch_store.epoch()
3441        );
3442
3443        if cfg!(debug_assertions) {
3444            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3445        }
3446
3447        self.get_reconfig_api()
3448            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3449
3450        // check for root state hash consistency with live object set
3451        if expensive_safety_check_config.enable_state_consistency_check() {
3452            info!(
3453                "Performing state consistency check for epoch {}",
3454                cur_epoch_store.epoch()
3455            );
3456            self.expensive_check_is_consistent_state(
3457                accumulator,
3458                cur_epoch_store,
3459                cfg!(debug_assertions), // panic in debug mode only
3460            );
3461        }
3462
3463        if expensive_safety_check_config.enable_secondary_index_checks() {
3464            if let Some(indexes) = self.indexes.clone() {
3465                verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3466                    .expect("secondary indexes are inconsistent");
3467            }
3468        }
3469
3470        Ok(())
3471    }
3472
3473    fn expensive_check_is_consistent_state(
3474        &self,
3475        accumulator: Arc<StateAccumulator>,
3476        cur_epoch_store: &AuthorityPerEpochStore,
3477        panic: bool,
3478    ) {
3479        let live_object_set_hash = accumulator.digest_live_object_set();
3480
3481        let root_state_hash: ECMHLiveObjectSetDigest = self
3482            .get_accumulator_store()
3483            .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3484            .expect("Retrieving root state hash cannot fail")
3485            .expect("Root state hash for epoch must exist")
3486            .1
3487            .digest()
3488            .into();
3489
3490        let is_inconsistent = root_state_hash != live_object_set_hash;
3491        if is_inconsistent {
3492            if panic {
3493                panic!(
3494                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3495                );
3496            } else {
3497                error!(
3498                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3499                    root_state_hash, live_object_set_hash
3500                );
3501            }
3502        } else {
3503            info!("State consistency check passed");
3504        }
3505
3506        if !panic {
3507            accumulator.set_inconsistent_state(is_inconsistent);
3508        }
3509    }
3510
3511    pub fn current_epoch_for_testing(&self) -> EpochId {
3512        self.epoch_store_for_testing().epoch()
3513    }
3514
3515    #[instrument(level = "error", skip_all)]
3516    pub fn checkpoint_all_dbs(
3517        &self,
3518        checkpoint_path: &Path,
3519        cur_epoch_store: &AuthorityPerEpochStore,
3520        checkpoint_indexes: bool,
3521    ) -> IotaResult {
3522        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3523        let current_epoch = cur_epoch_store.epoch();
3524
3525        if checkpoint_path.exists() {
3526            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3527            return Ok(());
3528        }
3529
3530        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3531        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3532
3533        if checkpoint_path_tmp.exists() {
3534            fs::remove_dir_all(&checkpoint_path_tmp)
3535                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3536        }
3537
3538        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3539        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3540
3541        // NOTE: Do not change the order of invoking these checkpoint calls
3542        // We want to snapshot checkpoint db first to not race with state sync
3543        self.checkpoint_store
3544            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3545
3546        self.get_reconfig_api()
3547            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3548
3549        self.committee_store
3550            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3551
3552        if checkpoint_indexes {
3553            if let Some(indexes) = self.indexes.as_ref() {
3554                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3555            }
3556            if let Some(rest_index) = self.rest_index.as_ref() {
3557                rest_index.checkpoint_db(&checkpoint_path_tmp.join("grpc_indexes"))?;
3558            }
3559        }
3560
3561        fs::rename(checkpoint_path_tmp, checkpoint_path)
3562            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3563        Ok(())
3564    }
3565
3566    /// Load the current epoch store. This can change during reconfiguration. To
3567    /// ensure that we never end up accessing different epoch stores in a
3568    /// single task, we need to make sure that this is called once per task.
3569    /// Each call needs to be carefully audited to ensure it is
3570    /// the case. This also means we should minimize the number of call-sites.
3571    /// Only call it when there is no way to obtain it from somewhere else.
3572    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3573        self.epoch_store.load()
3574    }
3575
3576    // Load the epoch store, should be used in tests only.
3577    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3578        self.load_epoch_store_one_call_per_task()
3579    }
3580
3581    pub fn clone_committee_for_testing(&self) -> Committee {
3582        Committee::clone(self.epoch_store_for_testing().committee())
3583    }
3584
3585    #[instrument(level = "trace", skip_all)]
3586    pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3587        self.get_object_store()
3588            .try_get_object(object_id)
3589            .map_err(Into::into)
3590    }
3591
3592    /// Non-fallible version of `try_get_object`.
3593    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3594        self.try_get_object(object_id)
3595            .await
3596            .expect("storage access failed")
3597    }
3598
3599    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3600        Ok(self
3601            .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3602            .await?
3603            .expect("framework object should always exist")
3604            .compute_object_reference())
3605    }
3606
3607    // This function is only used for testing.
3608    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3609        self.get_object_cache_reader()
3610            .try_get_iota_system_state_object_unsafe()
3611    }
3612
3613    #[instrument(level = "trace", skip_all)]
3614    pub fn get_checkpoint_by_sequence_number(
3615        &self,
3616        sequence_number: CheckpointSequenceNumber,
3617    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3618        Ok(self
3619            .checkpoint_store
3620            .get_checkpoint_by_sequence_number(sequence_number)?)
3621    }
3622
3623    #[instrument(level = "trace", skip_all)]
3624    pub fn get_transaction_checkpoint_for_tests(
3625        &self,
3626        digest: &TransactionDigest,
3627        epoch_store: &AuthorityPerEpochStore,
3628    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3629        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3630        let Some(checkpoint) = checkpoint else {
3631            return Ok(None);
3632        };
3633        let checkpoint = self
3634            .checkpoint_store
3635            .get_checkpoint_by_sequence_number(checkpoint)?;
3636        Ok(checkpoint)
3637    }
3638
3639    #[instrument(level = "trace", skip_all)]
3640    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3641        Ok(
3642            match self
3643                .get_object_cache_reader()
3644                .try_get_latest_object_or_tombstone(*object_id)?
3645            {
3646                Some((_, ObjectOrTombstone::Object(object))) => {
3647                    let layout = self.get_object_layout(&object)?;
3648                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3649                }
3650                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3651                None => ObjectRead::NotExists(*object_id),
3652            },
3653        )
3654    }
3655
3656    /// Chain Identifier is the digest of the genesis checkpoint.
3657    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3658        self.chain_identifier
3659    }
3660
3661    #[instrument(level = "trace", skip_all)]
3662    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3663    where
3664        T: DeserializeOwned,
3665    {
3666        let o = self.get_object_read(object_id)?.into_object()?;
3667        if let Some(move_object) = o.data.try_as_move() {
3668            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3669                IotaError::ObjectDeserialization {
3670                    error: format!("{e}"),
3671                }
3672            })?)
3673        } else {
3674            Err(IotaError::ObjectDeserialization {
3675                error: format!("Provided object : [{object_id}] is not a Move object."),
3676            })
3677        }
3678    }
3679
3680    /// This function aims to serve rpc reads on past objects and
3681    /// we don't expect it to be called for other purposes.
3682    /// Depending on the object pruning policies that will be enforced in the
3683    /// future there is no software-level guarantee/SLA to retrieve an object
3684    /// with an old version even if it exists/existed.
3685    #[instrument(level = "trace", skip_all)]
3686    pub fn get_past_object_read(
3687        &self,
3688        object_id: &ObjectID,
3689        version: SequenceNumber,
3690    ) -> IotaResult<PastObjectRead> {
3691        // Firstly we see if the object ever existed by getting its latest data
3692        let Some(obj_ref) = self
3693            .get_object_cache_reader()
3694            .try_get_latest_object_ref_or_tombstone(*object_id)?
3695        else {
3696            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3697        };
3698
3699        if version > obj_ref.1 {
3700            return Ok(PastObjectRead::VersionTooHigh {
3701                object_id: *object_id,
3702                asked_version: version,
3703                latest_version: obj_ref.1,
3704            });
3705        }
3706
3707        if version < obj_ref.1 {
3708            // Read past objects
3709            return Ok(match self.read_object_at_version(object_id, version)? {
3710                Some((object, layout)) => {
3711                    let obj_ref = object.compute_object_reference();
3712                    PastObjectRead::VersionFound(obj_ref, object, layout)
3713                }
3714
3715                None => PastObjectRead::VersionNotFound(*object_id, version),
3716            });
3717        }
3718
3719        if !obj_ref.2.is_alive() {
3720            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3721        }
3722
3723        match self.read_object_at_version(object_id, obj_ref.1)? {
3724            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3725            None => {
3726                error!(
3727                    "Object with in parent_entry is missing from object store, datastore is \
3728                     inconsistent",
3729                );
3730                Err(UserInputError::ObjectNotFound {
3731                    object_id: *object_id,
3732                    version: Some(obj_ref.1),
3733                }
3734                .into())
3735            }
3736        }
3737    }
3738
3739    #[instrument(level = "trace", skip_all)]
3740    fn read_object_at_version(
3741        &self,
3742        object_id: &ObjectID,
3743        version: SequenceNumber,
3744    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3745        let Some(object) = self
3746            .get_object_cache_reader()
3747            .try_get_object_by_key(object_id, version)?
3748        else {
3749            return Ok(None);
3750        };
3751
3752        let layout = self.get_object_layout(&object)?;
3753        Ok(Some((object, layout)))
3754    }
3755
3756    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3757        let layout = object
3758            .data
3759            .try_as_move()
3760            .map(|object| {
3761                into_struct_layout(
3762                    self.load_epoch_store_one_call_per_task()
3763                        .executor()
3764                        // TODO(cache) - must read through cache
3765                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3766                        .get_annotated_layout(&object.type_().clone().into())?,
3767                )
3768            })
3769            .transpose()?;
3770        Ok(layout)
3771    }
3772
3773    fn get_owner_at_version(
3774        &self,
3775        object_id: &ObjectID,
3776        version: SequenceNumber,
3777    ) -> IotaResult<Owner> {
3778        self.get_object_store()
3779            .try_get_object_by_key(object_id, version)?
3780            .ok_or_else(|| {
3781                IotaError::from(UserInputError::ObjectNotFound {
3782                    object_id: *object_id,
3783                    version: Some(version),
3784                })
3785            })
3786            .map(|o| o.owner)
3787    }
3788
3789    #[instrument(level = "trace", skip_all)]
3790    pub fn get_owner_objects(
3791        &self,
3792        owner: IotaAddress,
3793        // If `Some`, the query will start from the next item after the specified cursor
3794        cursor: Option<ObjectID>,
3795        limit: usize,
3796        filter: Option<IotaObjectDataFilter>,
3797    ) -> IotaResult<Vec<ObjectInfo>> {
3798        if let Some(indexes) = &self.indexes {
3799            indexes.get_owner_objects(owner, cursor, limit, filter)
3800        } else {
3801            Err(IotaError::IndexStoreNotAvailable)
3802        }
3803    }
3804
3805    #[instrument(level = "trace", skip_all)]
3806    pub fn get_owned_coins_iterator_with_cursor(
3807        &self,
3808        owner: IotaAddress,
3809        // If `Some`, the query will start from the next item after the specified cursor
3810        cursor: (String, ObjectID),
3811        limit: usize,
3812        one_coin_type_only: bool,
3813    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3814        if let Some(indexes) = &self.indexes {
3815            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3816        } else {
3817            Err(IotaError::IndexStoreNotAvailable)
3818        }
3819    }
3820
3821    #[instrument(level = "trace", skip_all)]
3822    pub fn get_owner_objects_iterator(
3823        &self,
3824        owner: IotaAddress,
3825        // If `Some`, the query will start from the next item after the specified cursor
3826        cursor: Option<ObjectID>,
3827        filter: Option<IotaObjectDataFilter>,
3828    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3829        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3830        if let Some(indexes) = &self.indexes {
3831            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3832        } else {
3833            Err(IotaError::IndexStoreNotAvailable)
3834        }
3835    }
3836
3837    #[instrument(level = "trace", skip_all)]
3838    pub async fn get_move_objects<T>(
3839        &self,
3840        owner: IotaAddress,
3841        type_: MoveObjectType,
3842    ) -> IotaResult<Vec<T>>
3843    where
3844        T: DeserializeOwned,
3845    {
3846        let object_ids = self
3847            .get_owner_objects_iterator(owner, None, None)?
3848            .filter(|o| match &o.type_ {
3849                ObjectType::Struct(s) => &type_ == s,
3850                ObjectType::Package => false,
3851            })
3852            .map(|info| ObjectKey(info.object_id, info.version))
3853            .collect::<Vec<_>>();
3854        let mut move_objects = vec![];
3855
3856        let objects = self
3857            .get_object_store()
3858            .try_multi_get_objects_by_key(&object_ids)?;
3859
3860        for (o, id) in objects.into_iter().zip(object_ids) {
3861            let object = o.ok_or_else(|| {
3862                IotaError::from(UserInputError::ObjectNotFound {
3863                    object_id: id.0,
3864                    version: Some(id.1),
3865                })
3866            })?;
3867            let move_object = object.data.try_as_move().ok_or_else(|| {
3868                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3869            })?;
3870            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3871                IotaError::ObjectDeserialization {
3872                    error: format!("{e}"),
3873                }
3874            })?);
3875        }
3876        Ok(move_objects)
3877    }
3878
3879    #[instrument(level = "trace", skip_all)]
3880    pub fn get_dynamic_fields(
3881        &self,
3882        owner: ObjectID,
3883        // If `Some`, the query will start from the next item after the specified cursor
3884        cursor: Option<ObjectID>,
3885        limit: usize,
3886    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3887        Ok(self
3888            .get_dynamic_fields_iterator(owner, cursor)?
3889            .take(limit)
3890            .collect::<Result<Vec<_>, _>>()?)
3891    }
3892
3893    fn get_dynamic_fields_iterator(
3894        &self,
3895        owner: ObjectID,
3896        // If `Some`, the query will start from the next item after the specified cursor
3897        cursor: Option<ObjectID>,
3898    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3899    {
3900        if let Some(indexes) = &self.indexes {
3901            indexes.get_dynamic_fields_iterator(owner, cursor)
3902        } else {
3903            Err(IotaError::IndexStoreNotAvailable)
3904        }
3905    }
3906
3907    #[instrument(level = "trace", skip_all)]
3908    pub fn get_dynamic_field_object_id(
3909        &self,
3910        owner: ObjectID,
3911        name_type: TypeTag,
3912        name_bcs_bytes: &[u8],
3913    ) -> IotaResult<Option<ObjectID>> {
3914        if let Some(indexes) = &self.indexes {
3915            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3916        } else {
3917            Err(IotaError::IndexStoreNotAvailable)
3918        }
3919    }
3920
3921    #[instrument(level = "trace", skip_all)]
3922    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3923        Ok(self.get_indexes()?.next_sequence_number())
3924    }
3925
3926    #[instrument(level = "trace", skip_all)]
3927    pub async fn get_executed_transaction_and_effects(
3928        &self,
3929        digest: TransactionDigest,
3930        kv_store: Arc<TransactionKeyValueStore>,
3931    ) -> IotaResult<(Transaction, TransactionEffects)> {
3932        let transaction = kv_store.get_tx(digest).await?;
3933        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3934        Ok((transaction, effects))
3935    }
3936
3937    #[instrument(level = "trace", skip_all)]
3938    pub fn multi_get_checkpoint_by_sequence_number(
3939        &self,
3940        sequence_numbers: &[CheckpointSequenceNumber],
3941    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3942        Ok(self
3943            .checkpoint_store
3944            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3945    }
3946
3947    #[instrument(level = "trace", skip_all)]
3948    pub fn get_transaction_events(
3949        &self,
3950        digest: &TransactionDigest,
3951    ) -> IotaResult<TransactionEvents> {
3952        self.get_transaction_cache_reader()
3953            .try_get_events(digest)?
3954            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3955    }
3956
3957    pub fn get_transaction_input_objects(
3958        &self,
3959        effects: &TransactionEffects,
3960    ) -> anyhow::Result<Vec<Object>> {
3961        iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
3962            .map_err(Into::into)
3963    }
3964
3965    pub fn get_transaction_output_objects(
3966        &self,
3967        effects: &TransactionEffects,
3968    ) -> anyhow::Result<Vec<Object>> {
3969        iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
3970            .map_err(Into::into)
3971    }
3972
3973    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3974        match &self.indexes {
3975            Some(i) => Ok(i.clone()),
3976            None => Err(IotaError::UnsupportedFeature {
3977                error: "extended object indexing is not enabled on this server".into(),
3978            }),
3979        }
3980    }
3981
3982    pub async fn get_transactions_for_tests(
3983        self: &Arc<Self>,
3984        filter: Option<TransactionFilter>,
3985        cursor: Option<TransactionDigest>,
3986        limit: Option<usize>,
3987        reverse: bool,
3988    ) -> IotaResult<Vec<TransactionDigest>> {
3989        let metrics = KeyValueStoreMetrics::new_for_tests();
3990        let kv_store = Arc::new(TransactionKeyValueStore::new(
3991            "rocksdb",
3992            metrics,
3993            self.clone(),
3994        ));
3995        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3996            .await
3997    }
3998
3999    #[instrument(level = "trace", skip_all)]
4000    pub async fn get_transactions(
4001        &self,
4002        kv_store: &Arc<TransactionKeyValueStore>,
4003        filter: Option<TransactionFilter>,
4004        // If `Some`, the query will start from the next item after the specified cursor
4005        cursor: Option<TransactionDigest>,
4006        limit: Option<usize>,
4007        reverse: bool,
4008    ) -> IotaResult<Vec<TransactionDigest>> {
4009        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4010            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4011            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4012            if reverse {
4013                let iter = iter
4014                    .rev()
4015                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4016                    .skip(usize::from(cursor.is_some()));
4017                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4018            } else {
4019                let iter = iter
4020                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4021                    .skip(usize::from(cursor.is_some()));
4022                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4023            }
4024        }
4025        self.get_indexes()?
4026            .get_transactions(filter, cursor, limit, reverse)
4027    }
4028
4029    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4030        &self.checkpoint_store
4031    }
4032
4033    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4034        self.get_checkpoint_store()
4035            .get_highest_executed_checkpoint_seq_number()?
4036            .ok_or(IotaError::UserInput {
4037                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4038            })
4039    }
4040
4041    #[cfg(msim)]
4042    pub fn get_highest_pruned_checkpoint_for_testing(
4043        &self,
4044    ) -> IotaResult<CheckpointSequenceNumber> {
4045        self.database_for_testing()
4046            .perpetual_tables
4047            .get_highest_pruned_checkpoint()
4048            .map(|c| c.unwrap_or(0))
4049            .map_err(Into::into)
4050    }
4051
4052    #[instrument(level = "trace", skip_all)]
4053    pub fn get_checkpoint_summary_by_sequence_number(
4054        &self,
4055        sequence_number: CheckpointSequenceNumber,
4056    ) -> IotaResult<CheckpointSummary> {
4057        let verified_checkpoint = self
4058            .get_checkpoint_store()
4059            .get_checkpoint_by_sequence_number(sequence_number)?;
4060        match verified_checkpoint {
4061            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4062            None => Err(IotaError::UserInput {
4063                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4064            }),
4065        }
4066    }
4067
4068    #[instrument(level = "trace", skip_all)]
4069    pub fn get_checkpoint_summary_by_digest(
4070        &self,
4071        digest: CheckpointDigest,
4072    ) -> IotaResult<CheckpointSummary> {
4073        let verified_checkpoint = self
4074            .get_checkpoint_store()
4075            .get_checkpoint_by_digest(&digest)?;
4076        match verified_checkpoint {
4077            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4078            None => Err(IotaError::UserInput {
4079                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4080            }),
4081        }
4082    }
4083
4084    #[instrument(level = "trace", skip_all)]
4085    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4086        if is_system_package(package_id) {
4087            return self.find_genesis_txn_digest();
4088        }
4089        Ok(self
4090            .get_object_read(&package_id)?
4091            .into_object()?
4092            .previous_transaction)
4093    }
4094
4095    #[instrument(level = "trace", skip_all)]
4096    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4097        let summary = self
4098            .get_verified_checkpoint_by_sequence_number(0)?
4099            .into_message();
4100        let content = self.get_checkpoint_contents(summary.content_digest)?;
4101        let genesis_transaction = content.enumerate_transactions(&summary).next();
4102        Ok(genesis_transaction
4103            .ok_or(IotaError::UserInput {
4104                error: UserInputError::GenesisTransactionNotFound,
4105            })?
4106            .1
4107            .transaction)
4108    }
4109
4110    #[instrument(level = "trace", skip_all)]
4111    pub fn get_verified_checkpoint_by_sequence_number(
4112        &self,
4113        sequence_number: CheckpointSequenceNumber,
4114    ) -> IotaResult<VerifiedCheckpoint> {
4115        let verified_checkpoint = self
4116            .get_checkpoint_store()
4117            .get_checkpoint_by_sequence_number(sequence_number)?;
4118        match verified_checkpoint {
4119            Some(verified_checkpoint) => Ok(verified_checkpoint),
4120            None => Err(IotaError::UserInput {
4121                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4122            }),
4123        }
4124    }
4125
4126    #[instrument(level = "trace", skip_all)]
4127    pub fn get_verified_checkpoint_summary_by_digest(
4128        &self,
4129        digest: CheckpointDigest,
4130    ) -> IotaResult<VerifiedCheckpoint> {
4131        let verified_checkpoint = self
4132            .get_checkpoint_store()
4133            .get_checkpoint_by_digest(&digest)?;
4134        match verified_checkpoint {
4135            Some(verified_checkpoint) => Ok(verified_checkpoint),
4136            None => Err(IotaError::UserInput {
4137                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4138            }),
4139        }
4140    }
4141
4142    #[instrument(level = "trace", skip_all)]
4143    pub fn get_checkpoint_contents(
4144        &self,
4145        digest: CheckpointContentsDigest,
4146    ) -> IotaResult<CheckpointContents> {
4147        self.get_checkpoint_store()
4148            .get_checkpoint_contents(&digest)?
4149            .ok_or(IotaError::UserInput {
4150                error: UserInputError::CheckpointContentsNotFound(digest),
4151            })
4152    }
4153
4154    #[instrument(level = "trace", skip_all)]
4155    pub fn get_checkpoint_contents_by_sequence_number(
4156        &self,
4157        sequence_number: CheckpointSequenceNumber,
4158    ) -> IotaResult<CheckpointContents> {
4159        let verified_checkpoint = self
4160            .get_checkpoint_store()
4161            .get_checkpoint_by_sequence_number(sequence_number)?;
4162        match verified_checkpoint {
4163            Some(verified_checkpoint) => {
4164                let content_digest = verified_checkpoint.into_inner().content_digest;
4165                self.get_checkpoint_contents(content_digest)
4166            }
4167            None => Err(IotaError::UserInput {
4168                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4169            }),
4170        }
4171    }
4172
4173    #[instrument(level = "trace", skip_all)]
4174    pub async fn query_events(
4175        &self,
4176        kv_store: &Arc<TransactionKeyValueStore>,
4177        query: EventFilter,
4178        // If `Some`, the query will start from the next item after the specified cursor
4179        cursor: Option<EventID>,
4180        limit: usize,
4181        descending: bool,
4182    ) -> IotaResult<Vec<IotaEvent>> {
4183        let index_store = self.get_indexes()?;
4184
4185        // Get the tx_num from tx_digest
4186        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4187            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4188                IotaError::TransactionNotFound {
4189                    digest: cursor.tx_digest,
4190                },
4191            )?;
4192            (tx_seq, cursor.event_seq as usize)
4193        } else if descending {
4194            (u64::MAX, usize::MAX)
4195        } else {
4196            (0, 0)
4197        };
4198
4199        let limit = limit + 1;
4200        let mut event_keys = match query {
4201            EventFilter::All(filters) => {
4202                if filters.is_empty() {
4203                    index_store.all_events(tx_num, event_num, limit, descending)?
4204                } else {
4205                    return Err(IotaError::UserInput {
4206                        error: UserInputError::Unsupported(
4207                            "This query type does not currently support filter combinations"
4208                                .to_string(),
4209                        ),
4210                    });
4211                }
4212            }
4213            EventFilter::Transaction(digest) => {
4214                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4215            }
4216            EventFilter::MoveModule { package, module } => {
4217                let module_id = ModuleId::new(package.into(), module);
4218                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4219            }
4220            EventFilter::MoveEventType(struct_name) => index_store
4221                .events_by_move_event_struct_name(
4222                    &struct_name,
4223                    tx_num,
4224                    event_num,
4225                    limit,
4226                    descending,
4227                )?,
4228            EventFilter::Sender(sender) => {
4229                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4230            }
4231            EventFilter::TimeRange {
4232                start_time,
4233                end_time,
4234            } => index_store
4235                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4236            EventFilter::MoveEventModule { package, module } => index_store
4237                .events_by_move_event_module(
4238                    &ModuleId::new(package.into(), module),
4239                    tx_num,
4240                    event_num,
4241                    limit,
4242                    descending,
4243                )?,
4244            // not using "_ =>" because we want to make sure we remember to add new variants here
4245            EventFilter::Package(_)
4246            | EventFilter::MoveEventField { .. }
4247            | EventFilter::Any(_)
4248            | EventFilter::And(_, _)
4249            | EventFilter::Or(_, _) => {
4250                return Err(IotaError::UserInput {
4251                    error: UserInputError::Unsupported(
4252                        "This query type is not supported by the full node.".to_string(),
4253                    ),
4254                });
4255            }
4256        };
4257
4258        // skip one event if exclusive cursor is provided,
4259        // otherwise truncate to the original limit.
4260        if cursor.is_some() {
4261            if !event_keys.is_empty() {
4262                event_keys.remove(0);
4263            }
4264        } else {
4265            event_keys.truncate(limit - 1);
4266        }
4267
4268        // get the unique set of digests from the event_keys
4269        let transaction_digests = event_keys
4270            .iter()
4271            .map(|(_, digest, _, _)| *digest)
4272            .collect::<HashSet<_>>()
4273            .into_iter()
4274            .collect::<Vec<_>>();
4275
4276        let events = kv_store
4277            .multi_get_events_by_tx_digests(&transaction_digests)
4278            .await?;
4279
4280        let events_map: HashMap<_, _> =
4281            transaction_digests.iter().zip(events.into_iter()).collect();
4282
4283        let stored_events = event_keys
4284            .into_iter()
4285            .map(|k| {
4286                (
4287                    k,
4288                    events_map
4289                        .get(&k.1)
4290                        .expect("fetched digest is missing")
4291                        .clone()
4292                        .and_then(|e| e.data.get(k.2).cloned()),
4293                )
4294            })
4295            .map(
4296                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4297                    event
4298                        .map(|e| (e, tx_digest, event_seq, timestamp))
4299                        .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4300                },
4301            )
4302            .collect::<Result<Vec<_>, _>>()?;
4303
4304        let epoch_store = self.load_epoch_store_one_call_per_task();
4305        let backing_store = self.get_backing_package_store().as_ref();
4306        let mut layout_resolver = epoch_store
4307            .executor()
4308            .type_layout_resolver(Box::new(backing_store));
4309        let mut events = vec![];
4310        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4311            events.push(IotaEvent::try_from(
4312                e.clone(),
4313                tx_digest,
4314                event_seq as u64,
4315                Some(timestamp),
4316                layout_resolver.get_annotated_layout(&e.type_)?,
4317            )?)
4318        }
4319        Ok(events)
4320    }
4321
4322    pub async fn insert_genesis_object(&self, object: Object) {
4323        self.get_reconfig_api()
4324            .try_insert_genesis_object(object)
4325            .expect("Cannot insert genesis object")
4326    }
4327
4328    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4329        futures::future::join_all(
4330            objects
4331                .iter()
4332                .map(|o| self.insert_genesis_object(o.clone())),
4333        )
4334        .await;
4335    }
4336
4337    /// Make a status response for a transaction
4338    #[instrument(level = "trace", skip_all)]
4339    pub fn get_transaction_status(
4340        &self,
4341        transaction_digest: &TransactionDigest,
4342        epoch_store: &Arc<AuthorityPerEpochStore>,
4343    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4344        // TODO: In the case of read path, we should not have to re-sign the effects.
4345        if let Some(effects) =
4346            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4347        {
4348            if let Some(transaction) = self
4349                .get_transaction_cache_reader()
4350                .try_get_transaction_block(transaction_digest)?
4351            {
4352                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4353                let events = if effects.events_digest().is_some() {
4354                    self.get_transaction_events(effects.transaction_digest())?
4355                } else {
4356                    TransactionEvents::default()
4357                };
4358                return Ok(Some((
4359                    (*transaction).clone().into_message(),
4360                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4361                )));
4362            } else {
4363                // The read of effects and read of transaction are not atomic. It's possible
4364                // that we reverted the transaction (during epoch change) in
4365                // between the above two reads, and we end up having effects but
4366                // not transaction. In this case, we just fall through.
4367                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4368            }
4369        }
4370        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4371            self.metrics.tx_already_processed.inc();
4372            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4373            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4374        } else {
4375            Ok(None)
4376        }
4377    }
4378
4379    /// Get the signed effects of the given transaction. If the effects was
4380    /// signed in a previous epoch, re-sign it so that the caller is able to
4381    /// form a cert of the effects in the current epoch.
4382    #[instrument(level = "trace", skip_all)]
4383    pub fn get_signed_effects_and_maybe_resign(
4384        &self,
4385        transaction_digest: &TransactionDigest,
4386        epoch_store: &Arc<AuthorityPerEpochStore>,
4387    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4388        let effects = self
4389            .get_transaction_cache_reader()
4390            .try_get_executed_effects(transaction_digest)?;
4391        match effects {
4392            Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4393            None => Ok(None),
4394        }
4395    }
4396
4397    #[instrument(level = "trace", skip_all)]
4398    pub(crate) fn sign_effects(
4399        &self,
4400        effects: TransactionEffects,
4401        epoch_store: &Arc<AuthorityPerEpochStore>,
4402    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4403        let tx_digest = *effects.transaction_digest();
4404        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4405            Some(sig) if sig.epoch == epoch_store.epoch() => {
4406                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4407            }
4408            _ => {
4409                // If the transaction was executed in previous epochs, the validator will
4410                // re-sign the effects with new current epoch so that a client is always able to
4411                // obtain an effects certificate at the current epoch.
4412                //
4413                // Why is this necessary? Consider the following case:
4414                // - assume there are 4 validators
4415                // - Quorum driver gets 2 signed effects before reconfig halt
4416                // - The tx makes it into final checkpoint.
4417                // - 2 validators go away and are replaced in the new epoch.
4418                // - The new epoch begins.
4419                // - The quorum driver cannot complete the partial effects cert from the
4420                //   previous epoch, because it may not be able to reach either of the 2 former
4421                //   validators.
4422                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4423                //   the new epoch, the QD can make a new effects cert and return it to the
4424                //   client.
4425                //
4426                // This is a considered a short-term workaround. Eventually, Quorum Driver
4427                // should be able to return either an effects certificate, -or-
4428                // a proof of inclusion in a checkpoint. In the case above, the
4429                // Quorum Driver would return a proof of inclusion in the final
4430                // checkpoint, and this code would no longer be necessary.
4431                debug!(
4432                    ?tx_digest,
4433                    epoch=?epoch_store.epoch(),
4434                    "Re-signing the effects with the current epoch"
4435                );
4436
4437                let sig = AuthoritySignInfo::new(
4438                    epoch_store.epoch(),
4439                    &effects,
4440                    Intent::iota_app(IntentScope::TransactionEffects),
4441                    self.name,
4442                    &*self.secret,
4443                );
4444
4445                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4446
4447                epoch_store.insert_effects_digest_and_signature(
4448                    &tx_digest,
4449                    effects.digest(),
4450                    &sig,
4451                )?;
4452
4453                effects
4454            }
4455        };
4456
4457        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4458            signed_effects,
4459        ))
4460    }
4461
4462    // Returns coin objects for indexing for fullnode if indexing is enabled.
4463    #[instrument(level = "trace", skip_all)]
4464    fn fullnode_only_get_tx_coins_for_indexing(
4465        &self,
4466        effects: &TransactionEffects,
4467        inner_temporary_store: &InnerTemporaryStore,
4468        epoch_store: &Arc<AuthorityPerEpochStore>,
4469    ) -> Option<TxCoins> {
4470        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4471            return None;
4472        }
4473        let written_coin_objects = inner_temporary_store
4474            .written
4475            .iter()
4476            .filter_map(|(k, v)| {
4477                if v.is_coin() {
4478                    Some((*k, v.clone()))
4479                } else {
4480                    None
4481                }
4482            })
4483            .collect();
4484        let mut input_coin_objects = inner_temporary_store
4485            .input_objects
4486            .iter()
4487            .filter_map(|(k, v)| {
4488                if v.is_coin() {
4489                    Some((*k, v.clone()))
4490                } else {
4491                    None
4492                }
4493            })
4494            .collect::<ObjectMap>();
4495
4496        // Check for receiving objects that were actually used and modified during
4497        // execution. Their updated version will already showup in
4498        // "written_coins" but their input isn't included in the set of input
4499        // objects in a inner_temporary_store.
4500        for (object_id, version) in effects.modified_at_versions() {
4501            if inner_temporary_store
4502                .loaded_runtime_objects
4503                .contains_key(&object_id)
4504            {
4505                if let Some(object) = self
4506                    .get_object_store()
4507                    .get_object_by_key(&object_id, version)
4508                {
4509                    if object.is_coin() {
4510                        input_coin_objects.insert(object_id, object);
4511                    }
4512                }
4513            }
4514        }
4515
4516        Some((input_coin_objects, written_coin_objects))
4517    }
4518
4519    /// Get the TransactionEnvelope that currently locks the given object, if
4520    /// any. Since object locks are only valid for one epoch, we also need
4521    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4522    /// no lock records for the given object can be found.
4523    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4524    /// object record is at a different version.
4525    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4526    /// certain transaction. Returns None if the a lock record is
4527    /// initialized for the given ObjectRef but not yet locked by any
4528    /// transaction,     or cannot find the transaction in transaction
4529    /// table, because of data race etc.
4530    #[instrument(level = "trace", skip_all)]
4531    pub async fn get_transaction_lock(
4532        &self,
4533        object_ref: &ObjectRef,
4534        epoch_store: &AuthorityPerEpochStore,
4535    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4536        let lock_info = self
4537            .get_object_cache_reader()
4538            .try_get_lock(*object_ref, epoch_store)?;
4539        let lock_info = match lock_info {
4540            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4541                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4542                    provided_obj_ref: *object_ref,
4543                    current_version: locked_ref.1,
4544                }
4545                .into());
4546            }
4547            ObjectLockStatus::Initialized => {
4548                return Ok(None);
4549            }
4550            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4551        };
4552
4553        epoch_store.get_signed_transaction(&lock_info)
4554    }
4555
4556    pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4557        self.get_object_cache_reader().try_get_objects(objects)
4558    }
4559
4560    /// Non-fallible version of `try_get_objects`.
4561    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4562        self.try_get_objects(objects)
4563            .await
4564            .expect("storage access failed")
4565    }
4566
4567    pub async fn try_get_object_or_tombstone(
4568        &self,
4569        object_id: ObjectID,
4570    ) -> IotaResult<Option<ObjectRef>> {
4571        self.get_object_cache_reader()
4572            .try_get_latest_object_ref_or_tombstone(object_id)
4573    }
4574
4575    /// Non-fallible version of `try_get_object_or_tombstone`.
4576    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4577        self.try_get_object_or_tombstone(object_id)
4578            .await
4579            .expect("storage access failed")
4580    }
4581
4582    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4583    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4584    /// upgrade.
4585    ///
4586    /// This method can be used to dynamic adjust the amount of buffer. If set
4587    /// to 0, the upgrade will go through with only 2f+1 votes.
4588    ///
4589    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4590    /// should have the same value), or you risk halting the chain.
4591    pub fn set_override_protocol_upgrade_buffer_stake(
4592        &self,
4593        expected_epoch: EpochId,
4594        buffer_stake_bps: u64,
4595    ) -> IotaResult {
4596        let epoch_store = self.load_epoch_store_one_call_per_task();
4597        let actual_epoch = epoch_store.epoch();
4598        if actual_epoch != expected_epoch {
4599            return Err(IotaError::WrongEpoch {
4600                expected_epoch,
4601                actual_epoch,
4602            });
4603        }
4604
4605        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4606    }
4607
4608    pub fn clear_override_protocol_upgrade_buffer_stake(
4609        &self,
4610        expected_epoch: EpochId,
4611    ) -> IotaResult {
4612        let epoch_store = self.load_epoch_store_one_call_per_task();
4613        let actual_epoch = epoch_store.epoch();
4614        if actual_epoch != expected_epoch {
4615            return Err(IotaError::WrongEpoch {
4616                expected_epoch,
4617                actual_epoch,
4618            });
4619        }
4620
4621        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4622    }
4623
4624    /// Get the set of system packages that are compiled in to this build, if
4625    /// those packages are compatible with the current versions of those
4626    /// packages on-chain.
4627    pub async fn get_available_system_packages(
4628        &self,
4629        binary_config: &BinaryConfig,
4630    ) -> Vec<ObjectRef> {
4631        let mut results = vec![];
4632
4633        let system_packages = BuiltInFramework::iter_system_packages();
4634
4635        // Add extra framework packages during simtest
4636        #[cfg(msim)]
4637        let extra_packages = framework_injection::get_extra_packages(self.name);
4638        #[cfg(msim)]
4639        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4640
4641        for system_package in system_packages {
4642            let modules = system_package.modules().to_vec();
4643            // In simtests, we could override the current built-in framework packages.
4644            #[cfg(msim)]
4645            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4646                .unwrap_or(modules);
4647
4648            let Some(obj_ref) = iota_framework::compare_system_package(
4649                &self.get_object_store(),
4650                &system_package.id,
4651                &modules,
4652                system_package.dependencies.to_vec(),
4653                binary_config,
4654            )
4655            .await
4656            else {
4657                return vec![];
4658            };
4659            results.push(obj_ref);
4660        }
4661
4662        results
4663    }
4664
4665    /// Return the new versions, module bytes, and dependencies for the packages
4666    /// that have been committed to for a framework upgrade, in
4667    /// `system_packages`.  Loads the module contents from the binary, and
4668    /// performs the following checks:
4669    ///
4670    /// - Whether its contents matches what is on-chain already, in which case
4671    ///   no upgrade is required, and its contents are omitted from the output.
4672    /// - Whether the contents in the binary can form a package whose digest
4673    ///   matches the input, meaning the framework will be upgraded, and this
4674    ///   authority can satisfy that upgrade, in which case the contents are
4675    ///   included in the output.
4676    ///
4677    /// If the current version of the framework can't be loaded, the binary does
4678    /// not contain the bytes for that framework ID, or the resulting
4679    /// package fails the digest check, `None` is returned indicating that
4680    /// this authority cannot run the upgrade that the network voted on.
4681    async fn get_system_package_bytes(
4682        &self,
4683        system_packages: Vec<ObjectRef>,
4684        binary_config: &BinaryConfig,
4685    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4686        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4687        let objects = self.get_objects(&ids).await;
4688
4689        let mut res = Vec::with_capacity(system_packages.len());
4690        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4691            let prev_transaction = match object {
4692                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4693                    // Skip this one because it doesn't need to be upgraded.
4694                    info!("Framework {} does not need updating", system_package_ref.0);
4695                    continue;
4696                }
4697
4698                Some(cur_object) => cur_object.previous_transaction,
4699                None => TransactionDigest::genesis_marker(),
4700            };
4701
4702            #[cfg(msim)]
4703            let SystemPackage {
4704                id: _,
4705                bytes,
4706                dependencies,
4707            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4708                .unwrap_or_else(|| {
4709                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4710                });
4711
4712            #[cfg(not(msim))]
4713            let SystemPackage {
4714                id: _,
4715                bytes,
4716                dependencies,
4717            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4718
4719            let modules: Vec<_> = bytes
4720                .iter()
4721                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4722                .collect();
4723
4724            let new_object = Object::new_system_package(
4725                &modules,
4726                system_package_ref.1,
4727                dependencies.clone(),
4728                prev_transaction,
4729            );
4730
4731            let new_ref = new_object.compute_object_reference();
4732            if new_ref != system_package_ref {
4733                error!(
4734                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4735                );
4736                return None;
4737            }
4738
4739            res.push((system_package_ref.1, bytes, dependencies));
4740        }
4741
4742        Some(res)
4743    }
4744
4745    /// Returns the new protocol version and system packages that the network
4746    /// has voted to upgrade to. If the proposed protocol version is not
4747    /// supported, None is returned.
4748    fn is_protocol_version_supported_v1(
4749        proposed_protocol_version: ProtocolVersion,
4750        committee: &Committee,
4751        capabilities: Vec<AuthorityCapabilitiesV1>,
4752        mut buffer_stake_bps: u64,
4753    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4754        if buffer_stake_bps > 10000 {
4755            warn!("clamping buffer_stake_bps to 10000");
4756            buffer_stake_bps = 10000;
4757        }
4758
4759        // For each validator, gather the protocol version and system packages that it
4760        // would like to upgrade to in the next epoch.
4761        let mut desired_upgrades: Vec<_> = capabilities
4762            .into_iter()
4763            .filter_map(|mut cap| {
4764                // A validator that lists no packages is voting against any change at all.
4765                if cap.available_system_packages.is_empty() {
4766                    return None;
4767                }
4768
4769                cap.available_system_packages.sort();
4770
4771                info!(
4772                    "validator {:?} supports {:?} with system packages: {:?}",
4773                    cap.authority.concise(),
4774                    cap.supported_protocol_versions,
4775                    cap.available_system_packages,
4776                );
4777
4778                // A validator that only supports the current protocol version is also voting
4779                // against any change, because framework upgrades always require a protocol
4780                // version bump.
4781                cap.supported_protocol_versions
4782                    .get_version_digest(proposed_protocol_version)
4783                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4784            })
4785            .collect();
4786
4787        // There can only be one set of votes that have a majority, find one if it
4788        // exists.
4789        desired_upgrades.sort();
4790        desired_upgrades
4791            .into_iter()
4792            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4793            .into_iter()
4794            .find_map(|((digest, packages), group)| {
4795                // should have been filtered out earlier.
4796                assert!(!packages.is_empty());
4797
4798                let mut stake_aggregator: StakeAggregator<(), true> =
4799                    StakeAggregator::new(Arc::new(committee.clone()));
4800
4801                for (_, _, authority) in group {
4802                    stake_aggregator.insert_generic(authority, ());
4803                }
4804
4805                let total_votes = stake_aggregator.total_votes();
4806                let quorum_threshold = committee.quorum_threshold();
4807                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4808
4809                info!(
4810                    protocol_config_digest = ?digest,
4811                    ?total_votes,
4812                    ?quorum_threshold,
4813                    ?buffer_stake_bps,
4814                    ?effective_threshold,
4815                    ?proposed_protocol_version,
4816                    ?packages,
4817                    "support for upgrade"
4818                );
4819
4820                let has_support = total_votes >= effective_threshold;
4821                has_support.then_some((proposed_protocol_version, digest, packages))
4822            })
4823    }
4824
4825    /// Selects the highest supported protocol version and system packages that
4826    /// the network has voted to upgrade to. If no upgrade is supported,
4827    /// returns the current protocol version and system packages.
4828    fn choose_protocol_version_and_system_packages_v1(
4829        current_protocol_version: ProtocolVersion,
4830        current_protocol_digest: Digest,
4831        committee: &Committee,
4832        capabilities: Vec<AuthorityCapabilitiesV1>,
4833        buffer_stake_bps: u64,
4834    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4835        let mut next_protocol_version = current_protocol_version;
4836        let mut system_packages = vec![];
4837        let mut protocol_version_digest = current_protocol_digest;
4838
4839        // Finds the highest supported protocol version and system packages by
4840        // incrementing the proposed protocol version by one until no further
4841        // upgrades are supported.
4842        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4843            next_protocol_version + 1,
4844            committee,
4845            capabilities.clone(),
4846            buffer_stake_bps,
4847        ) {
4848            next_protocol_version = version;
4849            protocol_version_digest = digest;
4850            system_packages = packages;
4851        }
4852
4853        (
4854            next_protocol_version,
4855            protocol_version_digest,
4856            system_packages,
4857        )
4858    }
4859
4860    /// Returns the indices of validators that support the given protocol
4861    /// version and digest. This includes both committee and non-committee
4862    /// validators based on their capabilities. Uses active validators
4863    /// instead of committee indices.
4864    fn get_validators_supporting_protocol_version(
4865        target_protocol_version: ProtocolVersion,
4866        target_digest: Digest,
4867        active_validators: &[AuthorityPublicKey],
4868        capabilities: &[AuthorityCapabilitiesV1],
4869    ) -> Vec<u64> {
4870        let mut eligible_validators = Vec::new();
4871
4872        for capability in capabilities {
4873            // Check if this validator supports the target protocol version and digest
4874            if let Some(digest) = capability
4875                .supported_protocol_versions
4876                .get_version_digest(target_protocol_version)
4877            {
4878                if digest == target_digest {
4879                    // Find the validator's index in the active validators list
4880                    if let Some(index) = active_validators
4881                        .iter()
4882                        .position(|name| AuthorityName::from(name) == capability.authority)
4883                    {
4884                        eligible_validators.push(index as u64);
4885                    }
4886                }
4887            }
4888        }
4889
4890        // Sort indices for deterministic behavior
4891        eligible_validators.sort();
4892        eligible_validators
4893    }
4894
4895    /// Calculates the sum of weights for eligible validators that are part of
4896    /// the committee. Takes the indices from
4897    /// get_validators_supporting_protocol_version and maps them back
4898    /// to committee members to get their weights.
4899    fn calculate_eligible_validators_weight(
4900        eligible_validator_indices: &[u64],
4901        active_validators: &[AuthorityPublicKey],
4902        committee: &Committee,
4903    ) -> u64 {
4904        let mut total_weight = 0u64;
4905
4906        for &index in eligible_validator_indices {
4907            let authority_pubkey = &active_validators[index as usize];
4908            // Check if this validator is in the committee and get their weight
4909            if let Some((_, weight)) = committee
4910                .members()
4911                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4912            {
4913                total_weight += weight;
4914            }
4915        }
4916
4917        total_weight
4918    }
4919
4920    #[instrument(level = "debug", skip_all)]
4921    fn create_authenticator_state_tx(
4922        &self,
4923        epoch_store: &Arc<AuthorityPerEpochStore>,
4924    ) -> Option<EndOfEpochTransactionKind> {
4925        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4926            info!("authenticator state transactions not enabled");
4927            return None;
4928        }
4929
4930        let authenticator_state_exists = epoch_store.authenticator_state_exists();
4931        let tx = if authenticator_state_exists {
4932            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4933            let min_epoch =
4934                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4935            let authenticator_obj_initial_shared_version = epoch_store
4936                .epoch_start_config()
4937                .authenticator_obj_initial_shared_version()
4938                .expect("initial version must exist");
4939
4940            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4941                min_epoch,
4942                authenticator_obj_initial_shared_version,
4943            );
4944
4945            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4946
4947            tx
4948        } else {
4949            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4950            info!("Creating AuthenticatorStateCreate tx");
4951            tx
4952        };
4953        Some(tx)
4954    }
4955
4956    /// Creates and execute the advance epoch transaction to effects without
4957    /// committing it to the database. The effects of the change epoch tx
4958    /// are only written to the database after a certified checkpoint has been
4959    /// formed and executed by CheckpointExecutor.
4960    ///
4961    /// When a framework upgraded has been decided on, but the validator does
4962    /// not have the new versions of the packages locally, the validator
4963    /// cannot form the ChangeEpochTx. In this case it returns Err,
4964    /// indicating that the checkpoint builder should give up trying to make the
4965    /// final checkpoint. As long as the network is able to create a certified
4966    /// checkpoint (which should be ensured by the capabilities vote), it
4967    /// will arrive via state sync and be executed by CheckpointExecutor.
4968    #[instrument(level = "error", skip_all)]
4969    pub async fn create_and_execute_advance_epoch_tx(
4970        &self,
4971        epoch_store: &Arc<AuthorityPerEpochStore>,
4972        gas_cost_summary: &GasCostSummary,
4973        checkpoint: CheckpointSequenceNumber,
4974        epoch_start_timestamp_ms: CheckpointTimestamp,
4975        scores: Vec<u64>,
4976    ) -> anyhow::Result<(
4977        IotaSystemState,
4978        Option<SystemEpochInfoEvent>,
4979        TransactionEffects,
4980    )> {
4981        let mut txns = Vec::new();
4982
4983        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4984            txns.push(tx);
4985        }
4986
4987        let next_epoch = epoch_store.epoch() + 1;
4988
4989        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4990        let authority_capabilities = epoch_store
4991            .get_capabilities_v1()
4992            .expect("read capabilities from db cannot fail");
4993        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4994            Self::choose_protocol_version_and_system_packages_v1(
4995                epoch_store.protocol_version(),
4996                SupportedProtocolVersionsWithHashes::protocol_config_digest(
4997                    epoch_store.protocol_config(),
4998                ),
4999                epoch_store.committee(),
5000                authority_capabilities.clone(),
5001                buffer_stake_bps,
5002            );
5003
5004        // since system packages are created during the current epoch, they should abide
5005        // by the rules of the current epoch, including the current epoch's max
5006        // Move binary format version
5007        let config = epoch_store.protocol_config();
5008        let binary_config = to_binary_config(config);
5009        let Some(next_epoch_system_package_bytes) = self
5010            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5011            .await
5012        else {
5013            error!(
5014                "upgraded system packages {:?} are not locally available, cannot create \
5015                ChangeEpochTx. validator binary must be upgraded to the correct version!",
5016                next_epoch_system_packages
5017            );
5018            // the checkpoint builder will keep retrying forever when it hits this error.
5019            // Eventually, one of two things will happen:
5020            // - The operator will upgrade this binary to one that has the new packages
5021            //   locally, and this function will succeed.
5022            // - The final checkpoint will be certified by other validators, we will receive
5023            //   it via state sync, and execute it. This will upgrade the framework
5024            //   packages, reconfigure, and most likely shut down in the new epoch (this
5025            //   validator likely doesn't support the new protocol version, or else it
5026            //   should have had the packages.)
5027            bail!("missing system packages: cannot form ChangeEpochTx");
5028        };
5029
5030        // Use ChangeEpochV3 or ChangeEpochV4 when the feature flags are enabled and
5031        // ChangeEpochV2 requirements are met
5032        if config.select_committee_from_eligible_validators() {
5033            // Get the list of eligible validators that support the target protocol version
5034            let active_validators = epoch_store.epoch_start_state().get_active_validators();
5035
5036            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5037
5038            // Use validators supporting the target protocol version as eligible validators
5039            // in the next version if select_committee_supporting_next_epoch_version feature
5040            // flag is set to true.
5041            if config.select_committee_supporting_next_epoch_version() {
5042                eligible_active_validators = Self::get_validators_supporting_protocol_version(
5043                    next_epoch_protocol_version,
5044                    next_epoch_protocol_digest,
5045                    &active_validators,
5046                    &authority_capabilities,
5047                );
5048
5049                // Calculate the total weight of eligible validators in the committee
5050                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5051                    &eligible_active_validators,
5052                    &active_validators,
5053                    epoch_store.committee(),
5054                );
5055
5056                // Safety check: ensure eligible validators have enough stake
5057                // Use the same effective threshold calculation that was used to decide the
5058                // protocol version
5059                let committee = epoch_store.committee();
5060                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5061
5062                if eligible_validators_weight < effective_threshold {
5063                    error!(
5064                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5065                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5066                    );
5067                    // Pass all active validator indices as eligible validators
5068                    // to perform selection among all of them.
5069                    eligible_active_validators = (0..active_validators.len() as u64).collect();
5070                }
5071            }
5072
5073            // Use ChangeEpochV4 when the pass_validator_scores_to_advance_epoch feature
5074            // flag is enabled.
5075            if config.pass_validator_scores_to_advance_epoch() {
5076                txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5077                    next_epoch,
5078                    next_epoch_protocol_version,
5079                    gas_cost_summary.storage_cost,
5080                    gas_cost_summary.computation_cost,
5081                    gas_cost_summary.computation_cost_burned,
5082                    gas_cost_summary.storage_rebate,
5083                    gas_cost_summary.non_refundable_storage_fee,
5084                    epoch_start_timestamp_ms,
5085                    next_epoch_system_package_bytes,
5086                    eligible_active_validators,
5087                    scores,
5088                    config.adjust_rewards_by_score(),
5089                ));
5090            } else {
5091                txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5092                    next_epoch,
5093                    next_epoch_protocol_version,
5094                    gas_cost_summary.storage_cost,
5095                    gas_cost_summary.computation_cost,
5096                    gas_cost_summary.computation_cost_burned,
5097                    gas_cost_summary.storage_rebate,
5098                    gas_cost_summary.non_refundable_storage_fee,
5099                    epoch_start_timestamp_ms,
5100                    next_epoch_system_package_bytes,
5101                    eligible_active_validators,
5102                ));
5103            }
5104        } else if config.protocol_defined_base_fee()
5105            && config.max_committee_members_count_as_option().is_some()
5106        {
5107            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5108                next_epoch,
5109                next_epoch_protocol_version,
5110                gas_cost_summary.storage_cost,
5111                gas_cost_summary.computation_cost,
5112                gas_cost_summary.computation_cost_burned,
5113                gas_cost_summary.storage_rebate,
5114                gas_cost_summary.non_refundable_storage_fee,
5115                epoch_start_timestamp_ms,
5116                next_epoch_system_package_bytes,
5117            ));
5118        } else {
5119            txns.push(EndOfEpochTransactionKind::new_change_epoch(
5120                next_epoch,
5121                next_epoch_protocol_version,
5122                gas_cost_summary.storage_cost,
5123                gas_cost_summary.computation_cost,
5124                gas_cost_summary.storage_rebate,
5125                gas_cost_summary.non_refundable_storage_fee,
5126                epoch_start_timestamp_ms,
5127                next_epoch_system_package_bytes,
5128            ));
5129        }
5130
5131        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5132
5133        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5134            tx.clone(),
5135            epoch_store.epoch(),
5136            checkpoint,
5137        );
5138
5139        let tx_digest = executable_tx.digest();
5140
5141        info!(
5142            ?next_epoch,
5143            ?next_epoch_protocol_version,
5144            ?next_epoch_system_packages,
5145            computation_cost=?gas_cost_summary.computation_cost,
5146            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5147            storage_cost=?gas_cost_summary.storage_cost,
5148            storage_rebate=?gas_cost_summary.storage_rebate,
5149            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5150            ?tx_digest,
5151            "Creating advance epoch transaction"
5152        );
5153
5154        fail_point_async!("change_epoch_tx_delay");
5155        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5156
5157        // The tx could have been executed by state sync already - if so simply return
5158        // an error. The checkpoint builder will shortly be terminated by
5159        // reconfiguration anyway.
5160        if self
5161            .get_transaction_cache_reader()
5162            .try_is_tx_already_executed(tx_digest)?
5163        {
5164            warn!("change epoch tx has already been executed via state sync");
5165            bail!("change epoch tx has already been executed via state sync",);
5166        }
5167
5168        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5169
5170        // We must manually assign the shared object versions to the transaction before
5171        // executing it. This is because we do not sequence end-of-epoch
5172        // transactions through consensus.
5173        epoch_store.assign_shared_object_versions_idempotent(
5174            self.get_object_cache_reader().as_ref(),
5175            std::slice::from_ref(&executable_tx),
5176        )?;
5177
5178        let (input_objects, _, _) =
5179            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5180
5181        let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5182            &execution_guard,
5183            &executable_tx,
5184            input_objects,
5185            None,
5186            None,
5187            epoch_store,
5188        )?;
5189        let system_obj = get_iota_system_state(&temporary_store.written)
5190            .expect("change epoch tx must write to system object");
5191        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
5192        let system_epoch_info_event = temporary_store
5193            .events
5194            .data
5195            .into_iter()
5196            .find(|event| event.is_system_epoch_info_event())
5197            .map(SystemEpochInfoEvent::from);
5198        // The system epoch info event can be `None` in case if the `advance_epoch`
5199        // Move function call failed and was executed in the safe mode.
5200        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5201
5202        // We must write tx and effects to the state sync tables so that state sync is
5203        // able to deliver to the transaction to CheckpointExecutor after it is
5204        // included in a certified checkpoint.
5205        self.get_state_sync_store()
5206            .try_insert_transaction_and_effects(&tx, &effects)
5207            .map_err(|err| {
5208                let err: anyhow::Error = err.into();
5209                err
5210            })?;
5211
5212        info!(
5213            "Effects summary of the change epoch transaction: {:?}",
5214            effects.summary_for_debug()
5215        );
5216        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5217        // The change epoch transaction cannot fail to execute.
5218        assert!(effects.status().is_ok());
5219        Ok((system_obj, system_epoch_info_event, effects))
5220    }
5221
5222    /// This function is called at the very end of the epoch.
5223    /// This step is required before updating new epoch in the db and calling
5224    /// reopen_epoch_db.
5225    #[instrument(level = "error", skip_all)]
5226    async fn revert_uncommitted_epoch_transactions(
5227        &self,
5228        epoch_store: &AuthorityPerEpochStore,
5229    ) -> IotaResult {
5230        {
5231            let state = epoch_store.get_reconfig_state_write_lock_guard();
5232            if state.should_accept_user_certs() {
5233                // Need to change this so that consensus adapter do not accept certificates from
5234                // user. This can happen if our local validator did not initiate
5235                // epoch change locally, but 2f+1 nodes already concluded the
5236                // epoch.
5237                //
5238                // This lock is essentially a barrier for
5239                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5240                // after this block
5241                epoch_store.close_user_certs(state);
5242            }
5243            // lock is dropped here
5244        }
5245        let pending_certificates = epoch_store.pending_consensus_certificates();
5246        info!(
5247            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5248            pending_certificates.len(),
5249            pending_certificates,
5250        );
5251        for digest in pending_certificates {
5252            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5253                info!(
5254                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5255                    digest
5256                );
5257                continue;
5258            }
5259            info!("Reverting {:?} at the end of epoch", digest);
5260            epoch_store.revert_executed_transaction(&digest)?;
5261            self.get_reconfig_api().try_revert_state_update(&digest)?;
5262        }
5263        info!("All uncommitted local transactions reverted");
5264        Ok(())
5265    }
5266
5267    #[instrument(level = "error", skip_all)]
5268    async fn reopen_epoch_db(
5269        &self,
5270        cur_epoch_store: &AuthorityPerEpochStore,
5271        new_committee: Committee,
5272        epoch_start_configuration: EpochStartConfiguration,
5273        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5274        epoch_last_checkpoint: CheckpointSequenceNumber,
5275    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5276        let new_epoch = new_committee.epoch;
5277        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5278        assert_eq!(
5279            epoch_start_configuration.epoch_start_state().epoch(),
5280            new_committee.epoch
5281        );
5282        fail_point!("before-open-new-epoch-store");
5283        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5284            self.name,
5285            new_committee,
5286            epoch_start_configuration,
5287            self.get_backing_package_store().clone(),
5288            self.get_object_store().clone(),
5289            expensive_safety_check_config,
5290            epoch_last_checkpoint,
5291        )?;
5292        self.epoch_store.store(new_epoch_store.clone());
5293        Ok(new_epoch_store)
5294    }
5295
5296    /// Checks if `authenticator` unlocks a valid Move account and returns the
5297    /// account-related `AuthenticatorFunctionRef` object.
5298    fn check_move_account(
5299        &self,
5300        auth_account_object_id: ObjectID,
5301        auth_account_object_seq_number: Option<SequenceNumber>,
5302        auth_account_object_digest: Option<ObjectDigest>,
5303        account_object: ObjectReadResult,
5304        signer: &IotaAddress,
5305    ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5306        let account_object = match account_object.object {
5307            ObjectReadResultKind::Object(object) => Ok(object),
5308            ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5309                Err(UserInputError::AccountObjectDeleted {
5310                    account_id: account_object.id(),
5311                    account_version: version,
5312                    transaction_digest: digest,
5313                })
5314            }
5315            // It is impossible to check the account object because it is used in a canceled
5316            // transaction and is not loaded.
5317            ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5318                Err(UserInputError::AccountObjectInCanceledTransaction {
5319                    account_id: account_object.id(),
5320                    account_version: version,
5321                })
5322            }
5323        }?;
5324
5325        let account_object_addr = IotaAddress::from(auth_account_object_id);
5326
5327        fp_ensure!(
5328            signer == &account_object_addr,
5329            UserInputError::IncorrectUserSignature {
5330                error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5331            }
5332            .into()
5333        );
5334
5335        fp_ensure!(
5336            account_object.is_shared() || account_object.is_immutable(),
5337            UserInputError::AccountObjectNotSupported {
5338                object_id: auth_account_object_id
5339            }
5340            .into()
5341        );
5342
5343        let auth_account_object_seq_number =
5344            if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5345                let account_object_version = account_object.version();
5346
5347                fp_ensure!(
5348                    account_object_version == auth_account_object_seq_number,
5349                    UserInputError::AccountObjectVersionMismatch {
5350                        object_id: auth_account_object_id,
5351                        expected_version: auth_account_object_seq_number,
5352                        actual_version: account_object_version,
5353                    }
5354                    .into()
5355                );
5356
5357                auth_account_object_seq_number
5358            } else {
5359                account_object.version()
5360            };
5361
5362        if let Some(auth_account_object_digest) = auth_account_object_digest {
5363            let expected_digest = account_object.digest();
5364            fp_ensure!(
5365                expected_digest == auth_account_object_digest,
5366                UserInputError::InvalidAccountObjectDigest {
5367                    object_id: auth_account_object_id,
5368                    expected_digest,
5369                    actual_digest: auth_account_object_digest,
5370                }
5371                .into()
5372            );
5373        }
5374
5375        let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5376            auth_account_object_id,
5377            &AuthenticatorFunctionRefV1Key::tag().into(),
5378            &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5379        )
5380        .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5381            account_object_id: auth_account_object_id,
5382        })?;
5383
5384        let authenticator_function_ref_field = self
5385            .get_object_cache_reader()
5386            .try_find_object_lt_or_eq_version(
5387                authenticator_function_ref_field_id,
5388                auth_account_object_seq_number,
5389            )?;
5390
5391        if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5392            let field_move_object = authenticator_function_ref_field_obj
5393                .data
5394                .try_as_move()
5395                .expect("dynamic field should never be a package object");
5396
5397            let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5398                field_move_object.to_rust().ok_or(
5399                    UserInputError::InvalidAuthenticatorFunctionRefField {
5400                        account_object_id: auth_account_object_id,
5401                    },
5402                )?;
5403
5404            Ok(AuthenticatorFunctionRefForExecution::new_v1(
5405                field.value,
5406                authenticator_function_ref_field_obj.compute_object_reference(),
5407                authenticator_function_ref_field_obj.owner,
5408                authenticator_function_ref_field_obj.storage_rebate,
5409                authenticator_function_ref_field_obj.previous_transaction,
5410            ))
5411        } else {
5412            Err(UserInputError::MoveAuthenticatorNotFound {
5413                authenticator_function_ref_id: authenticator_function_ref_field_id,
5414                account_object_id: auth_account_object_id,
5415                account_object_version: auth_account_object_seq_number,
5416            }
5417            .into())
5418        }
5419    }
5420
5421    fn read_objects_for_signing(
5422        &self,
5423        transaction: &VerifiedTransaction,
5424        epoch: u64,
5425    ) -> IotaResult<(
5426        InputObjects,
5427        ReceivingObjects,
5428        Option<InputObjects>,
5429        Option<ObjectReadResult>,
5430    )> {
5431        let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5432            Some(transaction.digest()),
5433            &transaction.collect_all_input_object_kind_for_reading()?,
5434            &transaction.data().transaction_data().receiving_objects(),
5435            epoch,
5436        )?;
5437
5438        transaction
5439            .split_input_objects_into_groups_for_reading(input_objects)
5440            .map(|(tx_input_objects, auth_input_objects, account_object)| {
5441                (
5442                    tx_input_objects,
5443                    tx_receiving_objects,
5444                    auth_input_objects,
5445                    account_object,
5446                )
5447            })
5448    }
5449
5450    fn check_transaction_inputs_for_signing(
5451        &self,
5452        protocol_config: &ProtocolConfig,
5453        reference_gas_price: u64,
5454        tx_data: &TransactionData,
5455        tx_input_objects: InputObjects,
5456        tx_receiving_objects: &ReceivingObjects,
5457        move_authenticator: Option<&MoveAuthenticator>,
5458        auth_input_objects: Option<InputObjects>,
5459        account_object: Option<ObjectReadResult>,
5460    ) -> IotaResult<(
5461        IotaGasStatus,
5462        CheckedInputObjects,
5463        Option<CheckedInputObjects>,
5464        Option<AuthenticatorFunctionRef>,
5465    )> {
5466        let (
5467            auth_checked_input_objects_union,
5468            authenticator_function_ref,
5469            authenticator_gas_budget,
5470        ) = if let Some(move_authenticator) = move_authenticator {
5471            let auth_input_objects =
5472                auth_input_objects.expect("MoveAuthenticator input objects must be provided");
5473            let account_object = account_object.expect("Move account object must be provided");
5474
5475            // Check basic `object_to_authenticate` preconditions and get its components.
5476            let (
5477                auth_account_object_id,
5478                auth_account_object_seq_number,
5479                auth_account_object_digest,
5480            ) = move_authenticator.object_to_authenticate_components()?;
5481
5482            // Make sure the sender is a Move account.
5483            let AuthenticatorFunctionRefForExecution {
5484                authenticator_function_ref,
5485                ..
5486            } = self.check_move_account(
5487                auth_account_object_id,
5488                auth_account_object_seq_number,
5489                auth_account_object_digest,
5490                account_object,
5491                &tx_data.sender(),
5492            )?;
5493
5494            // Check the MoveAuthenticator input objects.
5495            let auth_checked_input_objects =
5496                iota_transaction_checks::check_move_authenticator_input_for_signing(
5497                    auth_input_objects,
5498                )?;
5499
5500            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
5501            // not a part of the transaction data.
5502            let authenticator_gas_budget = protocol_config.max_auth_gas();
5503
5504            (
5505                Some(auth_checked_input_objects),
5506                Some(authenticator_function_ref),
5507                authenticator_gas_budget,
5508            )
5509        } else {
5510            (None, None, 0)
5511        };
5512
5513        // Check the transaction inputs.
5514        let (gas_status, tx_checked_input_objects) =
5515            iota_transaction_checks::check_transaction_input(
5516                protocol_config,
5517                reference_gas_price,
5518                tx_data,
5519                tx_input_objects,
5520                tx_receiving_objects,
5521                &self.metrics.bytecode_verifier_metrics,
5522                &self.config.verifier_signing_config,
5523                authenticator_gas_budget,
5524            )?;
5525
5526        Ok((
5527            gas_status,
5528            tx_checked_input_objects,
5529            auth_checked_input_objects_union,
5530            authenticator_function_ref,
5531        ))
5532    }
5533
5534    #[cfg(test)]
5535    pub(crate) fn iter_live_object_set_for_testing(
5536        &self,
5537    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5538        self.get_accumulator_store()
5539            .iter_cached_live_object_set_for_testing()
5540    }
5541
5542    #[cfg(test)]
5543    pub(crate) fn shutdown_execution_for_test(&self) {
5544        self.tx_execution_shutdown
5545            .lock()
5546            .take()
5547            .unwrap()
5548            .send(())
5549            .unwrap();
5550    }
5551
5552    /// NOTE: this function is only to be used for fuzzing and testing. Never
5553    /// use in prod
5554    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5555        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5556        self.get_object_cache_reader()
5557            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5558        self.get_reconfig_api()
5559            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5560    }
5561}
5562
5563pub struct RandomnessRoundReceiver {
5564    authority_state: Arc<AuthorityState>,
5565    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5566}
5567
5568impl RandomnessRoundReceiver {
5569    pub fn spawn(
5570        authority_state: Arc<AuthorityState>,
5571        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5572    ) -> JoinHandle<()> {
5573        let rrr = RandomnessRoundReceiver {
5574            authority_state,
5575            randomness_rx,
5576        };
5577        spawn_monitored_task!(rrr.run())
5578    }
5579
5580    async fn run(mut self) {
5581        info!("RandomnessRoundReceiver event loop started");
5582
5583        loop {
5584            tokio::select! {
5585                maybe_recv = self.randomness_rx.recv() => {
5586                    if let Some((epoch, round, bytes)) = maybe_recv {
5587                        self.handle_new_randomness(epoch, round, bytes);
5588                    } else {
5589                        break;
5590                    }
5591                },
5592            }
5593        }
5594
5595        info!("RandomnessRoundReceiver event loop ended");
5596    }
5597
5598    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5599    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5600        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5601        if epoch_store.epoch() != epoch {
5602            warn!(
5603                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5604                epoch_store.epoch()
5605            );
5606            return;
5607        }
5608        let transaction = VerifiedTransaction::new_randomness_state_update(
5609            epoch,
5610            round,
5611            bytes,
5612            epoch_store
5613                .epoch_start_config()
5614                .randomness_obj_initial_shared_version(),
5615        );
5616        debug!(
5617            "created randomness state update transaction with digest: {:?}",
5618            transaction.digest()
5619        );
5620        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5621        let digest = *transaction.digest();
5622
5623        // Randomness state updates contain the full bls signature for the random round,
5624        // which cannot necessarily be reconstructed again later. Therefore we must
5625        // immediately persist this transaction. If we crash before its outputs
5626        // are committed, this ensures we will be able to re-execute it.
5627        self.authority_state
5628            .get_cache_commit()
5629            .persist_transaction(&transaction);
5630
5631        // Send transaction to TransactionManager for execution.
5632        self.authority_state
5633            .transaction_manager()
5634            .enqueue(vec![transaction], &epoch_store);
5635
5636        let authority_state = self.authority_state.clone();
5637        spawn_monitored_task!(async move {
5638            // Wait for transaction execution in a separate task, to avoid deadlock in case
5639            // of out-of-order randomness generation. (Each
5640            // RandomnessStateUpdate depends on the output of the
5641            // RandomnessStateUpdate from the previous round.)
5642            //
5643            // We set a very long timeout so that in case this gets stuck for some reason,
5644            // the validator will eventually crash rather than continuing in a
5645            // zombie mode.
5646            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5647            let result = tokio::time::timeout(
5648                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5649                authority_state
5650                    .get_transaction_cache_reader()
5651                    .try_notify_read_executed_effects(&[digest]),
5652            )
5653            .await;
5654            let result = match result {
5655                Ok(result) => result,
5656                Err(_) => {
5657                    if cfg!(debug_assertions) {
5658                        // Crash on randomness update execution timeout in debug builds.
5659                        panic!(
5660                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5661                        );
5662                    }
5663                    warn!(
5664                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5665                    );
5666                    // Continue waiting as long as necessary in non-debug builds.
5667                    authority_state
5668                        .get_transaction_cache_reader()
5669                        .try_notify_read_executed_effects(&[digest])
5670                        .await
5671                }
5672            };
5673
5674            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5675            let effects = effects.pop().expect("should return effects");
5676            if *effects.status() != ExecutionStatus::Success {
5677                fatal!(
5678                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5679                );
5680            }
5681            debug!(
5682                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5683            );
5684        });
5685    }
5686}
5687
5688#[async_trait]
5689impl TransactionKeyValueStoreTrait for AuthorityState {
5690    async fn multi_get(
5691        &self,
5692        transaction_keys: &[TransactionDigest],
5693        effects_keys: &[TransactionDigest],
5694    ) -> IotaResult<KVStoreTransactionData> {
5695        let txns = if !transaction_keys.is_empty() {
5696            self.get_transaction_cache_reader()
5697                .try_multi_get_transaction_blocks(transaction_keys)?
5698                .into_iter()
5699                .map(|t| t.map(|t| (*t).clone().into_inner()))
5700                .collect()
5701        } else {
5702            vec![]
5703        };
5704
5705        let fx = if !effects_keys.is_empty() {
5706            self.get_transaction_cache_reader()
5707                .try_multi_get_executed_effects(effects_keys)?
5708        } else {
5709            vec![]
5710        };
5711
5712        Ok((txns, fx))
5713    }
5714
5715    async fn multi_get_checkpoints(
5716        &self,
5717        checkpoint_summaries: &[CheckpointSequenceNumber],
5718        checkpoint_contents: &[CheckpointSequenceNumber],
5719        checkpoint_summaries_by_digest: &[CheckpointDigest],
5720    ) -> IotaResult<(
5721        Vec<Option<CertifiedCheckpointSummary>>,
5722        Vec<Option<CheckpointContents>>,
5723        Vec<Option<CertifiedCheckpointSummary>>,
5724    )> {
5725        // TODO: use multi-get methods if it ever becomes important (unlikely)
5726        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5727        let store = self.get_checkpoint_store();
5728        for seq in checkpoint_summaries {
5729            let checkpoint = store
5730                .get_checkpoint_by_sequence_number(*seq)?
5731                .map(|c| c.into_inner());
5732
5733            summaries.push(checkpoint);
5734        }
5735
5736        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5737        for seq in checkpoint_contents {
5738            let checkpoint = store
5739                .get_checkpoint_by_sequence_number(*seq)?
5740                .and_then(|summary| {
5741                    store
5742                        .get_checkpoint_contents(&summary.content_digest)
5743                        .expect("db read cannot fail")
5744                });
5745            contents.push(checkpoint);
5746        }
5747
5748        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5749        for digest in checkpoint_summaries_by_digest {
5750            let checkpoint = store
5751                .get_checkpoint_by_digest(digest)?
5752                .map(|c| c.into_inner());
5753            summaries_by_digest.push(checkpoint);
5754        }
5755
5756        Ok((summaries, contents, summaries_by_digest))
5757    }
5758
5759    async fn get_transaction_perpetual_checkpoint(
5760        &self,
5761        digest: TransactionDigest,
5762    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5763        self.get_checkpoint_cache()
5764            .try_get_transaction_perpetual_checkpoint(&digest)
5765            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5766    }
5767
5768    async fn get_object(
5769        &self,
5770        object_id: ObjectID,
5771        version: VersionNumber,
5772    ) -> IotaResult<Option<Object>> {
5773        self.get_object_cache_reader()
5774            .try_get_object_by_key(&object_id, version)
5775    }
5776
5777    async fn multi_get_transactions_perpetual_checkpoints(
5778        &self,
5779        digests: &[TransactionDigest],
5780    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5781        let res = self
5782            .get_checkpoint_cache()
5783            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5784
5785        Ok(res
5786            .into_iter()
5787            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5788            .collect())
5789    }
5790
5791    #[instrument(skip(self))]
5792    async fn multi_get_events_by_tx_digests(
5793        &self,
5794        digests: &[TransactionDigest],
5795    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5796        if digests.is_empty() {
5797            return Ok(vec![]);
5798        }
5799
5800        Ok(self
5801            .get_transaction_cache_reader()
5802            .multi_get_events(digests))
5803    }
5804}
5805
5806#[cfg(msim)]
5807pub mod framework_injection {
5808    use std::{
5809        cell::RefCell,
5810        collections::{BTreeMap, BTreeSet},
5811    };
5812
5813    use iota_framework::{BuiltInFramework, SystemPackage};
5814    use iota_types::{
5815        base_types::{AuthorityName, ObjectID},
5816        is_system_package,
5817    };
5818    use move_binary_format::CompiledModule;
5819
5820    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5821
5822    // Thread local cache because all simtests run in a single unique thread.
5823    thread_local! {
5824        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5825    }
5826
5827    type Framework = Vec<CompiledModule>;
5828
5829    pub type PackageUpgradeCallback =
5830        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5831
5832    enum PackageOverrideConfig {
5833        Global(Framework),
5834        PerValidator(PackageUpgradeCallback),
5835    }
5836
5837    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5838        modules
5839            .iter()
5840            .map(|m| {
5841                let mut buf = Vec::new();
5842                m.serialize_with_version(m.version, &mut buf).unwrap();
5843                buf
5844            })
5845            .collect()
5846    }
5847
5848    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5849        OVERRIDE.with(|bs| {
5850            bs.borrow_mut()
5851                .insert(package_id, PackageOverrideConfig::Global(modules))
5852        });
5853    }
5854
5855    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5856        OVERRIDE.with(|bs| {
5857            bs.borrow_mut()
5858                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5859        });
5860    }
5861
5862    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5863        OVERRIDE.with(|cfg| {
5864            cfg.borrow().get(package_id).and_then(|entry| match entry {
5865                PackageOverrideConfig::Global(framework) => {
5866                    Some(compiled_modules_to_bytes(framework))
5867                }
5868                PackageOverrideConfig::PerValidator(func) => {
5869                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
5870                }
5871            })
5872        })
5873    }
5874
5875    pub fn get_override_modules(
5876        package_id: &ObjectID,
5877        name: AuthorityName,
5878    ) -> Option<Vec<CompiledModule>> {
5879        OVERRIDE.with(|cfg| {
5880            cfg.borrow().get(package_id).and_then(|entry| match entry {
5881                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5882                PackageOverrideConfig::PerValidator(func) => func(name),
5883            })
5884        })
5885    }
5886
5887    pub fn get_override_system_package(
5888        package_id: &ObjectID,
5889        name: AuthorityName,
5890    ) -> Option<SystemPackage> {
5891        let bytes = get_override_bytes(package_id, name)?;
5892        let dependencies = if is_system_package(*package_id) {
5893            BuiltInFramework::get_package_by_id(package_id)
5894                .dependencies
5895                .to_vec()
5896        } else {
5897            // Assume that entirely new injected packages depend on all existing system
5898            // packages.
5899            BuiltInFramework::all_package_ids()
5900        };
5901        Some(SystemPackage {
5902            id: *package_id,
5903            bytes,
5904            dependencies,
5905        })
5906    }
5907
5908    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5909        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5910        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5911            cfg.borrow()
5912                .keys()
5913                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5914                .collect()
5915        });
5916
5917        extra
5918            .into_iter()
5919            .map(|package| SystemPackage {
5920                id: package,
5921                bytes: get_override_bytes(&package, name).unwrap(),
5922                dependencies: BuiltInFramework::all_package_ids(),
5923            })
5924            .collect()
5925    }
5926}
5927
5928#[derive(Debug, Serialize, Deserialize, Clone)]
5929pub struct ObjDumpFormat {
5930    pub id: ObjectID,
5931    pub version: VersionNumber,
5932    pub digest: ObjectDigest,
5933    pub object: Object,
5934}
5935
5936impl ObjDumpFormat {
5937    fn new(object: Object) -> Self {
5938        let oref = object.compute_object_reference();
5939        Self {
5940            id: oref.0,
5941            version: oref.1,
5942            digest: oref.2,
5943            object,
5944        }
5945    }
5946}
5947
5948#[derive(Debug, Serialize, Deserialize, Clone)]
5949pub struct NodeStateDump {
5950    pub tx_digest: TransactionDigest,
5951    pub sender_signed_data: SenderSignedData,
5952    pub executed_epoch: u64,
5953    pub reference_gas_price: u64,
5954    pub protocol_version: u64,
5955    pub epoch_start_timestamp_ms: u64,
5956    pub computed_effects: TransactionEffects,
5957    pub expected_effects_digest: TransactionEffectsDigest,
5958    pub relevant_system_packages: Vec<ObjDumpFormat>,
5959    pub shared_objects: Vec<ObjDumpFormat>,
5960    pub loaded_child_objects: Vec<ObjDumpFormat>,
5961    pub modified_at_versions: Vec<ObjDumpFormat>,
5962    pub runtime_reads: Vec<ObjDumpFormat>,
5963    pub input_objects: Vec<ObjDumpFormat>,
5964}
5965
5966impl NodeStateDump {
5967    pub fn new(
5968        tx_digest: &TransactionDigest,
5969        effects: &TransactionEffects,
5970        expected_effects_digest: TransactionEffectsDigest,
5971        object_store: &dyn ObjectStore,
5972        epoch_store: &Arc<AuthorityPerEpochStore>,
5973        inner_temporary_store: &InnerTemporaryStore,
5974        certificate: &VerifiedExecutableTransaction,
5975    ) -> IotaResult<Self> {
5976        // Epoch info
5977        let executed_epoch = epoch_store.epoch();
5978        let reference_gas_price = epoch_store.reference_gas_price();
5979        let epoch_start_config = epoch_store.epoch_start_config();
5980        let protocol_version = epoch_store.protocol_version().as_u64();
5981        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5982
5983        // Record all system packages at this version
5984        let mut relevant_system_packages = Vec::new();
5985        for sys_package_id in BuiltInFramework::all_package_ids() {
5986            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5987                relevant_system_packages.push(ObjDumpFormat::new(w))
5988            }
5989        }
5990
5991        // Record all the shared objects
5992        let mut shared_objects = Vec::new();
5993        for kind in effects.input_shared_objects() {
5994            match kind {
5995                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5996                    if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5997                        shared_objects.push(ObjDumpFormat::new(w))
5998                    }
5999                }
6000                InputSharedObject::ReadDeleted(..)
6001                | InputSharedObject::MutateDeleted(..)
6002                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
6003                                                           * objects. */
6004            }
6005        }
6006
6007        // Record all loaded child objects
6008        // Child objects which are read but not mutated are not tracked anywhere else
6009        let mut loaded_child_objects = Vec::new();
6010        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6011            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6012                loaded_child_objects.push(ObjDumpFormat::new(w))
6013            }
6014        }
6015
6016        // Record all modified objects
6017        let mut modified_at_versions = Vec::new();
6018        for (id, ver) in effects.modified_at_versions() {
6019            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6020                modified_at_versions.push(ObjDumpFormat::new(w))
6021            }
6022        }
6023
6024        // Packages read at runtime, which were not previously loaded into the temoorary
6025        // store Some packages may be fetched at runtime and wont show up in
6026        // input objects
6027        let mut runtime_reads = Vec::new();
6028        for obj in inner_temporary_store
6029            .runtime_packages_loaded_from_db
6030            .values()
6031        {
6032            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6033        }
6034
6035        // All other input objects should already be in `inner_temporary_store.objects`
6036
6037        Ok(Self {
6038            tx_digest: *tx_digest,
6039            executed_epoch,
6040            reference_gas_price,
6041            epoch_start_timestamp_ms,
6042            protocol_version,
6043            relevant_system_packages,
6044            shared_objects,
6045            loaded_child_objects,
6046            modified_at_versions,
6047            runtime_reads,
6048            sender_signed_data: certificate.clone().into_message(),
6049            input_objects: inner_temporary_store
6050                .input_objects
6051                .values()
6052                .map(|o| ObjDumpFormat::new(o.clone()))
6053                .collect(),
6054            computed_effects: effects.clone(),
6055            expected_effects_digest,
6056        })
6057    }
6058
6059    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6060        let mut objects = Vec::new();
6061        objects.extend(self.relevant_system_packages.clone());
6062        objects.extend(self.shared_objects.clone());
6063        objects.extend(self.loaded_child_objects.clone());
6064        objects.extend(self.modified_at_versions.clone());
6065        objects.extend(self.runtime_reads.clone());
6066        objects.extend(self.input_objects.clone());
6067        objects
6068    }
6069
6070    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6071        let file_name = format!(
6072            "{}_{}_NODE_DUMP.json",
6073            self.tx_digest,
6074            AuthorityState::unixtime_now_ms()
6075        );
6076        let mut path = path.to_path_buf();
6077        path.push(&file_name);
6078        let mut file = File::create(path.clone())?;
6079        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6080        Ok(path)
6081    }
6082
6083    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6084        let file = File::open(path)?;
6085        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6086    }
6087}