Skip to main content

iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage as FrameworkSystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::{
48    ExecutionStatus, ObjectId, Owner, StructTag, TypeTag,
49    crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion},
50};
51use iota_storage::{
52    key_value_store::{
53        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
54    },
55    key_value_store_metrics::KeyValueStoreMetrics,
56};
57#[cfg(msim)]
58use iota_types::committee::CommitteeTrait;
59use iota_types::{
60    account_abstraction::{
61        account::AuthenticatorFunctionRefV1Key,
62        authenticator_function::{
63            AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
64            AuthenticatorFunctionRefV1, extract_auth_fun_refs,
65        },
66    },
67    auth_context::AuthContextData,
68    base_types::{
69        AuthorityName, ConciseableName, IotaAddress, ObjectInfo, ObjectRef, ObjectType,
70        SequenceNumber, VersionNumber,
71    },
72    committee::{Committee, EpochId, ProtocolVersion},
73    crypto::{AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer},
74    deny_list_v1::check_coin_deny_list_v1_during_signing,
75    digests::{ChainIdentifier, Digest, ObjectDigest, TransactionDigest, TransactionEffectsDigest},
76    dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
77    effects::{
78        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
79        TransactionEffectsExt, TransactionEvents, VerifiedSignedTransactionEffects,
80    },
81    error::{ExecutionError, IotaError, IotaResult, UserInputError},
82    event::{Event, EventID, SystemEpochInfoEvent},
83    executable_transaction::VerifiedExecutableTransaction,
84    execution_config_utils::to_binary_config,
85    fp_ensure,
86    gas::{GasCostSummary, IotaGasStatus},
87    gas_coin::NANOS_PER_IOTA,
88    inner_temporary_store::{
89        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
90        WrittenObjects,
91    },
92    iota_sdk_types_conversions::type_tag_core_to_sdk,
93    iota_system_state::{
94        IotaSystemState, IotaSystemStateTrait,
95        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
96    },
97    layout_resolver::{LayoutResolver, into_struct_layout},
98    message_envelope::Message,
99    messages_checkpoint::{
100        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
101        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
102        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
103        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
104    },
105    messages_consensus::AuthorityCapabilitiesV1,
106    messages_grpc::{
107        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
108        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
109        TransactionStatus,
110    },
111    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
112    move_authenticator::MoveAuthenticator,
113    object::{
114        MoveObject, MoveObjectExt, OBJECT_START_VERSION, Object, ObjectRead, PastObjectRead,
115        bounded_visitor::BoundedVisitor,
116    },
117    storage::{
118        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
119    },
120    supported_protocol_versions::{
121        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
122    },
123    traffic_control::{PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams},
124    transaction::*,
125    transaction_executor::{SimulateTransactionResult, VmChecks},
126};
127use itertools::Itertools;
128use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
129use move_core_types::{
130    account_address::AccountAddress, annotated_value::MoveStructLayout, language_storage::ModuleId,
131};
132use parking_lot::Mutex;
133use prometheus::{
134    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
135    register_histogram_vec_with_registry, register_histogram_with_registry,
136    register_int_counter_vec_with_registry, register_int_counter_with_registry,
137    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
138};
139use serde::{Deserialize, Serialize, de::DeserializeOwned};
140use tap::TapFallible;
141use tokio::{
142    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
143    task::JoinHandle,
144};
145use tracing::{debug, error, info, instrument, trace, warn};
146use typed_store::TypedStoreError;
147
148use self::{
149    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
150};
151#[cfg(msim)]
152pub use crate::checkpoints::checkpoint_executor::utils::{
153    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
154};
155use crate::{
156    authority::{
157        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
158        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
159        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
160        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
161        authority_store_tables::AuthorityPrunerTables,
162        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
163    },
164    authority_client::NetworkAuthorityClient,
165    checkpoint_progress_tracker::CheckpointProgressTracker,
166    checkpoints::CheckpointStore,
167    congestion_tracker::CongestionTracker,
168    consensus_adapter::ConsensusAdapter,
169    epoch::committee_store::CommitteeStore,
170    execution_cache::{
171        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
172        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
173        TransactionCacheRead,
174    },
175    execution_driver::execution_process,
176    global_state_hasher::{GlobalStateHashStore, GlobalStateHasher},
177    grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
178    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
179    metrics::{LatencyObserver, RateTracker},
180    module_cache_metrics::ResolverMetrics,
181    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
182    stake_aggregator::StakeAggregator,
183    subscription_handler::SubscriptionHandler,
184    traffic_controller::{TrafficController, metrics::TrafficControllerMetrics},
185    transaction_input_loader::TransactionInputLoader,
186    transaction_manager::TransactionManager,
187    transaction_outputs::TransactionOutputs,
188    validator_tx_finalizer::ValidatorTxFinalizer,
189    verify_indexes::verify_indexes,
190};
191
192#[cfg(test)]
193#[path = "unit_tests/authority_tests.rs"]
194pub mod authority_tests;
195
196#[cfg(test)]
197#[path = "unit_tests/transaction_tests.rs"]
198pub mod transaction_tests;
199
200#[cfg(test)]
201#[path = "unit_tests/batch_transaction_tests.rs"]
202mod batch_transaction_tests;
203
204#[cfg(test)]
205#[path = "unit_tests/move_integration_tests.rs"]
206pub mod move_integration_tests;
207
208#[cfg(test)]
209#[path = "unit_tests/gas_tests.rs"]
210mod gas_tests;
211
212#[cfg(test)]
213#[path = "unit_tests/batch_verification_tests.rs"]
214mod batch_verification_tests;
215
216#[cfg(test)]
217#[path = "unit_tests/coin_deny_list_tests.rs"]
218mod coin_deny_list_tests;
219
220#[cfg(test)]
221#[path = "unit_tests/auth_unit_test_utils.rs"]
222pub mod auth_unit_test_utils;
223
224pub mod authority_test_utils;
225
226pub mod authority_per_epoch_store;
227pub mod authority_per_epoch_store_pruner;
228
229mod authority_store_migrations;
230pub mod authority_store_pruner;
231pub mod authority_store_tables;
232pub mod authority_store_types;
233pub mod epoch_start_configuration;
234pub mod shared_object_congestion_tracker;
235pub mod shared_object_version_manager;
236pub mod suggested_gas_price_calculator;
237pub mod test_authority_builder;
238pub mod transaction_deferral;
239
240pub(crate) mod authority_store;
241pub mod backpressure;
242
243/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
244pub struct AuthorityMetrics {
245    tx_orders: IntCounter,
246    total_certs: IntCounter,
247    total_cert_attempts: IntCounter,
248    total_effects: IntCounter,
249    pub shared_obj_tx: IntCounter,
250    sponsored_tx: IntCounter,
251    tx_already_processed: IntCounter,
252    num_input_objs: Histogram,
253    num_shared_objects: Histogram,
254    batch_size: Histogram,
255
256    authority_state_handle_transaction_latency: Histogram,
257
258    execute_certificate_latency_single_writer: Histogram,
259    execute_certificate_latency_shared_object: Histogram,
260
261    internal_execution_latency: Histogram,
262    execution_load_input_objects_latency: Histogram,
263    prepare_certificate_latency: Histogram,
264    commit_certificate_latency: Histogram,
265    db_checkpoint_latency: Histogram,
266
267    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
268    pub(crate) transaction_manager_num_missing_objects: IntGauge,
269    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
270    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
271    pub(crate) transaction_manager_num_ready: IntGauge,
272    pub(crate) transaction_manager_object_cache_size: IntGauge,
273    pub(crate) transaction_manager_object_cache_hits: IntCounter,
274    pub(crate) transaction_manager_object_cache_misses: IntCounter,
275    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
276    pub(crate) transaction_manager_package_cache_size: IntGauge,
277    pub(crate) transaction_manager_package_cache_hits: IntCounter,
278    pub(crate) transaction_manager_package_cache_misses: IntCounter,
279    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
280    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
281
282    pub(crate) execution_driver_executed_transactions: IntCounter,
283    pub(crate) execution_driver_dispatch_queue: IntGauge,
284    pub(crate) execution_queueing_delay_s: Histogram,
285    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
286    pub(crate) execution_gas_latency_ratio: Histogram,
287
288    pub(crate) skipped_consensus_txns: IntCounter,
289    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
290
291    pub(crate) authority_overload_status: IntGauge,
292    pub(crate) authority_load_shedding_percentage: IntGauge,
293
294    pub(crate) transaction_overload_sources: IntCounterVec,
295
296    /// Post processing metrics
297    post_processing_total_events_emitted: IntCounter,
298    post_processing_total_tx_indexed: IntCounter,
299    post_processing_total_tx_had_event_processed: IntCounter,
300    post_processing_total_failures: IntCounter,
301
302    /// Consensus handler metrics
303    pub consensus_handler_processed: IntCounterVec,
304    pub consensus_handler_transaction_sizes: HistogramVec,
305    pub consensus_handler_num_low_scoring_authorities: IntGauge,
306    pub consensus_handler_scores: IntGaugeVec,
307    pub consensus_handler_deferred_transactions: IntCounter,
308    pub consensus_handler_congested_transactions: IntCounter,
309    pub consensus_handler_cancelled_transactions: IntCounter,
310    pub consensus_handler_max_object_costs: IntGaugeVec,
311    pub consensus_committed_subdags: IntCounterVec,
312    pub consensus_committed_messages: IntGaugeVec,
313    pub consensus_committed_user_transactions: IntGaugeVec,
314    pub consensus_handler_leader_round: IntGauge,
315    pub consensus_calculated_throughput: IntGauge,
316    pub consensus_calculated_throughput_profile: IntGauge,
317
318    pub validator_scoreboard_scores: IntGaugeVec,
319    pub invalid_misbehavior_reports_by_authority: IntGaugeVec,
320
321    pub limits_metrics: Arc<LimitsMetrics>,
322
323    /// bytecode verifier metrics for tracking timeouts
324    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
325
326    /// Count of multisig signatures
327    pub multisig_sig_count: IntCounter,
328
329    // Tracks recent average txn queueing delay between when it is ready for execution
330    // until it starts executing.
331    pub execution_queueing_latency: LatencyObserver,
332
333    // Tracks the rate of transactions become ready for execution in transaction manager.
334    // The need for the Mutex is that the tracker is updated in transaction manager and read
335    // in the overload_monitor. There should be low mutex contention because
336    // transaction manager is single threaded and the read rate in overload_monitor is
337    // low. In the case where transaction manager becomes multi-threaded, we can
338    // create one rate tracker per thread.
339    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
340
341    // Tracks the rate of transactions starts execution in execution driver.
342    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
343    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
344}
345
346// Override default Prom buckets for positive numbers in 0-10M range
347const POSITIVE_INT_BUCKETS: &[f64] = &[
348    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
349    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
350    7000000., 10000000.,
351];
352
353const LATENCY_SEC_BUCKETS: &[f64] = &[
354    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
355    10., 20., 30., 60., 90.,
356];
357
358// Buckets for low latency samples. Starts from 10us.
359const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
360    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
361    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
362];
363
364const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
365    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
366    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
367];
368
369/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
370pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
371
372impl AuthorityMetrics {
373    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
374        let execute_certificate_latency = register_histogram_vec_with_registry!(
375            "authority_state_execute_certificate_latency",
376            "Latency of executing certificates, including waiting for inputs",
377            &["tx_type"],
378            LATENCY_SEC_BUCKETS.to_vec(),
379            registry,
380        )
381        .unwrap();
382
383        let execute_certificate_latency_single_writer =
384            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
385        let execute_certificate_latency_shared_object =
386            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
387
388        Self {
389            tx_orders: register_int_counter_with_registry!(
390                "total_transaction_orders",
391                "Total number of transaction orders",
392                registry,
393            )
394            .unwrap(),
395            total_certs: register_int_counter_with_registry!(
396                "total_transaction_certificates",
397                "Total number of transaction certificates handled",
398                registry,
399            )
400            .unwrap(),
401            total_cert_attempts: register_int_counter_with_registry!(
402                "total_handle_certificate_attempts",
403                "Number of calls to handle_certificate",
404                registry,
405            )
406            .unwrap(),
407            // total_effects == total transactions finished
408            total_effects: register_int_counter_with_registry!(
409                "total_transaction_effects",
410                "Total number of transaction effects produced",
411                registry,
412            )
413            .unwrap(),
414
415            shared_obj_tx: register_int_counter_with_registry!(
416                "num_shared_obj_tx",
417                "Number of transactions involving shared objects",
418                registry,
419            )
420            .unwrap(),
421
422            sponsored_tx: register_int_counter_with_registry!(
423                "num_sponsored_tx",
424                "Number of sponsored transactions",
425                registry,
426            )
427            .unwrap(),
428
429            tx_already_processed: register_int_counter_with_registry!(
430                "num_tx_already_processed",
431                "Number of transaction orders already processed previously",
432                registry,
433            )
434            .unwrap(),
435            num_input_objs: register_histogram_with_registry!(
436                "num_input_objects",
437                "Distribution of number of input TX objects per TX",
438                POSITIVE_INT_BUCKETS.to_vec(),
439                registry,
440            )
441            .unwrap(),
442            num_shared_objects: register_histogram_with_registry!(
443                "num_shared_objects",
444                "Number of shared input objects per TX",
445                POSITIVE_INT_BUCKETS.to_vec(),
446                registry,
447            )
448            .unwrap(),
449            batch_size: register_histogram_with_registry!(
450                "batch_size",
451                "Distribution of size of transaction batch",
452                POSITIVE_INT_BUCKETS.to_vec(),
453                registry,
454            )
455            .unwrap(),
456            authority_state_handle_transaction_latency: register_histogram_with_registry!(
457                "authority_state_handle_transaction_latency",
458                "Latency of handling transactions",
459                LATENCY_SEC_BUCKETS.to_vec(),
460                registry,
461            )
462            .unwrap(),
463            execute_certificate_latency_single_writer,
464            execute_certificate_latency_shared_object,
465            internal_execution_latency: register_histogram_with_registry!(
466                "authority_state_internal_execution_latency",
467                "Latency of actual certificate executions",
468                LATENCY_SEC_BUCKETS.to_vec(),
469                registry,
470            )
471            .unwrap(),
472            execution_load_input_objects_latency: register_histogram_with_registry!(
473                "authority_state_execution_load_input_objects_latency",
474                "Latency of loading input objects for execution",
475                LOW_LATENCY_SEC_BUCKETS.to_vec(),
476                registry,
477            )
478            .unwrap(),
479            prepare_certificate_latency: register_histogram_with_registry!(
480                "authority_state_prepare_certificate_latency",
481                "Latency of executing certificates, before committing the results",
482                LATENCY_SEC_BUCKETS.to_vec(),
483                registry,
484            )
485            .unwrap(),
486            commit_certificate_latency: register_histogram_with_registry!(
487                "authority_state_commit_certificate_latency",
488                "Latency of committing certificate execution results",
489                LATENCY_SEC_BUCKETS.to_vec(),
490                registry,
491            )
492            .unwrap(),
493            db_checkpoint_latency: register_histogram_with_registry!(
494                "db_checkpoint_latency",
495                "Latency of checkpointing dbs",
496                LATENCY_SEC_BUCKETS.to_vec(),
497                registry,
498            ).unwrap(),
499            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
500                "transaction_manager_num_enqueued_certificates",
501                "Current number of certificates enqueued to TransactionManager",
502                &["result"],
503                registry,
504            )
505            .unwrap(),
506            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
507                "transaction_manager_num_missing_objects",
508                "Current number of missing objects in TransactionManager",
509                registry,
510            )
511            .unwrap(),
512            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
513                "transaction_manager_num_pending_certificates",
514                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
515                registry,
516            )
517            .unwrap(),
518            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
519                "transaction_manager_num_executing_certificates",
520                "Number of executing certificates, including queued and actually running certificates",
521                registry,
522            )
523            .unwrap(),
524            transaction_manager_num_ready: register_int_gauge_with_registry!(
525                "transaction_manager_num_ready",
526                "Number of ready transactions in TransactionManager",
527                registry,
528            )
529            .unwrap(),
530            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
531                "transaction_manager_object_cache_size",
532                "Current size of object-availability cache in TransactionManager",
533                registry,
534            )
535            .unwrap(),
536            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
537                "transaction_manager_object_cache_hits",
538                "Number of object-availability cache hits in TransactionManager",
539                registry,
540            )
541            .unwrap(),
542            authority_overload_status: register_int_gauge_with_registry!(
543                "authority_overload_status",
544                "Whether authority is current experiencing overload and enters load shedding mode.",
545                registry)
546            .unwrap(),
547            authority_load_shedding_percentage: register_int_gauge_with_registry!(
548                "authority_load_shedding_percentage",
549                "The percentage of transactions is shed when the authority is in load shedding mode.",
550                registry)
551            .unwrap(),
552            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
553                "transaction_manager_object_cache_misses",
554                "Number of object-availability cache misses in TransactionManager",
555                registry,
556            )
557            .unwrap(),
558            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
559                "transaction_manager_object_cache_evictions",
560                "Number of object-availability cache evictions in TransactionManager",
561                registry,
562            )
563            .unwrap(),
564            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
565                "transaction_manager_package_cache_size",
566                "Current size of package-availability cache in TransactionManager",
567                registry,
568            )
569            .unwrap(),
570            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
571                "transaction_manager_package_cache_hits",
572                "Number of package-availability cache hits in TransactionManager",
573                registry,
574            )
575            .unwrap(),
576            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
577                "transaction_manager_package_cache_misses",
578                "Number of package-availability cache misses in TransactionManager",
579                registry,
580            )
581            .unwrap(),
582            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
583                "transaction_manager_package_cache_evictions",
584                "Number of package-availability cache evictions in TransactionManager",
585                registry,
586            )
587            .unwrap(),
588            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
589                "transaction_manager_transaction_queue_age_s",
590                "Time spent in waiting for transaction in the queue",
591                LATENCY_SEC_BUCKETS.to_vec(),
592                registry,
593            )
594            .unwrap(),
595            transaction_overload_sources: register_int_counter_vec_with_registry!(
596                "transaction_overload_sources",
597                "Number of times each source indicates transaction overload.",
598                &["source"],
599                registry)
600            .unwrap(),
601            execution_driver_executed_transactions: register_int_counter_with_registry!(
602                "execution_driver_executed_transactions",
603                "Cumulative number of transaction executed by execution driver",
604                registry,
605            )
606            .unwrap(),
607            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
608                "execution_driver_dispatch_queue",
609                "Number of transaction pending in execution driver dispatch queue",
610                registry,
611            )
612            .unwrap(),
613            execution_queueing_delay_s: register_histogram_with_registry!(
614                "execution_queueing_delay_s",
615                "Queueing delay between a transaction is ready for execution until it starts executing.",
616                LATENCY_SEC_BUCKETS.to_vec(),
617                registry
618            )
619            .unwrap(),
620            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
621                "prepare_cert_gas_latency_ratio",
622                "The ratio of computation gas divided by VM execution latency.",
623                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
624                registry
625            )
626            .unwrap(),
627            execution_gas_latency_ratio: register_histogram_with_registry!(
628                "execution_gas_latency_ratio",
629                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
630                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
631                registry
632            )
633            .unwrap(),
634            skipped_consensus_txns: register_int_counter_with_registry!(
635                "skipped_consensus_txns",
636                "Total number of consensus transactions skipped",
637                registry,
638            )
639            .unwrap(),
640            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
641                "skipped_consensus_txns_cache_hit",
642                "Total number of consensus transactions skipped because of local cache hit",
643                registry,
644            )
645            .unwrap(),
646            post_processing_total_events_emitted: register_int_counter_with_registry!(
647                "post_processing_total_events_emitted",
648                "Total number of events emitted in post processing",
649                registry,
650            )
651            .unwrap(),
652            post_processing_total_tx_indexed: register_int_counter_with_registry!(
653                "post_processing_total_tx_indexed",
654                "Total number of txes indexed in post processing",
655                registry,
656            )
657            .unwrap(),
658            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
659                "post_processing_total_tx_had_event_processed",
660                "Total number of txes finished event processing in post processing",
661                registry,
662            )
663            .unwrap(),
664            post_processing_total_failures: register_int_counter_with_registry!(
665                "post_processing_total_failures",
666                "Total number of failure in post processing",
667                registry,
668            )
669            .unwrap(),
670            consensus_handler_processed: register_int_counter_vec_with_registry!(
671                "consensus_handler_processed",
672                "Number of transactions processed by consensus handler",
673                &["class"],
674                registry
675            ).unwrap(),
676            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
677                "consensus_handler_transaction_sizes",
678                "Sizes of each type of transactions processed by consensus handler",
679                &["class"],
680                POSITIVE_INT_BUCKETS.to_vec(),
681                registry
682            ).unwrap(),
683            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
684                "consensus_handler_num_low_scoring_authorities",
685                "Number of low scoring authorities based on reputation scores from consensus",
686                registry
687            ).unwrap(),
688            consensus_handler_scores: register_int_gauge_vec_with_registry!(
689                "consensus_handler_scores",
690                "scores from consensus for each authority",
691                &["authority"],
692                registry,
693            ).unwrap(),
694            validator_scoreboard_scores: register_int_gauge_vec_with_registry!(
695                "validator_scoreboard_scores",
696                "Per-authority validator scores published by the local Scoreboard after each consensus commit. Range [0, MAX_SCORE].",
697                &["authority"],
698                registry,
699            ).unwrap(),
700            invalid_misbehavior_reports_by_authority: register_int_gauge_vec_with_registry!(
701                "invalid_misbehavior_reports_by_authority",
702                "Cumulative count of invalid misbehavior reports received from each reporting authority in the current epoch. Bumped when a `MisbehaviorReport` consensus transaction fails sender/authority match or payload validation. Snapshot republished after each consensus commit.",
703                &["authority"],
704                registry,
705            ).unwrap(),
706            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
707                "consensus_handler_deferred_transactions",
708                "Number of transactions deferred by consensus handler",
709                registry,
710            ).unwrap(),
711            consensus_handler_congested_transactions: register_int_counter_with_registry!(
712                "consensus_handler_congested_transactions",
713                "Number of transactions deferred by consensus handler due to congestion",
714                registry,
715            ).unwrap(),
716            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
717                "consensus_handler_cancelled_transactions",
718                "Number of transactions cancelled by consensus handler",
719                registry,
720            ).unwrap(),
721            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
722                "consensus_handler_max_congestion_control_object_costs",
723                "Max object costs for congestion control in the current consensus commit",
724                &["commit_type"],
725                registry,
726            ).unwrap(),
727            consensus_committed_subdags: register_int_counter_vec_with_registry!(
728                "consensus_committed_subdags",
729                "Number of committed subdags, sliced by leader",
730                &["authority"],
731                registry,
732            ).unwrap(),
733            consensus_committed_messages: register_int_gauge_vec_with_registry!(
734                "consensus_committed_messages",
735                "Total number of committed consensus messages, sliced by author",
736                &["authority"],
737                registry,
738            ).unwrap(),
739            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
740                "consensus_committed_user_transactions",
741                "Number of committed user transactions, sliced by submitter",
742                &["authority"],
743                registry,
744            ).unwrap(),
745            consensus_handler_leader_round: register_int_gauge_with_registry!(
746                "consensus_handler_leader_round",
747                "The leader round of the current consensus output being processed in the consensus handler",
748                registry,
749            ).unwrap(),
750            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
751            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
752            multisig_sig_count: register_int_counter_with_registry!(
753                "multisig_sig_count",
754                "Count of multisig signatures",
755                registry,
756            )
757            .unwrap(),
758            consensus_calculated_throughput: register_int_gauge_with_registry!(
759                "consensus_calculated_throughput",
760                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
761                registry,
762            ).unwrap(),
763            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
764                "consensus_calculated_throughput_profile",
765                "The current active calculated throughput profile",
766                registry
767            ).unwrap(),
768            execution_queueing_latency: LatencyObserver::new(),
769            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
770            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
771        }
772    }
773
774    /// Reset metrics that contain `hostname` as one of the labels. This is
775    /// needed to avoid retaining metrics for long-gone committee members and
776    /// only exposing metrics for the committee in the current epoch.
777    pub fn reset_on_reconfigure(&self) {
778        self.consensus_committed_messages.reset();
779        self.consensus_handler_scores.reset();
780        self.validator_scoreboard_scores.reset();
781        self.invalid_misbehavior_reports_by_authority.reset();
782        self.consensus_committed_user_transactions.reset();
783    }
784}
785
786/// a Trait object for `Signer` that is:
787/// - Pin, i.e. confined to one place in memory (we don't want to copy private
788///   keys).
789/// - Sync, i.e. can be safely shared between threads.
790///
791/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
792pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
793
794pub struct AuthorityState {
795    // Fixed size, static, identity of the authority
796    /// The name of this authority.
797    pub name: AuthorityName,
798    /// The signature key of the authority.
799    pub secret: StableSyncAuthoritySigner,
800
801    /// The database
802    input_loader: TransactionInputLoader,
803    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
804
805    epoch_store: ArcSwap<AuthorityPerEpochStore>,
806
807    /// This lock denotes current 'execution epoch'.
808    /// Execution acquires read lock, checks certificate epoch and holds it
809    /// until all writes are complete. Reconfiguration acquires write lock,
810    /// changes the epoch and revert all transactions from previous epoch
811    /// that are executed but did not make into checkpoint.
812    execution_lock: RwLock<EpochId>,
813
814    pub indexes: Option<Arc<IndexStore>>,
815    pub grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
816
817    pub subscription_handler: Arc<SubscriptionHandler>,
818    pub checkpoint_store: Arc<CheckpointStore>,
819
820    committee_store: Arc<CommitteeStore>,
821
822    /// Manages pending certificates and their missing input objects.
823    transaction_manager: Arc<TransactionManager>,
824
825    /// Shuts down the execution task. Used only in testing.
826    #[cfg_attr(not(test), expect(unused))]
827    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
828
829    pub metrics: Arc<AuthorityMetrics>,
830    _pruner: AuthorityStorePruner,
831    authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
832    checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
833
834    /// Take db checkpoints of different dbs
835    db_checkpoint_config: DBCheckpointConfig,
836
837    pub config: NodeConfig,
838
839    /// Current overload status in this authority. Updated periodically.
840    pub overload_info: AuthorityOverloadInfo,
841
842    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
843
844    /// The chain identifier is derived from the digest of the genesis
845    /// checkpoint.
846    chain_identifier: ChainIdentifier,
847
848    pub(crate) congestion_tracker: Arc<CongestionTracker>,
849
850    /// Traffic controller for IOTA core servers (json-rpc, validator service)
851    pub traffic_controller: Option<Arc<TrafficController>>,
852}
853
854/// The authority state encapsulates all state, drives execution, and ensures
855/// safety.
856///
857/// Note the authority operations can be accessed through a read ref (&) and do
858/// not require &mut. Internally a database is synchronized through a mutex
859/// lock.
860///
861/// Repeating valid commands should produce no changes and return no error.
862impl AuthorityState {
863    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
864        epoch_store.committee().authority_exists(&self.name)
865    }
866
867    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
868        epoch_store
869            .active_validators()
870            .iter()
871            .any(|a| AuthorityName::from(a) == self.name)
872    }
873
874    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
875        !self.is_committee_validator(epoch_store)
876    }
877
878    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
879        &self.committee_store
880    }
881
882    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
883        self.committee_store.clone()
884    }
885
886    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
887        &self.config.authority_overload_config
888    }
889
890    pub fn get_epoch_state_commitments(
891        &self,
892        epoch: EpochId,
893    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
894        self.checkpoint_store.get_epoch_state_commitments(epoch)
895    }
896
897    /// This is a private method and should be kept that way. It doesn't check
898    /// whether the provided transaction is a system transaction, and hence
899    /// can only be called internally.
900    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
901    async fn handle_transaction_impl(
902        &self,
903        transaction: VerifiedTransaction,
904        epoch_store: &Arc<AuthorityPerEpochStore>,
905    ) -> IotaResult<VerifiedSignedTransaction> {
906        // Ensure that validator cannot reconfigure while we are signing the tx
907        let _execution_lock = self.execution_lock_for_signing()?;
908
909        let protocol_config = epoch_store.protocol_config();
910        let reference_gas_price = epoch_store.reference_gas_price();
911
912        let epoch = epoch_store.epoch();
913
914        let tx_data = transaction.data().transaction_data();
915
916        // Note: the deny checks may do redundant package loads but:
917        // - they only load packages when there is an active package deny map
918        // - the loads are cached anyway
919        iota_transaction_checks::deny::check_transaction_for_signing(
920            tx_data,
921            transaction.tx_signatures(),
922            &transaction.input_objects()?,
923            &tx_data.receiving_objects(),
924            &self.config.transaction_deny_config,
925            self.get_backing_package_store().as_ref(),
926        )?;
927
928        // Load all transaction-related input objects including ones for every
929        // `MoveAuthenticator`. Loading all objects eagerly means that any invalid
930        // reference — missing object, wrong version, inaccessible object — causes a
931        // pre-consensus rejection.
932        let (tx_input_objects, tx_receiving_objects, per_authenticator_inputs) =
933            self.read_objects_for_signing(&transaction, epoch)?;
934
935        let move_authenticators = transaction.move_authenticators();
936
937        // Check the inputs for signing.
938        // If there are `MoveAuthenticator` signatures, their input objects and the
939        // account objects are also checked and must be provided.
940        // It is also checked if there is enough gas to execute the transaction and its
941        // authenticators.
942        let (gas_status, tx_checked_input_objects, per_authenticator_checked_inputs) = self
943            .check_transaction_inputs_for_signing(
944                protocol_config,
945                reference_gas_price,
946                tx_data,
947                tx_input_objects,
948                &tx_receiving_objects,
949                &move_authenticators,
950                per_authenticator_inputs,
951            )?;
952
953        // Get the input objects for the authenticators, if there are
954        // `MoveAuthenticator`s.
955        let per_authenticator_checked_input_objects = per_authenticator_checked_inputs
956            .iter()
957            .map(|i| &i.0)
958            .collect();
959
960        // Check if any of the sender, the transaction input objects, the receiving
961        // objects and the authenticator input objects are in the coin deny
962        // list, which would prevent the transaction from being signed.
963        check_coin_deny_list_v1_during_signing(
964            tx_data.sender(),
965            &tx_checked_input_objects,
966            &tx_receiving_objects,
967            &per_authenticator_checked_input_objects,
968            &self.get_object_store(),
969        )?;
970
971        let (kind, signer, gas_data) = tx_data.execution_parts();
972
973        let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
974            extract_auth_fun_refs(signer, gas_data.owner, |address| {
975                move_authenticators
976                    .iter()
977                    .zip(per_authenticator_checked_inputs.iter())
978                    .find(|(move_authenticator, _)| {
979                        move_authenticator.address().ok().as_ref() == Some(&address)
980                    })
981                    .map(|(_, (_, auth_fun_ref))| auth_fun_ref.clone())
982            });
983
984        // Filter the authenticators and their checked inputs down to those that must
985        // be executed pre-consensus. This is done *after* the deny-list check so
986        // that all MoveAuthenticator input objects are covered by that check regardless
987        // of deferral.
988        let pre_consensus_move_authenticators =
989            pre_consensus_move_authenticators(&transaction, protocol_config);
990        let (move_authenticators, per_authenticator_checked_inputs): (Vec<_>, Vec<_>) =
991            move_authenticators
992                .into_iter()
993                .zip(per_authenticator_checked_inputs)
994                .filter(|(a, _)| pre_consensus_move_authenticators.contains(a))
995                .unzip();
996        let per_authenticator_checked_input_objects: Vec<_> = per_authenticator_checked_inputs
997            .iter()
998            .map(|i| &i.0)
999            .collect();
1000
1001        // If there are `MoveAuthenticator` signatures, execute them and check if they
1002        // all succeed.
1003        if !move_authenticators.is_empty() {
1004            let aggregated_authenticator_input_objects =
1005                iota_transaction_checks::aggregate_authenticator_input_objects(
1006                    &per_authenticator_checked_input_objects,
1007                )?;
1008
1009            debug_assert_eq!(
1010                move_authenticators.len(),
1011                per_authenticator_checked_inputs.len(),
1012                "Move authenticators amount must match the number of checked authenticator inputs"
1013            );
1014
1015            let move_authenticators = move_authenticators
1016                .into_iter()
1017                .zip(per_authenticator_checked_inputs)
1018                .map(
1019                    |(
1020                        move_authenticator,
1021                        (authenticator_checked_input_objects, authenticator_function_ref),
1022                    )| {
1023                        (
1024                            move_authenticator.to_owned(),
1025                            authenticator_function_ref,
1026                            authenticator_checked_input_objects,
1027                        )
1028                    },
1029                )
1030                .collect();
1031
1032            // It is supposed that `MoveAuthenticator` availability is checked in
1033            // `SenderSignedData::validity_check`.
1034
1035            // Serialize the TransactionData for the auth context before decomposing.
1036            let tx_data_bytes =
1037                bcs::to_bytes(&tx_data).expect("TransactionData serialization cannot fail");
1038
1039            let (sender_auth_digest, sponsor_auth_digest) =
1040                transaction.data().compute_auth_digests()?;
1041
1042            let auth_context_data = AuthContextData {
1043                transaction_data_bytes: tx_data_bytes,
1044                sender_auth_digest,
1045                sponsor_auth_digest,
1046                sender_authenticator_function_ref,
1047                sponsor_authenticator_function_ref,
1048            };
1049
1050            // Execute the Move authenticators.
1051            let validation_result = epoch_store.executor().authenticate_transaction(
1052                self.get_backing_store().as_ref(),
1053                protocol_config,
1054                self.metrics.limits_metrics.clone(),
1055                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1056                epoch_store
1057                    .epoch_start_config()
1058                    .epoch_data()
1059                    .epoch_start_timestamp(),
1060                gas_data,
1061                gas_status,
1062                move_authenticators,
1063                aggregated_authenticator_input_objects,
1064                kind,
1065                signer,
1066                transaction.digest().to_owned(),
1067                auth_context_data,
1068                &mut None,
1069            );
1070
1071            if let Err(validation_error) = validation_result {
1072                return Err(IotaError::MoveAuthenticatorExecutionFailure {
1073                    error: validation_error.to_string(),
1074                });
1075            }
1076        }
1077
1078        let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
1079
1080        let signed_transaction =
1081            VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
1082
1083        // Check and write locks, to signed transaction, into the database
1084        // The call to self.set_transaction_lock checks the lock is not conflicting,
1085        // and returns ConflictingTransaction error in case there is a lock on a
1086        // different existing transaction.
1087        self.get_cache_writer().try_acquire_transaction_locks(
1088            epoch_store,
1089            &owned_objects,
1090            signed_transaction.clone(),
1091        )?;
1092
1093        Ok(signed_transaction)
1094    }
1095
1096    /// Initiate a new transaction.
1097    #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()))]
1098    pub async fn handle_transaction(
1099        &self,
1100        epoch_store: &Arc<AuthorityPerEpochStore>,
1101        transaction: VerifiedTransaction,
1102    ) -> IotaResult<HandleTransactionResponse> {
1103        let tx_digest = *transaction.digest();
1104        debug!("handle_transaction");
1105
1106        // Ensure an idempotent answer.
1107        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1108            return Ok(HandleTransactionResponse { status });
1109        }
1110
1111        let _metrics_guard = self
1112            .metrics
1113            .authority_state_handle_transaction_latency
1114            .start_timer();
1115        self.metrics.tx_orders.inc();
1116
1117        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1118        match signed {
1119            Ok(s) => {
1120                if self.is_committee_validator(epoch_store) {
1121                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1122                        let tx = s.clone();
1123                        let validator_tx_finalizer = validator_tx_finalizer.clone();
1124                        let cache_reader = self.get_transaction_cache_reader().clone();
1125                        let epoch_store = epoch_store.clone();
1126                        spawn_monitored_task!(epoch_store.within_alive_epoch(
1127                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1128                        ));
1129                    }
1130                }
1131                Ok(HandleTransactionResponse {
1132                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
1133                })
1134            }
1135            // It happens frequently that while we are checking the validity of the transaction, it
1136            // has just been executed.
1137            // In that case, we could still return Ok to avoid showing confusing errors.
1138            Err(err) => Ok(HandleTransactionResponse {
1139                status: self
1140                    .get_transaction_status(&tx_digest, epoch_store)?
1141                    .ok_or(err)?
1142                    .1,
1143            }),
1144        }
1145    }
1146
1147    pub fn check_system_overload_at_signing(&self) -> bool {
1148        self.config
1149            .authority_overload_config
1150            .check_system_overload_at_signing
1151    }
1152
1153    pub fn check_system_overload_at_execution(&self) -> bool {
1154        self.config
1155            .authority_overload_config
1156            .check_system_overload_at_execution
1157    }
1158
1159    pub(crate) fn check_system_overload(
1160        &self,
1161        consensus_adapter: &Arc<ConsensusAdapter>,
1162        tx_data: &SenderSignedData,
1163        do_authority_overload_check: bool,
1164    ) -> IotaResult {
1165        if do_authority_overload_check {
1166            self.check_authority_overload(tx_data).tap_err(|_| {
1167                self.update_overload_metrics("execution_queue");
1168            })?;
1169        }
1170        self.transaction_manager
1171            .check_execution_overload(self.overload_config(), tx_data)
1172            .tap_err(|_| {
1173                self.update_overload_metrics("execution_pending");
1174            })?;
1175        consensus_adapter.check_consensus_overload().tap_err(|_| {
1176            self.update_overload_metrics("consensus");
1177        })?;
1178
1179        let pending_tx_count = self
1180            .get_cache_commit()
1181            .approximate_pending_transaction_count();
1182        if pending_tx_count
1183            > self
1184                .config
1185                .execution_cache_config
1186                .writeback_cache
1187                .backpressure_threshold_for_rpc()
1188        {
1189            return Err(IotaError::ValidatorOverloadedRetryAfter {
1190                retry_after_secs: 10,
1191            });
1192        }
1193
1194        Ok(())
1195    }
1196
1197    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1198        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1199            return Ok(());
1200        }
1201
1202        let load_shedding_percentage = self
1203            .overload_info
1204            .load_shedding_percentage
1205            .load(Ordering::Relaxed);
1206        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1207    }
1208
1209    fn update_overload_metrics(&self, source: &str) {
1210        self.metrics
1211            .transaction_overload_sources
1212            .with_label_values(&[source])
1213            .inc();
1214    }
1215
1216    /// Executes a certificate for its effects.
1217    #[instrument(level = "trace", skip_all)]
1218    pub async fn execute_certificate(
1219        &self,
1220        certificate: &VerifiedCertificate,
1221        epoch_store: &Arc<AuthorityPerEpochStore>,
1222    ) -> IotaResult<TransactionEffects> {
1223        let _metrics_guard = if certificate.contains_shared_object() {
1224            self.metrics
1225                .execute_certificate_latency_shared_object
1226                .start_timer()
1227        } else {
1228            self.metrics
1229                .execute_certificate_latency_single_writer
1230                .start_timer()
1231        };
1232        trace!("execute_certificate");
1233
1234        self.metrics.total_cert_attempts.inc();
1235
1236        if !certificate.contains_shared_object() {
1237            // Shared object transactions need to be sequenced by the consensus before
1238            // enqueueing for execution, done in
1239            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1240            // object transactions, they can be enqueued for execution immediately.
1241            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1242        }
1243
1244        // tx could be reverted when epoch ends, so we must be careful not to return a
1245        // result here after the epoch ends.
1246        epoch_store
1247            .within_alive_epoch(self.notify_read_effects(certificate))
1248            .await
1249            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1250            .and_then(|r| r)
1251    }
1252
1253    /// Internal logic to execute a certificate.
1254    ///
1255    /// Guarantees that
1256    /// - If input objects are available, return no permanent failure.
1257    /// - Execution and output commit are atomic. i.e. outputs are only written
1258    ///   to storage,
1259    /// on successful execution; crashed execution has no observable effect and
1260    /// can be retried.
1261    ///
1262    /// It is caller's responsibility to ensure input objects are available and
1263    /// locks are set. If this cannot be satisfied by the caller,
1264    /// execute_certificate() should be called instead.
1265    ///
1266    /// Should only be called within iota-core.
1267    #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1268    pub fn try_execute_immediately(
1269        &self,
1270        certificate: &VerifiedExecutableTransaction,
1271        expected_effects_digest: Option<TransactionEffectsDigest>,
1272        epoch_store: &Arc<AuthorityPerEpochStore>,
1273    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1274        let _scope = monitored_scope("Execution::try_execute_immediately");
1275        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1276
1277        let tx_digest = certificate.digest();
1278
1279        // Acquire a lock to prevent concurrent executions of the same transaction.
1280        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1281
1282        // The cert could have been processed by a concurrent attempt of the same cert,
1283        // so check if the effects have already been written.
1284        if let Some(effects) = self
1285            .get_transaction_cache_reader()
1286            .try_get_executed_effects(tx_digest)?
1287        {
1288            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1289                assert_eq!(
1290                    effects.digest(),
1291                    expected_effects_digest_inner,
1292                    "Unexpected effects digest for transaction {tx_digest}"
1293                );
1294            }
1295            tx_guard.release();
1296            return Ok((effects, None));
1297        }
1298
1299        let (tx_input_objects, per_authenticator_inputs) =
1300            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1301
1302        // If no expected_effects_digest was provided, try to get it from storage.
1303        // We could be re-executing a previously executed but uncommitted transaction,
1304        // perhaps after restarting with a new binary. In this situation, if
1305        // we have published an effects signature, we must be sure not to
1306        // equivocate.
1307        let expected_effects_digest =
1308            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1309
1310        self.process_certificate(
1311            tx_guard,
1312            certificate,
1313            tx_input_objects,
1314            per_authenticator_inputs,
1315            expected_effects_digest,
1316            epoch_store,
1317        )
1318        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1319        .tap_ok(
1320            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1321        )
1322    }
1323
1324    pub fn read_objects_for_execution(
1325        &self,
1326        tx_lock: &CertLockGuard,
1327        certificate: &VerifiedExecutableTransaction,
1328        epoch_store: &Arc<AuthorityPerEpochStore>,
1329    ) -> IotaResult<(InputObjects, Vec<(InputObjects, ObjectReadResult)>)> {
1330        let _scope = monitored_scope("Execution::load_input_objects");
1331        let _metrics_guard = self
1332            .metrics
1333            .execution_load_input_objects_latency
1334            .start_timer();
1335
1336        let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1337
1338        let input_objects = self.input_loader.read_objects_for_execution(
1339            epoch_store,
1340            &certificate.key(),
1341            tx_lock,
1342            &input_objects,
1343            epoch_store.epoch(),
1344        )?;
1345
1346        certificate.split_input_objects_into_groups_for_reading(input_objects)
1347    }
1348
1349    /// Test only wrapper for `try_execute_immediately()` above, useful for
1350    /// checking errors if the pre-conditions are not satisfied, and
1351    /// executing change epoch transactions.
1352    pub fn try_execute_for_test(
1353        &self,
1354        certificate: &VerifiedCertificate,
1355    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1356        let epoch_store = self.epoch_store_for_testing();
1357        let (effects, execution_error_opt) = self.try_execute_immediately(
1358            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1359            None,
1360            &epoch_store,
1361        )?;
1362        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1363        Ok((signed_effects, execution_error_opt))
1364    }
1365
1366    /// Non-fallible version of `try_execute_for_test()`.
1367    pub fn execute_for_test(
1368        &self,
1369        certificate: &VerifiedCertificate,
1370    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1371        self.try_execute_for_test(certificate)
1372            .expect("try_execute_for_test should not fail")
1373    }
1374
1375    pub async fn notify_read_effects(
1376        &self,
1377        certificate: &VerifiedCertificate,
1378    ) -> IotaResult<TransactionEffects> {
1379        self.get_transaction_cache_reader()
1380            .try_notify_read_executed_effects(&[*certificate.digest()])
1381            .await
1382            .map(|mut r| r.pop().expect("must return correct number of effects"))
1383    }
1384
1385    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1386        self.get_object_cache_reader()
1387            .try_check_owned_objects_are_live(owned_object_refs)
1388    }
1389
1390    /// This function captures the required state to debug a forked transaction.
1391    /// The dump is written to a file in dir `path`, with name prefixed by the
1392    /// transaction digest. NOTE: Since this info escapes the validator
1393    /// context, make sure not to leak any private info here
1394    pub(crate) fn debug_dump_transaction_state(
1395        &self,
1396        tx_digest: &TransactionDigest,
1397        effects: &TransactionEffects,
1398        expected_effects_digest: TransactionEffectsDigest,
1399        inner_temporary_store: &InnerTemporaryStore,
1400        certificate: &VerifiedExecutableTransaction,
1401        debug_dump_config: &StateDebugDumpConfig,
1402    ) -> IotaResult<PathBuf> {
1403        // Fall back to the OS temp directory if no dump directory is configured.
1404        // This is safe: dump files are named by transaction digest, so no collisions.
1405        let dump_dir = debug_dump_config
1406            .dump_file_directory
1407            .as_ref()
1408            .cloned()
1409            .unwrap_or(std::env::temp_dir());
1410        let epoch_store = self.load_epoch_store_one_call_per_task();
1411
1412        NodeStateDump::new(
1413            tx_digest,
1414            effects,
1415            expected_effects_digest,
1416            self.get_object_store().as_ref(),
1417            &epoch_store,
1418            inner_temporary_store,
1419            certificate,
1420        )?
1421        .write_to_file(&dump_dir)
1422        .map_err(|e| IotaError::FileIO(e.to_string()))
1423    }
1424
1425    #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?certificate.digest(), sender = ?certificate.data().transaction_data().gas_owner().to_string()))]
1426    pub(crate) fn process_certificate(
1427        &self,
1428        tx_guard: CertTxGuard,
1429        certificate: &VerifiedExecutableTransaction,
1430        tx_input_objects: InputObjects,
1431        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1432        expected_effects_digest: Option<TransactionEffectsDigest>,
1433        epoch_store: &Arc<AuthorityPerEpochStore>,
1434    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1435        let process_certificate_start_time = tokio::time::Instant::now();
1436        let digest = *certificate.digest();
1437
1438        let _scope = monitored_scope("Execution::process_certificate");
1439
1440        fail_point_if!("correlated-crash-process-certificate", || {
1441            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1442                iota_simulator::task::kill_current_node(None);
1443            }
1444        });
1445
1446        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1447        // Any caller that verifies the signatures on the certificate will have already
1448        // checked the epoch. But paths that don't verify sigs (e.g. execution
1449        // from checkpoint, reading from db) present the possibility of an epoch
1450        // mismatch. If this cert is not finalzied in previous epoch, then it's
1451        // invalid.
1452        let execution_guard = match execution_guard {
1453            Ok(execution_guard) => execution_guard,
1454            Err(err) => {
1455                tx_guard.release();
1456                return Err(err);
1457            }
1458        };
1459        // Since we obtain a reference to the epoch store before taking the execution
1460        // lock, it's possible that reconfiguration has happened and they no
1461        // longer match.
1462        if *execution_guard != epoch_store.epoch() {
1463            tx_guard.release();
1464            info!("The epoch of the execution_guard doesn't match the epoch store");
1465            return Err(IotaError::WrongEpoch {
1466                expected_epoch: epoch_store.epoch(),
1467                actual_epoch: *execution_guard,
1468            });
1469        }
1470
1471        // Errors originating from prepare_certificate may be transient (failure to read
1472        // locks) or non-transient (transaction input is invalid, move vm
1473        // errors). However, all errors from this function occur before we have
1474        // written anything to the db, so we commit the tx guard and rely on the
1475        // client to retry the tx (if it was transient).
1476        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1477            &execution_guard,
1478            certificate,
1479            tx_input_objects,
1480            per_authenticator_inputs,
1481            epoch_store,
1482        ) {
1483            Err(e) => {
1484                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1485                tx_guard.release();
1486                return Err(e);
1487            }
1488            Ok(res) => res,
1489        };
1490
1491        if let Some(expected_effects_digest) = expected_effects_digest {
1492            if effects.digest() != expected_effects_digest {
1493                // We dont want to mask the original error, so we log it and continue.
1494                match self.debug_dump_transaction_state(
1495                    &digest,
1496                    &effects,
1497                    expected_effects_digest,
1498                    &inner_temporary_store,
1499                    certificate,
1500                    &self.config.state_debug_dump_config,
1501                ) {
1502                    Ok(out_path) => {
1503                        info!(
1504                            "Dumped node state for transaction {} to {}",
1505                            digest,
1506                            out_path.as_path().display().to_string()
1507                        );
1508                    }
1509                    Err(e) => {
1510                        error!("Error dumping state for transaction {}: {e}", digest);
1511                    }
1512                }
1513                error!(
1514                    tx_digest = ?digest,
1515                    ?expected_effects_digest,
1516                    actual_effects = ?effects,
1517                    "fork detected!"
1518                );
1519                panic!(
1520                    "Transaction {} is expected to have effects digest {}, but got {}!",
1521                    digest,
1522                    expected_effects_digest,
1523                    effects.digest(),
1524                );
1525            }
1526        }
1527
1528        fail_point!("crash");
1529
1530        self.commit_certificate(
1531            certificate,
1532            inner_temporary_store,
1533            &effects,
1534            tx_guard,
1535            execution_guard,
1536            epoch_store,
1537        )?;
1538
1539        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1540        if elapsed > 0.0 {
1541            self.metrics
1542                .execution_gas_latency_ratio
1543                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1544        };
1545        Ok((effects, execution_error_opt))
1546    }
1547
1548    pub async fn reconfigure_traffic_control(
1549        &self,
1550        params: TrafficControlReconfigParams,
1551    ) -> Result<TrafficControlReconfigParams, IotaError> {
1552        if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1553            traffic_controller.admin_reconfigure(params).await
1554        } else {
1555            Err(IotaError::InvalidAdminRequest(
1556                "Traffic controller is not configured on this node".to_string(),
1557            ))
1558        }
1559    }
1560
1561    #[instrument(level = "trace", skip_all)]
1562    fn commit_certificate(
1563        &self,
1564        certificate: &VerifiedExecutableTransaction,
1565        inner_temporary_store: InnerTemporaryStore,
1566        effects: &TransactionEffects,
1567        tx_guard: CertTxGuard,
1568        _execution_guard: ExecutionLockReadGuard<'_>,
1569        epoch_store: &Arc<AuthorityPerEpochStore>,
1570    ) -> IotaResult {
1571        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1572            monitored_scope("Execution::commit_certificate");
1573        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1574
1575        let tx_key = certificate.key();
1576        let tx_digest = certificate.digest();
1577        let input_object_count = inner_temporary_store.input_objects.len();
1578        let shared_object_count = effects.input_shared_objects().len();
1579
1580        let output_keys = inner_temporary_store.get_output_keys(effects);
1581
1582        // index certificate
1583        let _ = self
1584            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1585            .tap_err(|e| {
1586                self.metrics.post_processing_total_failures.inc();
1587                error!(?tx_digest, "tx post processing failed: {e}");
1588            });
1589
1590        // The insertion to epoch_store is not atomic with the insertion to the
1591        // perpetual store. This is OK because we insert to the epoch store
1592        // first. And during lookups we always look up in the perpetual store first.
1593        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1594
1595        // Allow testing what happens if we crash here.
1596        fail_point!("crash");
1597
1598        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1599            certificate.clone().into_unsigned(),
1600            effects.clone(),
1601            inner_temporary_store,
1602        );
1603        self.get_cache_writer()
1604            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1605
1606        if certificate.transaction_data().is_end_of_epoch_tx() {
1607            // At the end of epoch, since system packages may have been upgraded, force
1608            // reload them in the cache.
1609            self.get_object_cache_reader()
1610                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1611        }
1612
1613        // commit_certificate finished, the tx is fully committed to the store.
1614        tx_guard.commit_tx();
1615
1616        // Notifies transaction manager about transaction and output objects committed.
1617        // This provides necessary information to transaction manager to start executing
1618        // additional ready transactions.
1619        self.transaction_manager
1620            .notify_commit(tx_digest, output_keys, epoch_store);
1621
1622        self.update_metrics(certificate, input_object_count, shared_object_count);
1623
1624        Ok(())
1625    }
1626
1627    fn update_metrics(
1628        &self,
1629        certificate: &VerifiedExecutableTransaction,
1630        input_object_count: usize,
1631        shared_object_count: usize,
1632    ) {
1633        // count signature by scheme, for multisig
1634        if certificate.has_upgraded_multisig() {
1635            self.metrics.multisig_sig_count.inc();
1636        }
1637
1638        self.metrics.total_effects.inc();
1639        self.metrics.total_certs.inc();
1640
1641        if shared_object_count > 0 {
1642            self.metrics.shared_obj_tx.inc();
1643        }
1644
1645        if certificate.is_sponsored_tx() {
1646            self.metrics.sponsored_tx.inc();
1647        }
1648
1649        self.metrics
1650            .num_input_objs
1651            .observe(input_object_count as f64);
1652        self.metrics
1653            .num_shared_objects
1654            .observe(shared_object_count as f64);
1655        self.metrics.batch_size.observe(
1656            certificate
1657                .data()
1658                .intent_message()
1659                .value
1660                .kind()
1661                .num_commands() as f64,
1662        );
1663    }
1664
1665    /// prepare_certificate validates the transaction input, and executes the
1666    /// certificate, returning effects, output objects, events, etc.
1667    ///
1668    /// It reads state from the db (both owned and shared locks), but it has no
1669    /// side effects.
1670    ///
1671    /// It can be generally understood that a failure of prepare_certificate
1672    /// indicates a non-transient error, e.g. the transaction input is
1673    /// somehow invalid, the correct locks are not held, etc. However, this
1674    /// is not entirely true, as a transient db read error may also cause
1675    /// this function to fail.
1676    #[instrument(level = "trace", skip_all)]
1677    fn prepare_certificate(
1678        &self,
1679        _execution_guard: &ExecutionLockReadGuard<'_>,
1680        certificate: &VerifiedExecutableTransaction,
1681        tx_input_objects: InputObjects,
1682        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1683        epoch_store: &Arc<AuthorityPerEpochStore>,
1684    ) -> IotaResult<(
1685        InnerTemporaryStore,
1686        TransactionEffects,
1687        Option<ExecutionError>,
1688    )> {
1689        let _scope = monitored_scope("Execution::prepare_certificate");
1690        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1691        let prepare_certificate_start_time = tokio::time::Instant::now();
1692
1693        let protocol_config = epoch_store.protocol_config();
1694
1695        let reference_gas_price = epoch_store.reference_gas_price();
1696
1697        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1698        let epoch_start_timestamp = epoch_store
1699            .epoch_start_config()
1700            .epoch_data()
1701            .epoch_start_timestamp();
1702
1703        let backing_store = self.get_backing_store().as_ref();
1704
1705        let tx_digest = *certificate.digest();
1706
1707        // TODO: We need to move this to a more appropriate place to avoid redundant
1708        // checks.
1709        let tx_data = certificate.data().transaction_data();
1710        tx_data.validity_check(protocol_config)?;
1711
1712        let (kind, signer, gas_data) = tx_data.execution_parts();
1713
1714        let move_authenticators = certificate.move_authenticators();
1715
1716        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1717        let (inner_temp_store, _, mut effects, execution_error_opt) = if move_authenticators
1718            .is_empty()
1719        {
1720            // No Move authentication required, proceed to execute the transaction directly.
1721
1722            // The cost of partially re-auditing a transaction before execution is
1723            // tolerated.
1724            let (tx_gas_status, tx_checked_input_objects) =
1725                iota_transaction_checks::check_certificate_input(
1726                    certificate,
1727                    tx_input_objects,
1728                    protocol_config,
1729                    reference_gas_price,
1730                )?;
1731
1732            let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1733            self.check_owned_locks(&owned_object_refs)?;
1734            epoch_store.executor().execute_transaction_to_effects(
1735                backing_store,
1736                protocol_config,
1737                self.metrics.limits_metrics.clone(),
1738                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1739                // cyclic dependency w/ iota-adapter
1740                self.config
1741                    .expensive_safety_check_config
1742                    .enable_deep_per_tx_iota_conservation_check(),
1743                self.config.certificate_deny_config.certificate_deny_set(),
1744                &epoch_id,
1745                epoch_start_timestamp,
1746                tx_checked_input_objects,
1747                gas_data,
1748                tx_gas_status,
1749                kind,
1750                signer,
1751                tx_digest,
1752                &mut None,
1753            )
1754        } else {
1755            // One or more `MoveAuthenticator` signatures present — authenticate each and
1756            // then execute the transaction.
1757            // It is supposed that `MoveAuthenticator` availability is checked in
1758            // `SenderSignedData::validity_check`.
1759
1760            debug_assert_eq!(
1761                move_authenticators.len(),
1762                per_authenticator_inputs.len(),
1763                "Move authenticators amount must match the number of authenticator inputs"
1764            );
1765
1766            let per_authenticator_inputs = move_authenticators
1767                .iter()
1768                .zip(per_authenticator_inputs)
1769                .map(
1770                    |(move_authenticator, (authenticator_input_objects, account_object))| {
1771                        // Check basic `object_to_authenticate` preconditions and get its
1772                        // components.
1773                        let (
1774                            auth_account_object_id,
1775                            auth_account_object_seq_number,
1776                            auth_account_object_digest,
1777                        ) = move_authenticator.object_to_authenticate_components()?;
1778
1779                        let signer = move_authenticator.address()?;
1780
1781                        let authenticator_function_ref_for_execution = self.check_move_account(
1782                            auth_account_object_id,
1783                            auth_account_object_seq_number,
1784                            auth_account_object_digest,
1785                            account_object,
1786                            &signer,
1787                        )?;
1788
1789                        Ok((
1790                            authenticator_input_objects,
1791                            authenticator_function_ref_for_execution,
1792                        ))
1793                    },
1794                )
1795                .collect::<IotaResult<Vec<_>>>()?;
1796
1797            let per_authenticator_input_objects = per_authenticator_inputs
1798                .iter()
1799                .map(|(authenticator_input_objects, _)| authenticator_input_objects.clone())
1800                .collect::<Vec<_>>();
1801
1802            // Serialize the TransactionData for the auth context.
1803            let tx_data_bytes =
1804                bcs::to_bytes(tx_data).expect("TransactionData serialization cannot fail");
1805
1806            let (sender_auth_digest, sponsor_auth_digest) =
1807                certificate.data().compute_auth_digests()?;
1808
1809            // Check the `MoveAuthenticator` input objects.
1810            // The `MoveAuthenticator` receiving objects are checked on the signing step.
1811            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
1812            // not a part of the transaction data.
1813            let authenticator_gas_budget = protocol_config.max_auth_gas();
1814            let (
1815                gas_status,
1816                per_authenticator_checked_input_objects,
1817                authenticator_and_tx_checked_input_objects,
1818            ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1819                certificate,
1820                tx_input_objects,
1821                per_authenticator_input_objects,
1822                authenticator_gas_budget,
1823                protocol_config,
1824                reference_gas_price,
1825            )?;
1826
1827            debug_assert_eq!(
1828                move_authenticators.len(),
1829                per_authenticator_checked_input_objects.len(),
1830                "Move authenticators amount must match the number of checked authenticator inputs"
1831            );
1832
1833            let move_authenticators = move_authenticators
1834                .into_iter()
1835                .zip(per_authenticator_inputs)
1836                .zip(per_authenticator_checked_input_objects)
1837                .map(
1838                    |(
1839                        (move_authenticator, (_, authenticator_function_ref_for_execution)),
1840                        authenticator_checked_input_objects,
1841                    )| {
1842                        (
1843                            move_authenticator.to_owned(),
1844                            authenticator_function_ref_for_execution,
1845                            authenticator_checked_input_objects,
1846                        )
1847                    },
1848                )
1849                .collect::<Vec<_>>();
1850
1851            let owned_object_refs = authenticator_and_tx_checked_input_objects
1852                .inner()
1853                .filter_owned_objects();
1854            self.check_owned_locks(&owned_object_refs)?;
1855
1856            let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
1857                extract_auth_fun_refs(signer, gas_data.owner, |address| {
1858                    move_authenticators
1859                        .iter()
1860                        .find(|t| t.0.address().ok().as_ref() == Some(&address))
1861                        .map(|t| t.1.authenticator_function_ref.clone())
1862                });
1863
1864            let auth_context_data = AuthContextData {
1865                transaction_data_bytes: tx_data_bytes,
1866                sender_auth_digest,
1867                sponsor_auth_digest,
1868                sender_authenticator_function_ref,
1869                sponsor_authenticator_function_ref,
1870            };
1871
1872            epoch_store
1873                .executor()
1874                .authenticate_then_execute_transaction_to_effects(
1875                    backing_store,
1876                    protocol_config,
1877                    self.metrics.limits_metrics.clone(),
1878                    self.config
1879                        .expensive_safety_check_config
1880                        .enable_deep_per_tx_iota_conservation_check(),
1881                    self.config.certificate_deny_config.certificate_deny_set(),
1882                    &epoch_id,
1883                    epoch_start_timestamp,
1884                    gas_data,
1885                    gas_status,
1886                    move_authenticators,
1887                    authenticator_and_tx_checked_input_objects,
1888                    kind,
1889                    signer,
1890                    tx_digest,
1891                    auth_context_data,
1892                    &mut None,
1893                )
1894        };
1895
1896        fail_point_if!("cp_execution_nondeterminism", || {
1897            #[cfg(msim)]
1898            self.create_fail_state(certificate, epoch_store, &mut effects);
1899        });
1900
1901        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1902        if elapsed > 0.0 {
1903            self.metrics
1904                .prepare_cert_gas_latency_ratio
1905                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1906        }
1907
1908        Ok((inner_temp_store, effects, execution_error_opt.err()))
1909    }
1910
1911    pub fn prepare_certificate_for_benchmark(
1912        &self,
1913        certificate: &VerifiedExecutableTransaction,
1914        input_objects: InputObjects,
1915        epoch_store: &Arc<AuthorityPerEpochStore>,
1916    ) -> IotaResult<(
1917        InnerTemporaryStore,
1918        TransactionEffects,
1919        Option<ExecutionError>,
1920    )> {
1921        let lock = RwLock::new(epoch_store.epoch());
1922        let execution_guard = lock.try_read().unwrap();
1923
1924        self.prepare_certificate(
1925            &execution_guard,
1926            certificate,
1927            input_objects,
1928            vec![],
1929            epoch_store,
1930        )
1931    }
1932
1933    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1934    /// `VmChecks::Enabled` instead.
1935    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1936    #[allow(clippy::type_complexity)]
1937    pub fn dry_exec_transaction(
1938        &self,
1939        transaction: TransactionData,
1940        transaction_digest: TransactionDigest,
1941    ) -> IotaResult<(
1942        DryRunTransactionBlockResponse,
1943        BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1944        TransactionEffects,
1945        Option<ObjectId>,
1946    )> {
1947        let epoch_store = self.load_epoch_store_one_call_per_task();
1948        if !self.is_fullnode(&epoch_store) {
1949            return Err(IotaError::UnsupportedFeature {
1950                error: "dry-exec is only supported on fullnodes".to_string(),
1951            });
1952        }
1953
1954        if transaction.kind().is_system() {
1955            return Err(IotaError::UnsupportedFeature {
1956                error: "dry-exec does not support system transactions".to_string(),
1957            });
1958        }
1959
1960        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1961    }
1962
1963    #[allow(clippy::type_complexity)]
1964    pub fn dry_exec_transaction_for_benchmark(
1965        &self,
1966        transaction: TransactionData,
1967        transaction_digest: TransactionDigest,
1968    ) -> IotaResult<(
1969        DryRunTransactionBlockResponse,
1970        BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1971        TransactionEffects,
1972        Option<ObjectId>,
1973    )> {
1974        let epoch_store = self.load_epoch_store_one_call_per_task();
1975        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1976    }
1977
1978    #[instrument(level = "trace", skip_all)]
1979    #[allow(clippy::type_complexity)]
1980    fn dry_exec_transaction_impl(
1981        &self,
1982        epoch_store: &AuthorityPerEpochStore,
1983        transaction: TransactionData,
1984        transaction_digest: TransactionDigest,
1985    ) -> IotaResult<(
1986        DryRunTransactionBlockResponse,
1987        BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1988        TransactionEffects,
1989        Option<ObjectId>,
1990    )> {
1991        // Cheap validity checks for a transaction, including input size limits.
1992        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1993
1994        let input_object_kinds = transaction.input_objects()?;
1995        let receiving_object_refs = transaction.receiving_objects();
1996
1997        iota_transaction_checks::deny::check_transaction_for_signing(
1998            &transaction,
1999            &[],
2000            &input_object_kinds,
2001            &receiving_object_refs,
2002            &self.config.transaction_deny_config,
2003            self.get_backing_package_store().as_ref(),
2004        )?;
2005
2006        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2007            // We don't want to cache this transaction since it's a dry run.
2008            None,
2009            &input_object_kinds,
2010            &receiving_object_refs,
2011            epoch_store.epoch(),
2012        )?;
2013
2014        // make a gas object if one was not provided
2015        let mut transaction = transaction;
2016        let reference_gas_price = epoch_store.reference_gas_price();
2017        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2018            let sender = transaction.gas_owner();
2019            let gas_object_id = ObjectId::random();
2020            let gas_object = Object::new_move(
2021                MoveObject::new_gas_coin(
2022                    OBJECT_START_VERSION,
2023                    gas_object_id,
2024                    SIMULATION_GAS_COIN_VALUE,
2025                ),
2026                Owner::Address(sender),
2027                TransactionDigest::GENESIS_MARKER,
2028            );
2029            let gas_object_ref = gas_object.compute_object_reference();
2030            // Add gas object to transaction gas payment
2031            transaction.gas_data_mut().objects = vec![gas_object_ref];
2032            (
2033                iota_transaction_checks::check_transaction_input_with_given_gas(
2034                    epoch_store.protocol_config(),
2035                    reference_gas_price,
2036                    &transaction,
2037                    input_objects,
2038                    receiving_objects,
2039                    gas_object,
2040                    &self.metrics.bytecode_verifier_metrics,
2041                    &self.config.verifier_signing_config,
2042                )?,
2043                Some(gas_object_id),
2044            )
2045        } else {
2046            // `MoveAuthenticator`s are not supported in dry runs, so we set the
2047            // `authenticator_gas_budget` to 0.
2048            let authenticator_gas_budget = 0;
2049
2050            (
2051                iota_transaction_checks::check_transaction_input(
2052                    epoch_store.protocol_config(),
2053                    reference_gas_price,
2054                    &transaction,
2055                    input_objects,
2056                    &receiving_objects,
2057                    &self.metrics.bytecode_verifier_metrics,
2058                    &self.config.verifier_signing_config,
2059                    authenticator_gas_budget,
2060                )?,
2061                None,
2062            )
2063        };
2064
2065        let protocol_config = epoch_store.protocol_config();
2066        let (kind, signer, gas_data) = transaction.execution_parts();
2067
2068        let silent = true;
2069        let executor = iota_execution::executor(protocol_config, silent, None)
2070            .expect("Creating an executor should not fail here");
2071
2072        let expensive_checks = false;
2073        let (inner_temp_store, _, effects, execution_error) = executor
2074            .execute_transaction_to_effects(
2075                self.get_backing_store().as_ref(),
2076                protocol_config,
2077                self.metrics.limits_metrics.clone(),
2078                expensive_checks,
2079                self.config.certificate_deny_config.certificate_deny_set(),
2080                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2081                epoch_store
2082                    .epoch_start_config()
2083                    .epoch_data()
2084                    .epoch_start_timestamp(),
2085                checked_input_objects,
2086                gas_data,
2087                gas_status,
2088                kind,
2089                signer,
2090                transaction_digest,
2091                &mut None,
2092            );
2093        let tx_digest = *effects.transaction_digest();
2094
2095        let module_cache =
2096            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2097
2098        let mut layout_resolver =
2099            epoch_store
2100                .executor()
2101                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2102                    &inner_temp_store,
2103                    self.get_backing_package_store(),
2104                )));
2105        // Returning empty vector here because we recalculate changes in the rpc layer.
2106        let object_changes = Vec::new();
2107
2108        // Returning empty vector here because we recalculate changes in the rpc layer.
2109        let balance_changes = Vec::new();
2110
2111        let written_with_kind = effects
2112            .created()
2113            .into_iter()
2114            .map(|(oref, _)| (oref, WriteKind::Create))
2115            .chain(
2116                effects
2117                    .unwrapped()
2118                    .into_iter()
2119                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2120            )
2121            .chain(
2122                effects
2123                    .mutated()
2124                    .into_iter()
2125                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2126            )
2127            .map(|(oref, kind)| {
2128                let obj = inner_temp_store.written.get(&oref.object_id).unwrap();
2129                // TODO: Avoid clones.
2130                (oref.object_id, (oref, obj.clone(), kind))
2131            })
2132            .collect();
2133
2134        let execution_error_source = execution_error
2135            .as_ref()
2136            .err()
2137            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2138
2139        Ok((
2140            DryRunTransactionBlockResponse {
2141                // to avoid cloning `transaction`, fields are populated in this order
2142                suggested_gas_price: self
2143                    .congestion_tracker
2144                    .get_prediction_suggested_gas_price(&transaction),
2145                input: IotaTransactionBlockData::try_from_with_module_cache(
2146                    transaction,
2147                    &module_cache,
2148                    tx_digest,
2149                )
2150                .map_err(|e| IotaError::TransactionSerialization {
2151                    error: format!(
2152                        "Failed to convert transaction to IotaTransactionBlockData: {e}",
2153                    ),
2154                })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
2155                effects: effects.clone().try_into()?,
2156                events: IotaTransactionBlockEvents::try_from(
2157                    inner_temp_store.events.clone(),
2158                    tx_digest,
2159                    None,
2160                    layout_resolver.as_mut(),
2161                )?,
2162                object_changes,
2163                balance_changes,
2164                execution_error_source,
2165            },
2166            written_with_kind,
2167            effects,
2168            mock_gas,
2169        ))
2170    }
2171
2172    pub fn simulate_transaction(
2173        &self,
2174        mut transaction: TransactionData,
2175        checks: VmChecks,
2176    ) -> IotaResult<SimulateTransactionResult> {
2177        if transaction.kind().is_system() {
2178            return Err(IotaError::UnsupportedFeature {
2179                error: "simulate does not support system transactions".to_string(),
2180            });
2181        }
2182
2183        let epoch_store = self.load_epoch_store_one_call_per_task();
2184        if !self.is_fullnode(&epoch_store) {
2185            return Err(IotaError::UnsupportedFeature {
2186                error: "simulate is only supported on fullnodes".to_string(),
2187            });
2188        }
2189
2190        // Cheap validity checks for a transaction, including input size limits.
2191        // This does not check if gas objects are missing since we may create a
2192        // mock gas object. It checks for other transaction input validity.
2193        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2194
2195        let input_object_kinds = transaction.input_objects()?;
2196        let receiving_object_refs = transaction.receiving_objects();
2197
2198        // Since we need to simulate a validator signing the transaction, the first step
2199        // is to check if some transaction elements are denied.
2200        iota_transaction_checks::deny::check_transaction_for_signing(
2201            &transaction,
2202            &[],
2203            &input_object_kinds,
2204            &receiving_object_refs,
2205            &self.config.transaction_deny_config,
2206            self.get_backing_package_store().as_ref(),
2207        )?;
2208
2209        // Load input and receiving objects
2210        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2211            // We don't want to cache this transaction since it's a simulation.
2212            None,
2213            &input_object_kinds,
2214            &receiving_object_refs,
2215            epoch_store.epoch(),
2216        )?;
2217
2218        // Create a mock gas object if one was not provided
2219        let mock_gas_id = if transaction.gas().is_empty() {
2220            let mock_gas_object = Object::new_move(
2221                MoveObject::new_gas_coin(
2222                    OBJECT_START_VERSION,
2223                    ObjectId::MAX,
2224                    SIMULATION_GAS_COIN_VALUE,
2225                ),
2226                Owner::Address(transaction.gas_data().owner),
2227                TransactionDigest::GENESIS_MARKER,
2228            );
2229            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2230            transaction.gas_data_mut().objects = vec![mock_gas_object_ref];
2231            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2232            Some(mock_gas_object.id())
2233        } else {
2234            None
2235        };
2236
2237        let protocol_config = epoch_store.protocol_config();
2238
2239        // `MoveAuthenticator`s are not supported in simulation, so we set the
2240        // `authenticator_gas_budget` to 0.
2241        let authenticator_gas_budget = 0;
2242
2243        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
2244        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
2245        let (gas_status, checked_input_objects) = if checks.enabled() {
2246            iota_transaction_checks::check_transaction_input(
2247                protocol_config,
2248                epoch_store.reference_gas_price(),
2249                &transaction,
2250                input_objects,
2251                &receiving_objects,
2252                &self.metrics.bytecode_verifier_metrics,
2253                &self.config.verifier_signing_config,
2254                authenticator_gas_budget,
2255            )?
2256        } else {
2257            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2258                protocol_config,
2259                transaction.kind(),
2260                input_objects,
2261                receiving_objects,
2262            )?;
2263            let gas_status = IotaGasStatus::new(
2264                transaction.gas_budget(),
2265                transaction.gas_price(),
2266                epoch_store.reference_gas_price(),
2267                protocol_config,
2268            )?;
2269
2270            (gas_status, checked_input_objects)
2271        };
2272
2273        // Create a new executor for the simulation
2274        let executor = iota_execution::executor(
2275            protocol_config,
2276            true, // silent
2277            None,
2278        )
2279        .expect("Creating an executor should not fail here");
2280
2281        // Execute the simulation
2282        let (kind, signer, gas_data) = transaction.execution_parts();
2283        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2284            self.get_backing_store().as_ref(),
2285            protocol_config,
2286            self.metrics.limits_metrics.clone(),
2287            false, // expensive_checks
2288            self.config.certificate_deny_config.certificate_deny_set(),
2289            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2290            epoch_store
2291                .epoch_start_config()
2292                .epoch_data()
2293                .epoch_start_timestamp(),
2294            checked_input_objects,
2295            gas_data,
2296            gas_status,
2297            kind,
2298            signer,
2299            transaction.digest(),
2300            checks.disabled(),
2301        );
2302
2303        // In the case of a dev inspect, the execution_result could be filled with some
2304        // values. Else, execution_result is empty in the case of a dry run.
2305        Ok(SimulateTransactionResult {
2306            input_objects: inner_temp_store.input_objects,
2307            output_objects: inner_temp_store.written,
2308            events: effects.events_digest().map(|_| inner_temp_store.events),
2309            effects,
2310            execution_result,
2311            suggested_gas_price: self
2312                .congestion_tracker
2313                .get_prediction_suggested_gas_price(&transaction),
2314            mock_gas_id,
2315        })
2316    }
2317
2318    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
2319    /// `VmChecks::DISABLED` instead.
2320    /// The object ID for gas can be any
2321    /// object ID, even for an uncreated object
2322    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2323    pub async fn dev_inspect_transaction_block(
2324        &self,
2325        sender: IotaAddress,
2326        transaction_kind: TransactionKind,
2327        gas_price: Option<u64>,
2328        gas_budget: Option<u64>,
2329        gas_sponsor: Option<IotaAddress>,
2330        gas_objects: Option<Vec<ObjectRef>>,
2331        show_raw_txn_data_and_effects: Option<bool>,
2332        skip_checks: Option<bool>,
2333    ) -> IotaResult<DevInspectResults> {
2334        let epoch_store = self.load_epoch_store_one_call_per_task();
2335
2336        if !self.is_fullnode(&epoch_store) {
2337            return Err(IotaError::UnsupportedFeature {
2338                error: "dev-inspect is only supported on fullnodes".to_string(),
2339            });
2340        }
2341
2342        if transaction_kind.is_system() {
2343            return Err(IotaError::UnsupportedFeature {
2344                error: "system transactions are not supported".to_string(),
2345            });
2346        }
2347
2348        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2349        let skip_checks = skip_checks.unwrap_or(true);
2350        let reference_gas_price = epoch_store.reference_gas_price();
2351        let protocol_config = epoch_store.protocol_config();
2352        let max_tx_gas = protocol_config.max_tx_gas();
2353
2354        let price = gas_price.unwrap_or(reference_gas_price);
2355        let budget = gas_budget.unwrap_or(max_tx_gas);
2356        let owner = gas_sponsor.unwrap_or(sender);
2357        // Payment might be empty here, but it's fine we'll have to deal with it later
2358        // after reading all the input objects.
2359        let payment = gas_objects.unwrap_or_default();
2360        let mut transaction = TransactionData::V1(TransactionDataV1 {
2361            kind: transaction_kind.clone(),
2362            sender,
2363            gas_payment: GasData {
2364                objects: payment,
2365                owner,
2366                price,
2367                budget,
2368            },
2369            expiration: TransactionExpiration::None,
2370        });
2371
2372        let raw_txn_data = if show_raw_txn_data_and_effects {
2373            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2374                error: "Failed to serialize transaction during dev inspect".to_string(),
2375            })?
2376        } else {
2377            vec![]
2378        };
2379
2380        transaction.validity_check_no_gas_check(protocol_config)?;
2381
2382        let input_object_kinds = transaction.input_objects()?;
2383        let receiving_object_refs = transaction.receiving_objects();
2384
2385        iota_transaction_checks::deny::check_transaction_for_signing(
2386            &transaction,
2387            &[],
2388            &input_object_kinds,
2389            &receiving_object_refs,
2390            &self.config.transaction_deny_config,
2391            self.get_backing_package_store().as_ref(),
2392        )?;
2393
2394        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2395            // We don't want to cache this transaction since it's a dev inspect.
2396            None,
2397            &input_object_kinds,
2398            &receiving_object_refs,
2399            epoch_store.epoch(),
2400        )?;
2401
2402        let (gas_status, checked_input_objects) = if skip_checks {
2403            // If we are skipping checks, then we call the check_dev_inspect_input function
2404            // which will perform only lightweight checks on the transaction
2405            // input. And if the gas field is empty, that means we will
2406            // use the dummy gas object so we need to add it to the input objects vector.
2407            if transaction.gas().is_empty() {
2408                // Create and use a dummy gas object if there is no gas object provided.
2409                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2410                    SIMULATION_GAS_COIN_VALUE,
2411                    transaction.gas_owner(),
2412                );
2413                let gas_object_ref = dummy_gas_object.compute_object_reference();
2414                transaction.gas_data_mut().objects = vec![gas_object_ref];
2415                input_objects.push(ObjectReadResult::new(
2416                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2417                    dummy_gas_object.into(),
2418                ));
2419            }
2420            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2421                protocol_config,
2422                &transaction_kind,
2423                input_objects,
2424                receiving_objects,
2425            )?;
2426            let gas_status = IotaGasStatus::new(
2427                max_tx_gas,
2428                transaction.gas_price(),
2429                reference_gas_price,
2430                protocol_config,
2431            )?;
2432
2433            (gas_status, checked_input_objects)
2434        } else {
2435            // If we are not skipping checks, then we call the check_transaction_input
2436            // function and its dummy gas variant which will perform full
2437            // fledged checks just like a real transaction execution.
2438            if transaction.gas().is_empty() {
2439                // Create and use a dummy gas object if there is no gas object provided.
2440                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2441                    SIMULATION_GAS_COIN_VALUE,
2442                    transaction.gas_owner(),
2443                );
2444                let gas_object_ref = dummy_gas_object.compute_object_reference();
2445                transaction.gas_data_mut().objects = vec![gas_object_ref];
2446                iota_transaction_checks::check_transaction_input_with_given_gas(
2447                    epoch_store.protocol_config(),
2448                    reference_gas_price,
2449                    &transaction,
2450                    input_objects,
2451                    receiving_objects,
2452                    dummy_gas_object,
2453                    &self.metrics.bytecode_verifier_metrics,
2454                    &self.config.verifier_signing_config,
2455                )?
2456            } else {
2457                // `MoveAuthenticator`s are not supported in dev inspects, so we set the
2458                // `authenticator_gas_budget` to 0.
2459                let authenticator_gas_budget = 0;
2460
2461                iota_transaction_checks::check_transaction_input(
2462                    epoch_store.protocol_config(),
2463                    reference_gas_price,
2464                    &transaction,
2465                    input_objects,
2466                    &receiving_objects,
2467                    &self.metrics.bytecode_verifier_metrics,
2468                    &self.config.verifier_signing_config,
2469                    authenticator_gas_budget,
2470                )?
2471            }
2472        };
2473
2474        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2475            .expect("Creating an executor should not fail here");
2476        let gas_data = transaction.gas_data().clone();
2477        let intent_msg = IntentMessage::new(
2478            Intent {
2479                version: IntentVersion::V0,
2480                scope: IntentScope::TransactionData,
2481                app_id: IntentAppId::Iota,
2482            },
2483            transaction,
2484        );
2485        let transaction_digest = TransactionDigest::new(intent_msg.value.digest().into_inner());
2486        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2487            self.get_backing_store().as_ref(),
2488            protocol_config,
2489            self.metrics.limits_metrics.clone(),
2490            // expensive checks
2491            false,
2492            self.config.certificate_deny_config.certificate_deny_set(),
2493            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2494            epoch_store
2495                .epoch_start_config()
2496                .epoch_data()
2497                .epoch_start_timestamp(),
2498            checked_input_objects,
2499            gas_data,
2500            gas_status,
2501            transaction_kind,
2502            sender,
2503            transaction_digest,
2504            skip_checks,
2505        );
2506
2507        let raw_effects = if show_raw_txn_data_and_effects {
2508            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2509                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2510            })?
2511        } else {
2512            vec![]
2513        };
2514
2515        let mut layout_resolver =
2516            epoch_store
2517                .executor()
2518                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2519                    &inner_temp_store,
2520                    self.get_backing_package_store(),
2521                )));
2522
2523        DevInspectResults::new(
2524            effects,
2525            inner_temp_store.events.clone(),
2526            execution_result,
2527            raw_txn_data,
2528            raw_effects,
2529            layout_resolver.as_mut(),
2530        )
2531    }
2532
2533    // Only used for testing because of how epoch store is loaded.
2534    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2535        let epoch_store = self.epoch_store_for_testing();
2536        Ok(epoch_store.reference_gas_price())
2537    }
2538
2539    #[instrument(level = "trace", skip_all)]
2540    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2541        self.get_transaction_cache_reader()
2542            .try_is_tx_already_executed(digest)
2543    }
2544
2545    /// Non-fallible version of `try_is_tx_already_executed`.
2546    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2547        self.try_is_tx_already_executed(digest)
2548            .expect("storage access failed")
2549    }
2550
2551    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2552    #[instrument(level = "debug", skip_all, err)]
2553    fn index_tx(
2554        &self,
2555        indexes: &IndexStore,
2556        digest: &TransactionDigest,
2557        // TODO: index_tx really just need the transaction data here.
2558        cert: &VerifiedExecutableTransaction,
2559        effects: &TransactionEffects,
2560        events: &TransactionEvents,
2561        timestamp_ms: u64,
2562        tx_coins: Option<TxCoins>,
2563        written: &WrittenObjects,
2564        inner_temporary_store: &InnerTemporaryStore,
2565    ) -> IotaResult<u64> {
2566        let changes = self
2567            .process_object_index(effects, written, inner_temporary_store)
2568            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2569
2570        indexes.index_tx(
2571            cert.data().intent_message().value.sender(),
2572            cert.data()
2573                .intent_message()
2574                .value
2575                .input_objects()?
2576                .iter()
2577                .map(|o| o.object_id()),
2578            effects
2579                .all_changed_objects()
2580                .into_iter()
2581                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2582            cert.data()
2583                .intent_message()
2584                .value
2585                .move_calls()
2586                .into_iter()
2587                .map(|(package, module, function)| {
2588                    (*package, module.to_owned(), function.to_owned())
2589                }),
2590            events,
2591            changes,
2592            digest,
2593            timestamp_ms,
2594            tx_coins,
2595        )
2596    }
2597
2598    #[cfg(msim)]
2599    fn create_fail_state(
2600        &self,
2601        certificate: &VerifiedExecutableTransaction,
2602        epoch_store: &Arc<AuthorityPerEpochStore>,
2603        effects: &mut TransactionEffects,
2604    ) {
2605        use std::cell::RefCell;
2606
2607        use iota_types::effects::TransactionEffectsAPIForTesting;
2608        thread_local! {
2609            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2610        }
2611        if !certificate.data().intent_message().value.is_system_tx() {
2612            let committee = epoch_store.committee();
2613            let cur_stake = (**committee).weight(&self.name);
2614            if cur_stake > 0 {
2615                FAIL_STATE.with_borrow_mut(|fail_state| {
2616                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2617                    if fail_state.0 < committee.validity_threshold() {
2618                        fail_state.0 += cur_stake;
2619                        fail_state.1.insert(self.name);
2620                    }
2621
2622                    if fail_state.1.contains(&self.name) {
2623                        info!("cp_exec failing tx");
2624                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2625                    }
2626                });
2627            }
2628        }
2629    }
2630
2631    fn process_object_index(
2632        &self,
2633        effects: &TransactionEffects,
2634        written: &WrittenObjects,
2635        inner_temporary_store: &InnerTemporaryStore,
2636    ) -> IotaResult<ObjectIndexChanges> {
2637        let epoch_store = self.load_epoch_store_one_call_per_task();
2638        let mut layout_resolver =
2639            epoch_store
2640                .executor()
2641                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2642                    inner_temporary_store,
2643                    self.get_backing_package_store(),
2644                )));
2645
2646        let modified_at_version = effects
2647            .modified_at_versions()
2648            .into_iter()
2649            .collect::<HashMap<_, _>>();
2650
2651        let tx_digest = effects.transaction_digest();
2652        let mut deleted_owners = vec![];
2653        let mut deleted_dynamic_fields = vec![];
2654        for object_ref in effects.deleted().into_iter().chain(effects.wrapped()) {
2655            let old_version = modified_at_version.get(&object_ref.object_id).unwrap();
2656            // When we process the index, the latest object hasn't been written yet so
2657            // the old object must be present.
2658            match self.get_owner_at_version(&object_ref.object_id, *old_version).unwrap_or_else(
2659                |e| panic!("tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {} at version {old_version:?}. Err: {e:?}",object_ref.object_id)
2660            ) {
2661                Owner::Address(addr) => deleted_owners.push((addr, object_ref.object_id)),
2662                Owner::Object(object_id) => {
2663                    deleted_dynamic_fields.push((object_id, object_ref.object_id))
2664                }
2665                _ => {}
2666            }
2667        }
2668
2669        let mut new_owners = vec![];
2670        let mut new_dynamic_fields = vec![];
2671
2672        for (oref, owner, kind) in effects.all_changed_objects() {
2673            let id = &oref.object_id;
2674            // For mutated objects, retrieve old owner and delete old index if there is a
2675            // owner change.
2676            if let WriteKind::Mutate = kind {
2677                let Some(old_version) = modified_at_version.get(id) else {
2678                    panic!(
2679                        "tx_digest={tx_digest}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2680                    );
2681                };
2682                // When we process the index, the latest object hasn't been written yet so
2683                // the old object must be present.
2684                let Some(old_object) = self
2685                    .get_object_store()
2686                    .try_get_object_by_key(id, *old_version)?
2687                else {
2688                    panic!(
2689                        "tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {id} at version {old_version:?}"
2690                    );
2691                };
2692                if old_object.owner != owner {
2693                    match old_object.owner {
2694                        Owner::Address(addr) => {
2695                            deleted_owners.push((addr, *id));
2696                        }
2697                        Owner::Object(object_id) => deleted_dynamic_fields.push((object_id, *id)),
2698                        _ => {}
2699                    }
2700                }
2701            }
2702
2703            match owner {
2704                Owner::Address(addr) => {
2705                    // TODO: We can remove the object fetching after we added ObjectType to
2706                    // TransactionEffects
2707                    let new_object = written.get(id).unwrap_or_else(
2708                        || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2709                    );
2710                    assert_eq!(
2711                        new_object.version(),
2712                        oref.version,
2713                        "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2714                        tx_digest,
2715                        id,
2716                        new_object.version(),
2717                        oref.version
2718                    );
2719
2720                    let type_ = new_object
2721                        .type_()
2722                        .map(|type_| ObjectType::Struct(type_.clone()))
2723                        .unwrap_or(ObjectType::Package);
2724
2725                    new_owners.push((
2726                        (addr, *id),
2727                        ObjectInfo {
2728                            object_id: *id,
2729                            version: oref.version,
2730                            digest: oref.digest,
2731                            type_,
2732                            owner,
2733                            previous_transaction: *effects.transaction_digest(),
2734                        },
2735                    ));
2736                }
2737                Owner::Object(owner) => {
2738                    let new_object = written.get(id).unwrap_or_else(
2739                        || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2740                    );
2741                    assert_eq!(
2742                        new_object.version(),
2743                        oref.version,
2744                        "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2745                        tx_digest,
2746                        id,
2747                        new_object.version(),
2748                        oref.version
2749                    );
2750
2751                    let Some(df_info) = self
2752                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2753                        .unwrap_or_else(|e| {
2754                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2755                            None
2756                        }
2757                    )
2758                        else {
2759                            // Skip indexing for non dynamic field objects.
2760                            continue;
2761                        };
2762                    new_dynamic_fields.push(((owner, *id), df_info))
2763                }
2764                _ => {}
2765            }
2766        }
2767
2768        Ok(ObjectIndexChanges {
2769            deleted_owners,
2770            deleted_dynamic_fields,
2771            new_owners,
2772            new_dynamic_fields,
2773        })
2774    }
2775
2776    fn try_create_dynamic_field_info(
2777        &self,
2778        o: &Object,
2779        written: &WrittenObjects,
2780        resolver: &mut dyn LayoutResolver,
2781    ) -> IotaResult<Option<DynamicFieldInfo>> {
2782        // Skip if not a move object
2783        let Some(move_object) = o.data.as_struct_opt().cloned() else {
2784            return Ok(None);
2785        };
2786
2787        // We only index dynamic field objects
2788        if !move_object.struct_tag().is_dynamic_field() {
2789            return Ok(None);
2790        }
2791
2792        let layout = resolver
2793            .get_annotated_layout(move_object.struct_tag())?
2794            .into_layout();
2795
2796        let field =
2797            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2798                IotaError::ObjectDeserialization {
2799                    error: e.to_string(),
2800                }
2801            })?;
2802
2803        let type_ = field.kind;
2804        let name_type: TypeTag = type_tag_core_to_sdk(&field.name_layout.into());
2805        let bcs_name = field.name_bytes.to_owned();
2806
2807        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2808            .map_err(|e| {
2809                warn!("{e}");
2810                IotaError::ObjectDeserialization {
2811                    error: e.to_string(),
2812                }
2813            })?;
2814
2815        let name = DynamicFieldName {
2816            type_: name_type,
2817            value: IotaMoveValue::from(name_value).to_json_value(),
2818        };
2819
2820        let value_metadata = field.value_metadata().map_err(|e| {
2821            warn!("{e}");
2822            IotaError::ObjectDeserialization {
2823                error: e.to_string(),
2824            }
2825        })?;
2826
2827        Ok(Some(match value_metadata {
2828            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2829                name,
2830                bcs_name,
2831                type_,
2832                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2833                object_id: o.id(),
2834                version: o.version(),
2835                digest: o.digest(),
2836            },
2837
2838            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2839                // Find the actual object from storage using the object id obtained from the
2840                // wrapper.
2841
2842                // Try to find the object in the written objects first.
2843                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2844                    (
2845                        object.version(),
2846                        object.digest(),
2847                        object.data.object_type().unwrap().clone(),
2848                    )
2849                } else {
2850                    // If not found, try to find it in the database.
2851                    let object = self
2852                        .get_object_store()
2853                        .try_get_object_by_key(&object_id, o.version())?
2854                        .ok_or_else(|| UserInputError::ObjectNotFound {
2855                            object_id,
2856                            version: Some(o.version()),
2857                        })?;
2858                    let version = object.version();
2859                    let digest = object.digest();
2860                    let object_type = object.data.object_type().unwrap().clone();
2861                    (version, digest, object_type)
2862                };
2863
2864                DynamicFieldInfo {
2865                    name,
2866                    bcs_name,
2867                    type_,
2868                    object_type: object_type.to_string(),
2869                    object_id,
2870                    version,
2871                    digest,
2872                }
2873            }
2874        }))
2875    }
2876
2877    #[instrument(level = "trace", skip_all, err)]
2878    fn post_process_one_tx(
2879        &self,
2880        certificate: &VerifiedExecutableTransaction,
2881        effects: &TransactionEffects,
2882        inner_temporary_store: &InnerTemporaryStore,
2883        epoch_store: &Arc<AuthorityPerEpochStore>,
2884    ) -> IotaResult {
2885        if self.indexes.is_none() {
2886            return Ok(());
2887        }
2888
2889        let _scope = monitored_scope("Execution::post_process_one_tx");
2890
2891        let tx_digest = certificate.digest();
2892        let timestamp_ms = Self::unixtime_now_ms();
2893        let events = &inner_temporary_store.events;
2894        let written = &inner_temporary_store.written;
2895        let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
2896            effects,
2897            inner_temporary_store,
2898            epoch_store,
2899        );
2900
2901        // Index tx
2902        if let Some(indexes) = &self.indexes {
2903            let _ = self
2904                .index_tx(
2905                    indexes.as_ref(),
2906                    tx_digest,
2907                    certificate,
2908                    effects,
2909                    events,
2910                    timestamp_ms,
2911                    tx_coins,
2912                    written,
2913                    inner_temporary_store,
2914                )
2915                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2916                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2917                .expect("Indexing tx should not fail");
2918
2919            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2920            let events = self.make_transaction_block_events(
2921                events.clone(),
2922                *tx_digest,
2923                timestamp_ms,
2924                epoch_store,
2925                inner_temporary_store,
2926            )?;
2927            // Emit events
2928            self.subscription_handler
2929                .process_tx(certificate.data().transaction_data(), &effects, &events)
2930                .tap_ok(|_| {
2931                    self.metrics
2932                        .post_processing_total_tx_had_event_processed
2933                        .inc()
2934                })
2935                .tap_err(|e| {
2936                    warn!(
2937                        ?tx_digest,
2938                        "Post processing - Couldn't process events for tx: {}", e
2939                    )
2940                })?;
2941
2942            self.metrics
2943                .post_processing_total_events_emitted
2944                .inc_by(events.data.len() as u64);
2945        };
2946        Ok(())
2947    }
2948
2949    fn make_transaction_block_events(
2950        &self,
2951        transaction_events: TransactionEvents,
2952        digest: TransactionDigest,
2953        timestamp_ms: u64,
2954        epoch_store: &Arc<AuthorityPerEpochStore>,
2955        inner_temporary_store: &InnerTemporaryStore,
2956    ) -> IotaResult<IotaTransactionBlockEvents> {
2957        let mut layout_resolver =
2958            epoch_store
2959                .executor()
2960                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2961                    inner_temporary_store,
2962                    self.get_backing_package_store(),
2963                )));
2964        IotaTransactionBlockEvents::try_from(
2965            transaction_events,
2966            digest,
2967            Some(timestamp_ms),
2968            layout_resolver.as_mut(),
2969        )
2970    }
2971
2972    pub fn unixtime_now_ms() -> u64 {
2973        let now = SystemTime::now()
2974            .duration_since(UNIX_EPOCH)
2975            .expect("Time went backwards")
2976            .as_millis();
2977        u64::try_from(now).expect("Travelling in time machine")
2978    }
2979
2980    #[instrument(level = "trace", skip_all)]
2981    pub async fn handle_transaction_info_request(
2982        &self,
2983        request: TransactionInfoRequest,
2984    ) -> IotaResult<TransactionInfoResponse> {
2985        let epoch_store = self.load_epoch_store_one_call_per_task();
2986        let (transaction, status) = self
2987            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2988            .ok_or(IotaError::TransactionNotFound {
2989                digest: request.transaction_digest,
2990            })?;
2991        Ok(TransactionInfoResponse {
2992            transaction,
2993            status,
2994        })
2995    }
2996
2997    #[instrument(level = "trace", skip_all)]
2998    pub async fn handle_object_info_request(
2999        &self,
3000        request: ObjectInfoRequest,
3001    ) -> IotaResult<ObjectInfoResponse> {
3002        let epoch_store = self.load_epoch_store_one_call_per_task();
3003
3004        let requested_object_seq = match request.request_kind {
3005            ObjectInfoRequestKind::LatestObjectInfo => {
3006                self.try_get_object_or_tombstone(request.object_id)
3007                    .await?
3008                    .ok_or_else(|| {
3009                        IotaError::from(UserInputError::ObjectNotFound {
3010                            object_id: request.object_id,
3011                            version: None,
3012                        })
3013                    })?
3014                    .version
3015            }
3016            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3017        };
3018
3019        let object = self
3020            .get_object_store()
3021            .try_get_object_by_key(&request.object_id, requested_object_seq)?
3022            .ok_or_else(|| {
3023                IotaError::from(UserInputError::ObjectNotFound {
3024                    object_id: request.object_id,
3025                    version: Some(requested_object_seq),
3026                })
3027            })?;
3028
3029        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3030            (request.generate_layout, object.data.as_struct_opt())
3031        {
3032            Some(into_struct_layout(
3033                epoch_store
3034                    .executor()
3035                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3036                    .get_annotated_layout(move_obj.struct_tag())?,
3037            )?)
3038        } else {
3039            None
3040        };
3041
3042        let lock = if !object.is_address_owned() {
3043            // Only address owned objects have locks.
3044            None
3045        } else {
3046            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3047                .await?
3048                .map(|s| s.into_inner())
3049        };
3050
3051        Ok(ObjectInfoResponse {
3052            object,
3053            layout,
3054            lock_for_debugging: lock,
3055        })
3056    }
3057
3058    #[instrument(level = "trace", skip_all)]
3059    pub fn handle_checkpoint_request(
3060        &self,
3061        request: &CheckpointRequest,
3062    ) -> IotaResult<CheckpointResponse> {
3063        let summary = if request.certified {
3064            let summary = match request.sequence_number {
3065                Some(seq) => self
3066                    .checkpoint_store
3067                    .get_checkpoint_by_sequence_number(seq)?,
3068                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3069            }
3070            .map(|v| v.into_inner());
3071            summary.map(CheckpointSummaryResponse::Certified)
3072        } else {
3073            let summary = match request.sequence_number {
3074                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3075                None => self
3076                    .checkpoint_store
3077                    .get_latest_locally_computed_checkpoint()?,
3078            };
3079            summary.map(CheckpointSummaryResponse::Pending)
3080        };
3081        let contents = match &summary {
3082            Some(s) => self
3083                .checkpoint_store
3084                .get_checkpoint_contents(&s.content_digest())?,
3085            None => None,
3086        };
3087        Ok(CheckpointResponse {
3088            checkpoint: summary,
3089            contents,
3090        })
3091    }
3092
3093    fn check_protocol_version(
3094        supported_protocol_versions: SupportedProtocolVersions,
3095        current_version: ProtocolVersion,
3096    ) {
3097        info!("current protocol version is now {:?}", current_version);
3098        info!("supported versions are: {:?}", supported_protocol_versions);
3099        if !supported_protocol_versions.is_version_supported(current_version) {
3100            let msg = format!(
3101                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
3102            );
3103
3104            error!("{}", msg);
3105            eprintln!("{msg}");
3106
3107            #[cfg(not(msim))]
3108            std::process::exit(1);
3109
3110            #[cfg(msim)]
3111            iota_simulator::task::shutdown_current_node();
3112        }
3113    }
3114
3115    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
3116    #[expect(clippy::too_many_arguments)]
3117    pub async fn new(
3118        name: AuthorityName,
3119        secret: StableSyncAuthoritySigner,
3120        supported_protocol_versions: SupportedProtocolVersions,
3121        store: Arc<AuthorityStore>,
3122        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3123        epoch_store: Arc<AuthorityPerEpochStore>,
3124        committee_store: Arc<CommitteeStore>,
3125        indexes: Option<Arc<IndexStore>>,
3126        grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
3127        checkpoint_store: Arc<CheckpointStore>,
3128        prometheus_registry: &Registry,
3129        genesis_objects: &[Object],
3130        db_checkpoint_config: &DBCheckpointConfig,
3131        config: NodeConfig,
3132        archive_readers: ArchiveReaderBalancer,
3133        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3134        chain_identifier: ChainIdentifier,
3135        pruner_db: Option<Arc<AuthorityPrunerTables>>,
3136        checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
3137        policy_config: Option<PolicyConfig>,
3138        firewall_config: Option<RemoteFirewallConfig>,
3139    ) -> Arc<Self> {
3140        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3141
3142        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3143        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3144        let transaction_manager = Arc::new(TransactionManager::new(
3145            execution_cache_trait_pointers.object_cache_reader.clone(),
3146            execution_cache_trait_pointers
3147                .transaction_cache_reader
3148                .clone(),
3149            &epoch_store,
3150            tx_ready_certificates,
3151            metrics.clone(),
3152        ));
3153        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3154
3155        let authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3156            epoch_store.get_parent_path(),
3157            config
3158                .authority_store_pruning_config
3159                .num_latest_epoch_dbs_to_retain,
3160        )
3161        .await;
3162        let _pruner = AuthorityStorePruner::new(
3163            store.perpetual_tables.clone(),
3164            checkpoint_store.clone(),
3165            grpc_indexes_store.clone(),
3166            indexes.clone(),
3167            config.authority_store_pruning_config.clone(),
3168            epoch_store.committee().authority_exists(&name),
3169            epoch_store.epoch_start_state().epoch_duration_ms(),
3170            prometheus_registry,
3171            archive_readers,
3172            pruner_db,
3173            checkpoint_progress_tracker.clone(),
3174        );
3175        let input_loader =
3176            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3177        let epoch = epoch_store.epoch();
3178        let rgp = epoch_store.reference_gas_price();
3179        let traffic_controller_metrics =
3180            Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3181        let traffic_controller = if let Some(policy_config) = policy_config {
3182            Some(Arc::new(
3183                TrafficController::init(
3184                    policy_config,
3185                    traffic_controller_metrics,
3186                    firewall_config.clone(),
3187                )
3188                .await,
3189            ))
3190        } else {
3191            None
3192        };
3193        let state = Arc::new(AuthorityState {
3194            name,
3195            secret,
3196            execution_lock: RwLock::new(epoch),
3197            epoch_store: ArcSwap::new(epoch_store.clone()),
3198            input_loader,
3199            execution_cache_trait_pointers,
3200            indexes,
3201            grpc_indexes_store,
3202            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3203            checkpoint_store,
3204            committee_store,
3205            transaction_manager,
3206            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3207            metrics,
3208            _pruner,
3209            authority_per_epoch_pruner,
3210            checkpoint_progress_tracker,
3211            db_checkpoint_config: db_checkpoint_config.clone(),
3212            config,
3213            overload_info: AuthorityOverloadInfo::default(),
3214            validator_tx_finalizer,
3215            chain_identifier,
3216            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3217            traffic_controller,
3218        });
3219
3220        // Start a task to execute ready certificates.
3221        let authority_state = Arc::downgrade(&state);
3222        spawn_monitored_task!(execution_process(
3223            authority_state,
3224            rx_ready_certificates,
3225            rx_execution_shutdown,
3226        ));
3227        spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3228
3229        // TODO: This doesn't belong to the constructor of AuthorityState.
3230        state
3231            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3232            .expect("Error indexing genesis objects.");
3233
3234        state
3235    }
3236
3237    pub fn epoch_db_pruner(&self) -> &AuthorityPerEpochStorePruner {
3238        &self.authority_per_epoch_pruner
3239    }
3240
3241    // TODO: Consolidate our traits to reduce the number of methods here.
3242    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3243        &self.execution_cache_trait_pointers.object_cache_reader
3244    }
3245
3246    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3247        &self.execution_cache_trait_pointers.transaction_cache_reader
3248    }
3249
3250    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3251        &self.execution_cache_trait_pointers.cache_writer
3252    }
3253
3254    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3255        &self.execution_cache_trait_pointers.backing_store
3256    }
3257
3258    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3259        &self.execution_cache_trait_pointers.backing_package_store
3260    }
3261
3262    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3263        &self.execution_cache_trait_pointers.object_store
3264    }
3265
3266    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3267        &self.execution_cache_trait_pointers.reconfig_api
3268    }
3269
3270    pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3271        &self.execution_cache_trait_pointers.global_state_hash_store
3272    }
3273
3274    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3275        &self.execution_cache_trait_pointers.checkpoint_cache
3276    }
3277
3278    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3279        &self.execution_cache_trait_pointers.state_sync_store
3280    }
3281
3282    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3283        &self.execution_cache_trait_pointers.cache_commit
3284    }
3285
3286    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3287        self.execution_cache_trait_pointers
3288            .testing_api
3289            .database_for_testing()
3290    }
3291
3292    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3293        &self,
3294        config: NodeConfig,
3295        metrics: Arc<AuthorityStorePruningMetrics>,
3296    ) -> anyhow::Result<()> {
3297        let archive_readers =
3298            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3299        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3300            &self.database_for_testing().perpetual_tables,
3301            &self.checkpoint_store,
3302            self.grpc_indexes_store.as_deref(),
3303            None,
3304            config.authority_store_pruning_config,
3305            metrics,
3306            archive_readers,
3307            EPOCH_DURATION_MS_FOR_TESTING,
3308            self.checkpoint_progress_tracker.as_ref(),
3309        )
3310        .await
3311    }
3312
3313    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3314        &self.transaction_manager
3315    }
3316
3317    /// Adds transactions / certificates to transaction manager for ordered
3318    /// execution.
3319    pub fn enqueue_transactions_for_execution(
3320        &self,
3321        txns: Vec<VerifiedExecutableTransaction>,
3322        epoch_store: &Arc<AuthorityPerEpochStore>,
3323    ) {
3324        self.transaction_manager.enqueue(txns, epoch_store)
3325    }
3326
3327    /// Adds certificates to transaction manager for ordered execution.
3328    pub fn enqueue_certificates_for_execution(
3329        &self,
3330        certs: Vec<VerifiedCertificate>,
3331        epoch_store: &Arc<AuthorityPerEpochStore>,
3332    ) {
3333        self.transaction_manager
3334            .enqueue_certificates(certs, epoch_store)
3335    }
3336
3337    pub fn enqueue_with_expected_effects_digest(
3338        &self,
3339        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3340        epoch_store: &AuthorityPerEpochStore,
3341    ) {
3342        self.transaction_manager
3343            .enqueue_with_expected_effects_digest(certs, epoch_store)
3344    }
3345
3346    fn create_owner_index_if_empty(
3347        &self,
3348        genesis_objects: &[Object],
3349        epoch_store: &Arc<AuthorityPerEpochStore>,
3350    ) -> IotaResult {
3351        let Some(index_store) = &self.indexes else {
3352            return Ok(());
3353        };
3354        if !index_store.is_empty() {
3355            return Ok(());
3356        }
3357
3358        let mut new_owners = vec![];
3359        let mut new_dynamic_fields = vec![];
3360        let mut layout_resolver = epoch_store
3361            .executor()
3362            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3363        for o in genesis_objects.iter() {
3364            match o.owner {
3365                Owner::Address(addr) => new_owners.push((
3366                    (addr, o.id()),
3367                    ObjectInfo::new(&o.compute_object_reference(), o),
3368                )),
3369                Owner::Object(object_id) => {
3370                    let id = o.id();
3371                    let info = match self.try_create_dynamic_field_info(
3372                        o,
3373                        &BTreeMap::new(),
3374                        layout_resolver.as_mut(),
3375                    ) {
3376                        Ok(Some(info)) => info,
3377                        Ok(None) => continue,
3378                        Err(IotaError::UserInput {
3379                            error:
3380                                UserInputError::ObjectNotFound {
3381                                    object_id: not_found_id,
3382                                    version,
3383                                },
3384                        }) => {
3385                            warn!(
3386                                ?not_found_id,
3387                                ?version,
3388                                object_owner=?object_id,
3389                                field=?id,
3390                                "Skipping dynamic field: referenced genesis object not found"
3391                            );
3392                            continue;
3393                        }
3394                        Err(e) => return Err(e),
3395                    };
3396                    new_dynamic_fields.push(((object_id, id), info));
3397                }
3398                _ => {}
3399            }
3400        }
3401
3402        index_store.insert_genesis_objects(ObjectIndexChanges {
3403            deleted_owners: vec![],
3404            deleted_dynamic_fields: vec![],
3405            new_owners,
3406            new_dynamic_fields,
3407        })
3408    }
3409
3410    /// Attempts to acquire execution lock for an executable transaction.
3411    /// Returns the lock if the transaction is matching current executed epoch
3412    /// Returns None otherwise
3413    pub fn execution_lock_for_executable_transaction(
3414        &self,
3415        transaction: &VerifiedExecutableTransaction,
3416    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3417        let lock = self
3418            .execution_lock
3419            .try_read()
3420            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3421        if *lock == transaction.auth_sig().epoch() {
3422            Ok(lock)
3423        } else {
3424            Err(IotaError::WrongEpoch {
3425                expected_epoch: *lock,
3426                actual_epoch: transaction.auth_sig().epoch(),
3427            })
3428        }
3429    }
3430
3431    /// Acquires the execution lock for the duration of a transaction signing
3432    /// request. This prevents reconfiguration from starting until we are
3433    /// finished handling the signing request. Otherwise, in-memory lock
3434    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3435    /// while we are attempting to acquire locks for the transaction.
3436    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3437        self.execution_lock
3438            .try_read()
3439            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3440    }
3441
3442    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3443        self.execution_lock.write().await
3444    }
3445
3446    #[instrument(level = "error", skip_all)]
3447    pub async fn reconfigure(
3448        &self,
3449        cur_epoch_store: &AuthorityPerEpochStore,
3450        supported_protocol_versions: SupportedProtocolVersions,
3451        new_committee: Committee,
3452        epoch_start_configuration: EpochStartConfiguration,
3453        state_hasher: Arc<GlobalStateHasher>,
3454        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3455        epoch_supply_change: i64,
3456        epoch_last_checkpoint: CheckpointSequenceNumber,
3457    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3458        Self::check_protocol_version(
3459            supported_protocol_versions,
3460            epoch_start_configuration
3461                .epoch_start_state()
3462                .protocol_version(),
3463        );
3464        self.metrics.reset_on_reconfigure();
3465        self.committee_store.insert_new_committee(&new_committee)?;
3466
3467        // Wait until no transactions are being executed.
3468        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3469
3470        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3471        cur_epoch_store.epoch_terminated().await;
3472
3473        let highest_locally_built_checkpoint_seq = self
3474            .checkpoint_store
3475            .get_latest_locally_computed_checkpoint()?
3476            .map(|c| *c.sequence_number())
3477            .unwrap_or(0);
3478
3479        assert!(
3480            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3481            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3482        );
3483        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint
3484            || self.is_fullnode(cur_epoch_store)
3485        {
3486            // if we built the last checkpoint locally (as opposed to receiving it from a
3487            // peer), then all shared_version_assignments except the one for the
3488            // ChangeEpoch transaction should have been removed
3489            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3490            // Due to (otherwise harmless) race conditions between CheckpointExecutor and
3491            // ConsensusHandler, we actually can't guarantee that all
3492            // shared_version_assignments have been removed. However,
3493            // typically at most 2 or 3 are left over. We leave this check here in order to
3494            // catch complete failure of cleanup which would cause a memory
3495            // leak.
3496            if num_shared_version_assignments > 10 {
3497                // If this happens in prod, we have a memory leak, but not a correctness issue.
3498                debug_fatal!(
3499                    "all shared_version_assignments should have been removed \
3500                    (num_shared_version_assignments: {num_shared_version_assignments})"
3501                );
3502            }
3503        }
3504
3505        // Safe to reconfigure now. No transactions are being executed,
3506        // and no epoch-specific tasks are running.
3507
3508        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3509        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3510        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3511            .await?;
3512        self.get_reconfig_api()
3513            .clear_state_end_of_epoch(&execution_lock);
3514        self.check_system_consistency(
3515            cur_epoch_store,
3516            state_hasher,
3517            expensive_safety_check_config,
3518            epoch_supply_change,
3519        )?;
3520        self.get_reconfig_api()
3521            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3522        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3523            if self
3524                .db_checkpoint_config
3525                .perform_db_checkpoints_at_epoch_end
3526            {
3527                let checkpoint_indexes = self
3528                    .db_checkpoint_config
3529                    .perform_index_db_checkpoints_at_epoch_end
3530                    .unwrap_or(false);
3531                let current_epoch = cur_epoch_store.epoch();
3532                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3533                self.checkpoint_all_dbs(
3534                    &epoch_checkpoint_path,
3535                    cur_epoch_store,
3536                    checkpoint_indexes,
3537                )?;
3538            }
3539        }
3540
3541        let new_epoch = new_committee.epoch;
3542        let new_epoch_store = self
3543            .reopen_epoch_db(
3544                cur_epoch_store,
3545                new_committee,
3546                epoch_start_configuration,
3547                expensive_safety_check_config,
3548                epoch_last_checkpoint,
3549            )
3550            .await?;
3551        assert_eq!(new_epoch_store.epoch(), new_epoch);
3552        self.transaction_manager.reconfigure(new_epoch);
3553        *execution_lock = new_epoch;
3554        // drop execution_lock after epoch store was updated
3555        // see also assert in AuthorityState::process_certificate
3556        // on the epoch store and execution lock epoch match
3557        Ok(new_epoch_store)
3558    }
3559
3560    /// Advance the epoch store to the next epoch for testing only.
3561    /// This only manually sets all the places where we have the epoch number.
3562    /// It doesn't properly reconfigure the node, hence should be only used for
3563    /// testing.
3564    pub async fn reconfigure_for_testing(&self) {
3565        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3566        let epoch_store = self.epoch_store_for_testing().clone();
3567        let protocol_config = epoch_store.protocol_config().clone();
3568        // The current protocol config used in the epoch store may have been overridden
3569        // and diverged from the protocol config definitions. That override may
3570        // have now been dropped when the initial guard was dropped. We reapply
3571        // the override before creating the new epoch store, to make sure that
3572        // the new epoch store has the same protocol config as the current one.
3573        // Since this is for testing only, we mostly like to keep the protocol config
3574        // the same across epochs.
3575        let _guard =
3576            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3577        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3578            self.get_backing_package_store().clone(),
3579            &self.config.expensive_safety_check_config,
3580            self.checkpoint_store
3581                .get_epoch_last_checkpoint(epoch_store.epoch())
3582                .unwrap()
3583                .map(|c| *c.sequence_number())
3584                .unwrap_or_default(),
3585        );
3586        let new_epoch = new_epoch_store.epoch();
3587        self.transaction_manager.reconfigure(new_epoch);
3588        self.epoch_store.store(new_epoch_store);
3589        epoch_store.epoch_terminated().await;
3590        *execution_lock = new_epoch;
3591    }
3592
3593    #[instrument(level = "error", skip_all)]
3594    fn check_system_consistency(
3595        &self,
3596        cur_epoch_store: &AuthorityPerEpochStore,
3597        state_hasher: Arc<GlobalStateHasher>,
3598        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3599        epoch_supply_change: i64,
3600    ) -> IotaResult<()> {
3601        info!(
3602            "Performing iota conservation consistency check for epoch {}",
3603            cur_epoch_store.epoch()
3604        );
3605
3606        if cfg!(debug_assertions) {
3607            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3608        }
3609
3610        self.get_reconfig_api()
3611            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3612
3613        // check for root state hash consistency with live object set
3614        if expensive_safety_check_config.enable_state_consistency_check() {
3615            info!(
3616                "Performing state consistency check for epoch {}",
3617                cur_epoch_store.epoch()
3618            );
3619            self.expensive_check_is_consistent_state(
3620                state_hasher,
3621                cur_epoch_store,
3622                cfg!(debug_assertions), // panic in debug mode only
3623            );
3624        }
3625
3626        if expensive_safety_check_config.enable_secondary_index_checks() {
3627            if let Some(indexes) = self.indexes.clone() {
3628                verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
3629                    .expect("secondary indexes are inconsistent");
3630            }
3631        }
3632
3633        Ok(())
3634    }
3635
3636    fn expensive_check_is_consistent_state(
3637        &self,
3638        state_hasher: Arc<GlobalStateHasher>,
3639        cur_epoch_store: &AuthorityPerEpochStore,
3640        panic: bool,
3641    ) {
3642        let live_object_set_hash = state_hasher.digest_live_object_set();
3643
3644        let root_state_hash: ECMHLiveObjectSetDigest = self
3645            .get_global_state_hash_store()
3646            .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
3647            .expect("Retrieving root state hash cannot fail")
3648            .expect("Root state hash for epoch must exist")
3649            .1
3650            .digest()
3651            .into();
3652
3653        let is_inconsistent = root_state_hash != live_object_set_hash;
3654        if is_inconsistent {
3655            if panic {
3656                panic!(
3657                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3658                );
3659            } else {
3660                error!(
3661                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3662                    root_state_hash, live_object_set_hash
3663                );
3664            }
3665        } else {
3666            info!("State consistency check passed");
3667        }
3668
3669        if !panic {
3670            state_hasher.set_inconsistent_state(is_inconsistent);
3671        }
3672    }
3673
3674    pub fn current_epoch_for_testing(&self) -> EpochId {
3675        self.epoch_store_for_testing().epoch()
3676    }
3677
3678    #[instrument(level = "error", skip_all)]
3679    pub fn checkpoint_all_dbs(
3680        &self,
3681        checkpoint_path: &Path,
3682        cur_epoch_store: &AuthorityPerEpochStore,
3683        checkpoint_indexes: bool,
3684    ) -> IotaResult {
3685        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3686        let current_epoch = cur_epoch_store.epoch();
3687
3688        if checkpoint_path.exists() {
3689            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3690            return Ok(());
3691        }
3692
3693        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3694        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3695
3696        if checkpoint_path_tmp.exists() {
3697            fs::remove_dir_all(&checkpoint_path_tmp)
3698                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3699        }
3700
3701        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3702        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3703
3704        // NOTE: Do not change the order of invoking these checkpoint calls
3705        // We want to snapshot checkpoint db first to not race with state sync
3706        self.checkpoint_store
3707            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3708
3709        self.get_reconfig_api()
3710            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3711
3712        self.committee_store
3713            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3714
3715        if checkpoint_indexes {
3716            if let Some(indexes) = self.indexes.as_ref() {
3717                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3718            }
3719            if let Some(grpc_indexes_store) = self.grpc_indexes_store.as_ref() {
3720                grpc_indexes_store.checkpoint_db(&checkpoint_path_tmp.join(GRPC_INDEXES_DIR))?;
3721            }
3722        }
3723
3724        fs::rename(checkpoint_path_tmp, checkpoint_path)
3725            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3726        Ok(())
3727    }
3728
3729    /// Load the current epoch store. This can change during reconfiguration. To
3730    /// ensure that we never end up accessing different epoch stores in a
3731    /// single task, we need to make sure that this is called once per task.
3732    /// Each call needs to be carefully audited to ensure it is
3733    /// the case. This also means we should minimize the number of call-sites.
3734    /// Only call it when there is no way to obtain it from somewhere else.
3735    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3736        self.epoch_store.load()
3737    }
3738
3739    // Load the epoch store, should be used in tests only.
3740    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3741        self.load_epoch_store_one_call_per_task()
3742    }
3743
3744    pub fn clone_committee_for_testing(&self) -> Committee {
3745        Committee::clone(self.epoch_store_for_testing().committee())
3746    }
3747
3748    #[instrument(level = "trace", skip_all)]
3749    pub async fn try_get_object(&self, object_id: &ObjectId) -> IotaResult<Option<Object>> {
3750        self.get_object_store()
3751            .try_get_object(object_id)
3752            .map_err(Into::into)
3753    }
3754
3755    /// Non-fallible version of `try_get_object`.
3756    pub async fn get_object(&self, object_id: &ObjectId) -> Option<Object> {
3757        self.try_get_object(object_id)
3758            .await
3759            .expect("storage access failed")
3760    }
3761
3762    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3763        Ok(self
3764            .try_get_object(&ObjectId::SYSTEM)
3765            .await?
3766            .expect("system package should always exist")
3767            .compute_object_reference())
3768    }
3769
3770    // This function is only used for testing.
3771    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3772        self.get_object_cache_reader()
3773            .try_get_iota_system_state_object_unsafe()
3774    }
3775
3776    #[instrument(level = "trace", skip_all)]
3777    pub fn get_checkpoint_by_sequence_number(
3778        &self,
3779        sequence_number: CheckpointSequenceNumber,
3780    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3781        Ok(self
3782            .checkpoint_store
3783            .get_checkpoint_by_sequence_number(sequence_number)?)
3784    }
3785
3786    /// Wait for the given transactions to be included in a checkpoint.
3787    ///
3788    /// Returns a mapping from transaction digest to
3789    /// `(checkpoint_sequence_number, checkpoint_timestamp_ms)`.
3790    /// On timeout, returns partial results for any transactions that were
3791    /// already checkpointed.
3792    pub async fn wait_for_checkpoint_inclusion(
3793        &self,
3794        digests: &[TransactionDigest],
3795        timeout: Duration,
3796    ) -> IotaResult<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>> {
3797        let epoch_store = self.load_epoch_store_one_call_per_task();
3798
3799        // Local cache so multiple transactions in the same checkpoint only
3800        // trigger a single checkpoint summary lookup.
3801        let mut checkpoint_timestamp_cache = HashMap::<CheckpointSequenceNumber, u64>::new();
3802
3803        let results = epoch_store
3804            .wait_for_transactions_in_checkpoint_with_timeout(digests, timeout, |seq| {
3805                *checkpoint_timestamp_cache.entry(seq).or_insert_with(|| {
3806                    self.get_checkpoint_by_sequence_number(seq)
3807                        .ok()
3808                        .flatten()
3809                        .map(|c| c.timestamp_ms)
3810                        .unwrap_or(0)
3811                })
3812            })
3813            .await?;
3814
3815        Ok(digests
3816            .iter()
3817            .copied()
3818            .zip(results)
3819            .filter_map(|(digest, opt)| opt.map(|seq_and_ts| (digest, seq_and_ts)))
3820            .collect())
3821    }
3822
3823    #[instrument(level = "trace", skip_all)]
3824    pub fn get_transaction_checkpoint_for_tests(
3825        &self,
3826        digest: &TransactionDigest,
3827        epoch_store: &AuthorityPerEpochStore,
3828    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3829        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3830        let Some(checkpoint) = checkpoint else {
3831            return Ok(None);
3832        };
3833        let checkpoint = self
3834            .checkpoint_store
3835            .get_checkpoint_by_sequence_number(checkpoint)?;
3836        Ok(checkpoint)
3837    }
3838
3839    #[instrument(level = "trace", skip_all)]
3840    pub fn get_object_read(&self, object_id: &ObjectId) -> IotaResult<ObjectRead> {
3841        Ok(
3842            match self
3843                .get_object_cache_reader()
3844                .try_get_latest_object_or_tombstone(*object_id)?
3845            {
3846                Some((_, ObjectOrTombstone::Object(object))) => {
3847                    let layout = self.get_object_layout(&object)?;
3848                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3849                }
3850                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3851                None => ObjectRead::NotExists(*object_id),
3852            },
3853        )
3854    }
3855
3856    /// Chain Identifier is the digest of the genesis checkpoint.
3857    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3858        self.chain_identifier
3859    }
3860
3861    #[instrument(level = "trace", skip_all)]
3862    pub fn get_move_object<T>(&self, object_id: &ObjectId) -> IotaResult<T>
3863    where
3864        T: DeserializeOwned,
3865    {
3866        let o = self.get_object_read(object_id)?.into_object()?;
3867        if let Some(move_object) = o.data.as_struct_opt() {
3868            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3869                IotaError::ObjectDeserialization {
3870                    error: format!("{e}"),
3871                }
3872            })?)
3873        } else {
3874            Err(IotaError::ObjectDeserialization {
3875                error: format!("Provided object : [{object_id}] is not a Move object."),
3876            })
3877        }
3878    }
3879
3880    /// This function aims to serve rpc reads on past objects and
3881    /// we don't expect it to be called for other purposes.
3882    /// Depending on the object pruning policies that will be enforced in the
3883    /// future there is no software-level guarantee/SLA to retrieve an object
3884    /// with an old version even if it exists/existed.
3885    #[instrument(level = "trace", skip_all)]
3886    pub fn get_past_object_read(
3887        &self,
3888        object_id: &ObjectId,
3889        version: SequenceNumber,
3890    ) -> IotaResult<PastObjectRead> {
3891        // Firstly we see if the object ever existed by getting its latest data
3892        let Some(obj_ref) = self
3893            .get_object_cache_reader()
3894            .try_get_latest_object_ref_or_tombstone(*object_id)?
3895        else {
3896            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3897        };
3898
3899        if version > obj_ref.version {
3900            return Ok(PastObjectRead::VersionTooHigh {
3901                object_id: *object_id,
3902                asked_version: version,
3903                latest_version: obj_ref.version,
3904            });
3905        }
3906
3907        if version < obj_ref.version {
3908            // Read past objects
3909            return Ok(match self.read_object_at_version(object_id, version)? {
3910                Some((object, layout)) => {
3911                    let obj_ref = object.compute_object_reference();
3912                    PastObjectRead::VersionFound(obj_ref, object, layout)
3913                }
3914
3915                None => PastObjectRead::VersionNotFound(*object_id, version),
3916            });
3917        }
3918
3919        if !obj_ref.digest.is_object_alive() {
3920            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3921        }
3922
3923        match self.read_object_at_version(object_id, obj_ref.version)? {
3924            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3925            None => {
3926                error!(
3927                    "Object with in parent_entry is missing from object store, datastore is \
3928                     inconsistent",
3929                );
3930                Err(UserInputError::ObjectNotFound {
3931                    object_id: *object_id,
3932                    version: Some(obj_ref.version),
3933                }
3934                .into())
3935            }
3936        }
3937    }
3938
3939    #[instrument(level = "trace", skip_all)]
3940    fn read_object_at_version(
3941        &self,
3942        object_id: &ObjectId,
3943        version: SequenceNumber,
3944    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3945        let Some(object) = self
3946            .get_object_cache_reader()
3947            .try_get_object_by_key(object_id, version)?
3948        else {
3949            return Ok(None);
3950        };
3951
3952        let layout = self.get_object_layout(&object)?;
3953        Ok(Some((object, layout)))
3954    }
3955
3956    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3957        let layout = object
3958            .data
3959            .as_struct_opt()
3960            .map(|object| {
3961                into_struct_layout(
3962                    self.load_epoch_store_one_call_per_task()
3963                        .executor()
3964                        // TODO(cache) - must read through cache
3965                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3966                        .get_annotated_layout(object.struct_tag())?,
3967                )
3968            })
3969            .transpose()?;
3970        Ok(layout)
3971    }
3972
3973    fn get_owner_at_version(
3974        &self,
3975        object_id: &ObjectId,
3976        version: SequenceNumber,
3977    ) -> IotaResult<Owner> {
3978        self.get_object_store()
3979            .try_get_object_by_key(object_id, version)?
3980            .ok_or_else(|| {
3981                IotaError::from(UserInputError::ObjectNotFound {
3982                    object_id: *object_id,
3983                    version: Some(version),
3984                })
3985            })
3986            .map(|o| o.owner)
3987    }
3988
3989    #[instrument(level = "trace", skip_all)]
3990    pub fn get_owner_objects(
3991        &self,
3992        owner: IotaAddress,
3993        // If `Some`, the query will start from the next item after the specified cursor
3994        cursor: Option<ObjectId>,
3995        limit: usize,
3996        filter: Option<IotaObjectDataFilter>,
3997    ) -> IotaResult<Vec<ObjectInfo>> {
3998        if let Some(indexes) = &self.indexes {
3999            indexes.get_owner_objects(owner, cursor, limit, filter)
4000        } else {
4001            Err(IotaError::IndexStoreNotAvailable)
4002        }
4003    }
4004
4005    #[instrument(level = "trace", skip_all)]
4006    pub fn get_owned_coins_iterator_with_cursor(
4007        &self,
4008        owner: IotaAddress,
4009        // If `Some`, the query will start from the next item after the specified cursor
4010        cursor: (String, ObjectId),
4011        limit: usize,
4012        one_coin_type_only: bool,
4013    ) -> IotaResult<impl Iterator<Item = (String, ObjectId, CoinInfo)> + '_> {
4014        if let Some(indexes) = &self.indexes {
4015            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4016        } else {
4017            Err(IotaError::IndexStoreNotAvailable)
4018        }
4019    }
4020
4021    #[instrument(level = "trace", skip_all)]
4022    pub fn get_owner_objects_iterator(
4023        &self,
4024        owner: IotaAddress,
4025        // If `Some`, the query will start from the next item after the specified cursor
4026        cursor: Option<ObjectId>,
4027        filter: Option<IotaObjectDataFilter>,
4028    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
4029        let cursor_u = cursor.unwrap_or(ObjectId::ZERO);
4030        if let Some(indexes) = &self.indexes {
4031            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4032        } else {
4033            Err(IotaError::IndexStoreNotAvailable)
4034        }
4035    }
4036
4037    #[instrument(level = "trace", skip_all)]
4038    pub async fn get_move_objects<T>(
4039        &self,
4040        owner: IotaAddress,
4041        tag: StructTag,
4042    ) -> IotaResult<Vec<T>>
4043    where
4044        T: DeserializeOwned,
4045    {
4046        let object_ids = self
4047            .get_owner_objects_iterator(owner, None, None)?
4048            .filter(|o| match &o.type_ {
4049                ObjectType::Struct(s) => *s == tag,
4050                ObjectType::Package => false,
4051            })
4052            .map(|info| ObjectKey(info.object_id, info.version))
4053            .collect::<Vec<_>>();
4054        let mut move_objects = vec![];
4055
4056        let objects = self
4057            .get_object_store()
4058            .try_multi_get_objects_by_key(&object_ids)?;
4059
4060        for (o, id) in objects.into_iter().zip(object_ids) {
4061            let object = o.ok_or_else(|| {
4062                IotaError::from(UserInputError::ObjectNotFound {
4063                    object_id: id.0,
4064                    version: Some(id.1),
4065                })
4066            })?;
4067            let move_object = object.data.as_struct_opt().ok_or_else(|| {
4068                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4069            })?;
4070            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4071                IotaError::ObjectDeserialization {
4072                    error: format!("{e}"),
4073                }
4074            })?);
4075        }
4076        Ok(move_objects)
4077    }
4078
4079    #[instrument(level = "trace", skip_all)]
4080    pub fn get_dynamic_fields(
4081        &self,
4082        owner: ObjectId,
4083        // If `Some`, the query will start from the next item after the specified cursor
4084        cursor: Option<ObjectId>,
4085        limit: usize,
4086    ) -> IotaResult<Vec<(ObjectId, DynamicFieldInfo)>> {
4087        Ok(self
4088            .get_dynamic_fields_iterator(owner, cursor)?
4089            .take(limit)
4090            .collect::<Result<Vec<_>, _>>()?)
4091    }
4092
4093    fn get_dynamic_fields_iterator(
4094        &self,
4095        owner: ObjectId,
4096        // If `Some`, the query will start from the next item after the specified cursor
4097        cursor: Option<ObjectId>,
4098    ) -> IotaResult<impl Iterator<Item = Result<(ObjectId, DynamicFieldInfo), TypedStoreError>> + '_>
4099    {
4100        if let Some(indexes) = &self.indexes {
4101            indexes.get_dynamic_fields_iterator(owner, cursor)
4102        } else {
4103            Err(IotaError::IndexStoreNotAvailable)
4104        }
4105    }
4106
4107    #[instrument(level = "trace", skip_all)]
4108    pub fn get_dynamic_field_object_id(
4109        &self,
4110        owner: ObjectId,
4111        name_type: TypeTag,
4112        name_bcs_bytes: &[u8],
4113    ) -> IotaResult<Option<ObjectId>> {
4114        if let Some(indexes) = &self.indexes {
4115            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4116        } else {
4117            Err(IotaError::IndexStoreNotAvailable)
4118        }
4119    }
4120
4121    #[instrument(level = "trace", skip_all)]
4122    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
4123        Ok(self.get_indexes()?.next_sequence_number())
4124    }
4125
4126    #[instrument(level = "trace", skip_all)]
4127    pub async fn get_executed_transaction_and_effects(
4128        &self,
4129        digest: TransactionDigest,
4130        kv_store: Arc<TransactionKeyValueStore>,
4131    ) -> IotaResult<(Transaction, TransactionEffects)> {
4132        let transaction = kv_store.get_tx(digest).await?;
4133        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4134        Ok((transaction, effects))
4135    }
4136
4137    #[instrument(level = "trace", skip_all)]
4138    pub fn multi_get_checkpoint_by_sequence_number(
4139        &self,
4140        sequence_numbers: &[CheckpointSequenceNumber],
4141    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
4142        Ok(self
4143            .checkpoint_store
4144            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4145    }
4146
4147    #[instrument(level = "trace", skip_all)]
4148    pub fn get_transaction_events(
4149        &self,
4150        digest: &TransactionDigest,
4151    ) -> IotaResult<TransactionEvents> {
4152        self.get_transaction_cache_reader()
4153            .try_get_events(digest)?
4154            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
4155    }
4156
4157    pub fn get_transaction_input_objects(
4158        &self,
4159        effects: &TransactionEffects,
4160    ) -> anyhow::Result<Vec<Object>> {
4161        iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4162            .map_err(Into::into)
4163    }
4164
4165    pub fn get_transaction_output_objects(
4166        &self,
4167        effects: &TransactionEffects,
4168    ) -> anyhow::Result<Vec<Object>> {
4169        iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4170            .map_err(Into::into)
4171    }
4172
4173    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
4174        match &self.indexes {
4175            Some(i) => Ok(i.clone()),
4176            None => Err(IotaError::UnsupportedFeature {
4177                error: "extended object indexing is not enabled on this server".into(),
4178            }),
4179        }
4180    }
4181
4182    pub async fn get_transactions_for_tests(
4183        self: &Arc<Self>,
4184        filter: Option<TransactionFilter>,
4185        cursor: Option<TransactionDigest>,
4186        limit: Option<usize>,
4187        reverse: bool,
4188    ) -> IotaResult<Vec<TransactionDigest>> {
4189        let metrics = KeyValueStoreMetrics::new_for_tests();
4190        let kv_store = Arc::new(TransactionKeyValueStore::new(
4191            "rocksdb",
4192            metrics,
4193            self.clone(),
4194        ));
4195        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4196            .await
4197    }
4198
4199    #[instrument(level = "trace", skip_all)]
4200    pub async fn get_transactions(
4201        &self,
4202        kv_store: &Arc<TransactionKeyValueStore>,
4203        filter: Option<TransactionFilter>,
4204        // If `Some`, the query will start from the next item after the specified cursor
4205        cursor: Option<TransactionDigest>,
4206        limit: Option<usize>,
4207        reverse: bool,
4208    ) -> IotaResult<Vec<TransactionDigest>> {
4209        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4210            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4211            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4212            if reverse {
4213                let iter = iter
4214                    .rev()
4215                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4216                    .skip(usize::from(cursor.is_some()));
4217                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4218            } else {
4219                let iter = iter
4220                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4221                    .skip(usize::from(cursor.is_some()));
4222                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4223            }
4224        }
4225        self.get_indexes()?
4226            .get_transactions(filter, cursor, limit, reverse)
4227    }
4228
4229    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4230        &self.checkpoint_store
4231    }
4232
4233    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4234        self.get_checkpoint_store()
4235            .get_highest_executed_checkpoint_seq_number()?
4236            .ok_or(IotaError::UserInput {
4237                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4238            })
4239    }
4240
4241    #[cfg(msim)]
4242    pub fn get_highest_pruned_checkpoint_for_testing(
4243        &self,
4244    ) -> IotaResult<CheckpointSequenceNumber> {
4245        self.database_for_testing()
4246            .perpetual_tables
4247            .get_highest_pruned_checkpoint()
4248            .map(|c| c.unwrap_or(0))
4249            .map_err(Into::into)
4250    }
4251
4252    #[instrument(level = "trace", skip_all)]
4253    pub fn get_checkpoint_summary_by_sequence_number(
4254        &self,
4255        sequence_number: CheckpointSequenceNumber,
4256    ) -> IotaResult<CheckpointSummary> {
4257        let verified_checkpoint = self
4258            .get_checkpoint_store()
4259            .get_checkpoint_by_sequence_number(sequence_number)?;
4260        match verified_checkpoint {
4261            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4262            None => Err(IotaError::UserInput {
4263                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4264            }),
4265        }
4266    }
4267
4268    #[instrument(level = "trace", skip_all)]
4269    pub fn get_checkpoint_summary_by_digest(
4270        &self,
4271        digest: CheckpointDigest,
4272    ) -> IotaResult<CheckpointSummary> {
4273        let verified_checkpoint = self
4274            .get_checkpoint_store()
4275            .get_checkpoint_by_digest(&digest)?;
4276        match verified_checkpoint {
4277            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4278            None => Err(IotaError::UserInput {
4279                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4280            }),
4281        }
4282    }
4283
4284    #[instrument(level = "trace", skip_all)]
4285    pub fn find_publish_txn_digest(&self, package_id: ObjectId) -> IotaResult<TransactionDigest> {
4286        if package_id.is_system_package() {
4287            return self.find_genesis_txn_digest();
4288        }
4289        Ok(self
4290            .get_object_read(&package_id)?
4291            .into_object()?
4292            .previous_transaction)
4293    }
4294
4295    #[instrument(level = "trace", skip_all)]
4296    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4297        let summary = self
4298            .get_verified_checkpoint_by_sequence_number(0)?
4299            .into_message();
4300        let content = self.get_checkpoint_contents(summary.content_digest)?;
4301        let genesis_transaction = content.enumerate_transactions(&summary).next();
4302        Ok(genesis_transaction
4303            .ok_or(IotaError::UserInput {
4304                error: UserInputError::GenesisTransactionNotFound,
4305            })?
4306            .1
4307            .transaction)
4308    }
4309
4310    #[instrument(level = "trace", skip_all)]
4311    pub fn get_verified_checkpoint_by_sequence_number(
4312        &self,
4313        sequence_number: CheckpointSequenceNumber,
4314    ) -> IotaResult<VerifiedCheckpoint> {
4315        let verified_checkpoint = self
4316            .get_checkpoint_store()
4317            .get_checkpoint_by_sequence_number(sequence_number)?;
4318        match verified_checkpoint {
4319            Some(verified_checkpoint) => Ok(verified_checkpoint),
4320            None => Err(IotaError::UserInput {
4321                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4322            }),
4323        }
4324    }
4325
4326    #[instrument(level = "trace", skip_all)]
4327    pub fn get_verified_checkpoint_summary_by_digest(
4328        &self,
4329        digest: CheckpointDigest,
4330    ) -> IotaResult<VerifiedCheckpoint> {
4331        let verified_checkpoint = self
4332            .get_checkpoint_store()
4333            .get_checkpoint_by_digest(&digest)?;
4334        match verified_checkpoint {
4335            Some(verified_checkpoint) => Ok(verified_checkpoint),
4336            None => Err(IotaError::UserInput {
4337                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4338            }),
4339        }
4340    }
4341
4342    #[instrument(level = "trace", skip_all)]
4343    pub fn get_checkpoint_contents(
4344        &self,
4345        digest: CheckpointContentsDigest,
4346    ) -> IotaResult<CheckpointContents> {
4347        self.get_checkpoint_store()
4348            .get_checkpoint_contents(&digest)?
4349            .ok_or(IotaError::UserInput {
4350                error: UserInputError::CheckpointContentsNotFound(digest),
4351            })
4352    }
4353
4354    #[instrument(level = "trace", skip_all)]
4355    pub fn get_checkpoint_contents_by_sequence_number(
4356        &self,
4357        sequence_number: CheckpointSequenceNumber,
4358    ) -> IotaResult<CheckpointContents> {
4359        let verified_checkpoint = self
4360            .get_checkpoint_store()
4361            .get_checkpoint_by_sequence_number(sequence_number)?;
4362        match verified_checkpoint {
4363            Some(verified_checkpoint) => {
4364                let content_digest = verified_checkpoint.into_inner().content_digest;
4365                self.get_checkpoint_contents(content_digest)
4366            }
4367            None => Err(IotaError::UserInput {
4368                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4369            }),
4370        }
4371    }
4372
4373    #[instrument(level = "trace", skip_all)]
4374    pub async fn query_events(
4375        &self,
4376        kv_store: &Arc<TransactionKeyValueStore>,
4377        query: EventFilter,
4378        // If `Some`, the query will start from the next item after the specified cursor
4379        cursor: Option<EventID>,
4380        limit: usize,
4381        descending: bool,
4382    ) -> IotaResult<Vec<IotaEvent>> {
4383        let index_store = self.get_indexes()?;
4384
4385        // Get the tx_num from tx_digest
4386        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4387            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4388                IotaError::TransactionNotFound {
4389                    digest: cursor.tx_digest,
4390                },
4391            )?;
4392            (tx_seq, cursor.event_seq as usize)
4393        } else if descending {
4394            (u64::MAX, usize::MAX)
4395        } else {
4396            (0, 0)
4397        };
4398
4399        let limit = limit + 1;
4400        let mut event_keys = match query {
4401            EventFilter::All(filters) => {
4402                if filters.is_empty() {
4403                    index_store.all_events(tx_num, event_num, limit, descending)?
4404                } else {
4405                    return Err(IotaError::UserInput {
4406                        error: UserInputError::Unsupported(
4407                            "This query type does not currently support filter combinations"
4408                                .to_string(),
4409                        ),
4410                    });
4411                }
4412            }
4413            EventFilter::Transaction(digest) => {
4414                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4415            }
4416            EventFilter::MoveModule { package, module } => {
4417                let module_id = ModuleId::new(
4418                    AccountAddress::new(package.into_bytes()),
4419                    move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4420                );
4421                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4422            }
4423            EventFilter::MoveEventType(struct_name) => index_store
4424                .events_by_move_event_struct_name(
4425                    &struct_name,
4426                    tx_num,
4427                    event_num,
4428                    limit,
4429                    descending,
4430                )?,
4431            EventFilter::Sender(sender) => {
4432                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4433            }
4434            EventFilter::TimeRange {
4435                start_time,
4436                end_time,
4437            } => index_store
4438                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4439            EventFilter::MoveEventModule { package, module } => index_store
4440                .events_by_move_event_module(
4441                    &ModuleId::new(
4442                        AccountAddress::new(package.into_bytes()),
4443                        move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4444                    ),
4445                    tx_num,
4446                    event_num,
4447                    limit,
4448                    descending,
4449                )?,
4450            // not using "_ =>" because we want to make sure we remember to add new variants here
4451            EventFilter::Package(_)
4452            | EventFilter::MoveEventField { .. }
4453            | EventFilter::Any(_)
4454            | EventFilter::And(_, _)
4455            | EventFilter::Or(_, _) => {
4456                return Err(IotaError::UserInput {
4457                    error: UserInputError::Unsupported(
4458                        "This query type is not supported by the full node.".to_string(),
4459                    ),
4460                });
4461            }
4462        };
4463
4464        // skip one event if exclusive cursor is provided,
4465        // otherwise truncate to the original limit.
4466        if cursor.is_some() {
4467            if !event_keys.is_empty() {
4468                event_keys.remove(0);
4469            }
4470        } else {
4471            event_keys.truncate(limit - 1);
4472        }
4473
4474        // get the unique set of digests from the event_keys
4475        let transaction_digests = event_keys
4476            .iter()
4477            .map(|(_, digest, _, _)| *digest)
4478            .collect::<HashSet<_>>()
4479            .into_iter()
4480            .collect::<Vec<_>>();
4481
4482        let events = kv_store
4483            .multi_get_events_by_tx_digests(&transaction_digests)
4484            .await?;
4485
4486        let events_map: HashMap<_, _> =
4487            transaction_digests.iter().zip(events.into_iter()).collect();
4488
4489        let stored_events = event_keys
4490            .into_iter()
4491            .map(|k| {
4492                (
4493                    k,
4494                    events_map
4495                        .get(&k.1)
4496                        .expect("fetched digest is missing")
4497                        .clone()
4498                        .and_then(|e| e.get(k.2).cloned()),
4499                )
4500            })
4501            .map(
4502                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4503                    event
4504                        .map(|e| (e, tx_digest, event_seq, timestamp))
4505                        .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4506                },
4507            )
4508            .collect::<Result<Vec<_>, _>>()?;
4509
4510        let epoch_store = self.load_epoch_store_one_call_per_task();
4511        let backing_store = self.get_backing_package_store().as_ref();
4512        let mut layout_resolver = epoch_store
4513            .executor()
4514            .type_layout_resolver(Box::new(backing_store));
4515        let mut events = vec![];
4516        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4517            events.push(IotaEvent::try_from(
4518                e.clone(),
4519                tx_digest,
4520                event_seq as u64,
4521                Some(timestamp),
4522                layout_resolver.get_annotated_layout(&e.type_)?,
4523            )?)
4524        }
4525        Ok(events)
4526    }
4527
4528    pub async fn insert_genesis_object(&self, object: Object) {
4529        self.get_reconfig_api()
4530            .try_insert_genesis_object(object)
4531            .expect("Cannot insert genesis object")
4532    }
4533
4534    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4535        futures::future::join_all(
4536            objects
4537                .iter()
4538                .map(|o| self.insert_genesis_object(o.clone())),
4539        )
4540        .await;
4541    }
4542
4543    /// Make a status response for a transaction
4544    #[instrument(level = "trace", skip_all)]
4545    pub fn get_transaction_status(
4546        &self,
4547        transaction_digest: &TransactionDigest,
4548        epoch_store: &Arc<AuthorityPerEpochStore>,
4549    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4550        // TODO: In the case of read path, we should not have to re-sign the effects.
4551        if let Some(effects) =
4552            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4553        {
4554            if let Some(transaction) = self
4555                .get_transaction_cache_reader()
4556                .try_get_transaction_block(transaction_digest)?
4557            {
4558                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4559                let events = if effects.events_digest().is_some() {
4560                    self.get_transaction_events(effects.transaction_digest())?
4561                } else {
4562                    TransactionEvents::default()
4563                };
4564                return Ok(Some((
4565                    (*transaction).clone().into_message(),
4566                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4567                )));
4568            } else {
4569                // The read of effects and read of transaction are not atomic. It's possible
4570                // that we reverted the transaction (during epoch change) in
4571                // between the above two reads, and we end up having effects but
4572                // not transaction. In this case, we just fall through.
4573                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4574            }
4575        }
4576        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4577            self.metrics.tx_already_processed.inc();
4578            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4579            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4580        } else {
4581            Ok(None)
4582        }
4583    }
4584
4585    /// Get the signed effects of the given transaction. If the effects was
4586    /// signed in a previous epoch, re-sign it so that the caller is able to
4587    /// form a cert of the effects in the current epoch.
4588    #[instrument(level = "trace", skip_all)]
4589    pub fn get_signed_effects_and_maybe_resign(
4590        &self,
4591        transaction_digest: &TransactionDigest,
4592        epoch_store: &Arc<AuthorityPerEpochStore>,
4593    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4594        let effects = self
4595            .get_transaction_cache_reader()
4596            .try_get_executed_effects(transaction_digest)?;
4597        match effects {
4598            Some(effects) => {
4599                // If the transaction was executed in previous epochs, the validator will
4600                // re-sign the effects with new current epoch so that a client is always able to
4601                // obtain an effects certificate at the current epoch.
4602                //
4603                // Why is this necessary? Consider the following case:
4604                // - assume there are 4 validators
4605                // - Quorum driver gets 2 signed effects before reconfig halt
4606                // - The tx makes it into final checkpoint.
4607                // - 2 validators go away and are replaced in the new epoch.
4608                // - The new epoch begins.
4609                // - The quorum driver cannot complete the partial effects cert from the
4610                //   previous epoch, because it may not be able to reach either of the 2 former
4611                //   validators.
4612                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4613                //   the new epoch, the QD can make a new effects cert and return it to the
4614                //   client.
4615                //
4616                // This is a considered a short-term workaround. Eventually, Quorum Driver
4617                // should be able to return either an effects certificate, -or-
4618                // a proof of inclusion in a checkpoint. In the case above, the
4619                // Quorum Driver would return a proof of inclusion in the final
4620                // checkpoint, and this code would no longer be necessary.
4621                if effects.epoch() != epoch_store.epoch() {
4622                    debug!(
4623                        tx_digest=?transaction_digest,
4624                        effects_epoch=?effects.epoch(),
4625                        epoch=?epoch_store.epoch(),
4626                        "Re-signing the effects with the current epoch"
4627                    );
4628                }
4629                Ok(Some(self.sign_effects(effects, epoch_store)?))
4630            }
4631            None => Ok(None),
4632        }
4633    }
4634
4635    #[instrument(level = "trace", skip_all)]
4636    pub(crate) fn sign_effects(
4637        &self,
4638        effects: TransactionEffects,
4639        epoch_store: &Arc<AuthorityPerEpochStore>,
4640    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4641        let tx_digest = *effects.transaction_digest();
4642        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4643            Some(sig) => {
4644                debug_assert!(sig.epoch == epoch_store.epoch());
4645                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4646            }
4647            _ => {
4648                let sig = AuthoritySignInfo::new(
4649                    epoch_store.epoch(),
4650                    &effects,
4651                    Intent::iota_app(IntentScope::TransactionEffects),
4652                    self.name,
4653                    &*self.secret,
4654                );
4655
4656                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4657
4658                epoch_store.insert_effects_digest_and_signature(
4659                    &tx_digest,
4660                    effects.digest(),
4661                    &sig,
4662                )?;
4663
4664                effects
4665            }
4666        };
4667
4668        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4669            signed_effects,
4670        ))
4671    }
4672
4673    // Returns coin objects for indexing for fullnode if indexing is enabled.
4674    #[instrument(level = "trace", skip_all)]
4675    fn fullnode_only_get_tx_coins_for_indexing(
4676        &self,
4677        effects: &TransactionEffects,
4678        inner_temporary_store: &InnerTemporaryStore,
4679        epoch_store: &Arc<AuthorityPerEpochStore>,
4680    ) -> Option<TxCoins> {
4681        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4682            return None;
4683        }
4684        let written_coin_objects = inner_temporary_store
4685            .written
4686            .iter()
4687            .filter_map(|(k, v)| {
4688                if v.is_coin() {
4689                    Some((*k, v.clone()))
4690                } else {
4691                    None
4692                }
4693            })
4694            .collect();
4695        let mut input_coin_objects = inner_temporary_store
4696            .input_objects
4697            .iter()
4698            .filter_map(|(k, v)| {
4699                if v.is_coin() {
4700                    Some((*k, v.clone()))
4701                } else {
4702                    None
4703                }
4704            })
4705            .collect::<ObjectMap>();
4706
4707        // Check for receiving objects that were actually used and modified during
4708        // execution. Their updated version will already showup in
4709        // "written_coins" but their input isn't included in the set of input
4710        // objects in a inner_temporary_store.
4711        for (object_id, version) in effects.modified_at_versions() {
4712            if inner_temporary_store
4713                .loaded_runtime_objects
4714                .contains_key(&object_id)
4715            {
4716                if let Some(object) = self
4717                    .get_object_store()
4718                    .get_object_by_key(&object_id, version)
4719                {
4720                    if object.is_coin() {
4721                        input_coin_objects.insert(object_id, object);
4722                    }
4723                }
4724            }
4725        }
4726
4727        Some((input_coin_objects, written_coin_objects))
4728    }
4729
4730    /// Get the TransactionEnvelope that currently locks the given object, if
4731    /// any. Since object locks are only valid for one epoch, we also need
4732    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4733    /// no lock records for the given object can be found.
4734    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4735    /// object record is at a different version.
4736    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4737    /// certain transaction. Returns None if the a lock record is
4738    /// initialized for the given ObjectRef but not yet locked by any
4739    /// transaction,     or cannot find the transaction in transaction
4740    /// table, because of data race etc.
4741    #[instrument(level = "trace", skip_all)]
4742    pub async fn get_transaction_lock(
4743        &self,
4744        object_ref: &ObjectRef,
4745        epoch_store: &AuthorityPerEpochStore,
4746    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4747        let lock_info = self
4748            .get_object_cache_reader()
4749            .try_get_lock(*object_ref, epoch_store)?;
4750        let lock_info = match lock_info {
4751            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4752                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4753                    provided_obj_ref: *object_ref,
4754                    current_version: locked_ref.version,
4755                }
4756                .into());
4757            }
4758            ObjectLockStatus::Initialized => {
4759                return Ok(None);
4760            }
4761            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4762        };
4763
4764        epoch_store.get_signed_transaction(&lock_info)
4765    }
4766
4767    pub async fn try_get_objects(&self, objects: &[ObjectId]) -> IotaResult<Vec<Option<Object>>> {
4768        self.get_object_cache_reader().try_get_objects(objects)
4769    }
4770
4771    /// Non-fallible version of `try_get_objects`.
4772    pub async fn get_objects(&self, objects: &[ObjectId]) -> Vec<Option<Object>> {
4773        self.try_get_objects(objects)
4774            .await
4775            .expect("storage access failed")
4776    }
4777
4778    pub async fn try_get_object_or_tombstone(
4779        &self,
4780        object_id: ObjectId,
4781    ) -> IotaResult<Option<ObjectRef>> {
4782        self.get_object_cache_reader()
4783            .try_get_latest_object_ref_or_tombstone(object_id)
4784    }
4785
4786    /// Non-fallible version of `try_get_object_or_tombstone`.
4787    pub async fn get_object_or_tombstone(&self, object_id: ObjectId) -> Option<ObjectRef> {
4788        self.try_get_object_or_tombstone(object_id)
4789            .await
4790            .expect("storage access failed")
4791    }
4792
4793    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4794    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4795    /// upgrade.
4796    ///
4797    /// This method can be used to dynamic adjust the amount of buffer. If set
4798    /// to 0, the upgrade will go through with only 2f+1 votes.
4799    ///
4800    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4801    /// should have the same value), or you risk halting the chain.
4802    pub fn set_override_protocol_upgrade_buffer_stake(
4803        &self,
4804        expected_epoch: EpochId,
4805        buffer_stake_bps: u64,
4806    ) -> IotaResult {
4807        let epoch_store = self.load_epoch_store_one_call_per_task();
4808        let actual_epoch = epoch_store.epoch();
4809        if actual_epoch != expected_epoch {
4810            return Err(IotaError::WrongEpoch {
4811                expected_epoch,
4812                actual_epoch,
4813            });
4814        }
4815
4816        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4817    }
4818
4819    pub fn clear_override_protocol_upgrade_buffer_stake(
4820        &self,
4821        expected_epoch: EpochId,
4822    ) -> IotaResult {
4823        let epoch_store = self.load_epoch_store_one_call_per_task();
4824        let actual_epoch = epoch_store.epoch();
4825        if actual_epoch != expected_epoch {
4826            return Err(IotaError::WrongEpoch {
4827                expected_epoch,
4828                actual_epoch,
4829            });
4830        }
4831
4832        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4833    }
4834
4835    /// Get the set of system packages that are compiled in to this build, if
4836    /// those packages are compatible with the current versions of those
4837    /// packages on-chain.
4838    pub async fn get_available_system_packages(
4839        &self,
4840        binary_config: &BinaryConfig,
4841    ) -> Vec<ObjectRef> {
4842        let mut results = vec![];
4843
4844        let system_packages = BuiltInFramework::iter_system_packages();
4845
4846        // Add extra framework packages during simtest
4847        #[cfg(msim)]
4848        let extra_packages = framework_injection::get_extra_packages(self.name);
4849        #[cfg(msim)]
4850        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4851
4852        for system_package in system_packages {
4853            let modules = system_package.modules().to_vec();
4854            // In simtests, we could override the current built-in framework packages.
4855            #[cfg(msim)]
4856            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4857                .unwrap_or(modules);
4858
4859            let Some(obj_ref) = iota_framework::compare_system_package(
4860                &self.get_object_store(),
4861                &system_package.id,
4862                &modules,
4863                system_package.dependencies.to_vec(),
4864                binary_config,
4865            )
4866            .await
4867            else {
4868                return vec![];
4869            };
4870            results.push(obj_ref);
4871        }
4872
4873        results
4874    }
4875
4876    /// Return the new versions, module bytes, and dependencies for the packages
4877    /// that have been committed to for a framework upgrade, in
4878    /// `system_packages`.  Loads the module contents from the binary, and
4879    /// performs the following checks:
4880    ///
4881    /// - Whether its contents matches what is on-chain already, in which case
4882    ///   no upgrade is required, and its contents are omitted from the output.
4883    /// - Whether the contents in the binary can form a package whose digest
4884    ///   matches the input, meaning the framework will be upgraded, and this
4885    ///   authority can satisfy that upgrade, in which case the contents are
4886    ///   included in the output.
4887    ///
4888    /// If the current version of the framework can't be loaded, the binary does
4889    /// not contain the bytes for that framework ID, or the resulting
4890    /// package fails the digest check, `None` is returned indicating that
4891    /// this authority cannot run the upgrade that the network voted on.
4892    async fn get_system_package_bytes(
4893        &self,
4894        system_packages: Vec<ObjectRef>,
4895        binary_config: &BinaryConfig,
4896    ) -> Option<Vec<SystemPackage>> {
4897        let ids: Vec<_> = system_packages
4898            .iter()
4899            .map(|object_ref| object_ref.object_id)
4900            .collect();
4901        let objects = self.get_objects(&ids).await;
4902
4903        let mut res = Vec::with_capacity(system_packages.len());
4904        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4905            let prev_transaction = match object {
4906                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4907                    // Skip this one because it doesn't need to be upgraded.
4908                    info!(
4909                        "Framework {} does not need updating",
4910                        system_package_ref.object_id
4911                    );
4912                    continue;
4913                }
4914
4915                Some(cur_object) => cur_object.previous_transaction,
4916                None => TransactionDigest::GENESIS_MARKER,
4917            };
4918
4919            #[cfg(msim)]
4920            let FrameworkSystemPackage {
4921                id: _,
4922                bytes,
4923                dependencies,
4924            } = framework_injection::get_override_system_package(
4925                &system_package_ref.object_id,
4926                self.name,
4927            )
4928            .unwrap_or_else(|| {
4929                BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone()
4930            });
4931
4932            #[cfg(not(msim))]
4933            let FrameworkSystemPackage {
4934                id: _,
4935                bytes,
4936                dependencies,
4937            } = BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone();
4938
4939            let modules: Vec<_> = bytes
4940                .iter()
4941                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4942                .collect();
4943
4944            let new_object = Object::new_system_package(
4945                &modules,
4946                system_package_ref.version,
4947                dependencies.clone(),
4948                prev_transaction,
4949            );
4950
4951            let new_ref = new_object.compute_object_reference();
4952            if new_ref != system_package_ref {
4953                error!(
4954                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4955                );
4956                return None;
4957            }
4958
4959            res.push(SystemPackage {
4960                version: system_package_ref.version,
4961                modules: bytes,
4962                dependencies,
4963            });
4964        }
4965
4966        Some(res)
4967    }
4968
4969    /// Returns the new protocol version and system packages that the network
4970    /// has voted to upgrade to. If the proposed protocol version is not
4971    /// supported, None is returned.
4972    fn is_protocol_version_supported_v1(
4973        proposed_protocol_version: ProtocolVersion,
4974        committee: &Committee,
4975        capabilities: Vec<AuthorityCapabilitiesV1>,
4976        mut buffer_stake_bps: u64,
4977    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4978        if buffer_stake_bps > 10000 {
4979            warn!("clamping buffer_stake_bps to 10000");
4980            buffer_stake_bps = 10000;
4981        }
4982
4983        // For each validator, gather the protocol version and system packages that it
4984        // would like to upgrade to in the next epoch.
4985        let mut desired_upgrades: Vec<_> = capabilities
4986            .into_iter()
4987            .filter_map(|mut cap| {
4988                // A validator that lists no packages is voting against any change at all.
4989                if cap.available_system_packages.is_empty() {
4990                    return None;
4991                }
4992
4993                cap.available_system_packages.sort();
4994
4995                info!(
4996                    "validator {:?} supports {:?} with system packages: {:?}",
4997                    cap.authority.concise(),
4998                    cap.supported_protocol_versions,
4999                    cap.available_system_packages,
5000                );
5001
5002                // A validator that only supports the current protocol version is also voting
5003                // against any change, because framework upgrades always require a protocol
5004                // version bump.
5005                cap.supported_protocol_versions
5006                    .get_version_digest(proposed_protocol_version)
5007                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
5008            })
5009            .collect();
5010
5011        // There can only be one set of votes that have a majority, find one if it
5012        // exists.
5013        desired_upgrades.sort();
5014        desired_upgrades
5015            .into_iter()
5016            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5017            .into_iter()
5018            .find_map(|((digest, packages), group)| {
5019                // should have been filtered out earlier.
5020                assert!(!packages.is_empty());
5021
5022                let mut stake_aggregator: StakeAggregator<(), true> =
5023                    StakeAggregator::new(Arc::new(committee.clone()));
5024
5025                for (_, _, authority) in group {
5026                    stake_aggregator.insert_generic(authority, ());
5027                }
5028
5029                let total_votes = stake_aggregator.total_votes();
5030                let quorum_threshold = committee.quorum_threshold();
5031                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5032
5033                info!(
5034                    protocol_config_digest = ?digest,
5035                    ?total_votes,
5036                    ?quorum_threshold,
5037                    ?buffer_stake_bps,
5038                    ?effective_threshold,
5039                    ?proposed_protocol_version,
5040                    ?packages,
5041                    "support for upgrade"
5042                );
5043
5044                let has_support = total_votes >= effective_threshold;
5045                has_support.then_some((proposed_protocol_version, digest, packages))
5046            })
5047    }
5048
5049    /// Selects the highest supported protocol version and system packages that
5050    /// the network has voted to upgrade to. If no upgrade is supported,
5051    /// returns the current protocol version and system packages.
5052    fn choose_protocol_version_and_system_packages_v1(
5053        current_protocol_version: ProtocolVersion,
5054        current_protocol_digest: Digest,
5055        committee: &Committee,
5056        capabilities: Vec<AuthorityCapabilitiesV1>,
5057        buffer_stake_bps: u64,
5058    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
5059        let mut next_protocol_version = current_protocol_version;
5060        let mut system_packages = vec![];
5061        let mut protocol_version_digest = current_protocol_digest;
5062
5063        // Finds the highest supported protocol version and system packages by
5064        // incrementing the proposed protocol version by one until no further
5065        // upgrades are supported.
5066        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
5067            next_protocol_version + 1,
5068            committee,
5069            capabilities.clone(),
5070            buffer_stake_bps,
5071        ) {
5072            next_protocol_version = version;
5073            protocol_version_digest = digest;
5074            system_packages = packages;
5075        }
5076
5077        (
5078            next_protocol_version,
5079            protocol_version_digest,
5080            system_packages,
5081        )
5082    }
5083
5084    /// Returns the indices of validators that support the given protocol
5085    /// version and digest. This includes both committee and non-committee
5086    /// validators based on their capabilities. Uses active validators
5087    /// instead of committee indices.
5088    fn get_validators_supporting_protocol_version(
5089        target_protocol_version: ProtocolVersion,
5090        target_digest: Digest,
5091        active_validators: &[AuthorityPublicKey],
5092        capabilities: &[AuthorityCapabilitiesV1],
5093    ) -> Vec<u64> {
5094        let mut eligible_validators = Vec::new();
5095
5096        for capability in capabilities {
5097            // Check if this validator supports the target protocol version and digest
5098            if let Some(digest) = capability
5099                .supported_protocol_versions
5100                .get_version_digest(target_protocol_version)
5101            {
5102                if digest == target_digest {
5103                    // Find the validator's index in the active validators list
5104                    if let Some(index) = active_validators
5105                        .iter()
5106                        .position(|name| AuthorityName::from(name) == capability.authority)
5107                    {
5108                        eligible_validators.push(index as u64);
5109                    }
5110                }
5111            }
5112        }
5113
5114        // Sort indices for deterministic behavior
5115        eligible_validators.sort();
5116        eligible_validators
5117    }
5118
5119    /// Calculates the sum of weights for eligible validators that are part of
5120    /// the committee. Takes the indices from
5121    /// get_validators_supporting_protocol_version and maps them back
5122    /// to committee members to get their weights.
5123    fn calculate_eligible_validators_weight(
5124        eligible_validator_indices: &[u64],
5125        active_validators: &[AuthorityPublicKey],
5126        committee: &Committee,
5127    ) -> u64 {
5128        let mut total_weight = 0u64;
5129
5130        for &index in eligible_validator_indices {
5131            let authority_pubkey = &active_validators[index as usize];
5132            // Check if this validator is in the committee and get their weight
5133            if let Some((_, weight)) = committee
5134                .members()
5135                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
5136            {
5137                total_weight += weight;
5138            }
5139        }
5140
5141        total_weight
5142    }
5143
5144    /// Creates and execute the advance epoch transaction to effects without
5145    /// committing it to the database. The effects of the change epoch tx
5146    /// are only written to the database after a certified checkpoint has been
5147    /// formed and executed by CheckpointExecutor.
5148    ///
5149    /// When a framework upgraded has been decided on, but the validator does
5150    /// not have the new versions of the packages locally, the validator
5151    /// cannot form the ChangeEpochTx. In this case it returns Err,
5152    /// indicating that the checkpoint builder should give up trying to make the
5153    /// final checkpoint. As long as the network is able to create a certified
5154    /// checkpoint (which should be ensured by the capabilities vote), it
5155    /// will arrive via state sync and be executed by CheckpointExecutor.
5156    #[instrument(level = "error", skip_all)]
5157    pub async fn create_and_execute_advance_epoch_tx(
5158        &self,
5159        epoch_store: &Arc<AuthorityPerEpochStore>,
5160        gas_cost_summary: &GasCostSummary,
5161        checkpoint: CheckpointSequenceNumber,
5162        epoch_start_timestamp_ms: CheckpointTimestamp,
5163        scores: Vec<u64>,
5164    ) -> anyhow::Result<(
5165        IotaSystemState,
5166        Option<SystemEpochInfoEvent>,
5167        TransactionEffects,
5168    )> {
5169        let mut txns = Vec::new();
5170
5171        let next_epoch = epoch_store.epoch() + 1;
5172
5173        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5174        let authority_capabilities = epoch_store
5175            .get_capabilities_v1()
5176            .expect("read capabilities from db cannot fail");
5177        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
5178            Self::choose_protocol_version_and_system_packages_v1(
5179                epoch_store.protocol_version(),
5180                SupportedProtocolVersionsWithHashes::protocol_config_digest(
5181                    epoch_store.protocol_config(),
5182                ),
5183                epoch_store.committee(),
5184                authority_capabilities.clone(),
5185                buffer_stake_bps,
5186            );
5187
5188        // since system packages are created during the current epoch, they should abide
5189        // by the rules of the current epoch, including the current epoch's max
5190        // Move binary format version
5191        let config = epoch_store.protocol_config();
5192        let binary_config = to_binary_config(config);
5193        let Some(next_epoch_system_package_bytes) = self
5194            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5195            .await
5196        else {
5197            error!(
5198                "upgraded system packages {:?} are not locally available, cannot create \
5199                ChangeEpochTx. validator binary must be upgraded to the correct version!",
5200                next_epoch_system_packages
5201            );
5202            // the checkpoint builder will keep retrying forever when it hits this error.
5203            // Eventually, one of two things will happen:
5204            // - The operator will upgrade this binary to one that has the new packages
5205            //   locally, and this function will succeed.
5206            // - The final checkpoint will be certified by other validators, we will receive
5207            //   it via state sync, and execute it. This will upgrade the framework
5208            //   packages, reconfigure, and most likely shut down in the new epoch (this
5209            //   validator likely doesn't support the new protocol version, or else it
5210            //   should have had the packages.)
5211            bail!("missing system packages: cannot form ChangeEpochTx");
5212        };
5213
5214        // Use ChangeEpochV3 or ChangeEpochV4 when the feature flags are enabled and
5215        // ChangeEpochV2 requirements are met
5216        if config.select_committee_from_eligible_validators() {
5217            // Get the list of eligible validators that support the target protocol version
5218            let active_validators = epoch_store.epoch_start_state().get_active_validators();
5219
5220            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5221
5222            // Use validators supporting the target protocol version as eligible validators
5223            // in the next version if select_committee_supporting_next_epoch_version feature
5224            // flag is set to true.
5225            if config.select_committee_supporting_next_epoch_version() {
5226                eligible_active_validators = Self::get_validators_supporting_protocol_version(
5227                    next_epoch_protocol_version,
5228                    next_epoch_protocol_digest,
5229                    &active_validators,
5230                    &authority_capabilities,
5231                );
5232
5233                // Calculate the total weight of eligible validators in the committee
5234                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5235                    &eligible_active_validators,
5236                    &active_validators,
5237                    epoch_store.committee(),
5238                );
5239
5240                // Safety check: ensure eligible validators have enough stake
5241                // Use the same effective threshold calculation that was used to decide the
5242                // protocol version
5243                let committee = epoch_store.committee();
5244                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5245
5246                if eligible_validators_weight < effective_threshold {
5247                    error!(
5248                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5249                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5250                    );
5251                    // Pass all active validator indices as eligible validators
5252                    // to perform selection among all of them.
5253                    eligible_active_validators = (0..active_validators.len() as u64).collect();
5254                }
5255            }
5256
5257            // Use ChangeEpochV4 when the pass_validator_scores_to_advance_epoch feature
5258            // flag is enabled.
5259            if config.pass_validator_scores_to_advance_epoch() {
5260                txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5261                    next_epoch,
5262                    next_epoch_protocol_version.as_u64(),
5263                    gas_cost_summary.storage_cost,
5264                    gas_cost_summary.computation_cost,
5265                    gas_cost_summary.computation_cost_burned,
5266                    gas_cost_summary.storage_rebate,
5267                    gas_cost_summary.non_refundable_storage_fee,
5268                    epoch_start_timestamp_ms,
5269                    next_epoch_system_package_bytes,
5270                    eligible_active_validators,
5271                    scores,
5272                    config.adjust_rewards_by_score(),
5273                ));
5274            } else {
5275                txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5276                    next_epoch,
5277                    next_epoch_protocol_version.as_u64(),
5278                    gas_cost_summary.storage_cost,
5279                    gas_cost_summary.computation_cost,
5280                    gas_cost_summary.computation_cost_burned,
5281                    gas_cost_summary.storage_rebate,
5282                    gas_cost_summary.non_refundable_storage_fee,
5283                    epoch_start_timestamp_ms,
5284                    next_epoch_system_package_bytes,
5285                    eligible_active_validators,
5286                ));
5287            }
5288        } else if config.protocol_defined_base_fee()
5289            && config.max_committee_members_count_as_option().is_some()
5290        {
5291            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5292                next_epoch,
5293                next_epoch_protocol_version.as_u64(),
5294                gas_cost_summary.storage_cost,
5295                gas_cost_summary.computation_cost,
5296                gas_cost_summary.computation_cost_burned,
5297                gas_cost_summary.storage_rebate,
5298                gas_cost_summary.non_refundable_storage_fee,
5299                epoch_start_timestamp_ms,
5300                next_epoch_system_package_bytes,
5301            ));
5302        } else {
5303            txns.push(EndOfEpochTransactionKind::new_change_epoch(
5304                next_epoch,
5305                next_epoch_protocol_version.as_u64(),
5306                gas_cost_summary.storage_cost,
5307                gas_cost_summary.computation_cost,
5308                gas_cost_summary.storage_rebate,
5309                gas_cost_summary.non_refundable_storage_fee,
5310                epoch_start_timestamp_ms,
5311                next_epoch_system_package_bytes,
5312            ));
5313        }
5314
5315        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5316
5317        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5318            tx.clone(),
5319            epoch_store.epoch(),
5320            checkpoint,
5321        );
5322
5323        let tx_digest = executable_tx.digest();
5324
5325        info!(
5326            ?next_epoch,
5327            ?next_epoch_protocol_version,
5328            ?next_epoch_system_packages,
5329            computation_cost=?gas_cost_summary.computation_cost,
5330            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5331            storage_cost=?gas_cost_summary.storage_cost,
5332            storage_rebate=?gas_cost_summary.storage_rebate,
5333            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5334            ?tx_digest,
5335            "Creating advance epoch transaction"
5336        );
5337
5338        fail_point_async!("change_epoch_tx_delay");
5339        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5340
5341        // The tx could have been executed by state sync already - if so simply return
5342        // an error. The checkpoint builder will shortly be terminated by
5343        // reconfiguration anyway.
5344        if self
5345            .get_transaction_cache_reader()
5346            .try_is_tx_already_executed(tx_digest)?
5347        {
5348            warn!("change epoch tx has already been executed via state sync");
5349            bail!("change epoch tx has already been executed via state sync",);
5350        }
5351
5352        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5353
5354        // We must manually assign the shared object versions to the transaction before
5355        // executing it. This is because we do not sequence end-of-epoch
5356        // transactions through consensus.
5357        epoch_store.assign_shared_object_versions_idempotent(
5358            self.get_object_cache_reader().as_ref(),
5359            std::slice::from_ref(&executable_tx),
5360        )?;
5361
5362        let (input_objects, _) =
5363            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5364
5365        let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5366            &execution_guard,
5367            &executable_tx,
5368            input_objects,
5369            vec![],
5370            epoch_store,
5371        )?;
5372        let system_obj = get_iota_system_state(&temporary_store.written)
5373            .expect("change epoch tx must write to system object");
5374        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
5375        let system_epoch_info_event = temporary_store
5376            .events
5377            .0
5378            .into_iter()
5379            .find(|event| event.is_system_epoch_info_event())
5380            .map(SystemEpochInfoEvent::from);
5381        // The system epoch info event can be `None` in case if the `advance_epoch`
5382        // Move function call failed and was executed in the safe mode.
5383        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5384
5385        // We must write tx and effects to the state sync tables so that state sync is
5386        // able to deliver to the transaction to CheckpointExecutor after it is
5387        // included in a certified checkpoint.
5388        self.get_state_sync_store()
5389            .try_insert_transaction_and_effects(&tx, &effects)
5390            .map_err(|err| {
5391                let err: anyhow::Error = err.into();
5392                err
5393            })?;
5394
5395        info!(
5396            "Effects summary of the change epoch transaction: {:?}",
5397            effects.summary_for_debug()
5398        );
5399        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5400        // The change epoch transaction cannot fail to execute.
5401        assert!(effects.status().is_success());
5402        Ok((system_obj, system_epoch_info_event, effects))
5403    }
5404
5405    /// This function is called at the very end of the epoch.
5406    /// This step is required before updating new epoch in the db and calling
5407    /// reopen_epoch_db.
5408    #[instrument(level = "error", skip_all)]
5409    async fn revert_uncommitted_epoch_transactions(
5410        &self,
5411        epoch_store: &AuthorityPerEpochStore,
5412    ) -> IotaResult {
5413        {
5414            let state = epoch_store.get_reconfig_state_write_lock_guard();
5415            if state.should_accept_user_certs() {
5416                // Need to change this so that consensus adapter do not accept certificates from
5417                // user. This can happen if our local validator did not initiate
5418                // epoch change locally, but 2f+1 nodes already concluded the
5419                // epoch.
5420                //
5421                // This lock is essentially a barrier for
5422                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5423                // after this block
5424                epoch_store.close_user_certs(state);
5425            }
5426            // lock is dropped here
5427        }
5428        let pending_certificates = epoch_store.pending_consensus_certificates();
5429        info!(
5430            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5431            pending_certificates.len(),
5432            pending_certificates,
5433        );
5434        for digest in pending_certificates {
5435            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5436                info!(
5437                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5438                    digest
5439                );
5440                continue;
5441            }
5442            info!("Reverting {:?} at the end of epoch", digest);
5443            epoch_store.revert_executed_transaction(&digest)?;
5444            self.get_reconfig_api().try_revert_state_update(&digest)?;
5445        }
5446        info!("All uncommitted local transactions reverted");
5447        Ok(())
5448    }
5449
5450    #[instrument(level = "error", skip_all)]
5451    async fn reopen_epoch_db(
5452        &self,
5453        cur_epoch_store: &AuthorityPerEpochStore,
5454        new_committee: Committee,
5455        epoch_start_configuration: EpochStartConfiguration,
5456        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5457        epoch_last_checkpoint: CheckpointSequenceNumber,
5458    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5459        let new_epoch = new_committee.epoch;
5460        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5461        assert_eq!(
5462            epoch_start_configuration.epoch_start_state().epoch(),
5463            new_committee.epoch
5464        );
5465        fail_point!("before-open-new-epoch-store");
5466        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5467            self.name,
5468            new_committee,
5469            epoch_start_configuration,
5470            self.get_backing_package_store().clone(),
5471            expensive_safety_check_config,
5472            epoch_last_checkpoint,
5473        )?;
5474        self.epoch_store.store(new_epoch_store.clone());
5475        Ok(new_epoch_store)
5476    }
5477
5478    /// Checks if `authenticator` unlocks a valid Move account and returns the
5479    /// account-related `AuthenticatorFunctionRef` object.
5480    fn check_move_account(
5481        &self,
5482        auth_account_object_id: ObjectId,
5483        auth_account_object_seq_number: Option<SequenceNumber>,
5484        auth_account_object_digest: Option<ObjectDigest>,
5485        account_object: ObjectReadResult,
5486        signer: &IotaAddress,
5487    ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5488        let account_object = match account_object.object {
5489            ObjectReadResultKind::Object(object) => Ok(object),
5490            ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5491                Err(UserInputError::AccountObjectDeleted {
5492                    account_id: account_object.id(),
5493                    account_version: version,
5494                    transaction_digest: digest,
5495                })
5496            }
5497            // It is impossible to check the account object because it is used in a canceled
5498            // transaction and is not loaded.
5499            ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5500                Err(UserInputError::AccountObjectInCanceledTransaction {
5501                    account_id: account_object.id(),
5502                    account_version: version,
5503                })
5504            }
5505        }?;
5506
5507        let account_object_addr = IotaAddress::from(auth_account_object_id);
5508
5509        fp_ensure!(
5510            signer == &account_object_addr,
5511            UserInputError::IncorrectUserSignature {
5512                error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5513            }
5514            .into()
5515        );
5516
5517        fp_ensure!(
5518            account_object.is_shared() || account_object.is_immutable(),
5519            UserInputError::AccountObjectNotSupported {
5520                object_id: auth_account_object_id
5521            }
5522            .into()
5523        );
5524
5525        let auth_account_object_seq_number =
5526            if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5527                let account_object_version = account_object.version();
5528
5529                fp_ensure!(
5530                    account_object_version == auth_account_object_seq_number,
5531                    UserInputError::AccountObjectVersionMismatch {
5532                        object_id: auth_account_object_id,
5533                        expected_version: auth_account_object_seq_number,
5534                        actual_version: account_object_version,
5535                    }
5536                    .into()
5537                );
5538
5539                auth_account_object_seq_number
5540            } else {
5541                account_object.version()
5542            };
5543
5544        if let Some(auth_account_object_digest) = auth_account_object_digest {
5545            let expected_digest = account_object.digest();
5546            fp_ensure!(
5547                expected_digest == auth_account_object_digest,
5548                UserInputError::InvalidAccountObjectDigest {
5549                    object_id: auth_account_object_id,
5550                    expected_digest,
5551                    actual_digest: auth_account_object_digest,
5552                }
5553                .into()
5554            );
5555        }
5556
5557        let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5558            auth_account_object_id,
5559            &AuthenticatorFunctionRefV1Key::tag().into(),
5560            &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5561        )
5562        .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5563            account_object_id: auth_account_object_id,
5564        })?;
5565
5566        let authenticator_function_ref_field = self
5567            .get_object_cache_reader()
5568            .try_find_object_lt_or_eq_version(
5569                authenticator_function_ref_field_id,
5570                auth_account_object_seq_number,
5571            )?;
5572
5573        if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5574            let field_move_object = authenticator_function_ref_field_obj
5575                .data
5576                .as_struct_opt()
5577                .expect("dynamic field should never be a package object");
5578
5579            let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5580                field_move_object.to_rust().map_err(|_| {
5581                    UserInputError::InvalidAuthenticatorFunctionRefField {
5582                        account_object_id: auth_account_object_id,
5583                    }
5584                })?;
5585
5586            Ok(AuthenticatorFunctionRefForExecution::new_v1(
5587                field.value,
5588                authenticator_function_ref_field_obj.compute_object_reference(),
5589                authenticator_function_ref_field_obj.owner,
5590                authenticator_function_ref_field_obj.storage_rebate,
5591                authenticator_function_ref_field_obj.previous_transaction,
5592            ))
5593        } else {
5594            Err(UserInputError::MoveAuthenticatorNotFound {
5595                authenticator_function_ref_id: authenticator_function_ref_field_id,
5596                account_object_id: auth_account_object_id,
5597                account_object_version: auth_account_object_seq_number,
5598            }
5599            .into())
5600        }
5601    }
5602
5603    #[allow(clippy::type_complexity)]
5604    fn read_objects_for_signing(
5605        &self,
5606        transaction: &VerifiedTransaction,
5607        epoch: u64,
5608    ) -> IotaResult<(
5609        InputObjects,
5610        ReceivingObjects,
5611        Vec<(InputObjects, ObjectReadResult)>,
5612    )> {
5613        let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5614            Some(transaction.digest()),
5615            &transaction.collect_all_input_object_kind_for_reading()?,
5616            &transaction.data().transaction_data().receiving_objects(),
5617            epoch,
5618        )?;
5619
5620        transaction
5621            .split_input_objects_into_groups_for_reading(input_objects)
5622            .map(|(tx_input_objects, per_authenticator_inputs)| {
5623                (
5624                    tx_input_objects,
5625                    tx_receiving_objects,
5626                    per_authenticator_inputs,
5627                )
5628            })
5629    }
5630
5631    #[allow(clippy::type_complexity)]
5632    fn check_transaction_inputs_for_signing(
5633        &self,
5634        protocol_config: &ProtocolConfig,
5635        reference_gas_price: u64,
5636        tx_data: &TransactionData,
5637        tx_input_objects: InputObjects,
5638        tx_receiving_objects: &ReceivingObjects,
5639        move_authenticators: &Vec<&MoveAuthenticator>,
5640        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
5641    ) -> IotaResult<(
5642        IotaGasStatus,
5643        CheckedInputObjects,
5644        Vec<(CheckedInputObjects, AuthenticatorFunctionRef)>,
5645    )> {
5646        let authenticator_gas_budget = if move_authenticators.is_empty() {
5647            0
5648        } else {
5649            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
5650            // not a part of the transaction data.
5651            protocol_config.max_auth_gas()
5652        };
5653
5654        debug_assert_eq!(
5655            move_authenticators.len(),
5656            per_authenticator_inputs.len(),
5657            "Move authenticators amount must match the number of authenticator inputs"
5658        );
5659
5660        let per_authenticator_checked_inputs = move_authenticators
5661            .iter()
5662            .zip(per_authenticator_inputs)
5663            .map(
5664                |(move_authenticator, (authenticator_input_objects, account_object))| {
5665                    // Check basic `object_to_authenticate` preconditions and get its components.
5666                    let (
5667                        auth_account_object_id,
5668                        auth_account_object_seq_number,
5669                        auth_account_object_digest,
5670                    ) = move_authenticator.object_to_authenticate_components()?;
5671
5672                    let signer = move_authenticator.address()?;
5673
5674                    // Make sure the signer is a Move account.
5675                    let AuthenticatorFunctionRefForExecution {
5676                        authenticator_function_ref,
5677                        ..
5678                    } = self.check_move_account(
5679                        auth_account_object_id,
5680                        auth_account_object_seq_number,
5681                        auth_account_object_digest,
5682                        account_object,
5683                        &signer,
5684                    )?;
5685
5686                    // Check the MoveAuthenticator input objects.
5687                    let authenticator_checked_input_objects =
5688                        iota_transaction_checks::check_move_authenticator_input_for_signing(
5689                            authenticator_input_objects,
5690                        )?;
5691
5692                    Ok((
5693                        authenticator_checked_input_objects,
5694                        authenticator_function_ref,
5695                    ))
5696                },
5697            )
5698            .collect::<IotaResult<Vec<_>>>()?;
5699
5700        // Check the transaction inputs.
5701        let (gas_status, tx_checked_input_objects) =
5702            iota_transaction_checks::check_transaction_input(
5703                protocol_config,
5704                reference_gas_price,
5705                tx_data,
5706                tx_input_objects,
5707                tx_receiving_objects,
5708                &self.metrics.bytecode_verifier_metrics,
5709                &self.config.verifier_signing_config,
5710                authenticator_gas_budget,
5711            )?;
5712
5713        Ok((
5714            gas_status,
5715            tx_checked_input_objects,
5716            per_authenticator_checked_inputs,
5717        ))
5718    }
5719
5720    #[cfg(test)]
5721    pub(crate) fn iter_live_object_set_for_testing(
5722        &self,
5723    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5724        self.get_global_state_hash_store()
5725            .iter_cached_live_object_set_for_testing()
5726    }
5727
5728    #[cfg(test)]
5729    pub(crate) fn shutdown_execution_for_test(&self) {
5730        self.tx_execution_shutdown
5731            .lock()
5732            .take()
5733            .unwrap()
5734            .send(())
5735            .unwrap();
5736    }
5737
5738    /// NOTE: this function is only to be used for fuzzing and testing. Never
5739    /// use in prod
5740    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5741        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5742        self.get_object_cache_reader()
5743            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5744        self.get_reconfig_api()
5745            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5746    }
5747}
5748
5749pub struct RandomnessRoundReceiver {
5750    authority_state: Arc<AuthorityState>,
5751    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5752}
5753
5754impl RandomnessRoundReceiver {
5755    pub fn spawn(
5756        authority_state: Arc<AuthorityState>,
5757        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5758    ) -> JoinHandle<()> {
5759        let rrr = RandomnessRoundReceiver {
5760            authority_state,
5761            randomness_rx,
5762        };
5763        spawn_monitored_task!(rrr.run())
5764    }
5765
5766    async fn run(mut self) {
5767        info!("RandomnessRoundReceiver event loop started");
5768
5769        loop {
5770            tokio::select! {
5771                maybe_recv = self.randomness_rx.recv() => {
5772                    if let Some((epoch, round, bytes)) = maybe_recv {
5773                        self.handle_new_randomness(epoch, round, bytes);
5774                    } else {
5775                        break;
5776                    }
5777                },
5778            }
5779        }
5780
5781        info!("RandomnessRoundReceiver event loop ended");
5782    }
5783
5784    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5785    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5786        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5787        if epoch_store.epoch() != epoch {
5788            warn!(
5789                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5790                epoch_store.epoch()
5791            );
5792            return;
5793        }
5794        let transaction = VerifiedTransaction::new_randomness_state_update(
5795            epoch,
5796            round,
5797            bytes,
5798            epoch_store
5799                .epoch_start_config()
5800                .randomness_obj_initial_shared_version(),
5801        );
5802        debug!(
5803            "created randomness state update transaction with digest: {:?}",
5804            transaction.digest()
5805        );
5806        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5807        let digest = *transaction.digest();
5808
5809        // Randomness state updates contain the full bls signature for the random round,
5810        // which cannot necessarily be reconstructed again later. Therefore we must
5811        // immediately persist this transaction. If we crash before its outputs
5812        // are committed, this ensures we will be able to re-execute it.
5813        self.authority_state
5814            .get_cache_commit()
5815            .persist_transaction(&transaction);
5816
5817        // Send transaction to TransactionManager for execution.
5818        self.authority_state
5819            .transaction_manager()
5820            .enqueue(vec![transaction], &epoch_store);
5821
5822        let authority_state = self.authority_state.clone();
5823        spawn_monitored_task!(async move {
5824            // Wait for transaction execution in a separate task, to avoid deadlock in case
5825            // of out-of-order randomness generation. (Each
5826            // RandomnessStateUpdate depends on the output of the
5827            // RandomnessStateUpdate from the previous round.)
5828            //
5829            // We set a very long timeout so that in case this gets stuck for some reason,
5830            // the validator will eventually crash rather than continuing in a
5831            // zombie mode.
5832            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5833            let result = tokio::time::timeout(
5834                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5835                authority_state
5836                    .get_transaction_cache_reader()
5837                    .try_notify_read_executed_effects(&[digest]),
5838            )
5839            .await;
5840            let result = match result {
5841                Ok(result) => result,
5842                Err(_) => {
5843                    if cfg!(debug_assertions) {
5844                        // Crash on randomness update execution timeout in debug builds.
5845                        panic!(
5846                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5847                        );
5848                    }
5849                    warn!(
5850                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5851                    );
5852                    // Continue waiting as long as necessary in non-debug builds.
5853                    authority_state
5854                        .get_transaction_cache_reader()
5855                        .try_notify_read_executed_effects(&[digest])
5856                        .await
5857                }
5858            };
5859
5860            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5861            let effects = effects.pop().expect("should return effects");
5862            if *effects.status() != ExecutionStatus::Success {
5863                fatal!(
5864                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5865                );
5866            }
5867            debug!(
5868                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5869            );
5870        });
5871    }
5872}
5873
5874#[async_trait]
5875impl TransactionKeyValueStoreTrait for AuthorityState {
5876    async fn multi_get(
5877        &self,
5878        transaction_keys: &[TransactionDigest],
5879        effects_keys: &[TransactionDigest],
5880    ) -> IotaResult<KVStoreTransactionData> {
5881        let txns = if !transaction_keys.is_empty() {
5882            self.get_transaction_cache_reader()
5883                .try_multi_get_transaction_blocks(transaction_keys)?
5884                .into_iter()
5885                .map(|t| t.map(|t| (*t).clone().into_inner()))
5886                .collect()
5887        } else {
5888            vec![]
5889        };
5890
5891        let fx = if !effects_keys.is_empty() {
5892            self.get_transaction_cache_reader()
5893                .try_multi_get_executed_effects(effects_keys)?
5894        } else {
5895            vec![]
5896        };
5897
5898        Ok((txns, fx))
5899    }
5900
5901    async fn multi_get_checkpoints(
5902        &self,
5903        checkpoint_summaries: &[CheckpointSequenceNumber],
5904        checkpoint_contents: &[CheckpointSequenceNumber],
5905        checkpoint_summaries_by_digest: &[CheckpointDigest],
5906    ) -> IotaResult<(
5907        Vec<Option<CertifiedCheckpointSummary>>,
5908        Vec<Option<CheckpointContents>>,
5909        Vec<Option<CertifiedCheckpointSummary>>,
5910    )> {
5911        // TODO: use multi-get methods if it ever becomes important (unlikely)
5912        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5913        let store = self.get_checkpoint_store();
5914        for seq in checkpoint_summaries {
5915            let checkpoint = store
5916                .get_checkpoint_by_sequence_number(*seq)?
5917                .map(|c| c.into_inner());
5918
5919            summaries.push(checkpoint);
5920        }
5921
5922        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5923        for seq in checkpoint_contents {
5924            let checkpoint = store
5925                .get_checkpoint_by_sequence_number(*seq)?
5926                .and_then(|summary| {
5927                    store
5928                        .get_checkpoint_contents(&summary.content_digest)
5929                        .expect("db read cannot fail")
5930                });
5931            contents.push(checkpoint);
5932        }
5933
5934        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5935        for digest in checkpoint_summaries_by_digest {
5936            let checkpoint = store
5937                .get_checkpoint_by_digest(digest)?
5938                .map(|c| c.into_inner());
5939            summaries_by_digest.push(checkpoint);
5940        }
5941
5942        Ok((summaries, contents, summaries_by_digest))
5943    }
5944
5945    async fn get_transaction_perpetual_checkpoint(
5946        &self,
5947        digest: TransactionDigest,
5948    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5949        self.get_checkpoint_cache()
5950            .try_get_transaction_perpetual_checkpoint(&digest)
5951            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5952    }
5953
5954    async fn get_object(
5955        &self,
5956        object_id: ObjectId,
5957        version: VersionNumber,
5958    ) -> IotaResult<Option<Object>> {
5959        self.get_object_cache_reader()
5960            .try_get_object_by_key(&object_id, version)
5961    }
5962
5963    #[instrument(skip_all)]
5964    async fn multi_get_objects(
5965        &self,
5966        object_keys: &[ObjectKey],
5967    ) -> IotaResult<Vec<Option<Object>>> {
5968        Ok(self
5969            .get_object_cache_reader()
5970            .multi_get_objects_by_key(object_keys))
5971    }
5972
5973    async fn multi_get_transactions_perpetual_checkpoints(
5974        &self,
5975        digests: &[TransactionDigest],
5976    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5977        let res = self
5978            .get_checkpoint_cache()
5979            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5980
5981        Ok(res
5982            .into_iter()
5983            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5984            .collect())
5985    }
5986
5987    #[instrument(skip(self, digests), fields(digests = digests.iter().map(|d| d.to_string()).collect::<Vec<String>>().join(", ")))]
5988    async fn multi_get_events_by_tx_digests(
5989        &self,
5990        digests: &[TransactionDigest],
5991    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5992        if digests.is_empty() {
5993            return Ok(vec![]);
5994        }
5995
5996        Ok(self
5997            .get_transaction_cache_reader()
5998            .multi_get_events(digests))
5999    }
6000}
6001
6002#[cfg(msim)]
6003pub mod framework_injection {
6004    use std::{
6005        cell::RefCell,
6006        collections::{BTreeMap, BTreeSet},
6007    };
6008
6009    use iota_framework::{BuiltInFramework, SystemPackage};
6010    use iota_sdk_types::ObjectId;
6011    use iota_types::base_types::AuthorityName;
6012    use move_binary_format::CompiledModule;
6013
6014    type FrameworkOverrideConfig = BTreeMap<ObjectId, PackageOverrideConfig>;
6015
6016    // Thread local cache because all simtests run in a single unique thread.
6017    thread_local! {
6018        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6019    }
6020
6021    type Framework = Vec<CompiledModule>;
6022
6023    pub type PackageUpgradeCallback =
6024        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6025
6026    enum PackageOverrideConfig {
6027        Global(Framework),
6028        PerValidator(PackageUpgradeCallback),
6029    }
6030
6031    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6032        modules
6033            .iter()
6034            .map(|m| {
6035                let mut buf = Vec::new();
6036                m.serialize_with_version(m.version, &mut buf).unwrap();
6037                buf
6038            })
6039            .collect()
6040    }
6041
6042    pub fn set_override(package_id: ObjectId, modules: Vec<CompiledModule>) {
6043        OVERRIDE.with(|bs| {
6044            bs.borrow_mut()
6045                .insert(package_id, PackageOverrideConfig::Global(modules))
6046        });
6047    }
6048
6049    pub fn set_override_cb(package_id: ObjectId, func: PackageUpgradeCallback) {
6050        OVERRIDE.with(|bs| {
6051            bs.borrow_mut()
6052                .insert(package_id, PackageOverrideConfig::PerValidator(func))
6053        });
6054    }
6055
6056    pub fn get_override_bytes(package_id: &ObjectId, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6057        OVERRIDE.with(|cfg| {
6058            cfg.borrow().get(package_id).and_then(|entry| match entry {
6059                PackageOverrideConfig::Global(framework) => {
6060                    Some(compiled_modules_to_bytes(framework))
6061                }
6062                PackageOverrideConfig::PerValidator(func) => {
6063                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6064                }
6065            })
6066        })
6067    }
6068
6069    pub fn get_override_modules(
6070        package_id: &ObjectId,
6071        name: AuthorityName,
6072    ) -> Option<Vec<CompiledModule>> {
6073        OVERRIDE.with(|cfg| {
6074            cfg.borrow().get(package_id).and_then(|entry| match entry {
6075                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6076                PackageOverrideConfig::PerValidator(func) => func(name),
6077            })
6078        })
6079    }
6080
6081    pub fn get_override_system_package(
6082        package_id: &ObjectId,
6083        name: AuthorityName,
6084    ) -> Option<SystemPackage> {
6085        let bytes = get_override_bytes(package_id, name)?;
6086        let dependencies = if package_id.is_system_package() {
6087            BuiltInFramework::get_package_by_id(package_id)
6088                .dependencies
6089                .to_vec()
6090        } else {
6091            // Assume that entirely new injected packages depend on all existing system
6092            // packages.
6093            BuiltInFramework::all_package_ids()
6094        };
6095        Some(SystemPackage {
6096            id: *package_id,
6097            bytes,
6098            dependencies,
6099        })
6100    }
6101
6102    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6103        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
6104        let extra: Vec<ObjectId> = OVERRIDE.with(|cfg| {
6105            cfg.borrow()
6106                .keys()
6107                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6108                .collect()
6109        });
6110
6111        extra
6112            .into_iter()
6113            .map(|package| SystemPackage {
6114                id: package,
6115                bytes: get_override_bytes(&package, name).unwrap(),
6116                dependencies: BuiltInFramework::all_package_ids(),
6117            })
6118            .collect()
6119    }
6120}
6121
6122#[derive(Debug, Serialize, Deserialize, Clone)]
6123pub struct ObjDumpFormat {
6124    pub id: ObjectId,
6125    pub version: VersionNumber,
6126    pub digest: ObjectDigest,
6127    pub object: Object,
6128}
6129
6130impl ObjDumpFormat {
6131    fn new(object: Object) -> Self {
6132        let oref = object.compute_object_reference();
6133        Self {
6134            id: oref.object_id,
6135            version: oref.version,
6136            digest: oref.digest,
6137            object,
6138        }
6139    }
6140}
6141
6142#[derive(Debug, Serialize, Deserialize, Clone)]
6143pub struct NodeStateDump {
6144    pub tx_digest: TransactionDigest,
6145    pub sender_signed_data: SenderSignedData,
6146    pub executed_epoch: u64,
6147    pub reference_gas_price: u64,
6148    pub protocol_version: u64,
6149    pub epoch_start_timestamp_ms: u64,
6150    pub computed_effects: TransactionEffects,
6151    pub expected_effects_digest: TransactionEffectsDigest,
6152    pub relevant_system_packages: Vec<ObjDumpFormat>,
6153    pub shared_objects: Vec<ObjDumpFormat>,
6154    pub loaded_child_objects: Vec<ObjDumpFormat>,
6155    pub modified_at_versions: Vec<ObjDumpFormat>,
6156    pub runtime_reads: Vec<ObjDumpFormat>,
6157    pub input_objects: Vec<ObjDumpFormat>,
6158}
6159
6160impl NodeStateDump {
6161    pub fn new(
6162        tx_digest: &TransactionDigest,
6163        effects: &TransactionEffects,
6164        expected_effects_digest: TransactionEffectsDigest,
6165        object_store: &dyn ObjectStore,
6166        epoch_store: &Arc<AuthorityPerEpochStore>,
6167        inner_temporary_store: &InnerTemporaryStore,
6168        certificate: &VerifiedExecutableTransaction,
6169    ) -> IotaResult<Self> {
6170        // Epoch info
6171        let executed_epoch = epoch_store.epoch();
6172        let reference_gas_price = epoch_store.reference_gas_price();
6173        let epoch_start_config = epoch_store.epoch_start_config();
6174        let protocol_version = epoch_store.protocol_version().as_u64();
6175        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6176
6177        // Record all system packages at this version
6178        let mut relevant_system_packages = Vec::new();
6179        for sys_package_id in BuiltInFramework::all_package_ids() {
6180            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
6181                relevant_system_packages.push(ObjDumpFormat::new(w))
6182            }
6183        }
6184
6185        // Record all the shared objects
6186        let mut shared_objects = Vec::new();
6187        for kind in effects.input_shared_objects() {
6188            match kind {
6189                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
6190                    if let Some(w) =
6191                        object_store.try_get_object_by_key(&obj_ref.object_id, obj_ref.version)?
6192                    {
6193                        shared_objects.push(ObjDumpFormat::new(w))
6194                    }
6195                }
6196                InputSharedObject::ReadDeleted(..)
6197                | InputSharedObject::MutateDeleted(..)
6198                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
6199                                                           * objects. */
6200            }
6201        }
6202
6203        // Record all loaded child objects
6204        // Child objects which are read but not mutated are not tracked anywhere else
6205        let mut loaded_child_objects = Vec::new();
6206        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6207            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6208                loaded_child_objects.push(ObjDumpFormat::new(w))
6209            }
6210        }
6211
6212        // Record all modified objects
6213        let mut modified_at_versions = Vec::new();
6214        for (id, ver) in effects.modified_at_versions() {
6215            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6216                modified_at_versions.push(ObjDumpFormat::new(w))
6217            }
6218        }
6219
6220        // Packages read at runtime, which were not previously loaded into the temoorary
6221        // store Some packages may be fetched at runtime and wont show up in
6222        // input objects
6223        let mut runtime_reads = Vec::new();
6224        for obj in inner_temporary_store
6225            .runtime_packages_loaded_from_db
6226            .values()
6227        {
6228            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6229        }
6230
6231        // All other input objects should already be in `inner_temporary_store.objects`
6232
6233        Ok(Self {
6234            tx_digest: *tx_digest,
6235            executed_epoch,
6236            reference_gas_price,
6237            epoch_start_timestamp_ms,
6238            protocol_version,
6239            relevant_system_packages,
6240            shared_objects,
6241            loaded_child_objects,
6242            modified_at_versions,
6243            runtime_reads,
6244            sender_signed_data: certificate.clone().into_message(),
6245            input_objects: inner_temporary_store
6246                .input_objects
6247                .values()
6248                .map(|o| ObjDumpFormat::new(o.clone()))
6249                .collect(),
6250            computed_effects: effects.clone(),
6251            expected_effects_digest,
6252        })
6253    }
6254
6255    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6256        let mut objects = Vec::new();
6257        objects.extend(self.relevant_system_packages.clone());
6258        objects.extend(self.shared_objects.clone());
6259        objects.extend(self.loaded_child_objects.clone());
6260        objects.extend(self.modified_at_versions.clone());
6261        objects.extend(self.runtime_reads.clone());
6262        objects.extend(self.input_objects.clone());
6263        objects
6264    }
6265
6266    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6267        let file_name = format!(
6268            "{}_{}_NODE_DUMP.json",
6269            self.tx_digest,
6270            AuthorityState::unixtime_now_ms()
6271        );
6272        let mut path = path.to_path_buf();
6273        path.push(&file_name);
6274        let mut file = File::create(path.clone())?;
6275        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6276        Ok(path)
6277    }
6278
6279    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6280        let file = File::open(path)?;
6281        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6282    }
6283}
6284
6285/// Returns the [`MoveAuthenticator`]s to execute during the pre-consensus
6286/// phase.
6287///
6288/// When `pre_consensus_sponsor_only_move_authentication` is enabled:
6289/// - For sponsored transactions: only the sponsor's [`MoveAuthenticator`] is
6290///   returned (empty if the sponsor does not use one).
6291/// - For non-sponsored transactions: all [`MoveAuthenticator`]s are returned
6292///   (currently only the sender's).
6293///
6294/// When the flag is not set, all [`MoveAuthenticator`]s are returned for
6295/// compatibility.
6296fn pre_consensus_move_authenticators<'a>(
6297    tx: &'a VerifiedTransaction,
6298    protocol_config: &ProtocolConfig,
6299) -> Vec<&'a MoveAuthenticator> {
6300    if protocol_config.pre_consensus_sponsor_only_move_authentication() {
6301        if tx.transaction_data().is_sponsored_tx() {
6302            if let Some(sponsor_move_authenticator) = tx.sponsor_move_authenticator() {
6303                vec![sponsor_move_authenticator]
6304            } else {
6305                vec![]
6306            }
6307        } else {
6308            tx.move_authenticators()
6309        }
6310    } else {
6311        tx.move_authenticators()
6312    }
6313}