Skip to main content

iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage as FrameworkSystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::{
48    ExecutionStatus, ObjectId, Owner, StructTag, TransactionExpiration, TransactionKind, TypeTag,
49    crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion},
50};
51use iota_storage::{
52    key_value_store::{
53        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
54    },
55    key_value_store_metrics::KeyValueStoreMetrics,
56};
57#[cfg(msim)]
58use iota_types::committee::CommitteeTrait;
59use iota_types::{
60    account_abstraction::{
61        account::AuthenticatorFunctionRefV1Key,
62        authenticator_function::{
63            AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
64            AuthenticatorFunctionRefV1, extract_auth_fun_refs,
65        },
66    },
67    auth_context::AuthContextData,
68    base_types::{
69        AuthorityName, ConciseableName, IotaAddress, ObjectInfo, ObjectRef, ObjectType,
70        SequenceNumber, VersionNumber,
71    },
72    committee::{Committee, EpochId, ProtocolVersion},
73    crypto::{AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer},
74    deny_list_v1::check_coin_deny_list_v1_during_signing,
75    digests::{ChainIdentifier, Digest, ObjectDigest, TransactionDigest, TransactionEffectsDigest},
76    dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
77    effects::{
78        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
79        TransactionEffectsExt, TransactionEvents, VerifiedSignedTransactionEffects,
80    },
81    error::{ExecutionError, IotaError, IotaResult, UserInputError},
82    event::{Event, EventID, SystemEpochInfoEvent},
83    executable_transaction::VerifiedExecutableTransaction,
84    execution_config_utils::to_binary_config,
85    fp_ensure,
86    gas::{GasCostSummary, IotaGasStatus},
87    gas_coin::NANOS_PER_IOTA,
88    inner_temporary_store::{
89        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
90        WrittenObjects,
91    },
92    iota_sdk_types_conversions::type_tag_core_to_sdk,
93    iota_system_state::{
94        IotaSystemState, IotaSystemStateTrait,
95        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
96    },
97    layout_resolver::{LayoutResolver, into_struct_layout},
98    message_envelope::Message,
99    messages_checkpoint::{
100        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
101        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
102        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
103        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
104    },
105    messages_consensus::AuthorityCapabilitiesV1,
106    messages_grpc::{
107        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
108        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
109        TransactionStatus,
110    },
111    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
112    move_authenticator::MoveAuthenticator,
113    object::{
114        MoveObject, MoveObjectExt, OBJECT_START_VERSION, Object, ObjectRead, PastObjectRead,
115        bounded_visitor::BoundedVisitor,
116    },
117    storage::{
118        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
119    },
120    supported_protocol_versions::{
121        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
122    },
123    traffic_control::{PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams},
124    transaction::*,
125    transaction_executor::{SimulateTransactionResult, VmChecks},
126};
127use itertools::Itertools;
128use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
129use move_core_types::{
130    account_address::AccountAddress, annotated_value::MoveStructLayout, language_storage::ModuleId,
131};
132use parking_lot::Mutex;
133use prometheus::{
134    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
135    register_histogram_vec_with_registry, register_histogram_with_registry,
136    register_int_counter_vec_with_registry, register_int_counter_with_registry,
137    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
138};
139use serde::{Deserialize, Serialize, de::DeserializeOwned};
140use tap::TapFallible;
141use tokio::{
142    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
143    task::JoinHandle,
144};
145use tracing::{debug, error, info, instrument, trace, warn};
146use typed_store::TypedStoreError;
147
148use self::{
149    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
150};
151#[cfg(msim)]
152pub use crate::checkpoints::checkpoint_executor::utils::{
153    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
154};
155use crate::{
156    authority::{
157        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
158        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
159        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
160        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
161        authority_store_tables::AuthorityPrunerTables,
162        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
163    },
164    authority_client::NetworkAuthorityClient,
165    checkpoint_progress_tracker::CheckpointProgressTracker,
166    checkpoints::CheckpointStore,
167    congestion_tracker::CongestionTracker,
168    consensus_adapter::ConsensusAdapter,
169    epoch::committee_store::CommitteeStore,
170    execution_cache::{
171        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
172        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
173        TransactionCacheRead,
174    },
175    execution_driver::execution_process,
176    global_state_hasher::{GlobalStateHashStore, GlobalStateHasher},
177    grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
178    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
179    metrics::{LatencyObserver, RateTracker},
180    module_cache_metrics::ResolverMetrics,
181    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
182    stake_aggregator::StakeAggregator,
183    subscription_handler::SubscriptionHandler,
184    traffic_controller::{TrafficController, metrics::TrafficControllerMetrics},
185    transaction_input_loader::TransactionInputLoader,
186    transaction_manager::TransactionManager,
187    transaction_outputs::TransactionOutputs,
188    validator_tx_finalizer::ValidatorTxFinalizer,
189    verify_indexes::verify_indexes,
190};
191
192#[cfg(test)]
193#[path = "unit_tests/authority_tests.rs"]
194pub mod authority_tests;
195
196#[cfg(test)]
197#[path = "unit_tests/transaction_tests.rs"]
198pub mod transaction_tests;
199
200#[cfg(test)]
201#[path = "unit_tests/batch_transaction_tests.rs"]
202mod batch_transaction_tests;
203
204#[cfg(test)]
205#[path = "unit_tests/move_integration_tests.rs"]
206pub mod move_integration_tests;
207
208#[cfg(test)]
209#[path = "unit_tests/gas_tests.rs"]
210mod gas_tests;
211
212#[cfg(test)]
213#[path = "unit_tests/batch_verification_tests.rs"]
214mod batch_verification_tests;
215
216#[cfg(test)]
217#[path = "unit_tests/coin_deny_list_tests.rs"]
218mod coin_deny_list_tests;
219
220#[cfg(test)]
221#[path = "unit_tests/auth_unit_test_utils.rs"]
222pub mod auth_unit_test_utils;
223
224pub mod authority_test_utils;
225
226pub mod authority_per_epoch_store;
227pub mod authority_per_epoch_store_pruner;
228
229mod authority_store_migrations;
230pub mod authority_store_pruner;
231pub mod authority_store_tables;
232pub mod authority_store_types;
233pub mod epoch_start_configuration;
234pub mod shared_object_congestion_tracker;
235pub mod shared_object_version_manager;
236pub mod suggested_gas_price_calculator;
237pub mod test_authority_builder;
238pub mod transaction_deferral;
239
240pub(crate) mod authority_store;
241pub mod backpressure;
242
243/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
244pub struct AuthorityMetrics {
245    tx_orders: IntCounter,
246    total_certs: IntCounter,
247    total_cert_attempts: IntCounter,
248    total_effects: IntCounter,
249    pub shared_obj_tx: IntCounter,
250    sponsored_tx: IntCounter,
251    tx_already_processed: IntCounter,
252    num_input_objs: Histogram,
253    num_shared_objects: Histogram,
254    batch_size: Histogram,
255
256    authority_state_handle_transaction_latency: Histogram,
257
258    execute_certificate_latency_single_writer: Histogram,
259    execute_certificate_latency_shared_object: Histogram,
260
261    internal_execution_latency: Histogram,
262    execution_load_input_objects_latency: Histogram,
263    prepare_certificate_latency: Histogram,
264    commit_certificate_latency: Histogram,
265    db_checkpoint_latency: Histogram,
266
267    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
268    pub(crate) transaction_manager_num_missing_objects: IntGauge,
269    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
270    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
271    pub(crate) transaction_manager_num_ready: IntGauge,
272    pub(crate) transaction_manager_object_cache_size: IntGauge,
273    pub(crate) transaction_manager_object_cache_hits: IntCounter,
274    pub(crate) transaction_manager_object_cache_misses: IntCounter,
275    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
276    pub(crate) transaction_manager_package_cache_size: IntGauge,
277    pub(crate) transaction_manager_package_cache_hits: IntCounter,
278    pub(crate) transaction_manager_package_cache_misses: IntCounter,
279    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
280    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
281
282    pub(crate) execution_driver_executed_transactions: IntCounter,
283    pub(crate) execution_driver_dispatch_queue: IntGauge,
284    pub(crate) execution_queueing_delay_s: Histogram,
285    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
286    pub(crate) execution_gas_latency_ratio: Histogram,
287
288    pub(crate) skipped_consensus_txns: IntCounter,
289    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
290
291    pub(crate) authority_overload_status: IntGauge,
292    pub(crate) authority_load_shedding_percentage: IntGauge,
293
294    pub(crate) transaction_overload_sources: IntCounterVec,
295
296    /// Post processing metrics
297    post_processing_total_events_emitted: IntCounter,
298    post_processing_total_tx_indexed: IntCounter,
299    post_processing_total_tx_had_event_processed: IntCounter,
300    post_processing_total_failures: IntCounter,
301
302    /// Consensus handler metrics
303    pub consensus_handler_processed: IntCounterVec,
304    pub consensus_handler_transaction_sizes: HistogramVec,
305    pub consensus_handler_num_low_scoring_authorities: IntGauge,
306    pub consensus_handler_scores: IntGaugeVec,
307    pub consensus_handler_deferred_transactions: IntCounter,
308    pub consensus_handler_congested_transactions: IntCounter,
309    pub consensus_handler_cancelled_transactions: IntCounter,
310    pub consensus_handler_max_object_costs: IntGaugeVec,
311    pub consensus_committed_subdags: IntCounterVec,
312    pub consensus_committed_messages: IntGaugeVec,
313    pub consensus_committed_user_transactions: IntGaugeVec,
314    pub consensus_handler_leader_round: IntGauge,
315    pub consensus_calculated_throughput: IntGauge,
316    pub consensus_calculated_throughput_profile: IntGauge,
317
318    pub validator_scoreboard_scores: IntGaugeVec,
319    pub invalid_misbehavior_reports_by_authority: IntGaugeVec,
320
321    pub limits_metrics: Arc<LimitsMetrics>,
322
323    /// bytecode verifier metrics for tracking timeouts
324    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
325
326    /// Count of multisig signatures
327    pub multisig_sig_count: IntCounter,
328
329    // Tracks recent average txn queueing delay between when it is ready for execution
330    // until it starts executing.
331    pub execution_queueing_latency: LatencyObserver,
332
333    // Tracks the rate of transactions become ready for execution in transaction manager.
334    // The need for the Mutex is that the tracker is updated in transaction manager and read
335    // in the overload_monitor. There should be low mutex contention because
336    // transaction manager is single threaded and the read rate in overload_monitor is
337    // low. In the case where transaction manager becomes multi-threaded, we can
338    // create one rate tracker per thread.
339    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
340
341    // Tracks the rate of transactions starts execution in execution driver.
342    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
343    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
344}
345
346// Override default Prom buckets for positive numbers in 0-10M range
347const POSITIVE_INT_BUCKETS: &[f64] = &[
348    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
349    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
350    7000000., 10000000.,
351];
352
353const LATENCY_SEC_BUCKETS: &[f64] = &[
354    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
355    10., 20., 30., 60., 90.,
356];
357
358// Buckets for low latency samples. Starts from 10us.
359const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
360    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
361    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
362];
363
364const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
365    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
366    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
367];
368
369/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
370pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
371
372impl AuthorityMetrics {
373    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
374        let execute_certificate_latency = register_histogram_vec_with_registry!(
375            "authority_state_execute_certificate_latency",
376            "Latency of executing certificates, including waiting for inputs",
377            &["tx_type"],
378            LATENCY_SEC_BUCKETS.to_vec(),
379            registry,
380        )
381        .unwrap();
382
383        let execute_certificate_latency_single_writer =
384            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
385        let execute_certificate_latency_shared_object =
386            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
387
388        Self {
389            tx_orders: register_int_counter_with_registry!(
390                "total_transaction_orders",
391                "Total number of transaction orders",
392                registry,
393            )
394            .unwrap(),
395            total_certs: register_int_counter_with_registry!(
396                "total_transaction_certificates",
397                "Total number of transaction certificates handled",
398                registry,
399            )
400            .unwrap(),
401            total_cert_attempts: register_int_counter_with_registry!(
402                "total_handle_certificate_attempts",
403                "Number of calls to handle_certificate",
404                registry,
405            )
406            .unwrap(),
407            // total_effects == total transactions finished
408            total_effects: register_int_counter_with_registry!(
409                "total_transaction_effects",
410                "Total number of transaction effects produced",
411                registry,
412            )
413            .unwrap(),
414
415            shared_obj_tx: register_int_counter_with_registry!(
416                "num_shared_obj_tx",
417                "Number of transactions involving shared objects",
418                registry,
419            )
420            .unwrap(),
421
422            sponsored_tx: register_int_counter_with_registry!(
423                "num_sponsored_tx",
424                "Number of sponsored transactions",
425                registry,
426            )
427            .unwrap(),
428
429            tx_already_processed: register_int_counter_with_registry!(
430                "num_tx_already_processed",
431                "Number of transaction orders already processed previously",
432                registry,
433            )
434            .unwrap(),
435            num_input_objs: register_histogram_with_registry!(
436                "num_input_objects",
437                "Distribution of number of input TX objects per TX",
438                POSITIVE_INT_BUCKETS.to_vec(),
439                registry,
440            )
441            .unwrap(),
442            num_shared_objects: register_histogram_with_registry!(
443                "num_shared_objects",
444                "Number of shared input objects per TX",
445                POSITIVE_INT_BUCKETS.to_vec(),
446                registry,
447            )
448            .unwrap(),
449            batch_size: register_histogram_with_registry!(
450                "batch_size",
451                "Distribution of size of transaction batch",
452                POSITIVE_INT_BUCKETS.to_vec(),
453                registry,
454            )
455            .unwrap(),
456            authority_state_handle_transaction_latency: register_histogram_with_registry!(
457                "authority_state_handle_transaction_latency",
458                "Latency of handling transactions",
459                LATENCY_SEC_BUCKETS.to_vec(),
460                registry,
461            )
462            .unwrap(),
463            execute_certificate_latency_single_writer,
464            execute_certificate_latency_shared_object,
465            internal_execution_latency: register_histogram_with_registry!(
466                "authority_state_internal_execution_latency",
467                "Latency of actual certificate executions",
468                LATENCY_SEC_BUCKETS.to_vec(),
469                registry,
470            )
471            .unwrap(),
472            execution_load_input_objects_latency: register_histogram_with_registry!(
473                "authority_state_execution_load_input_objects_latency",
474                "Latency of loading input objects for execution",
475                LOW_LATENCY_SEC_BUCKETS.to_vec(),
476                registry,
477            )
478            .unwrap(),
479            prepare_certificate_latency: register_histogram_with_registry!(
480                "authority_state_prepare_certificate_latency",
481                "Latency of executing certificates, before committing the results",
482                LATENCY_SEC_BUCKETS.to_vec(),
483                registry,
484            )
485            .unwrap(),
486            commit_certificate_latency: register_histogram_with_registry!(
487                "authority_state_commit_certificate_latency",
488                "Latency of committing certificate execution results",
489                LATENCY_SEC_BUCKETS.to_vec(),
490                registry,
491            )
492            .unwrap(),
493            db_checkpoint_latency: register_histogram_with_registry!(
494                "db_checkpoint_latency",
495                "Latency of checkpointing dbs",
496                LATENCY_SEC_BUCKETS.to_vec(),
497                registry,
498            ).unwrap(),
499            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
500                "transaction_manager_num_enqueued_certificates",
501                "Current number of certificates enqueued to TransactionManager",
502                &["result"],
503                registry,
504            )
505            .unwrap(),
506            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
507                "transaction_manager_num_missing_objects",
508                "Current number of missing objects in TransactionManager",
509                registry,
510            )
511            .unwrap(),
512            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
513                "transaction_manager_num_pending_certificates",
514                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
515                registry,
516            )
517            .unwrap(),
518            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
519                "transaction_manager_num_executing_certificates",
520                "Number of executing certificates, including queued and actually running certificates",
521                registry,
522            )
523            .unwrap(),
524            transaction_manager_num_ready: register_int_gauge_with_registry!(
525                "transaction_manager_num_ready",
526                "Number of ready transactions in TransactionManager",
527                registry,
528            )
529            .unwrap(),
530            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
531                "transaction_manager_object_cache_size",
532                "Current size of object-availability cache in TransactionManager",
533                registry,
534            )
535            .unwrap(),
536            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
537                "transaction_manager_object_cache_hits",
538                "Number of object-availability cache hits in TransactionManager",
539                registry,
540            )
541            .unwrap(),
542            authority_overload_status: register_int_gauge_with_registry!(
543                "authority_overload_status",
544                "Whether authority is current experiencing overload and enters load shedding mode.",
545                registry)
546            .unwrap(),
547            authority_load_shedding_percentage: register_int_gauge_with_registry!(
548                "authority_load_shedding_percentage",
549                "The percentage of transactions is shed when the authority is in load shedding mode.",
550                registry)
551            .unwrap(),
552            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
553                "transaction_manager_object_cache_misses",
554                "Number of object-availability cache misses in TransactionManager",
555                registry,
556            )
557            .unwrap(),
558            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
559                "transaction_manager_object_cache_evictions",
560                "Number of object-availability cache evictions in TransactionManager",
561                registry,
562            )
563            .unwrap(),
564            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
565                "transaction_manager_package_cache_size",
566                "Current size of package-availability cache in TransactionManager",
567                registry,
568            )
569            .unwrap(),
570            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
571                "transaction_manager_package_cache_hits",
572                "Number of package-availability cache hits in TransactionManager",
573                registry,
574            )
575            .unwrap(),
576            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
577                "transaction_manager_package_cache_misses",
578                "Number of package-availability cache misses in TransactionManager",
579                registry,
580            )
581            .unwrap(),
582            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
583                "transaction_manager_package_cache_evictions",
584                "Number of package-availability cache evictions in TransactionManager",
585                registry,
586            )
587            .unwrap(),
588            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
589                "transaction_manager_transaction_queue_age_s",
590                "Time spent in waiting for transaction in the queue",
591                LATENCY_SEC_BUCKETS.to_vec(),
592                registry,
593            )
594            .unwrap(),
595            transaction_overload_sources: register_int_counter_vec_with_registry!(
596                "transaction_overload_sources",
597                "Number of times each source indicates transaction overload.",
598                &["source"],
599                registry)
600            .unwrap(),
601            execution_driver_executed_transactions: register_int_counter_with_registry!(
602                "execution_driver_executed_transactions",
603                "Cumulative number of transaction executed by execution driver",
604                registry,
605            )
606            .unwrap(),
607            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
608                "execution_driver_dispatch_queue",
609                "Number of transaction pending in execution driver dispatch queue",
610                registry,
611            )
612            .unwrap(),
613            execution_queueing_delay_s: register_histogram_with_registry!(
614                "execution_queueing_delay_s",
615                "Queueing delay between a transaction is ready for execution until it starts executing.",
616                LATENCY_SEC_BUCKETS.to_vec(),
617                registry
618            )
619            .unwrap(),
620            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
621                "prepare_cert_gas_latency_ratio",
622                "The ratio of computation gas divided by VM execution latency.",
623                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
624                registry
625            )
626            .unwrap(),
627            execution_gas_latency_ratio: register_histogram_with_registry!(
628                "execution_gas_latency_ratio",
629                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
630                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
631                registry
632            )
633            .unwrap(),
634            skipped_consensus_txns: register_int_counter_with_registry!(
635                "skipped_consensus_txns",
636                "Total number of consensus transactions skipped",
637                registry,
638            )
639            .unwrap(),
640            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
641                "skipped_consensus_txns_cache_hit",
642                "Total number of consensus transactions skipped because of local cache hit",
643                registry,
644            )
645            .unwrap(),
646            post_processing_total_events_emitted: register_int_counter_with_registry!(
647                "post_processing_total_events_emitted",
648                "Total number of events emitted in post processing",
649                registry,
650            )
651            .unwrap(),
652            post_processing_total_tx_indexed: register_int_counter_with_registry!(
653                "post_processing_total_tx_indexed",
654                "Total number of txes indexed in post processing",
655                registry,
656            )
657            .unwrap(),
658            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
659                "post_processing_total_tx_had_event_processed",
660                "Total number of txes finished event processing in post processing",
661                registry,
662            )
663            .unwrap(),
664            post_processing_total_failures: register_int_counter_with_registry!(
665                "post_processing_total_failures",
666                "Total number of failure in post processing",
667                registry,
668            )
669            .unwrap(),
670            consensus_handler_processed: register_int_counter_vec_with_registry!(
671                "consensus_handler_processed",
672                "Number of transactions processed by consensus handler",
673                &["class"],
674                registry
675            ).unwrap(),
676            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
677                "consensus_handler_transaction_sizes",
678                "Sizes of each type of transactions processed by consensus handler",
679                &["class"],
680                POSITIVE_INT_BUCKETS.to_vec(),
681                registry
682            ).unwrap(),
683            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
684                "consensus_handler_num_low_scoring_authorities",
685                "Number of low scoring authorities based on reputation scores from consensus",
686                registry
687            ).unwrap(),
688            consensus_handler_scores: register_int_gauge_vec_with_registry!(
689                "consensus_handler_scores",
690                "scores from consensus for each authority",
691                &["authority"],
692                registry,
693            ).unwrap(),
694            validator_scoreboard_scores: register_int_gauge_vec_with_registry!(
695                "validator_scoreboard_scores",
696                "Per-authority validator scores published by the local Scoreboard after each consensus commit. Range [0, MAX_SCORE].",
697                &["authority"],
698                registry,
699            ).unwrap(),
700            invalid_misbehavior_reports_by_authority: register_int_gauge_vec_with_registry!(
701                "invalid_misbehavior_reports_by_authority",
702                "Cumulative count of invalid misbehavior reports received from each reporting authority in the current epoch. Bumped when a `MisbehaviorReport` consensus transaction fails sender/authority match or payload validation. Snapshot republished after each consensus commit.",
703                &["authority"],
704                registry,
705            ).unwrap(),
706            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
707                "consensus_handler_deferred_transactions",
708                "Number of transactions deferred by consensus handler",
709                registry,
710            ).unwrap(),
711            consensus_handler_congested_transactions: register_int_counter_with_registry!(
712                "consensus_handler_congested_transactions",
713                "Number of transactions deferred by consensus handler due to congestion",
714                registry,
715            ).unwrap(),
716            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
717                "consensus_handler_cancelled_transactions",
718                "Number of transactions cancelled by consensus handler",
719                registry,
720            ).unwrap(),
721            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
722                "consensus_handler_max_congestion_control_object_costs",
723                "Max object costs for congestion control in the current consensus commit",
724                &["commit_type"],
725                registry,
726            ).unwrap(),
727            consensus_committed_subdags: register_int_counter_vec_with_registry!(
728                "consensus_committed_subdags",
729                "Number of committed subdags, sliced by leader",
730                &["authority"],
731                registry,
732            ).unwrap(),
733            consensus_committed_messages: register_int_gauge_vec_with_registry!(
734                "consensus_committed_messages",
735                "Total number of committed consensus messages, sliced by author",
736                &["authority"],
737                registry,
738            ).unwrap(),
739            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
740                "consensus_committed_user_transactions",
741                "Number of committed user transactions, sliced by submitter",
742                &["authority"],
743                registry,
744            ).unwrap(),
745            consensus_handler_leader_round: register_int_gauge_with_registry!(
746                "consensus_handler_leader_round",
747                "The leader round of the current consensus output being processed in the consensus handler",
748                registry,
749            ).unwrap(),
750            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
751            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
752            multisig_sig_count: register_int_counter_with_registry!(
753                "multisig_sig_count",
754                "Count of multisig signatures",
755                registry,
756            )
757            .unwrap(),
758            consensus_calculated_throughput: register_int_gauge_with_registry!(
759                "consensus_calculated_throughput",
760                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
761                registry,
762            ).unwrap(),
763            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
764                "consensus_calculated_throughput_profile",
765                "The current active calculated throughput profile",
766                registry
767            ).unwrap(),
768            execution_queueing_latency: LatencyObserver::new(),
769            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
770            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
771        }
772    }
773
774    /// Reset metrics that contain `hostname` as one of the labels. This is
775    /// needed to avoid retaining metrics for long-gone committee members and
776    /// only exposing metrics for the committee in the current epoch.
777    pub fn reset_on_reconfigure(&self) {
778        self.consensus_committed_messages.reset();
779        self.consensus_handler_scores.reset();
780        self.validator_scoreboard_scores.reset();
781        self.invalid_misbehavior_reports_by_authority.reset();
782        self.consensus_committed_user_transactions.reset();
783    }
784}
785
786/// a Trait object for `Signer` that is:
787/// - Pin, i.e. confined to one place in memory (we don't want to copy private
788///   keys).
789/// - Sync, i.e. can be safely shared between threads.
790///
791/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
792pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
793
794pub struct AuthorityState {
795    // Fixed size, static, identity of the authority
796    /// The name of this authority.
797    pub name: AuthorityName,
798    /// The signature key of the authority.
799    pub secret: StableSyncAuthoritySigner,
800
801    /// The database
802    input_loader: TransactionInputLoader,
803    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
804
805    epoch_store: ArcSwap<AuthorityPerEpochStore>,
806
807    /// This lock denotes current 'execution epoch'.
808    /// Execution acquires read lock, checks certificate epoch and holds it
809    /// until all writes are complete. Reconfiguration acquires write lock,
810    /// changes the epoch and revert all transactions from previous epoch
811    /// that are executed but did not make into checkpoint.
812    execution_lock: RwLock<EpochId>,
813
814    pub indexes: Option<Arc<IndexStore>>,
815    pub grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
816
817    pub subscription_handler: Arc<SubscriptionHandler>,
818    pub checkpoint_store: Arc<CheckpointStore>,
819
820    committee_store: Arc<CommitteeStore>,
821
822    /// Manages pending certificates and their missing input objects.
823    transaction_manager: Arc<TransactionManager>,
824
825    /// Shuts down the execution task. Used only in testing.
826    #[cfg_attr(not(test), expect(unused))]
827    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
828
829    pub metrics: Arc<AuthorityMetrics>,
830    _pruner: AuthorityStorePruner,
831    authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
832    checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
833
834    /// Take db checkpoints of different dbs
835    db_checkpoint_config: DBCheckpointConfig,
836
837    pub config: NodeConfig,
838
839    /// Current overload status in this authority. Updated periodically.
840    pub overload_info: AuthorityOverloadInfo,
841
842    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
843
844    /// The chain identifier is derived from the digest of the genesis
845    /// checkpoint.
846    chain_identifier: ChainIdentifier,
847
848    pub(crate) congestion_tracker: Arc<CongestionTracker>,
849
850    /// Traffic controller for IOTA core servers (json-rpc, validator service)
851    pub traffic_controller: Option<Arc<TrafficController>>,
852}
853
854/// The authority state encapsulates all state, drives execution, and ensures
855/// safety.
856///
857/// Note the authority operations can be accessed through a read ref (&) and do
858/// not require &mut. Internally a database is synchronized through a mutex
859/// lock.
860///
861/// Repeating valid commands should produce no changes and return no error.
862impl AuthorityState {
863    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
864        epoch_store.committee().authority_exists(&self.name)
865    }
866
867    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
868        epoch_store
869            .active_validators()
870            .iter()
871            .any(|a| AuthorityName::from(a) == self.name)
872    }
873
874    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
875        !self.is_committee_validator(epoch_store)
876    }
877
878    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
879        &self.committee_store
880    }
881
882    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
883        self.committee_store.clone()
884    }
885
886    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
887        &self.config.authority_overload_config
888    }
889
890    pub fn get_epoch_state_commitments(
891        &self,
892        epoch: EpochId,
893    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
894        self.checkpoint_store.get_epoch_state_commitments(epoch)
895    }
896
897    /// This is a private method and should be kept that way. It doesn't check
898    /// whether the provided transaction is a system transaction, and hence
899    /// can only be called internally.
900    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
901    async fn handle_transaction_impl(
902        &self,
903        transaction: VerifiedTransaction,
904        epoch_store: &Arc<AuthorityPerEpochStore>,
905    ) -> IotaResult<VerifiedSignedTransaction> {
906        // Ensure that validator cannot reconfigure while we are signing the tx
907        let _execution_lock = self.execution_lock_for_signing()?;
908
909        let protocol_config = epoch_store.protocol_config();
910        let reference_gas_price = epoch_store.reference_gas_price();
911
912        let epoch = epoch_store.epoch();
913
914        let tx_data = transaction.data().transaction_data();
915
916        // Note: the deny checks may do redundant package loads but:
917        // - they only load packages when there is an active package deny map
918        // - the loads are cached anyway
919        iota_transaction_checks::deny::check_transaction_for_signing(
920            tx_data,
921            transaction.tx_signatures(),
922            &transaction.input_objects()?,
923            &tx_data.receiving_objects(),
924            &self.config.transaction_deny_config,
925            self.get_backing_package_store().as_ref(),
926        )?;
927
928        // Load all transaction-related input objects including ones for every
929        // `MoveAuthenticator`. Loading all objects eagerly means that any invalid
930        // reference — missing object, wrong version, inaccessible object — causes a
931        // pre-consensus rejection.
932        let (tx_input_objects, tx_receiving_objects, per_authenticator_inputs) =
933            self.read_objects_for_signing(&transaction, epoch)?;
934
935        let move_authenticators = transaction.move_authenticators();
936
937        // Check the inputs for signing.
938        // If there are `MoveAuthenticator` signatures, their input objects and the
939        // account objects are also checked and must be provided.
940        // It is also checked if there is enough gas to execute the transaction and its
941        // authenticators.
942        let (gas_status, tx_checked_input_objects, per_authenticator_checked_inputs) = self
943            .check_transaction_inputs_for_signing(
944                protocol_config,
945                reference_gas_price,
946                tx_data,
947                tx_input_objects,
948                &tx_receiving_objects,
949                &move_authenticators,
950                per_authenticator_inputs,
951            )?;
952
953        // Get the input objects for the authenticators, if there are
954        // `MoveAuthenticator`s.
955        let per_authenticator_checked_input_objects = per_authenticator_checked_inputs
956            .iter()
957            .map(|i| &i.0)
958            .collect();
959
960        // Check if any of the sender, the transaction input objects, the receiving
961        // objects and the authenticator input objects are in the coin deny
962        // list, which would prevent the transaction from being signed.
963        check_coin_deny_list_v1_during_signing(
964            tx_data.sender(),
965            &tx_checked_input_objects,
966            &tx_receiving_objects,
967            &per_authenticator_checked_input_objects,
968            &self.get_object_store(),
969        )?;
970
971        let (kind, signer, gas_data) = tx_data.execution_parts();
972
973        let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
974            extract_auth_fun_refs(signer, gas_data.owner, |address| {
975                move_authenticators
976                    .iter()
977                    .zip(per_authenticator_checked_inputs.iter())
978                    .find(|(move_authenticator, _)| {
979                        move_authenticator.address().ok().as_ref() == Some(&address)
980                    })
981                    .map(|(_, (_, auth_fun_ref))| auth_fun_ref.clone())
982            });
983
984        // Filter the authenticators and their checked inputs down to those that must
985        // be executed pre-consensus. This is done *after* the deny-list check so
986        // that all MoveAuthenticator input objects are covered by that check regardless
987        // of deferral.
988        let pre_consensus_move_authenticators =
989            pre_consensus_move_authenticators(&transaction, protocol_config);
990        let (move_authenticators, per_authenticator_checked_inputs): (Vec<_>, Vec<_>) =
991            move_authenticators
992                .into_iter()
993                .zip(per_authenticator_checked_inputs)
994                .filter(|(a, _)| pre_consensus_move_authenticators.contains(a))
995                .unzip();
996        let per_authenticator_checked_input_objects: Vec<_> = per_authenticator_checked_inputs
997            .iter()
998            .map(|i| &i.0)
999            .collect();
1000
1001        // If there are `MoveAuthenticator` signatures, execute them and check if they
1002        // all succeed.
1003        if !move_authenticators.is_empty() {
1004            let aggregated_authenticator_input_objects =
1005                iota_transaction_checks::aggregate_authenticator_input_objects(
1006                    &per_authenticator_checked_input_objects,
1007                )?;
1008
1009            debug_assert_eq!(
1010                move_authenticators.len(),
1011                per_authenticator_checked_inputs.len(),
1012                "Move authenticators amount must match the number of checked authenticator inputs"
1013            );
1014
1015            let move_authenticators = move_authenticators
1016                .into_iter()
1017                .zip(per_authenticator_checked_inputs)
1018                .map(
1019                    |(
1020                        move_authenticator,
1021                        (authenticator_checked_input_objects, authenticator_function_ref),
1022                    )| {
1023                        (
1024                            move_authenticator.to_owned(),
1025                            authenticator_function_ref,
1026                            authenticator_checked_input_objects,
1027                        )
1028                    },
1029                )
1030                .collect();
1031
1032            // It is supposed that `MoveAuthenticator` availability is checked in
1033            // `SenderSignedData::validity_check`.
1034
1035            // Serialize the TransactionData for the auth context before decomposing.
1036            let tx_data_bytes =
1037                bcs::to_bytes(&tx_data).expect("TransactionData serialization cannot fail");
1038
1039            let (sender_auth_digest, sponsor_auth_digest) =
1040                transaction.data().compute_auth_digests()?;
1041
1042            let auth_context_data = AuthContextData {
1043                transaction_data_bytes: tx_data_bytes,
1044                sender_auth_digest,
1045                sponsor_auth_digest,
1046                sender_authenticator_function_ref,
1047                sponsor_authenticator_function_ref,
1048            };
1049
1050            // Execute the Move authenticators.
1051            let validation_result = epoch_store.executor().authenticate_transaction(
1052                self.get_backing_store().as_ref(),
1053                protocol_config,
1054                self.metrics.limits_metrics.clone(),
1055                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1056                epoch_store
1057                    .epoch_start_config()
1058                    .epoch_data()
1059                    .epoch_start_timestamp(),
1060                gas_data,
1061                gas_status,
1062                move_authenticators,
1063                aggregated_authenticator_input_objects,
1064                kind,
1065                signer,
1066                transaction.digest().to_owned(),
1067                auth_context_data,
1068                &mut None,
1069            );
1070
1071            if let Err(validation_error) = validation_result {
1072                return Err(IotaError::MoveAuthenticatorExecutionFailure {
1073                    error: validation_error.to_string(),
1074                });
1075            }
1076        }
1077
1078        let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
1079
1080        let signed_transaction =
1081            VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
1082
1083        // Check and write locks, to signed transaction, into the database
1084        // The call to self.set_transaction_lock checks the lock is not conflicting,
1085        // and returns ConflictingTransaction error in case there is a lock on a
1086        // different existing transaction.
1087        self.get_cache_writer().try_acquire_transaction_locks(
1088            epoch_store,
1089            &owned_objects,
1090            signed_transaction.clone(),
1091        )?;
1092
1093        Ok(signed_transaction)
1094    }
1095
1096    /// Initiate a new transaction.
1097    #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()))]
1098    pub async fn handle_transaction(
1099        &self,
1100        epoch_store: &Arc<AuthorityPerEpochStore>,
1101        transaction: VerifiedTransaction,
1102    ) -> IotaResult<HandleTransactionResponse> {
1103        let tx_digest = *transaction.digest();
1104        debug!("handle_transaction");
1105
1106        // Ensure an idempotent answer.
1107        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1108            return Ok(HandleTransactionResponse { status });
1109        }
1110
1111        let _metrics_guard = self
1112            .metrics
1113            .authority_state_handle_transaction_latency
1114            .start_timer();
1115        self.metrics.tx_orders.inc();
1116
1117        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1118        match signed {
1119            Ok(s) => {
1120                if self.is_committee_validator(epoch_store) {
1121                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1122                        let tx = s.clone();
1123                        let validator_tx_finalizer = validator_tx_finalizer.clone();
1124                        let cache_reader = self.get_transaction_cache_reader().clone();
1125                        let epoch_store = epoch_store.clone();
1126                        spawn_monitored_task!(epoch_store.within_alive_epoch(
1127                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1128                        ));
1129                    }
1130                }
1131                Ok(HandleTransactionResponse {
1132                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
1133                })
1134            }
1135            // It happens frequently that while we are checking the validity of the transaction, it
1136            // has just been executed.
1137            // In that case, we could still return Ok to avoid showing confusing errors.
1138            Err(err) => Ok(HandleTransactionResponse {
1139                status: self
1140                    .get_transaction_status(&tx_digest, epoch_store)?
1141                    .ok_or(err)?
1142                    .1,
1143            }),
1144        }
1145    }
1146
1147    pub fn check_system_overload_at_signing(&self) -> bool {
1148        self.config
1149            .authority_overload_config
1150            .check_system_overload_at_signing
1151    }
1152
1153    pub fn check_system_overload_at_execution(&self) -> bool {
1154        self.config
1155            .authority_overload_config
1156            .check_system_overload_at_execution
1157    }
1158
1159    pub(crate) fn check_system_overload(
1160        &self,
1161        consensus_adapter: &Arc<ConsensusAdapter>,
1162        tx_data: &SenderSignedData,
1163        do_authority_overload_check: bool,
1164    ) -> IotaResult {
1165        if do_authority_overload_check {
1166            self.check_authority_overload(tx_data).tap_err(|_| {
1167                self.update_overload_metrics("execution_queue");
1168            })?;
1169        }
1170        self.transaction_manager
1171            .check_execution_overload(self.overload_config(), tx_data)
1172            .tap_err(|_| {
1173                self.update_overload_metrics("execution_pending");
1174            })?;
1175        consensus_adapter.check_consensus_overload().tap_err(|_| {
1176            self.update_overload_metrics("consensus");
1177        })?;
1178
1179        let pending_tx_count = self
1180            .get_cache_commit()
1181            .approximate_pending_transaction_count();
1182        if pending_tx_count
1183            > self
1184                .config
1185                .execution_cache_config
1186                .writeback_cache
1187                .backpressure_threshold_for_rpc()
1188        {
1189            return Err(IotaError::ValidatorOverloadedRetryAfter {
1190                retry_after_secs: 10,
1191            });
1192        }
1193
1194        Ok(())
1195    }
1196
1197    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1198        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1199            return Ok(());
1200        }
1201
1202        let load_shedding_percentage = self
1203            .overload_info
1204            .load_shedding_percentage
1205            .load(Ordering::Relaxed);
1206        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1207    }
1208
1209    fn update_overload_metrics(&self, source: &str) {
1210        self.metrics
1211            .transaction_overload_sources
1212            .with_label_values(&[source])
1213            .inc();
1214    }
1215
1216    /// Executes a certificate for its effects.
1217    #[instrument(level = "trace", skip_all)]
1218    pub async fn execute_certificate(
1219        &self,
1220        certificate: &VerifiedCertificate,
1221        epoch_store: &Arc<AuthorityPerEpochStore>,
1222    ) -> IotaResult<TransactionEffects> {
1223        let _metrics_guard = if certificate.contains_shared_object() {
1224            self.metrics
1225                .execute_certificate_latency_shared_object
1226                .start_timer()
1227        } else {
1228            self.metrics
1229                .execute_certificate_latency_single_writer
1230                .start_timer()
1231        };
1232        trace!("execute_certificate");
1233
1234        self.metrics.total_cert_attempts.inc();
1235
1236        if !certificate.contains_shared_object() {
1237            // Shared object transactions need to be sequenced by the consensus before
1238            // enqueueing for execution, done in
1239            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1240            // object transactions, they can be enqueued for execution immediately.
1241            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1242        }
1243
1244        // tx could be reverted when epoch ends, so we must be careful not to return a
1245        // result here after the epoch ends.
1246        epoch_store
1247            .within_alive_epoch(self.notify_read_effects(certificate))
1248            .await
1249            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1250            .and_then(|r| r)
1251    }
1252
1253    /// Internal logic to execute a certificate.
1254    ///
1255    /// Guarantees that
1256    /// - If input objects are available, return no permanent failure.
1257    /// - Execution and output commit are atomic. i.e. outputs are only written
1258    ///   to storage,
1259    /// on successful execution; crashed execution has no observable effect and
1260    /// can be retried.
1261    ///
1262    /// It is caller's responsibility to ensure input objects are available and
1263    /// locks are set. If this cannot be satisfied by the caller,
1264    /// execute_certificate() should be called instead.
1265    ///
1266    /// Should only be called within iota-core.
1267    #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1268    pub fn try_execute_immediately(
1269        &self,
1270        certificate: &VerifiedExecutableTransaction,
1271        expected_effects_digest: Option<TransactionEffectsDigest>,
1272        epoch_store: &Arc<AuthorityPerEpochStore>,
1273    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1274        let _scope = monitored_scope("Execution::try_execute_immediately");
1275        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1276
1277        let tx_digest = certificate.digest();
1278
1279        // Acquire a lock to prevent concurrent executions of the same transaction.
1280        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1281
1282        // The cert could have been processed by a concurrent attempt of the same cert,
1283        // so check if the effects have already been written.
1284        if let Some(effects) = self
1285            .get_transaction_cache_reader()
1286            .try_get_executed_effects(tx_digest)?
1287        {
1288            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1289                assert_eq!(
1290                    effects.digest(),
1291                    expected_effects_digest_inner,
1292                    "Unexpected effects digest for transaction {tx_digest}"
1293                );
1294            }
1295            tx_guard.release();
1296            return Ok((effects, None));
1297        }
1298
1299        let (tx_input_objects, per_authenticator_inputs) =
1300            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1301
1302        // If no expected_effects_digest was provided, try to get it from storage.
1303        // We could be re-executing a previously executed but uncommitted transaction,
1304        // perhaps after restarting with a new binary. In this situation, if
1305        // we have published an effects signature, we must be sure not to
1306        // equivocate.
1307        let expected_effects_digest =
1308            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1309
1310        self.process_certificate(
1311            tx_guard,
1312            certificate,
1313            tx_input_objects,
1314            per_authenticator_inputs,
1315            expected_effects_digest,
1316            epoch_store,
1317        )
1318        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1319        .tap_ok(
1320            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1321        )
1322    }
1323
1324    pub fn read_objects_for_execution(
1325        &self,
1326        tx_lock: &CertLockGuard,
1327        certificate: &VerifiedExecutableTransaction,
1328        epoch_store: &Arc<AuthorityPerEpochStore>,
1329    ) -> IotaResult<(InputObjects, Vec<(InputObjects, ObjectReadResult)>)> {
1330        let _scope = monitored_scope("Execution::load_input_objects");
1331        let _metrics_guard = self
1332            .metrics
1333            .execution_load_input_objects_latency
1334            .start_timer();
1335
1336        let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1337
1338        let input_objects = self.input_loader.read_objects_for_execution(
1339            epoch_store,
1340            &certificate.key(),
1341            tx_lock,
1342            &input_objects,
1343            epoch_store.epoch(),
1344        )?;
1345
1346        certificate.split_input_objects_into_groups_for_reading(input_objects)
1347    }
1348
1349    /// Test only wrapper for `try_execute_immediately()` above, useful for
1350    /// checking errors if the pre-conditions are not satisfied, and
1351    /// executing change epoch transactions.
1352    pub fn try_execute_for_test(
1353        &self,
1354        certificate: &VerifiedCertificate,
1355    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1356        let epoch_store = self.epoch_store_for_testing();
1357        let (effects, execution_error_opt) = self.try_execute_immediately(
1358            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1359            None,
1360            &epoch_store,
1361        )?;
1362        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1363        Ok((signed_effects, execution_error_opt))
1364    }
1365
1366    /// Non-fallible version of `try_execute_for_test()`.
1367    pub fn execute_for_test(
1368        &self,
1369        certificate: &VerifiedCertificate,
1370    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1371        self.try_execute_for_test(certificate)
1372            .expect("try_execute_for_test should not fail")
1373    }
1374
1375    pub async fn notify_read_effects(
1376        &self,
1377        certificate: &VerifiedCertificate,
1378    ) -> IotaResult<TransactionEffects> {
1379        self.get_transaction_cache_reader()
1380            .try_notify_read_executed_effects(&[*certificate.digest()])
1381            .await
1382            .map(|mut r| r.pop().expect("must return correct number of effects"))
1383    }
1384
1385    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1386        self.get_object_cache_reader()
1387            .try_check_owned_objects_are_live(owned_object_refs)
1388    }
1389
1390    /// This function captures the required state to debug a forked transaction.
1391    /// The dump is written to a file in dir `path`, with name prefixed by the
1392    /// transaction digest. NOTE: Since this info escapes the validator
1393    /// context, make sure not to leak any private info here
1394    pub(crate) fn debug_dump_transaction_state(
1395        &self,
1396        tx_digest: &TransactionDigest,
1397        effects: &TransactionEffects,
1398        expected_effects_digest: TransactionEffectsDigest,
1399        inner_temporary_store: &InnerTemporaryStore,
1400        certificate: &VerifiedExecutableTransaction,
1401        debug_dump_config: &StateDebugDumpConfig,
1402    ) -> IotaResult<PathBuf> {
1403        // Fall back to the OS temp directory if no dump directory is configured.
1404        // This is safe: dump files are named by transaction digest, so no collisions.
1405        let dump_dir = debug_dump_config
1406            .dump_file_directory
1407            .as_ref()
1408            .cloned()
1409            .unwrap_or(std::env::temp_dir());
1410        let epoch_store = self.load_epoch_store_one_call_per_task();
1411
1412        NodeStateDump::new(
1413            tx_digest,
1414            effects,
1415            expected_effects_digest,
1416            self.get_object_store().as_ref(),
1417            &epoch_store,
1418            inner_temporary_store,
1419            certificate,
1420        )?
1421        .write_to_file(&dump_dir)
1422        .map_err(|e| IotaError::FileIO(e.to_string()))
1423    }
1424
1425    #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?certificate.digest(), sender = ?certificate.data().transaction_data().gas_owner().to_string()))]
1426    pub(crate) fn process_certificate(
1427        &self,
1428        tx_guard: CertTxGuard,
1429        certificate: &VerifiedExecutableTransaction,
1430        tx_input_objects: InputObjects,
1431        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1432        expected_effects_digest: Option<TransactionEffectsDigest>,
1433        epoch_store: &Arc<AuthorityPerEpochStore>,
1434    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1435        let process_certificate_start_time = tokio::time::Instant::now();
1436        let digest = *certificate.digest();
1437
1438        let _scope = monitored_scope("Execution::process_certificate");
1439
1440        fail_point_if!("correlated-crash-process-certificate", || {
1441            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1442                iota_simulator::task::kill_current_node(None);
1443            }
1444        });
1445
1446        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1447        // Any caller that verifies the signatures on the certificate will have already
1448        // checked the epoch. But paths that don't verify sigs (e.g. execution
1449        // from checkpoint, reading from db) present the possibility of an epoch
1450        // mismatch. If this cert is not finalzied in previous epoch, then it's
1451        // invalid.
1452        let execution_guard = match execution_guard {
1453            Ok(execution_guard) => execution_guard,
1454            Err(err) => {
1455                tx_guard.release();
1456                return Err(err);
1457            }
1458        };
1459        // Since we obtain a reference to the epoch store before taking the execution
1460        // lock, it's possible that reconfiguration has happened and they no
1461        // longer match.
1462        if *execution_guard != epoch_store.epoch() {
1463            tx_guard.release();
1464            info!("The epoch of the execution_guard doesn't match the epoch store");
1465            return Err(IotaError::WrongEpoch {
1466                expected_epoch: epoch_store.epoch(),
1467                actual_epoch: *execution_guard,
1468            });
1469        }
1470
1471        // Errors originating from prepare_certificate may be transient (failure to read
1472        // locks) or non-transient (transaction input is invalid, move vm
1473        // errors). However, all errors from this function occur before we have
1474        // written anything to the db, so we commit the tx guard and rely on the
1475        // client to retry the tx (if it was transient).
1476        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1477            &execution_guard,
1478            certificate,
1479            tx_input_objects,
1480            per_authenticator_inputs,
1481            epoch_store,
1482        ) {
1483            Err(e) => {
1484                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1485                tx_guard.release();
1486                return Err(e);
1487            }
1488            Ok(res) => res,
1489        };
1490
1491        if let Some(expected_effects_digest) = expected_effects_digest {
1492            if effects.digest() != expected_effects_digest {
1493                // We dont want to mask the original error, so we log it and continue.
1494                match self.debug_dump_transaction_state(
1495                    &digest,
1496                    &effects,
1497                    expected_effects_digest,
1498                    &inner_temporary_store,
1499                    certificate,
1500                    &self.config.state_debug_dump_config,
1501                ) {
1502                    Ok(out_path) => {
1503                        info!(
1504                            "Dumped node state for transaction {} to {}",
1505                            digest,
1506                            out_path.as_path().display().to_string()
1507                        );
1508                    }
1509                    Err(e) => {
1510                        error!("Error dumping state for transaction {}: {e}", digest);
1511                    }
1512                }
1513                error!(
1514                    tx_digest = ?digest,
1515                    ?expected_effects_digest,
1516                    actual_effects = ?effects,
1517                    "fork detected!"
1518                );
1519                panic!(
1520                    "Transaction {} is expected to have effects digest {}, but got {}!",
1521                    digest,
1522                    expected_effects_digest,
1523                    effects.digest(),
1524                );
1525            }
1526        }
1527
1528        fail_point!("crash");
1529
1530        self.commit_certificate(
1531            certificate,
1532            inner_temporary_store,
1533            &effects,
1534            tx_guard,
1535            execution_guard,
1536            epoch_store,
1537        )?;
1538
1539        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1540        if elapsed > 0.0 {
1541            self.metrics
1542                .execution_gas_latency_ratio
1543                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1544        };
1545        Ok((effects, execution_error_opt))
1546    }
1547
1548    pub async fn reconfigure_traffic_control(
1549        &self,
1550        params: TrafficControlReconfigParams,
1551    ) -> Result<TrafficControlReconfigParams, IotaError> {
1552        if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1553            traffic_controller.admin_reconfigure(params).await
1554        } else {
1555            Err(IotaError::InvalidAdminRequest(
1556                "Traffic controller is not configured on this node".to_string(),
1557            ))
1558        }
1559    }
1560
1561    #[instrument(level = "trace", skip_all)]
1562    fn commit_certificate(
1563        &self,
1564        certificate: &VerifiedExecutableTransaction,
1565        inner_temporary_store: InnerTemporaryStore,
1566        effects: &TransactionEffects,
1567        tx_guard: CertTxGuard,
1568        _execution_guard: ExecutionLockReadGuard<'_>,
1569        epoch_store: &Arc<AuthorityPerEpochStore>,
1570    ) -> IotaResult {
1571        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1572            monitored_scope("Execution::commit_certificate");
1573        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1574
1575        let tx_key = certificate.key();
1576        let tx_digest = certificate.digest();
1577        let input_object_count = inner_temporary_store.input_objects.len();
1578        let shared_object_count = effects.input_shared_objects().len();
1579
1580        let output_keys = inner_temporary_store.get_output_keys(effects);
1581
1582        // index certificate
1583        let _ = self
1584            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1585            .tap_err(|e| {
1586                self.metrics.post_processing_total_failures.inc();
1587                error!(?tx_digest, "tx post processing failed: {e}");
1588            });
1589
1590        // The insertion to epoch_store is not atomic with the insertion to the
1591        // perpetual store. This is OK because we insert to the epoch store
1592        // first. And during lookups we always look up in the perpetual store first.
1593        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1594
1595        // Allow testing what happens if we crash here.
1596        fail_point!("crash");
1597
1598        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1599            certificate.clone().into_unsigned(),
1600            effects.clone(),
1601            inner_temporary_store,
1602        );
1603        self.get_cache_writer()
1604            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1605
1606        if certificate.transaction_data().is_end_of_epoch_tx() {
1607            // At the end of epoch, since system packages may have been upgraded, force
1608            // reload them in the cache.
1609            self.get_object_cache_reader()
1610                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1611        }
1612
1613        // commit_certificate finished, the tx is fully committed to the store.
1614        tx_guard.commit_tx();
1615
1616        // Notifies transaction manager about transaction and output objects committed.
1617        // This provides necessary information to transaction manager to start executing
1618        // additional ready transactions.
1619        self.transaction_manager
1620            .notify_commit(tx_digest, output_keys, epoch_store);
1621
1622        self.update_metrics(certificate, input_object_count, shared_object_count);
1623
1624        Ok(())
1625    }
1626
1627    fn update_metrics(
1628        &self,
1629        certificate: &VerifiedExecutableTransaction,
1630        input_object_count: usize,
1631        shared_object_count: usize,
1632    ) {
1633        // count signature by scheme, for multisig
1634        if certificate.has_upgraded_multisig() {
1635            self.metrics.multisig_sig_count.inc();
1636        }
1637
1638        self.metrics.total_effects.inc();
1639        self.metrics.total_certs.inc();
1640
1641        if shared_object_count > 0 {
1642            self.metrics.shared_obj_tx.inc();
1643        }
1644
1645        if certificate.is_sponsored_tx() {
1646            self.metrics.sponsored_tx.inc();
1647        }
1648
1649        self.metrics
1650            .num_input_objs
1651            .observe(input_object_count as f64);
1652        self.metrics
1653            .num_shared_objects
1654            .observe(shared_object_count as f64);
1655        self.metrics.batch_size.observe(
1656            certificate
1657                .data()
1658                .intent_message()
1659                .value
1660                .kind()
1661                .num_commands() as f64,
1662        );
1663    }
1664
1665    /// prepare_certificate validates the transaction input, and executes the
1666    /// certificate, returning effects, output objects, events, etc.
1667    ///
1668    /// It reads state from the db (both owned and shared locks), but it has no
1669    /// side effects.
1670    ///
1671    /// It can be generally understood that a failure of prepare_certificate
1672    /// indicates a non-transient error, e.g. the transaction input is
1673    /// somehow invalid, the correct locks are not held, etc. However, this
1674    /// is not entirely true, as a transient db read error may also cause
1675    /// this function to fail.
1676    #[instrument(level = "trace", skip_all)]
1677    fn prepare_certificate(
1678        &self,
1679        _execution_guard: &ExecutionLockReadGuard<'_>,
1680        certificate: &VerifiedExecutableTransaction,
1681        tx_input_objects: InputObjects,
1682        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1683        epoch_store: &Arc<AuthorityPerEpochStore>,
1684    ) -> IotaResult<(
1685        InnerTemporaryStore,
1686        TransactionEffects,
1687        Option<ExecutionError>,
1688    )> {
1689        let _scope = monitored_scope("Execution::prepare_certificate");
1690        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1691        let prepare_certificate_start_time = tokio::time::Instant::now();
1692
1693        let protocol_config = epoch_store.protocol_config();
1694
1695        let reference_gas_price = epoch_store.reference_gas_price();
1696
1697        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1698        let epoch_start_timestamp = epoch_store
1699            .epoch_start_config()
1700            .epoch_data()
1701            .epoch_start_timestamp();
1702
1703        let backing_store = self.get_backing_store().as_ref();
1704
1705        let tx_digest = *certificate.digest();
1706
1707        // TODO: We need to move this to a more appropriate place to avoid redundant
1708        // checks.
1709        let tx_data = certificate.data().transaction_data();
1710        tx_data.validity_check(protocol_config)?;
1711
1712        let (kind, signer, gas_data) = tx_data.execution_parts();
1713
1714        let move_authenticators = certificate.move_authenticators();
1715
1716        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1717        let (inner_temp_store, _, mut effects, execution_error_opt) = if move_authenticators
1718            .is_empty()
1719        {
1720            // No Move authentication required, proceed to execute the transaction directly.
1721
1722            // The cost of partially re-auditing a transaction before execution is
1723            // tolerated.
1724            let (tx_gas_status, tx_checked_input_objects) =
1725                iota_transaction_checks::check_certificate_input(
1726                    certificate,
1727                    tx_input_objects,
1728                    protocol_config,
1729                    reference_gas_price,
1730                )?;
1731
1732            let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1733            self.check_owned_locks(&owned_object_refs)?;
1734            epoch_store.executor().execute_transaction_to_effects(
1735                backing_store,
1736                protocol_config,
1737                self.metrics.limits_metrics.clone(),
1738                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1739                // cyclic dependency w/ iota-adapter
1740                self.config
1741                    .expensive_safety_check_config
1742                    .enable_deep_per_tx_iota_conservation_check(),
1743                self.config.certificate_deny_config.certificate_deny_set(),
1744                &epoch_id,
1745                epoch_start_timestamp,
1746                tx_checked_input_objects,
1747                gas_data,
1748                tx_gas_status,
1749                kind,
1750                signer,
1751                tx_digest,
1752                &mut None,
1753            )
1754        } else {
1755            // One or more `MoveAuthenticator` signatures present — authenticate each and
1756            // then execute the transaction.
1757            // It is supposed that `MoveAuthenticator` availability is checked in
1758            // `SenderSignedData::validity_check`.
1759
1760            debug_assert_eq!(
1761                move_authenticators.len(),
1762                per_authenticator_inputs.len(),
1763                "Move authenticators amount must match the number of authenticator inputs"
1764            );
1765
1766            let per_authenticator_inputs = move_authenticators
1767                .iter()
1768                .zip(per_authenticator_inputs)
1769                .map(
1770                    |(move_authenticator, (authenticator_input_objects, account_object))| {
1771                        // Check basic `object_to_authenticate` preconditions and get its
1772                        // components.
1773                        let (
1774                            auth_account_object_id,
1775                            auth_account_object_seq_number,
1776                            auth_account_object_digest,
1777                        ) = move_authenticator.object_to_authenticate_components()?;
1778
1779                        let signer = move_authenticator.address()?;
1780
1781                        let authenticator_function_ref_for_execution = self.check_move_account(
1782                            auth_account_object_id,
1783                            auth_account_object_seq_number,
1784                            auth_account_object_digest,
1785                            account_object,
1786                            &signer,
1787                        )?;
1788
1789                        Ok((
1790                            authenticator_input_objects,
1791                            authenticator_function_ref_for_execution,
1792                        ))
1793                    },
1794                )
1795                .collect::<IotaResult<Vec<_>>>()?;
1796
1797            let per_authenticator_input_objects = per_authenticator_inputs
1798                .iter()
1799                .map(|(authenticator_input_objects, _)| authenticator_input_objects.clone())
1800                .collect::<Vec<_>>();
1801
1802            // Serialize the TransactionData for the auth context.
1803            let tx_data_bytes =
1804                bcs::to_bytes(tx_data).expect("TransactionData serialization cannot fail");
1805
1806            let (sender_auth_digest, sponsor_auth_digest) =
1807                certificate.data().compute_auth_digests()?;
1808
1809            // Check the `MoveAuthenticator` input objects.
1810            // The `MoveAuthenticator` receiving objects are checked on the signing step.
1811            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
1812            // not a part of the transaction data.
1813            let authenticator_gas_budget = protocol_config.max_auth_gas();
1814            let (
1815                gas_status,
1816                per_authenticator_checked_input_objects,
1817                authenticator_and_tx_checked_input_objects,
1818            ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1819                certificate,
1820                tx_input_objects,
1821                per_authenticator_input_objects,
1822                authenticator_gas_budget,
1823                protocol_config,
1824                reference_gas_price,
1825            )?;
1826
1827            debug_assert_eq!(
1828                move_authenticators.len(),
1829                per_authenticator_checked_input_objects.len(),
1830                "Move authenticators amount must match the number of checked authenticator inputs"
1831            );
1832
1833            let move_authenticators = move_authenticators
1834                .into_iter()
1835                .zip(per_authenticator_inputs)
1836                .zip(per_authenticator_checked_input_objects)
1837                .map(
1838                    |(
1839                        (move_authenticator, (_, authenticator_function_ref_for_execution)),
1840                        authenticator_checked_input_objects,
1841                    )| {
1842                        (
1843                            move_authenticator.to_owned(),
1844                            authenticator_function_ref_for_execution,
1845                            authenticator_checked_input_objects,
1846                        )
1847                    },
1848                )
1849                .collect::<Vec<_>>();
1850
1851            let owned_object_refs = authenticator_and_tx_checked_input_objects
1852                .inner()
1853                .filter_owned_objects();
1854            self.check_owned_locks(&owned_object_refs)?;
1855
1856            let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
1857                extract_auth_fun_refs(signer, gas_data.owner, |address| {
1858                    move_authenticators
1859                        .iter()
1860                        .find(|t| t.0.address().ok().as_ref() == Some(&address))
1861                        .map(|t| t.1.authenticator_function_ref.clone())
1862                });
1863
1864            let auth_context_data = AuthContextData {
1865                transaction_data_bytes: tx_data_bytes,
1866                sender_auth_digest,
1867                sponsor_auth_digest,
1868                sender_authenticator_function_ref,
1869                sponsor_authenticator_function_ref,
1870            };
1871
1872            epoch_store
1873                .executor()
1874                .authenticate_then_execute_transaction_to_effects(
1875                    backing_store,
1876                    protocol_config,
1877                    self.metrics.limits_metrics.clone(),
1878                    self.config
1879                        .expensive_safety_check_config
1880                        .enable_deep_per_tx_iota_conservation_check(),
1881                    self.config.certificate_deny_config.certificate_deny_set(),
1882                    &epoch_id,
1883                    epoch_start_timestamp,
1884                    gas_data,
1885                    gas_status,
1886                    move_authenticators,
1887                    authenticator_and_tx_checked_input_objects,
1888                    kind,
1889                    signer,
1890                    tx_digest,
1891                    auth_context_data,
1892                    &mut None,
1893                )
1894        };
1895
1896        fail_point_if!("cp_execution_nondeterminism", || {
1897            #[cfg(msim)]
1898            self.create_fail_state(certificate, epoch_store, &mut effects);
1899        });
1900
1901        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1902        if elapsed > 0.0 {
1903            self.metrics
1904                .prepare_cert_gas_latency_ratio
1905                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1906        }
1907
1908        Ok((inner_temp_store, effects, execution_error_opt.err()))
1909    }
1910
1911    pub fn prepare_certificate_for_benchmark(
1912        &self,
1913        certificate: &VerifiedExecutableTransaction,
1914        input_objects: InputObjects,
1915        epoch_store: &Arc<AuthorityPerEpochStore>,
1916    ) -> IotaResult<(
1917        InnerTemporaryStore,
1918        TransactionEffects,
1919        Option<ExecutionError>,
1920    )> {
1921        let lock = RwLock::new(epoch_store.epoch());
1922        let execution_guard = lock.try_read().unwrap();
1923
1924        self.prepare_certificate(
1925            &execution_guard,
1926            certificate,
1927            input_objects,
1928            vec![],
1929            epoch_store,
1930        )
1931    }
1932
1933    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1934    /// `VmChecks::Enabled` instead.
1935    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1936    #[allow(clippy::type_complexity)]
1937    pub fn dry_exec_transaction(
1938        &self,
1939        transaction: TransactionData,
1940        transaction_digest: TransactionDigest,
1941    ) -> IotaResult<(
1942        DryRunTransactionBlockResponse,
1943        BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1944        TransactionEffects,
1945        Option<ObjectId>,
1946    )> {
1947        let epoch_store = self.load_epoch_store_one_call_per_task();
1948        if !self.is_fullnode(&epoch_store) {
1949            return Err(IotaError::UnsupportedFeature {
1950                error: "dry-exec is only supported on fullnodes".to_string(),
1951            });
1952        }
1953
1954        if transaction.kind().is_system() {
1955            return Err(IotaError::UnsupportedFeature {
1956                error: "dry-exec does not support system transactions".to_string(),
1957            });
1958        }
1959
1960        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1961    }
1962
1963    #[allow(clippy::type_complexity)]
1964    pub fn dry_exec_transaction_for_benchmark(
1965        &self,
1966        transaction: TransactionData,
1967        transaction_digest: TransactionDigest,
1968    ) -> IotaResult<(
1969        DryRunTransactionBlockResponse,
1970        BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1971        TransactionEffects,
1972        Option<ObjectId>,
1973    )> {
1974        let epoch_store = self.load_epoch_store_one_call_per_task();
1975        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1976    }
1977
1978    #[instrument(level = "trace", skip_all)]
1979    #[allow(clippy::type_complexity)]
1980    fn dry_exec_transaction_impl(
1981        &self,
1982        epoch_store: &AuthorityPerEpochStore,
1983        transaction: TransactionData,
1984        transaction_digest: TransactionDigest,
1985    ) -> IotaResult<(
1986        DryRunTransactionBlockResponse,
1987        BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1988        TransactionEffects,
1989        Option<ObjectId>,
1990    )> {
1991        // Cheap validity checks for a transaction, including input size limits.
1992        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1993
1994        let input_object_kinds = transaction.input_objects()?;
1995        let receiving_object_refs = transaction.receiving_objects();
1996
1997        iota_transaction_checks::deny::check_transaction_for_signing(
1998            &transaction,
1999            &[],
2000            &input_object_kinds,
2001            &receiving_object_refs,
2002            &self.config.transaction_deny_config,
2003            self.get_backing_package_store().as_ref(),
2004        )?;
2005
2006        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2007            // We don't want to cache this transaction since it's a dry run.
2008            None,
2009            &input_object_kinds,
2010            &receiving_object_refs,
2011            epoch_store.epoch(),
2012        )?;
2013
2014        // make a gas object if one was not provided
2015        let mut transaction = transaction;
2016        let reference_gas_price = epoch_store.reference_gas_price();
2017        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2018            let sender = transaction.gas_owner();
2019            let gas_object_id = ObjectId::random();
2020            let gas_object = Object::new_move(
2021                MoveObject::new_gas_coin(
2022                    OBJECT_START_VERSION,
2023                    gas_object_id,
2024                    SIMULATION_GAS_COIN_VALUE,
2025                ),
2026                Owner::Address(sender),
2027                TransactionDigest::GENESIS_MARKER,
2028            );
2029            let gas_object_ref = gas_object.object_ref();
2030            // Add gas object to transaction gas payment
2031            transaction.gas_data_mut().objects = vec![gas_object_ref];
2032            (
2033                iota_transaction_checks::check_transaction_input_with_given_gas(
2034                    epoch_store.protocol_config(),
2035                    reference_gas_price,
2036                    &transaction,
2037                    input_objects,
2038                    receiving_objects,
2039                    gas_object,
2040                    &self.metrics.bytecode_verifier_metrics,
2041                    &self.config.verifier_signing_config,
2042                )?,
2043                Some(gas_object_id),
2044            )
2045        } else {
2046            // `MoveAuthenticator`s are not supported in dry runs, so we set the
2047            // `authenticator_gas_budget` to 0.
2048            let authenticator_gas_budget = 0;
2049
2050            (
2051                iota_transaction_checks::check_transaction_input(
2052                    epoch_store.protocol_config(),
2053                    reference_gas_price,
2054                    &transaction,
2055                    input_objects,
2056                    &receiving_objects,
2057                    &self.metrics.bytecode_verifier_metrics,
2058                    &self.config.verifier_signing_config,
2059                    authenticator_gas_budget,
2060                )?,
2061                None,
2062            )
2063        };
2064
2065        let protocol_config = epoch_store.protocol_config();
2066        let (kind, signer, gas_data) = transaction.execution_parts();
2067
2068        let silent = true;
2069        let executor = iota_execution::executor(protocol_config, silent, None)
2070            .expect("Creating an executor should not fail here");
2071
2072        let expensive_checks = false;
2073        let (inner_temp_store, _, effects, execution_error) = executor
2074            .execute_transaction_to_effects(
2075                self.get_backing_store().as_ref(),
2076                protocol_config,
2077                self.metrics.limits_metrics.clone(),
2078                expensive_checks,
2079                self.config.certificate_deny_config.certificate_deny_set(),
2080                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2081                epoch_store
2082                    .epoch_start_config()
2083                    .epoch_data()
2084                    .epoch_start_timestamp(),
2085                checked_input_objects,
2086                gas_data,
2087                gas_status,
2088                kind,
2089                signer,
2090                transaction_digest,
2091                &mut None,
2092            );
2093        let tx_digest = *effects.transaction_digest();
2094
2095        let module_cache =
2096            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2097
2098        let mut layout_resolver =
2099            epoch_store
2100                .executor()
2101                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2102                    &inner_temp_store,
2103                    self.get_backing_package_store(),
2104                )));
2105        // Returning empty vector here because we recalculate changes in the rpc layer.
2106        let object_changes = Vec::new();
2107
2108        // Returning empty vector here because we recalculate changes in the rpc layer.
2109        let balance_changes = Vec::new();
2110
2111        let written_with_kind = effects
2112            .created()
2113            .into_iter()
2114            .map(|(oref, _)| (oref, WriteKind::Create))
2115            .chain(
2116                effects
2117                    .unwrapped()
2118                    .into_iter()
2119                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2120            )
2121            .chain(
2122                effects
2123                    .mutated()
2124                    .into_iter()
2125                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2126            )
2127            .map(|(oref, kind)| {
2128                let obj = inner_temp_store.written.get(&oref.object_id).unwrap();
2129                // TODO: Avoid clones.
2130                (oref.object_id, (oref, obj.clone(), kind))
2131            })
2132            .collect();
2133
2134        let execution_error_source = execution_error
2135            .as_ref()
2136            .err()
2137            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2138
2139        Ok((
2140            DryRunTransactionBlockResponse {
2141                // to avoid cloning `transaction`, fields are populated in this order
2142                suggested_gas_price: self
2143                    .congestion_tracker
2144                    .get_prediction_suggested_gas_price(&transaction),
2145                input: IotaTransactionBlockData::try_from_with_module_cache(
2146                    transaction,
2147                    &module_cache,
2148                    tx_digest,
2149                )
2150                .map_err(|e| IotaError::TransactionSerialization {
2151                    error: format!(
2152                        "Failed to convert transaction to IotaTransactionBlockData: {e}",
2153                    ),
2154                })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
2155                effects: effects.clone().try_into()?,
2156                events: IotaTransactionBlockEvents::try_from(
2157                    inner_temp_store.events.clone(),
2158                    tx_digest,
2159                    None,
2160                    layout_resolver.as_mut(),
2161                )?,
2162                object_changes,
2163                balance_changes,
2164                execution_error_source,
2165            },
2166            written_with_kind,
2167            effects,
2168            mock_gas,
2169        ))
2170    }
2171
2172    pub fn simulate_transaction(
2173        &self,
2174        mut transaction: TransactionData,
2175        checks: VmChecks,
2176    ) -> IotaResult<SimulateTransactionResult> {
2177        if transaction.kind().is_system() {
2178            return Err(IotaError::UnsupportedFeature {
2179                error: "simulate does not support system transactions".to_string(),
2180            });
2181        }
2182
2183        let epoch_store = self.load_epoch_store_one_call_per_task();
2184        if !self.is_fullnode(&epoch_store) {
2185            return Err(IotaError::UnsupportedFeature {
2186                error: "simulate is only supported on fullnodes".to_string(),
2187            });
2188        }
2189
2190        // Cheap validity checks for a transaction, including input size limits.
2191        // This does not check if gas objects are missing since we may create a
2192        // mock gas object. It checks for other transaction input validity.
2193        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2194
2195        let input_object_kinds = transaction.input_objects()?;
2196        let receiving_object_refs = transaction.receiving_objects();
2197
2198        // Since we need to simulate a validator signing the transaction, the first step
2199        // is to check if some transaction elements are denied.
2200        iota_transaction_checks::deny::check_transaction_for_signing(
2201            &transaction,
2202            &[],
2203            &input_object_kinds,
2204            &receiving_object_refs,
2205            &self.config.transaction_deny_config,
2206            self.get_backing_package_store().as_ref(),
2207        )?;
2208
2209        // Load input and receiving objects
2210        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2211            // We don't want to cache this transaction since it's a simulation.
2212            None,
2213            &input_object_kinds,
2214            &receiving_object_refs,
2215            epoch_store.epoch(),
2216        )?;
2217
2218        // Create a mock gas object if one was not provided
2219        let mock_gas_id = if transaction.gas().is_empty() {
2220            let mock_gas_object = Object::new_move(
2221                MoveObject::new_gas_coin(
2222                    OBJECT_START_VERSION,
2223                    ObjectId::MAX,
2224                    SIMULATION_GAS_COIN_VALUE,
2225                ),
2226                Owner::Address(transaction.gas_data().owner),
2227                TransactionDigest::GENESIS_MARKER,
2228            );
2229            let mock_gas_object_ref = mock_gas_object.object_ref();
2230            transaction.gas_data_mut().objects = vec![mock_gas_object_ref];
2231            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2232            Some(mock_gas_object.id())
2233        } else {
2234            None
2235        };
2236
2237        let protocol_config = epoch_store.protocol_config();
2238
2239        // `MoveAuthenticator`s are not supported in simulation, so we set the
2240        // `authenticator_gas_budget` to 0.
2241        let authenticator_gas_budget = 0;
2242
2243        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
2244        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
2245        let (gas_status, checked_input_objects) = if checks.enabled() {
2246            iota_transaction_checks::check_transaction_input(
2247                protocol_config,
2248                epoch_store.reference_gas_price(),
2249                &transaction,
2250                input_objects,
2251                &receiving_objects,
2252                &self.metrics.bytecode_verifier_metrics,
2253                &self.config.verifier_signing_config,
2254                authenticator_gas_budget,
2255            )?
2256        } else {
2257            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2258                protocol_config,
2259                transaction.kind(),
2260                input_objects,
2261                receiving_objects,
2262            )?;
2263            let gas_status = IotaGasStatus::new(
2264                transaction.gas_budget(),
2265                transaction.gas_price(),
2266                epoch_store.reference_gas_price(),
2267                protocol_config,
2268            )?;
2269
2270            (gas_status, checked_input_objects)
2271        };
2272
2273        // Create a new executor for the simulation
2274        let executor = iota_execution::executor(
2275            protocol_config,
2276            true, // silent
2277            None,
2278        )
2279        .expect("Creating an executor should not fail here");
2280
2281        // Execute the simulation
2282        let (kind, signer, gas_data) = transaction.execution_parts();
2283        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2284            self.get_backing_store().as_ref(),
2285            protocol_config,
2286            self.metrics.limits_metrics.clone(),
2287            false, // expensive_checks
2288            self.config.certificate_deny_config.certificate_deny_set(),
2289            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2290            epoch_store
2291                .epoch_start_config()
2292                .epoch_data()
2293                .epoch_start_timestamp(),
2294            checked_input_objects,
2295            gas_data,
2296            gas_status,
2297            kind,
2298            signer,
2299            transaction.digest(),
2300            checks.disabled(),
2301        );
2302
2303        // In the case of a dev inspect, the execution_result could be filled with some
2304        // values. Else, execution_result is empty in the case of a dry run.
2305        Ok(SimulateTransactionResult {
2306            input_objects: inner_temp_store.input_objects,
2307            output_objects: inner_temp_store.written,
2308            events: effects.events_digest().map(|_| inner_temp_store.events),
2309            effects,
2310            execution_result,
2311            suggested_gas_price: self
2312                .congestion_tracker
2313                .get_prediction_suggested_gas_price(&transaction),
2314            mock_gas_id,
2315        })
2316    }
2317
2318    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
2319    /// `VmChecks::DISABLED` instead.
2320    /// The object ID for gas can be any
2321    /// object ID, even for an uncreated object
2322    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2323    pub async fn dev_inspect_transaction_block(
2324        &self,
2325        sender: IotaAddress,
2326        transaction_kind: TransactionKind,
2327        gas_price: Option<u64>,
2328        gas_budget: Option<u64>,
2329        gas_sponsor: Option<IotaAddress>,
2330        gas_objects: Option<Vec<ObjectRef>>,
2331        show_raw_txn_data_and_effects: Option<bool>,
2332        skip_checks: Option<bool>,
2333    ) -> IotaResult<DevInspectResults> {
2334        let epoch_store = self.load_epoch_store_one_call_per_task();
2335
2336        if !self.is_fullnode(&epoch_store) {
2337            return Err(IotaError::UnsupportedFeature {
2338                error: "dev-inspect is only supported on fullnodes".to_string(),
2339            });
2340        }
2341
2342        if transaction_kind.is_system() {
2343            return Err(IotaError::UnsupportedFeature {
2344                error: "system transactions are not supported".to_string(),
2345            });
2346        }
2347
2348        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2349        let skip_checks = skip_checks.unwrap_or(true);
2350        let reference_gas_price = epoch_store.reference_gas_price();
2351        let protocol_config = epoch_store.protocol_config();
2352        let max_tx_gas = protocol_config.max_tx_gas();
2353
2354        let price = gas_price.unwrap_or(reference_gas_price);
2355        let budget = gas_budget.unwrap_or(max_tx_gas);
2356        let owner = gas_sponsor.unwrap_or(sender);
2357        // Payment might be empty here, but it's fine we'll have to deal with it later
2358        // after reading all the input objects.
2359        let payment = gas_objects.unwrap_or_default();
2360        let mut transaction = TransactionData::V1(TransactionDataV1 {
2361            kind: transaction_kind.clone(),
2362            sender,
2363            gas_payment: GasData {
2364                objects: payment,
2365                owner,
2366                price,
2367                budget,
2368            },
2369            expiration: TransactionExpiration::None,
2370        });
2371
2372        let raw_txn_data = if show_raw_txn_data_and_effects {
2373            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2374                error: "Failed to serialize transaction during dev inspect".to_string(),
2375            })?
2376        } else {
2377            vec![]
2378        };
2379
2380        transaction.validity_check_no_gas_check(protocol_config)?;
2381
2382        let input_object_kinds = transaction.input_objects()?;
2383        let receiving_object_refs = transaction.receiving_objects();
2384
2385        iota_transaction_checks::deny::check_transaction_for_signing(
2386            &transaction,
2387            &[],
2388            &input_object_kinds,
2389            &receiving_object_refs,
2390            &self.config.transaction_deny_config,
2391            self.get_backing_package_store().as_ref(),
2392        )?;
2393
2394        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2395            // We don't want to cache this transaction since it's a dev inspect.
2396            None,
2397            &input_object_kinds,
2398            &receiving_object_refs,
2399            epoch_store.epoch(),
2400        )?;
2401
2402        let (gas_status, checked_input_objects) = if skip_checks {
2403            // If we are skipping checks, then we call the check_dev_inspect_input function
2404            // which will perform only lightweight checks on the transaction
2405            // input. And if the gas field is empty, that means we will
2406            // use the dummy gas object so we need to add it to the input objects vector.
2407            if transaction.gas().is_empty() {
2408                // Create and use a dummy gas object if there is no gas object provided.
2409                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2410                    SIMULATION_GAS_COIN_VALUE,
2411                    transaction.gas_owner(),
2412                );
2413                let gas_object_ref = dummy_gas_object.object_ref();
2414                transaction.gas_data_mut().objects = vec![gas_object_ref];
2415                input_objects.push(ObjectReadResult::new(
2416                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2417                    dummy_gas_object.into(),
2418                ));
2419            }
2420            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2421                protocol_config,
2422                &transaction_kind,
2423                input_objects,
2424                receiving_objects,
2425            )?;
2426            let gas_status = IotaGasStatus::new(
2427                max_tx_gas,
2428                transaction.gas_price(),
2429                reference_gas_price,
2430                protocol_config,
2431            )?;
2432
2433            (gas_status, checked_input_objects)
2434        } else {
2435            // If we are not skipping checks, then we call the check_transaction_input
2436            // function and its dummy gas variant which will perform full
2437            // fledged checks just like a real transaction execution.
2438            if transaction.gas().is_empty() {
2439                // Create and use a dummy gas object if there is no gas object provided.
2440                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2441                    SIMULATION_GAS_COIN_VALUE,
2442                    transaction.gas_owner(),
2443                );
2444                let gas_object_ref = dummy_gas_object.object_ref();
2445                transaction.gas_data_mut().objects = vec![gas_object_ref];
2446                iota_transaction_checks::check_transaction_input_with_given_gas(
2447                    epoch_store.protocol_config(),
2448                    reference_gas_price,
2449                    &transaction,
2450                    input_objects,
2451                    receiving_objects,
2452                    dummy_gas_object,
2453                    &self.metrics.bytecode_verifier_metrics,
2454                    &self.config.verifier_signing_config,
2455                )?
2456            } else {
2457                // `MoveAuthenticator`s are not supported in dev inspects, so we set the
2458                // `authenticator_gas_budget` to 0.
2459                let authenticator_gas_budget = 0;
2460
2461                iota_transaction_checks::check_transaction_input(
2462                    epoch_store.protocol_config(),
2463                    reference_gas_price,
2464                    &transaction,
2465                    input_objects,
2466                    &receiving_objects,
2467                    &self.metrics.bytecode_verifier_metrics,
2468                    &self.config.verifier_signing_config,
2469                    authenticator_gas_budget,
2470                )?
2471            }
2472        };
2473
2474        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2475            .expect("Creating an executor should not fail here");
2476        let gas_data = transaction.gas_data().clone();
2477        let intent_msg = IntentMessage::new(
2478            Intent {
2479                version: IntentVersion::V0,
2480                scope: IntentScope::TransactionData,
2481                app_id: IntentAppId::Iota,
2482            },
2483            transaction,
2484        );
2485        let transaction_digest = TransactionDigest::new(intent_msg.value.digest().into_inner());
2486        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2487            self.get_backing_store().as_ref(),
2488            protocol_config,
2489            self.metrics.limits_metrics.clone(),
2490            // expensive checks
2491            false,
2492            self.config.certificate_deny_config.certificate_deny_set(),
2493            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2494            epoch_store
2495                .epoch_start_config()
2496                .epoch_data()
2497                .epoch_start_timestamp(),
2498            checked_input_objects,
2499            gas_data,
2500            gas_status,
2501            transaction_kind,
2502            sender,
2503            transaction_digest,
2504            skip_checks,
2505        );
2506
2507        let raw_effects = if show_raw_txn_data_and_effects {
2508            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2509                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2510            })?
2511        } else {
2512            vec![]
2513        };
2514
2515        let mut layout_resolver =
2516            epoch_store
2517                .executor()
2518                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2519                    &inner_temp_store,
2520                    self.get_backing_package_store(),
2521                )));
2522
2523        DevInspectResults::new(
2524            effects,
2525            inner_temp_store.events.clone(),
2526            execution_result,
2527            raw_txn_data,
2528            raw_effects,
2529            layout_resolver.as_mut(),
2530        )
2531    }
2532
2533    // Only used for testing because of how epoch store is loaded.
2534    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2535        let epoch_store = self.epoch_store_for_testing();
2536        Ok(epoch_store.reference_gas_price())
2537    }
2538
2539    #[instrument(level = "trace", skip_all)]
2540    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2541        self.get_transaction_cache_reader()
2542            .try_is_tx_already_executed(digest)
2543    }
2544
2545    /// Non-fallible version of `try_is_tx_already_executed`.
2546    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2547        self.try_is_tx_already_executed(digest)
2548            .expect("storage access failed")
2549    }
2550
2551    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2552    #[instrument(level = "debug", skip_all, err)]
2553    fn index_tx(
2554        &self,
2555        indexes: &IndexStore,
2556        digest: &TransactionDigest,
2557        // TODO: index_tx really just need the transaction data here.
2558        cert: &VerifiedExecutableTransaction,
2559        effects: &TransactionEffects,
2560        events: &TransactionEvents,
2561        timestamp_ms: u64,
2562        tx_coins: Option<TxCoins>,
2563        written: &WrittenObjects,
2564        inner_temporary_store: &InnerTemporaryStore,
2565    ) -> IotaResult<u64> {
2566        let changes = self
2567            .process_object_index(effects, written, inner_temporary_store)
2568            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2569
2570        indexes.index_tx(
2571            cert.data().intent_message().value.sender(),
2572            cert.data()
2573                .intent_message()
2574                .value
2575                .input_objects()?
2576                .iter()
2577                .map(|o| o.object_id()),
2578            effects
2579                .all_changed_objects()
2580                .into_iter()
2581                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2582            cert.data()
2583                .intent_message()
2584                .value
2585                .move_calls()
2586                .into_iter()
2587                .map(|(package, module, function)| {
2588                    (*package, module.to_owned(), function.to_owned())
2589                }),
2590            events,
2591            changes,
2592            digest,
2593            timestamp_ms,
2594            tx_coins,
2595        )
2596    }
2597
2598    #[cfg(msim)]
2599    fn create_fail_state(
2600        &self,
2601        certificate: &VerifiedExecutableTransaction,
2602        epoch_store: &Arc<AuthorityPerEpochStore>,
2603        effects: &mut TransactionEffects,
2604    ) {
2605        use std::cell::RefCell;
2606
2607        use iota_types::effects::TransactionEffectsAPIForTesting;
2608        thread_local! {
2609            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2610        }
2611        if !certificate.data().intent_message().value.is_system_tx() {
2612            let committee = epoch_store.committee();
2613            let cur_stake = (**committee).weight(&self.name);
2614            if cur_stake > 0 {
2615                FAIL_STATE.with_borrow_mut(|fail_state| {
2616                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2617                    if fail_state.0 < committee.validity_threshold() {
2618                        fail_state.0 += cur_stake;
2619                        fail_state.1.insert(self.name);
2620                    }
2621
2622                    if fail_state.1.contains(&self.name) {
2623                        info!("cp_exec failing tx");
2624                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2625                    }
2626                });
2627            }
2628        }
2629    }
2630
2631    fn process_object_index(
2632        &self,
2633        effects: &TransactionEffects,
2634        written: &WrittenObjects,
2635        inner_temporary_store: &InnerTemporaryStore,
2636    ) -> IotaResult<ObjectIndexChanges> {
2637        let epoch_store = self.load_epoch_store_one_call_per_task();
2638        let mut layout_resolver =
2639            epoch_store
2640                .executor()
2641                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2642                    inner_temporary_store,
2643                    self.get_backing_package_store(),
2644                )));
2645
2646        let modified_at_version = effects
2647            .modified_at_versions()
2648            .into_iter()
2649            .collect::<HashMap<_, _>>();
2650
2651        let tx_digest = effects.transaction_digest();
2652        let mut deleted_owners = vec![];
2653        let mut deleted_dynamic_fields = vec![];
2654        for object_ref in effects.deleted().into_iter().chain(effects.wrapped()) {
2655            let old_version = modified_at_version.get(&object_ref.object_id).unwrap();
2656            // When we process the index, the latest object hasn't been written yet so
2657            // the old object must be present.
2658            match self.get_owner_at_version(&object_ref.object_id, *old_version).unwrap_or_else(
2659                |e| panic!("tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {} at version {old_version:?}. Err: {e:?}",object_ref.object_id)
2660            ) {
2661                Owner::Address(addr) => deleted_owners.push((addr, object_ref.object_id)),
2662                Owner::Object(object_id) => {
2663                    deleted_dynamic_fields.push((object_id, object_ref.object_id))
2664                }
2665                _ => {}
2666            }
2667        }
2668
2669        let mut new_owners = vec![];
2670        let mut new_dynamic_fields = vec![];
2671
2672        for (oref, owner, kind) in effects.all_changed_objects() {
2673            let id = &oref.object_id;
2674            // For mutated objects, retrieve old owner and delete old index if there is a
2675            // owner change.
2676            if let WriteKind::Mutate = kind {
2677                let Some(old_version) = modified_at_version.get(id) else {
2678                    panic!(
2679                        "tx_digest={tx_digest}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2680                    );
2681                };
2682                // When we process the index, the latest object hasn't been written yet so
2683                // the old object must be present.
2684                let Some(old_object) = self
2685                    .get_object_store()
2686                    .try_get_object_by_key(id, *old_version)?
2687                else {
2688                    panic!(
2689                        "tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {id} at version {old_version:?}"
2690                    );
2691                };
2692                if old_object.owner != owner {
2693                    match old_object.owner {
2694                        Owner::Address(addr) => {
2695                            deleted_owners.push((addr, *id));
2696                        }
2697                        Owner::Object(object_id) => deleted_dynamic_fields.push((object_id, *id)),
2698                        _ => {}
2699                    }
2700                }
2701            }
2702
2703            match owner {
2704                Owner::Address(addr) => {
2705                    // TODO: We can remove the object fetching after we added ObjectType to
2706                    // TransactionEffects
2707                    let new_object = written.get(id).unwrap_or_else(
2708                        || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2709                    );
2710                    assert_eq!(
2711                        new_object.version(),
2712                        oref.version,
2713                        "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2714                        tx_digest,
2715                        id,
2716                        new_object.version(),
2717                        oref.version
2718                    );
2719
2720                    let type_ = new_object
2721                        .type_()
2722                        .map(|type_| ObjectType::Struct(type_.clone()))
2723                        .unwrap_or(ObjectType::Package);
2724
2725                    new_owners.push((
2726                        (addr, *id),
2727                        ObjectInfo {
2728                            object_id: *id,
2729                            version: oref.version,
2730                            digest: oref.digest,
2731                            type_,
2732                            owner,
2733                            previous_transaction: *effects.transaction_digest(),
2734                        },
2735                    ));
2736                }
2737                Owner::Object(owner) => {
2738                    let new_object = written.get(id).unwrap_or_else(
2739                        || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2740                    );
2741                    assert_eq!(
2742                        new_object.version(),
2743                        oref.version,
2744                        "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2745                        tx_digest,
2746                        id,
2747                        new_object.version(),
2748                        oref.version
2749                    );
2750
2751                    let Some(df_info) = self
2752                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2753                        .unwrap_or_else(|e| {
2754                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2755                            None
2756                        }
2757                    )
2758                        else {
2759                            // Skip indexing for non dynamic field objects.
2760                            continue;
2761                        };
2762                    new_dynamic_fields.push(((owner, *id), df_info))
2763                }
2764                _ => {}
2765            }
2766        }
2767
2768        Ok(ObjectIndexChanges {
2769            deleted_owners,
2770            deleted_dynamic_fields,
2771            new_owners,
2772            new_dynamic_fields,
2773        })
2774    }
2775
2776    fn try_create_dynamic_field_info(
2777        &self,
2778        o: &Object,
2779        written: &WrittenObjects,
2780        resolver: &mut dyn LayoutResolver,
2781    ) -> IotaResult<Option<DynamicFieldInfo>> {
2782        // Skip if not a move object
2783        let Some(move_object) = o.data.as_struct_opt().cloned() else {
2784            return Ok(None);
2785        };
2786
2787        // We only index dynamic field objects
2788        if !move_object.struct_tag().is_dynamic_field() {
2789            return Ok(None);
2790        }
2791
2792        let layout = resolver
2793            .get_annotated_layout(move_object.struct_tag())?
2794            .into_layout();
2795
2796        let field =
2797            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2798                IotaError::ObjectDeserialization {
2799                    error: e.to_string(),
2800                }
2801            })?;
2802
2803        let type_ = field.kind;
2804        let name_type: TypeTag = type_tag_core_to_sdk(&field.name_layout.into());
2805        let bcs_name = field.name_bytes.to_owned();
2806
2807        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2808            .map_err(|e| {
2809                warn!("{e}");
2810                IotaError::ObjectDeserialization {
2811                    error: e.to_string(),
2812                }
2813            })?;
2814
2815        let name = DynamicFieldName {
2816            type_: name_type,
2817            value: IotaMoveValue::from(name_value).to_json_value(),
2818        };
2819
2820        let value_metadata = field.value_metadata().map_err(|e| {
2821            warn!("{e}");
2822            IotaError::ObjectDeserialization {
2823                error: e.to_string(),
2824            }
2825        })?;
2826
2827        Ok(Some(match value_metadata {
2828            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2829                name,
2830                bcs_name,
2831                type_,
2832                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2833                object_id: o.id(),
2834                version: o.version(),
2835                digest: o.digest(),
2836            },
2837
2838            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2839                // Find the actual object from storage using the object id obtained from the
2840                // wrapper.
2841
2842                // Try to find the object in the written objects first.
2843                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2844                    (
2845                        object.version(),
2846                        object.digest(),
2847                        object.data.object_type().unwrap().clone(),
2848                    )
2849                } else {
2850                    // If not found, try to find it in the database.
2851                    let object = self
2852                        .get_object_store()
2853                        .try_get_object_by_key(&object_id, o.version())?
2854                        .ok_or_else(|| UserInputError::ObjectNotFound {
2855                            object_id,
2856                            version: Some(o.version()),
2857                        })?;
2858                    let version = object.version();
2859                    let digest = object.digest();
2860                    let object_type = object.data.object_type().unwrap().clone();
2861                    (version, digest, object_type)
2862                };
2863
2864                DynamicFieldInfo {
2865                    name,
2866                    bcs_name,
2867                    type_,
2868                    object_type: object_type.to_string(),
2869                    object_id,
2870                    version,
2871                    digest,
2872                }
2873            }
2874        }))
2875    }
2876
2877    #[instrument(level = "trace", skip_all, err)]
2878    fn post_process_one_tx(
2879        &self,
2880        certificate: &VerifiedExecutableTransaction,
2881        effects: &TransactionEffects,
2882        inner_temporary_store: &InnerTemporaryStore,
2883        epoch_store: &Arc<AuthorityPerEpochStore>,
2884    ) -> IotaResult {
2885        if self.indexes.is_none() {
2886            return Ok(());
2887        }
2888
2889        let _scope = monitored_scope("Execution::post_process_one_tx");
2890
2891        let tx_digest = certificate.digest();
2892        let timestamp_ms = Self::unixtime_now_ms();
2893        let events = &inner_temporary_store.events;
2894        let written = &inner_temporary_store.written;
2895        let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
2896            effects,
2897            inner_temporary_store,
2898            epoch_store,
2899        );
2900
2901        // Index tx
2902        if let Some(indexes) = &self.indexes {
2903            let _ = self
2904                .index_tx(
2905                    indexes.as_ref(),
2906                    tx_digest,
2907                    certificate,
2908                    effects,
2909                    events,
2910                    timestamp_ms,
2911                    tx_coins,
2912                    written,
2913                    inner_temporary_store,
2914                )
2915                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2916                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2917                .expect("Indexing tx should not fail");
2918
2919            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2920            let events = self.make_transaction_block_events(
2921                events.clone(),
2922                *tx_digest,
2923                timestamp_ms,
2924                epoch_store,
2925                inner_temporary_store,
2926            )?;
2927            // Emit events
2928            self.subscription_handler
2929                .process_tx(certificate.data().transaction_data(), &effects, &events)
2930                .tap_ok(|_| {
2931                    self.metrics
2932                        .post_processing_total_tx_had_event_processed
2933                        .inc()
2934                })
2935                .tap_err(|e| {
2936                    warn!(
2937                        ?tx_digest,
2938                        "Post processing - Couldn't process events for tx: {}", e
2939                    )
2940                })?;
2941
2942            self.metrics
2943                .post_processing_total_events_emitted
2944                .inc_by(events.data.len() as u64);
2945        };
2946        Ok(())
2947    }
2948
2949    fn make_transaction_block_events(
2950        &self,
2951        transaction_events: TransactionEvents,
2952        digest: TransactionDigest,
2953        timestamp_ms: u64,
2954        epoch_store: &Arc<AuthorityPerEpochStore>,
2955        inner_temporary_store: &InnerTemporaryStore,
2956    ) -> IotaResult<IotaTransactionBlockEvents> {
2957        let mut layout_resolver =
2958            epoch_store
2959                .executor()
2960                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2961                    inner_temporary_store,
2962                    self.get_backing_package_store(),
2963                )));
2964        IotaTransactionBlockEvents::try_from(
2965            transaction_events,
2966            digest,
2967            Some(timestamp_ms),
2968            layout_resolver.as_mut(),
2969        )
2970    }
2971
2972    pub fn unixtime_now_ms() -> u64 {
2973        let now = SystemTime::now()
2974            .duration_since(UNIX_EPOCH)
2975            .expect("Time went backwards")
2976            .as_millis();
2977        u64::try_from(now).expect("Travelling in time machine")
2978    }
2979
2980    #[instrument(level = "trace", skip_all)]
2981    pub async fn handle_transaction_info_request(
2982        &self,
2983        request: TransactionInfoRequest,
2984    ) -> IotaResult<TransactionInfoResponse> {
2985        let epoch_store = self.load_epoch_store_one_call_per_task();
2986        let (transaction, status) = self
2987            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2988            .ok_or(IotaError::TransactionNotFound {
2989                digest: request.transaction_digest,
2990            })?;
2991        Ok(TransactionInfoResponse {
2992            transaction,
2993            status,
2994        })
2995    }
2996
2997    #[instrument(level = "trace", skip_all)]
2998    pub async fn handle_object_info_request(
2999        &self,
3000        request: ObjectInfoRequest,
3001    ) -> IotaResult<ObjectInfoResponse> {
3002        let epoch_store = self.load_epoch_store_one_call_per_task();
3003
3004        let requested_object_seq = match request.request_kind {
3005            ObjectInfoRequestKind::LatestObjectInfo => {
3006                self.try_get_object_or_tombstone(request.object_id)
3007                    .await?
3008                    .ok_or_else(|| {
3009                        IotaError::from(UserInputError::ObjectNotFound {
3010                            object_id: request.object_id,
3011                            version: None,
3012                        })
3013                    })?
3014                    .version
3015            }
3016            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3017        };
3018
3019        let object = self
3020            .get_object_store()
3021            .try_get_object_by_key(&request.object_id, requested_object_seq)?
3022            .ok_or_else(|| {
3023                IotaError::from(UserInputError::ObjectNotFound {
3024                    object_id: request.object_id,
3025                    version: Some(requested_object_seq),
3026                })
3027            })?;
3028
3029        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3030            (request.generate_layout, object.data.as_struct_opt())
3031        {
3032            Some(into_struct_layout(
3033                epoch_store
3034                    .executor()
3035                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3036                    .get_annotated_layout(move_obj.struct_tag())?,
3037            )?)
3038        } else {
3039            None
3040        };
3041
3042        let lock = if !object.is_address_owned() {
3043            // Only address owned objects have locks.
3044            None
3045        } else {
3046            self.get_transaction_lock(&object.object_ref(), &epoch_store)
3047                .await?
3048                .map(|s| s.into_inner())
3049        };
3050
3051        Ok(ObjectInfoResponse {
3052            object,
3053            layout,
3054            lock_for_debugging: lock,
3055        })
3056    }
3057
3058    #[instrument(level = "trace", skip_all)]
3059    pub fn handle_checkpoint_request(
3060        &self,
3061        request: &CheckpointRequest,
3062    ) -> IotaResult<CheckpointResponse> {
3063        let summary = if request.certified {
3064            let summary = match request.sequence_number {
3065                Some(seq) => self
3066                    .checkpoint_store
3067                    .get_checkpoint_by_sequence_number(seq)?,
3068                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3069            }
3070            .map(|v| v.into_inner());
3071            summary.map(CheckpointSummaryResponse::Certified)
3072        } else {
3073            let summary = match request.sequence_number {
3074                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3075                None => self
3076                    .checkpoint_store
3077                    .get_latest_locally_computed_checkpoint()?,
3078            };
3079            summary.map(CheckpointSummaryResponse::Pending)
3080        };
3081        let contents = match &summary {
3082            Some(s) => self
3083                .checkpoint_store
3084                .get_checkpoint_contents(&s.content_digest())?,
3085            None => None,
3086        };
3087        Ok(CheckpointResponse {
3088            checkpoint: summary,
3089            contents,
3090        })
3091    }
3092
3093    fn check_protocol_version(
3094        supported_protocol_versions: SupportedProtocolVersions,
3095        current_version: ProtocolVersion,
3096    ) {
3097        info!("current protocol version is now {:?}", current_version);
3098        info!("supported versions are: {:?}", supported_protocol_versions);
3099        if !supported_protocol_versions.is_version_supported(current_version) {
3100            let msg = format!(
3101                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
3102            );
3103
3104            error!("{}", msg);
3105            eprintln!("{msg}");
3106
3107            #[cfg(not(msim))]
3108            std::process::exit(1);
3109
3110            #[cfg(msim)]
3111            iota_simulator::task::shutdown_current_node();
3112        }
3113    }
3114
3115    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
3116    #[expect(clippy::too_many_arguments)]
3117    pub async fn new(
3118        name: AuthorityName,
3119        secret: StableSyncAuthoritySigner,
3120        supported_protocol_versions: SupportedProtocolVersions,
3121        store: Arc<AuthorityStore>,
3122        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3123        epoch_store: Arc<AuthorityPerEpochStore>,
3124        committee_store: Arc<CommitteeStore>,
3125        indexes: Option<Arc<IndexStore>>,
3126        grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
3127        checkpoint_store: Arc<CheckpointStore>,
3128        prometheus_registry: &Registry,
3129        genesis_objects: &[Object],
3130        db_checkpoint_config: &DBCheckpointConfig,
3131        config: NodeConfig,
3132        archive_readers: ArchiveReaderBalancer,
3133        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3134        chain_identifier: ChainIdentifier,
3135        pruner_db: Option<Arc<AuthorityPrunerTables>>,
3136        checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
3137        policy_config: Option<PolicyConfig>,
3138        firewall_config: Option<RemoteFirewallConfig>,
3139    ) -> Arc<Self> {
3140        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3141
3142        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3143        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3144        let transaction_manager = Arc::new(TransactionManager::new(
3145            execution_cache_trait_pointers.object_cache_reader.clone(),
3146            execution_cache_trait_pointers
3147                .transaction_cache_reader
3148                .clone(),
3149            &epoch_store,
3150            tx_ready_certificates,
3151            metrics.clone(),
3152        ));
3153        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3154
3155        let authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3156            epoch_store.get_parent_path(),
3157            config
3158                .authority_store_pruning_config
3159                .num_latest_epoch_dbs_to_retain,
3160        )
3161        .await;
3162        let _pruner = AuthorityStorePruner::new(
3163            store.perpetual_tables.clone(),
3164            checkpoint_store.clone(),
3165            grpc_indexes_store.clone(),
3166            indexes.clone(),
3167            config.authority_store_pruning_config.clone(),
3168            epoch_store.committee().authority_exists(&name),
3169            epoch_store.epoch_start_state().epoch_duration_ms(),
3170            prometheus_registry,
3171            archive_readers,
3172            pruner_db,
3173            checkpoint_progress_tracker.clone(),
3174        );
3175        let input_loader =
3176            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3177        let epoch = epoch_store.epoch();
3178        let rgp = epoch_store.reference_gas_price();
3179        let traffic_controller_metrics =
3180            Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3181        let traffic_controller = if let Some(policy_config) = policy_config {
3182            Some(Arc::new(
3183                TrafficController::init(
3184                    policy_config,
3185                    traffic_controller_metrics,
3186                    firewall_config.clone(),
3187                )
3188                .await,
3189            ))
3190        } else {
3191            None
3192        };
3193        let state = Arc::new(AuthorityState {
3194            name,
3195            secret,
3196            execution_lock: RwLock::new(epoch),
3197            epoch_store: ArcSwap::new(epoch_store.clone()),
3198            input_loader,
3199            execution_cache_trait_pointers,
3200            indexes,
3201            grpc_indexes_store,
3202            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3203            checkpoint_store,
3204            committee_store,
3205            transaction_manager,
3206            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3207            metrics,
3208            _pruner,
3209            authority_per_epoch_pruner,
3210            checkpoint_progress_tracker,
3211            db_checkpoint_config: db_checkpoint_config.clone(),
3212            config,
3213            overload_info: AuthorityOverloadInfo::default(),
3214            validator_tx_finalizer,
3215            chain_identifier,
3216            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3217            traffic_controller,
3218        });
3219
3220        // Start a task to execute ready certificates.
3221        let authority_state = Arc::downgrade(&state);
3222        spawn_monitored_task!(execution_process(
3223            authority_state,
3224            rx_ready_certificates,
3225            rx_execution_shutdown,
3226        ));
3227        spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3228
3229        // TODO: This doesn't belong to the constructor of AuthorityState.
3230        state
3231            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3232            .expect("Error indexing genesis objects.");
3233
3234        state
3235    }
3236
3237    pub fn epoch_db_pruner(&self) -> &AuthorityPerEpochStorePruner {
3238        &self.authority_per_epoch_pruner
3239    }
3240
3241    // TODO: Consolidate our traits to reduce the number of methods here.
3242    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3243        &self.execution_cache_trait_pointers.object_cache_reader
3244    }
3245
3246    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3247        &self.execution_cache_trait_pointers.transaction_cache_reader
3248    }
3249
3250    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3251        &self.execution_cache_trait_pointers.cache_writer
3252    }
3253
3254    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3255        &self.execution_cache_trait_pointers.backing_store
3256    }
3257
3258    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3259        &self.execution_cache_trait_pointers.backing_package_store
3260    }
3261
3262    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3263        &self.execution_cache_trait_pointers.object_store
3264    }
3265
3266    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3267        &self.execution_cache_trait_pointers.reconfig_api
3268    }
3269
3270    pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3271        &self.execution_cache_trait_pointers.global_state_hash_store
3272    }
3273
3274    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3275        &self.execution_cache_trait_pointers.checkpoint_cache
3276    }
3277
3278    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3279        &self.execution_cache_trait_pointers.state_sync_store
3280    }
3281
3282    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3283        &self.execution_cache_trait_pointers.cache_commit
3284    }
3285
3286    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3287        self.execution_cache_trait_pointers
3288            .testing_api
3289            .database_for_testing()
3290    }
3291
3292    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3293        &self,
3294        config: NodeConfig,
3295        metrics: Arc<AuthorityStorePruningMetrics>,
3296    ) -> anyhow::Result<()> {
3297        let archive_readers =
3298            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3299        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3300            &self.database_for_testing().perpetual_tables,
3301            &self.checkpoint_store,
3302            self.grpc_indexes_store.as_deref(),
3303            None,
3304            config.authority_store_pruning_config,
3305            metrics,
3306            archive_readers,
3307            EPOCH_DURATION_MS_FOR_TESTING,
3308            self.checkpoint_progress_tracker.as_ref(),
3309        )
3310        .await
3311    }
3312
3313    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3314        &self.transaction_manager
3315    }
3316
3317    /// Adds transactions / certificates to transaction manager for ordered
3318    /// execution.
3319    pub fn enqueue_transactions_for_execution(
3320        &self,
3321        txns: Vec<VerifiedExecutableTransaction>,
3322        epoch_store: &Arc<AuthorityPerEpochStore>,
3323    ) {
3324        self.transaction_manager.enqueue(txns, epoch_store)
3325    }
3326
3327    /// Adds certificates to transaction manager for ordered execution.
3328    pub fn enqueue_certificates_for_execution(
3329        &self,
3330        certs: Vec<VerifiedCertificate>,
3331        epoch_store: &Arc<AuthorityPerEpochStore>,
3332    ) {
3333        self.transaction_manager
3334            .enqueue_certificates(certs, epoch_store)
3335    }
3336
3337    pub fn enqueue_with_expected_effects_digest(
3338        &self,
3339        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3340        epoch_store: &AuthorityPerEpochStore,
3341    ) {
3342        self.transaction_manager
3343            .enqueue_with_expected_effects_digest(certs, epoch_store)
3344    }
3345
3346    fn create_owner_index_if_empty(
3347        &self,
3348        genesis_objects: &[Object],
3349        epoch_store: &Arc<AuthorityPerEpochStore>,
3350    ) -> IotaResult {
3351        let Some(index_store) = &self.indexes else {
3352            return Ok(());
3353        };
3354        if !index_store.is_empty() {
3355            return Ok(());
3356        }
3357
3358        let mut new_owners = vec![];
3359        let mut new_dynamic_fields = vec![];
3360        let mut layout_resolver = epoch_store
3361            .executor()
3362            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3363        for o in genesis_objects.iter() {
3364            match o.owner {
3365                Owner::Address(addr) => {
3366                    new_owners.push(((addr, o.id()), ObjectInfo::new(&o.object_ref(), o)))
3367                }
3368                Owner::Object(object_id) => {
3369                    let id = o.id();
3370                    let info = match self.try_create_dynamic_field_info(
3371                        o,
3372                        &BTreeMap::new(),
3373                        layout_resolver.as_mut(),
3374                    ) {
3375                        Ok(Some(info)) => info,
3376                        Ok(None) => continue,
3377                        Err(IotaError::UserInput {
3378                            error:
3379                                UserInputError::ObjectNotFound {
3380                                    object_id: not_found_id,
3381                                    version,
3382                                },
3383                        }) => {
3384                            warn!(
3385                                ?not_found_id,
3386                                ?version,
3387                                object_owner=?object_id,
3388                                field=?id,
3389                                "Skipping dynamic field: referenced genesis object not found"
3390                            );
3391                            continue;
3392                        }
3393                        Err(e) => return Err(e),
3394                    };
3395                    new_dynamic_fields.push(((object_id, id), info));
3396                }
3397                _ => {}
3398            }
3399        }
3400
3401        index_store.insert_genesis_objects(ObjectIndexChanges {
3402            deleted_owners: vec![],
3403            deleted_dynamic_fields: vec![],
3404            new_owners,
3405            new_dynamic_fields,
3406        })
3407    }
3408
3409    /// Attempts to acquire execution lock for an executable transaction.
3410    /// Returns the lock if the transaction is matching current executed epoch
3411    /// Returns None otherwise
3412    pub fn execution_lock_for_executable_transaction(
3413        &self,
3414        transaction: &VerifiedExecutableTransaction,
3415    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3416        let lock = self
3417            .execution_lock
3418            .try_read()
3419            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3420        if *lock == transaction.auth_sig().epoch() {
3421            Ok(lock)
3422        } else {
3423            Err(IotaError::WrongEpoch {
3424                expected_epoch: *lock,
3425                actual_epoch: transaction.auth_sig().epoch(),
3426            })
3427        }
3428    }
3429
3430    /// Acquires the execution lock for the duration of a transaction signing
3431    /// request. This prevents reconfiguration from starting until we are
3432    /// finished handling the signing request. Otherwise, in-memory lock
3433    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3434    /// while we are attempting to acquire locks for the transaction.
3435    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3436        self.execution_lock
3437            .try_read()
3438            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3439    }
3440
3441    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3442        self.execution_lock.write().await
3443    }
3444
3445    #[instrument(level = "error", skip_all)]
3446    pub async fn reconfigure(
3447        &self,
3448        cur_epoch_store: &AuthorityPerEpochStore,
3449        supported_protocol_versions: SupportedProtocolVersions,
3450        new_committee: Committee,
3451        epoch_start_configuration: EpochStartConfiguration,
3452        state_hasher: Arc<GlobalStateHasher>,
3453        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3454        epoch_supply_change: i64,
3455        epoch_last_checkpoint: CheckpointSequenceNumber,
3456    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3457        Self::check_protocol_version(
3458            supported_protocol_versions,
3459            epoch_start_configuration
3460                .epoch_start_state()
3461                .protocol_version(),
3462        );
3463        self.metrics.reset_on_reconfigure();
3464        self.committee_store.insert_new_committee(&new_committee)?;
3465
3466        // Wait until no transactions are being executed.
3467        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3468
3469        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3470        cur_epoch_store.epoch_terminated().await;
3471
3472        let highest_locally_built_checkpoint_seq = self
3473            .checkpoint_store
3474            .get_latest_locally_computed_checkpoint()?
3475            .map(|c| *c.sequence_number())
3476            .unwrap_or(0);
3477
3478        assert!(
3479            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3480            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3481        );
3482        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint
3483            || self.is_fullnode(cur_epoch_store)
3484        {
3485            // if we built the last checkpoint locally (as opposed to receiving it from a
3486            // peer), then all shared_version_assignments except the one for the
3487            // ChangeEpoch transaction should have been removed
3488            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3489            // Due to (otherwise harmless) race conditions between CheckpointExecutor and
3490            // ConsensusHandler, we actually can't guarantee that all
3491            // shared_version_assignments have been removed. However,
3492            // typically at most 2 or 3 are left over. We leave this check here in order to
3493            // catch complete failure of cleanup which would cause a memory
3494            // leak.
3495            if num_shared_version_assignments > 10 {
3496                // If this happens in prod, we have a memory leak, but not a correctness issue.
3497                debug_fatal!(
3498                    "all shared_version_assignments should have been removed \
3499                    (num_shared_version_assignments: {num_shared_version_assignments})"
3500                );
3501            }
3502        }
3503
3504        // Safe to reconfigure now. No transactions are being executed,
3505        // and no epoch-specific tasks are running.
3506
3507        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3508        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3509        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3510            .await?;
3511        self.get_reconfig_api()
3512            .clear_state_end_of_epoch(&execution_lock);
3513        self.check_system_consistency(
3514            cur_epoch_store,
3515            state_hasher,
3516            expensive_safety_check_config,
3517            epoch_supply_change,
3518        )?;
3519        self.get_reconfig_api()
3520            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3521        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3522            if self
3523                .db_checkpoint_config
3524                .perform_db_checkpoints_at_epoch_end
3525            {
3526                let checkpoint_indexes = self
3527                    .db_checkpoint_config
3528                    .perform_index_db_checkpoints_at_epoch_end
3529                    .unwrap_or(false);
3530                let current_epoch = cur_epoch_store.epoch();
3531                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3532                self.checkpoint_all_dbs(
3533                    &epoch_checkpoint_path,
3534                    cur_epoch_store,
3535                    checkpoint_indexes,
3536                )?;
3537            }
3538        }
3539
3540        let new_epoch = new_committee.epoch;
3541        let new_epoch_store = self
3542            .reopen_epoch_db(
3543                cur_epoch_store,
3544                new_committee,
3545                epoch_start_configuration,
3546                expensive_safety_check_config,
3547                epoch_last_checkpoint,
3548            )
3549            .await?;
3550        assert_eq!(new_epoch_store.epoch(), new_epoch);
3551        self.transaction_manager.reconfigure(new_epoch);
3552        *execution_lock = new_epoch;
3553        // drop execution_lock after epoch store was updated
3554        // see also assert in AuthorityState::process_certificate
3555        // on the epoch store and execution lock epoch match
3556        Ok(new_epoch_store)
3557    }
3558
3559    /// Advance the epoch store to the next epoch for testing only.
3560    /// This only manually sets all the places where we have the epoch number.
3561    /// It doesn't properly reconfigure the node, hence should be only used for
3562    /// testing.
3563    pub async fn reconfigure_for_testing(&self) {
3564        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3565        let epoch_store = self.epoch_store_for_testing().clone();
3566        let protocol_config = epoch_store.protocol_config().clone();
3567        // The current protocol config used in the epoch store may have been overridden
3568        // and diverged from the protocol config definitions. That override may
3569        // have now been dropped when the initial guard was dropped. We reapply
3570        // the override before creating the new epoch store, to make sure that
3571        // the new epoch store has the same protocol config as the current one.
3572        // Since this is for testing only, we mostly like to keep the protocol config
3573        // the same across epochs.
3574        let _guard =
3575            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3576        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3577            self.get_backing_package_store().clone(),
3578            &self.config.expensive_safety_check_config,
3579            self.checkpoint_store
3580                .get_epoch_last_checkpoint(epoch_store.epoch())
3581                .unwrap()
3582                .map(|c| *c.sequence_number())
3583                .unwrap_or_default(),
3584        );
3585        let new_epoch = new_epoch_store.epoch();
3586        self.transaction_manager.reconfigure(new_epoch);
3587        self.epoch_store.store(new_epoch_store);
3588        epoch_store.epoch_terminated().await;
3589        *execution_lock = new_epoch;
3590    }
3591
3592    #[instrument(level = "error", skip_all)]
3593    fn check_system_consistency(
3594        &self,
3595        cur_epoch_store: &AuthorityPerEpochStore,
3596        state_hasher: Arc<GlobalStateHasher>,
3597        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3598        epoch_supply_change: i64,
3599    ) -> IotaResult<()> {
3600        info!(
3601            "Performing iota conservation consistency check for epoch {}",
3602            cur_epoch_store.epoch()
3603        );
3604
3605        if cfg!(debug_assertions) {
3606            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3607        }
3608
3609        self.get_reconfig_api()
3610            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3611
3612        // check for root state hash consistency with live object set
3613        if expensive_safety_check_config.enable_state_consistency_check() {
3614            info!(
3615                "Performing state consistency check for epoch {}",
3616                cur_epoch_store.epoch()
3617            );
3618            self.expensive_check_is_consistent_state(
3619                state_hasher,
3620                cur_epoch_store,
3621                cfg!(debug_assertions), // panic in debug mode only
3622            );
3623        }
3624
3625        if expensive_safety_check_config.enable_secondary_index_checks() {
3626            if let Some(indexes) = self.indexes.clone() {
3627                verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
3628                    .expect("secondary indexes are inconsistent");
3629            }
3630        }
3631
3632        Ok(())
3633    }
3634
3635    fn expensive_check_is_consistent_state(
3636        &self,
3637        state_hasher: Arc<GlobalStateHasher>,
3638        cur_epoch_store: &AuthorityPerEpochStore,
3639        panic: bool,
3640    ) {
3641        let live_object_set_hash = state_hasher.digest_live_object_set();
3642
3643        let root_state_hash: ECMHLiveObjectSetDigest = self
3644            .get_global_state_hash_store()
3645            .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
3646            .expect("Retrieving root state hash cannot fail")
3647            .expect("Root state hash for epoch must exist")
3648            .1
3649            .digest()
3650            .into();
3651
3652        let is_inconsistent = root_state_hash != live_object_set_hash;
3653        if is_inconsistent {
3654            if panic {
3655                panic!(
3656                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3657                );
3658            } else {
3659                error!(
3660                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3661                    root_state_hash, live_object_set_hash
3662                );
3663            }
3664        } else {
3665            info!("State consistency check passed");
3666        }
3667
3668        if !panic {
3669            state_hasher.set_inconsistent_state(is_inconsistent);
3670        }
3671    }
3672
3673    pub fn current_epoch_for_testing(&self) -> EpochId {
3674        self.epoch_store_for_testing().epoch()
3675    }
3676
3677    #[instrument(level = "error", skip_all)]
3678    pub fn checkpoint_all_dbs(
3679        &self,
3680        checkpoint_path: &Path,
3681        cur_epoch_store: &AuthorityPerEpochStore,
3682        checkpoint_indexes: bool,
3683    ) -> IotaResult {
3684        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3685        let current_epoch = cur_epoch_store.epoch();
3686
3687        if checkpoint_path.exists() {
3688            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3689            return Ok(());
3690        }
3691
3692        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3693        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3694
3695        if checkpoint_path_tmp.exists() {
3696            fs::remove_dir_all(&checkpoint_path_tmp)
3697                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3698        }
3699
3700        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3701        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3702
3703        // NOTE: Do not change the order of invoking these checkpoint calls
3704        // We want to snapshot checkpoint db first to not race with state sync
3705        self.checkpoint_store
3706            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3707
3708        self.get_reconfig_api()
3709            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3710
3711        self.committee_store
3712            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3713
3714        if checkpoint_indexes {
3715            if let Some(indexes) = self.indexes.as_ref() {
3716                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3717            }
3718            if let Some(grpc_indexes_store) = self.grpc_indexes_store.as_ref() {
3719                grpc_indexes_store.checkpoint_db(&checkpoint_path_tmp.join(GRPC_INDEXES_DIR))?;
3720            }
3721        }
3722
3723        fs::rename(checkpoint_path_tmp, checkpoint_path)
3724            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3725        Ok(())
3726    }
3727
3728    /// Load the current epoch store. This can change during reconfiguration. To
3729    /// ensure that we never end up accessing different epoch stores in a
3730    /// single task, we need to make sure that this is called once per task.
3731    /// Each call needs to be carefully audited to ensure it is
3732    /// the case. This also means we should minimize the number of call-sites.
3733    /// Only call it when there is no way to obtain it from somewhere else.
3734    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3735        self.epoch_store.load()
3736    }
3737
3738    // Load the epoch store, should be used in tests only.
3739    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3740        self.load_epoch_store_one_call_per_task()
3741    }
3742
3743    pub fn clone_committee_for_testing(&self) -> Committee {
3744        Committee::clone(self.epoch_store_for_testing().committee())
3745    }
3746
3747    #[instrument(level = "trace", skip_all)]
3748    pub async fn try_get_object(&self, object_id: &ObjectId) -> IotaResult<Option<Object>> {
3749        self.get_object_store()
3750            .try_get_object(object_id)
3751            .map_err(Into::into)
3752    }
3753
3754    /// Non-fallible version of `try_get_object`.
3755    pub async fn get_object(&self, object_id: &ObjectId) -> Option<Object> {
3756        self.try_get_object(object_id)
3757            .await
3758            .expect("storage access failed")
3759    }
3760
3761    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3762        Ok(self
3763            .try_get_object(&ObjectId::SYSTEM)
3764            .await?
3765            .expect("system package should always exist")
3766            .object_ref())
3767    }
3768
3769    // This function is only used for testing.
3770    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3771        self.get_object_cache_reader()
3772            .try_get_iota_system_state_object_unsafe()
3773    }
3774
3775    #[instrument(level = "trace", skip_all)]
3776    pub fn get_checkpoint_by_sequence_number(
3777        &self,
3778        sequence_number: CheckpointSequenceNumber,
3779    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3780        Ok(self
3781            .checkpoint_store
3782            .get_checkpoint_by_sequence_number(sequence_number)?)
3783    }
3784
3785    /// Wait for the given transactions to be included in a checkpoint.
3786    ///
3787    /// Returns a mapping from transaction digest to
3788    /// `(checkpoint_sequence_number, checkpoint_timestamp_ms)`.
3789    /// On timeout, returns partial results for any transactions that were
3790    /// already checkpointed.
3791    pub async fn wait_for_checkpoint_inclusion(
3792        &self,
3793        digests: &[TransactionDigest],
3794        timeout: Duration,
3795    ) -> IotaResult<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>> {
3796        let epoch_store = self.load_epoch_store_one_call_per_task();
3797
3798        // Local cache so multiple transactions in the same checkpoint only
3799        // trigger a single checkpoint summary lookup.
3800        let mut checkpoint_timestamp_cache = HashMap::<CheckpointSequenceNumber, u64>::new();
3801
3802        let results = epoch_store
3803            .wait_for_transactions_in_checkpoint_with_timeout(digests, timeout, |seq| {
3804                *checkpoint_timestamp_cache.entry(seq).or_insert_with(|| {
3805                    self.get_checkpoint_by_sequence_number(seq)
3806                        .ok()
3807                        .flatten()
3808                        .map(|c| c.timestamp_ms)
3809                        .unwrap_or(0)
3810                })
3811            })
3812            .await?;
3813
3814        Ok(digests
3815            .iter()
3816            .copied()
3817            .zip(results)
3818            .filter_map(|(digest, opt)| opt.map(|seq_and_ts| (digest, seq_and_ts)))
3819            .collect())
3820    }
3821
3822    #[instrument(level = "trace", skip_all)]
3823    pub fn get_transaction_checkpoint_for_tests(
3824        &self,
3825        digest: &TransactionDigest,
3826        epoch_store: &AuthorityPerEpochStore,
3827    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3828        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3829        let Some(checkpoint) = checkpoint else {
3830            return Ok(None);
3831        };
3832        let checkpoint = self
3833            .checkpoint_store
3834            .get_checkpoint_by_sequence_number(checkpoint)?;
3835        Ok(checkpoint)
3836    }
3837
3838    #[instrument(level = "trace", skip_all)]
3839    pub fn get_object_read(&self, object_id: &ObjectId) -> IotaResult<ObjectRead> {
3840        Ok(
3841            match self
3842                .get_object_cache_reader()
3843                .try_get_latest_object_or_tombstone(*object_id)?
3844            {
3845                Some((_, ObjectOrTombstone::Object(object))) => {
3846                    let layout = self.get_object_layout(&object)?;
3847                    ObjectRead::Exists(object.object_ref(), object, layout)
3848                }
3849                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3850                None => ObjectRead::NotExists(*object_id),
3851            },
3852        )
3853    }
3854
3855    /// Chain Identifier is the digest of the genesis checkpoint.
3856    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3857        self.chain_identifier
3858    }
3859
3860    #[instrument(level = "trace", skip_all)]
3861    pub fn get_move_object<T>(&self, object_id: &ObjectId) -> IotaResult<T>
3862    where
3863        T: DeserializeOwned,
3864    {
3865        let o = self.get_object_read(object_id)?.into_object()?;
3866        if let Some(move_object) = o.data.as_struct_opt() {
3867            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3868                IotaError::ObjectDeserialization {
3869                    error: format!("{e}"),
3870                }
3871            })?)
3872        } else {
3873            Err(IotaError::ObjectDeserialization {
3874                error: format!("Provided object : [{object_id}] is not a Move object."),
3875            })
3876        }
3877    }
3878
3879    /// This function aims to serve rpc reads on past objects and
3880    /// we don't expect it to be called for other purposes.
3881    /// Depending on the object pruning policies that will be enforced in the
3882    /// future there is no software-level guarantee/SLA to retrieve an object
3883    /// with an old version even if it exists/existed.
3884    #[instrument(level = "trace", skip_all)]
3885    pub fn get_past_object_read(
3886        &self,
3887        object_id: &ObjectId,
3888        version: SequenceNumber,
3889    ) -> IotaResult<PastObjectRead> {
3890        // Firstly we see if the object ever existed by getting its latest data
3891        let Some(obj_ref) = self
3892            .get_object_cache_reader()
3893            .try_get_latest_object_ref_or_tombstone(*object_id)?
3894        else {
3895            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3896        };
3897
3898        if version > obj_ref.version {
3899            return Ok(PastObjectRead::VersionTooHigh {
3900                object_id: *object_id,
3901                asked_version: version,
3902                latest_version: obj_ref.version,
3903            });
3904        }
3905
3906        if version < obj_ref.version {
3907            // Read past objects
3908            return Ok(match self.read_object_at_version(object_id, version)? {
3909                Some((object, layout)) => {
3910                    let obj_ref = object.object_ref();
3911                    PastObjectRead::VersionFound(obj_ref, object, layout)
3912                }
3913
3914                None => PastObjectRead::VersionNotFound(*object_id, version),
3915            });
3916        }
3917
3918        if !obj_ref.digest.is_object_alive() {
3919            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3920        }
3921
3922        match self.read_object_at_version(object_id, obj_ref.version)? {
3923            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3924            None => {
3925                error!(
3926                    "Object with in parent_entry is missing from object store, datastore is \
3927                     inconsistent",
3928                );
3929                Err(UserInputError::ObjectNotFound {
3930                    object_id: *object_id,
3931                    version: Some(obj_ref.version),
3932                }
3933                .into())
3934            }
3935        }
3936    }
3937
3938    #[instrument(level = "trace", skip_all)]
3939    fn read_object_at_version(
3940        &self,
3941        object_id: &ObjectId,
3942        version: SequenceNumber,
3943    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3944        let Some(object) = self
3945            .get_object_cache_reader()
3946            .try_get_object_by_key(object_id, version)?
3947        else {
3948            return Ok(None);
3949        };
3950
3951        let layout = self.get_object_layout(&object)?;
3952        Ok(Some((object, layout)))
3953    }
3954
3955    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3956        let layout = object
3957            .data
3958            .as_struct_opt()
3959            .map(|object| {
3960                into_struct_layout(
3961                    self.load_epoch_store_one_call_per_task()
3962                        .executor()
3963                        // TODO(cache) - must read through cache
3964                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3965                        .get_annotated_layout(object.struct_tag())?,
3966                )
3967            })
3968            .transpose()?;
3969        Ok(layout)
3970    }
3971
3972    fn get_owner_at_version(
3973        &self,
3974        object_id: &ObjectId,
3975        version: SequenceNumber,
3976    ) -> IotaResult<Owner> {
3977        self.get_object_store()
3978            .try_get_object_by_key(object_id, version)?
3979            .ok_or_else(|| {
3980                IotaError::from(UserInputError::ObjectNotFound {
3981                    object_id: *object_id,
3982                    version: Some(version),
3983                })
3984            })
3985            .map(|o| o.owner)
3986    }
3987
3988    #[instrument(level = "trace", skip_all)]
3989    pub fn get_owner_objects(
3990        &self,
3991        owner: IotaAddress,
3992        // If `Some`, the query will start from the next item after the specified cursor
3993        cursor: Option<ObjectId>,
3994        limit: usize,
3995        filter: Option<IotaObjectDataFilter>,
3996    ) -> IotaResult<Vec<ObjectInfo>> {
3997        if let Some(indexes) = &self.indexes {
3998            indexes.get_owner_objects(owner, cursor, limit, filter)
3999        } else {
4000            Err(IotaError::IndexStoreNotAvailable)
4001        }
4002    }
4003
4004    #[instrument(level = "trace", skip_all)]
4005    pub fn get_owned_coins_iterator_with_cursor(
4006        &self,
4007        owner: IotaAddress,
4008        // If `Some`, the query will start from the next item after the specified cursor
4009        cursor: (String, ObjectId),
4010        limit: usize,
4011        one_coin_type_only: bool,
4012    ) -> IotaResult<impl Iterator<Item = (String, ObjectId, CoinInfo)> + '_> {
4013        if let Some(indexes) = &self.indexes {
4014            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4015        } else {
4016            Err(IotaError::IndexStoreNotAvailable)
4017        }
4018    }
4019
4020    #[instrument(level = "trace", skip_all)]
4021    pub fn get_owner_objects_iterator(
4022        &self,
4023        owner: IotaAddress,
4024        // If `Some`, the query will start from the next item after the specified cursor
4025        cursor: Option<ObjectId>,
4026        filter: Option<IotaObjectDataFilter>,
4027    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
4028        let cursor_u = cursor.unwrap_or(ObjectId::ZERO);
4029        if let Some(indexes) = &self.indexes {
4030            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4031        } else {
4032            Err(IotaError::IndexStoreNotAvailable)
4033        }
4034    }
4035
4036    #[instrument(level = "trace", skip_all)]
4037    pub async fn get_move_objects<T>(
4038        &self,
4039        owner: IotaAddress,
4040        tag: StructTag,
4041    ) -> IotaResult<Vec<T>>
4042    where
4043        T: DeserializeOwned,
4044    {
4045        let object_ids = self
4046            .get_owner_objects_iterator(owner, None, None)?
4047            .filter(|o| match &o.type_ {
4048                ObjectType::Struct(s) => *s == tag,
4049                ObjectType::Package => false,
4050            })
4051            .map(|info| ObjectKey(info.object_id, info.version))
4052            .collect::<Vec<_>>();
4053        let mut move_objects = vec![];
4054
4055        let objects = self
4056            .get_object_store()
4057            .try_multi_get_objects_by_key(&object_ids)?;
4058
4059        for (o, id) in objects.into_iter().zip(object_ids) {
4060            let object = o.ok_or_else(|| {
4061                IotaError::from(UserInputError::ObjectNotFound {
4062                    object_id: id.0,
4063                    version: Some(id.1),
4064                })
4065            })?;
4066            let move_object = object.data.as_struct_opt().ok_or_else(|| {
4067                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4068            })?;
4069            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4070                IotaError::ObjectDeserialization {
4071                    error: format!("{e}"),
4072                }
4073            })?);
4074        }
4075        Ok(move_objects)
4076    }
4077
4078    #[instrument(level = "trace", skip_all)]
4079    pub fn get_dynamic_fields(
4080        &self,
4081        owner: ObjectId,
4082        // If `Some`, the query will start from the next item after the specified cursor
4083        cursor: Option<ObjectId>,
4084        limit: usize,
4085    ) -> IotaResult<Vec<(ObjectId, DynamicFieldInfo)>> {
4086        Ok(self
4087            .get_dynamic_fields_iterator(owner, cursor)?
4088            .take(limit)
4089            .collect::<Result<Vec<_>, _>>()?)
4090    }
4091
4092    fn get_dynamic_fields_iterator(
4093        &self,
4094        owner: ObjectId,
4095        // If `Some`, the query will start from the next item after the specified cursor
4096        cursor: Option<ObjectId>,
4097    ) -> IotaResult<impl Iterator<Item = Result<(ObjectId, DynamicFieldInfo), TypedStoreError>> + '_>
4098    {
4099        if let Some(indexes) = &self.indexes {
4100            indexes.get_dynamic_fields_iterator(owner, cursor)
4101        } else {
4102            Err(IotaError::IndexStoreNotAvailable)
4103        }
4104    }
4105
4106    #[instrument(level = "trace", skip_all)]
4107    pub fn get_dynamic_field_object_id(
4108        &self,
4109        owner: ObjectId,
4110        name_type: TypeTag,
4111        name_bcs_bytes: &[u8],
4112    ) -> IotaResult<Option<ObjectId>> {
4113        if let Some(indexes) = &self.indexes {
4114            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4115        } else {
4116            Err(IotaError::IndexStoreNotAvailable)
4117        }
4118    }
4119
4120    #[instrument(level = "trace", skip_all)]
4121    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
4122        Ok(self.get_indexes()?.next_sequence_number())
4123    }
4124
4125    #[instrument(level = "trace", skip_all)]
4126    pub async fn get_executed_transaction_and_effects(
4127        &self,
4128        digest: TransactionDigest,
4129        kv_store: Arc<TransactionKeyValueStore>,
4130    ) -> IotaResult<(Transaction, TransactionEffects)> {
4131        let transaction = kv_store.get_tx(digest).await?;
4132        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4133        Ok((transaction, effects))
4134    }
4135
4136    #[instrument(level = "trace", skip_all)]
4137    pub fn multi_get_checkpoint_by_sequence_number(
4138        &self,
4139        sequence_numbers: &[CheckpointSequenceNumber],
4140    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
4141        Ok(self
4142            .checkpoint_store
4143            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4144    }
4145
4146    #[instrument(level = "trace", skip_all)]
4147    pub fn get_transaction_events(
4148        &self,
4149        digest: &TransactionDigest,
4150    ) -> IotaResult<TransactionEvents> {
4151        self.get_transaction_cache_reader()
4152            .try_get_events(digest)?
4153            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
4154    }
4155
4156    pub fn get_transaction_input_objects(
4157        &self,
4158        effects: &TransactionEffects,
4159    ) -> anyhow::Result<Vec<Object>> {
4160        iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4161            .map_err(Into::into)
4162    }
4163
4164    pub fn get_transaction_output_objects(
4165        &self,
4166        effects: &TransactionEffects,
4167    ) -> anyhow::Result<Vec<Object>> {
4168        iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4169            .map_err(Into::into)
4170    }
4171
4172    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
4173        match &self.indexes {
4174            Some(i) => Ok(i.clone()),
4175            None => Err(IotaError::UnsupportedFeature {
4176                error: "extended object indexing is not enabled on this server".into(),
4177            }),
4178        }
4179    }
4180
4181    pub async fn get_transactions_for_tests(
4182        self: &Arc<Self>,
4183        filter: Option<TransactionFilter>,
4184        cursor: Option<TransactionDigest>,
4185        limit: Option<usize>,
4186        reverse: bool,
4187    ) -> IotaResult<Vec<TransactionDigest>> {
4188        let metrics = KeyValueStoreMetrics::new_for_tests();
4189        let kv_store = Arc::new(TransactionKeyValueStore::new(
4190            "rocksdb",
4191            metrics,
4192            self.clone(),
4193        ));
4194        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4195            .await
4196    }
4197
4198    #[instrument(level = "trace", skip_all)]
4199    pub async fn get_transactions(
4200        &self,
4201        kv_store: &Arc<TransactionKeyValueStore>,
4202        filter: Option<TransactionFilter>,
4203        // If `Some`, the query will start from the next item after the specified cursor
4204        cursor: Option<TransactionDigest>,
4205        limit: Option<usize>,
4206        reverse: bool,
4207    ) -> IotaResult<Vec<TransactionDigest>> {
4208        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4209            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4210            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4211            if reverse {
4212                let iter = iter
4213                    .rev()
4214                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4215                    .skip(usize::from(cursor.is_some()));
4216                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4217            } else {
4218                let iter = iter
4219                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4220                    .skip(usize::from(cursor.is_some()));
4221                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4222            }
4223        }
4224        self.get_indexes()?
4225            .get_transactions(filter, cursor, limit, reverse)
4226    }
4227
4228    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4229        &self.checkpoint_store
4230    }
4231
4232    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4233        self.get_checkpoint_store()
4234            .get_highest_executed_checkpoint_seq_number()?
4235            .ok_or(IotaError::UserInput {
4236                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4237            })
4238    }
4239
4240    #[cfg(msim)]
4241    pub fn get_highest_pruned_checkpoint_for_testing(
4242        &self,
4243    ) -> IotaResult<CheckpointSequenceNumber> {
4244        self.database_for_testing()
4245            .perpetual_tables
4246            .get_highest_pruned_checkpoint()
4247            .map(|c| c.unwrap_or(0))
4248            .map_err(Into::into)
4249    }
4250
4251    #[instrument(level = "trace", skip_all)]
4252    pub fn get_checkpoint_summary_by_sequence_number(
4253        &self,
4254        sequence_number: CheckpointSequenceNumber,
4255    ) -> IotaResult<CheckpointSummary> {
4256        let verified_checkpoint = self
4257            .get_checkpoint_store()
4258            .get_checkpoint_by_sequence_number(sequence_number)?;
4259        match verified_checkpoint {
4260            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4261            None => Err(IotaError::UserInput {
4262                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4263            }),
4264        }
4265    }
4266
4267    #[instrument(level = "trace", skip_all)]
4268    pub fn get_checkpoint_summary_by_digest(
4269        &self,
4270        digest: CheckpointDigest,
4271    ) -> IotaResult<CheckpointSummary> {
4272        let verified_checkpoint = self
4273            .get_checkpoint_store()
4274            .get_checkpoint_by_digest(&digest)?;
4275        match verified_checkpoint {
4276            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4277            None => Err(IotaError::UserInput {
4278                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4279            }),
4280        }
4281    }
4282
4283    #[instrument(level = "trace", skip_all)]
4284    pub fn find_publish_txn_digest(&self, package_id: ObjectId) -> IotaResult<TransactionDigest> {
4285        if package_id.is_system_package() {
4286            return self.find_genesis_txn_digest();
4287        }
4288        Ok(self
4289            .get_object_read(&package_id)?
4290            .into_object()?
4291            .previous_transaction)
4292    }
4293
4294    #[instrument(level = "trace", skip_all)]
4295    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4296        let summary = self
4297            .get_verified_checkpoint_by_sequence_number(0)?
4298            .into_message();
4299        let content = self.get_checkpoint_contents(summary.content_digest)?;
4300        let genesis_transaction = content.enumerate_transactions(&summary).next();
4301        Ok(genesis_transaction
4302            .ok_or(IotaError::UserInput {
4303                error: UserInputError::GenesisTransactionNotFound,
4304            })?
4305            .1
4306            .transaction)
4307    }
4308
4309    #[instrument(level = "trace", skip_all)]
4310    pub fn get_verified_checkpoint_by_sequence_number(
4311        &self,
4312        sequence_number: CheckpointSequenceNumber,
4313    ) -> IotaResult<VerifiedCheckpoint> {
4314        let verified_checkpoint = self
4315            .get_checkpoint_store()
4316            .get_checkpoint_by_sequence_number(sequence_number)?;
4317        match verified_checkpoint {
4318            Some(verified_checkpoint) => Ok(verified_checkpoint),
4319            None => Err(IotaError::UserInput {
4320                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4321            }),
4322        }
4323    }
4324
4325    #[instrument(level = "trace", skip_all)]
4326    pub fn get_verified_checkpoint_summary_by_digest(
4327        &self,
4328        digest: CheckpointDigest,
4329    ) -> IotaResult<VerifiedCheckpoint> {
4330        let verified_checkpoint = self
4331            .get_checkpoint_store()
4332            .get_checkpoint_by_digest(&digest)?;
4333        match verified_checkpoint {
4334            Some(verified_checkpoint) => Ok(verified_checkpoint),
4335            None => Err(IotaError::UserInput {
4336                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4337            }),
4338        }
4339    }
4340
4341    #[instrument(level = "trace", skip_all)]
4342    pub fn get_checkpoint_contents(
4343        &self,
4344        digest: CheckpointContentsDigest,
4345    ) -> IotaResult<CheckpointContents> {
4346        self.get_checkpoint_store()
4347            .get_checkpoint_contents(&digest)?
4348            .ok_or(IotaError::UserInput {
4349                error: UserInputError::CheckpointContentsNotFound(digest),
4350            })
4351    }
4352
4353    #[instrument(level = "trace", skip_all)]
4354    pub fn get_checkpoint_contents_by_sequence_number(
4355        &self,
4356        sequence_number: CheckpointSequenceNumber,
4357    ) -> IotaResult<CheckpointContents> {
4358        let verified_checkpoint = self
4359            .get_checkpoint_store()
4360            .get_checkpoint_by_sequence_number(sequence_number)?;
4361        match verified_checkpoint {
4362            Some(verified_checkpoint) => {
4363                let content_digest = verified_checkpoint.into_inner().content_digest;
4364                self.get_checkpoint_contents(content_digest)
4365            }
4366            None => Err(IotaError::UserInput {
4367                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4368            }),
4369        }
4370    }
4371
4372    #[instrument(level = "trace", skip_all)]
4373    pub async fn query_events(
4374        &self,
4375        kv_store: &Arc<TransactionKeyValueStore>,
4376        query: EventFilter,
4377        // If `Some`, the query will start from the next item after the specified cursor
4378        cursor: Option<EventID>,
4379        limit: usize,
4380        descending: bool,
4381    ) -> IotaResult<Vec<IotaEvent>> {
4382        let index_store = self.get_indexes()?;
4383
4384        // Get the tx_num from tx_digest
4385        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4386            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4387                IotaError::TransactionNotFound {
4388                    digest: cursor.tx_digest,
4389                },
4390            )?;
4391            (tx_seq, cursor.event_seq as usize)
4392        } else if descending {
4393            (u64::MAX, usize::MAX)
4394        } else {
4395            (0, 0)
4396        };
4397
4398        let limit = limit + 1;
4399        let mut event_keys = match query {
4400            EventFilter::All(filters) => {
4401                if filters.is_empty() {
4402                    index_store.all_events(tx_num, event_num, limit, descending)?
4403                } else {
4404                    return Err(IotaError::UserInput {
4405                        error: UserInputError::Unsupported(
4406                            "This query type does not currently support filter combinations"
4407                                .to_string(),
4408                        ),
4409                    });
4410                }
4411            }
4412            EventFilter::Transaction(digest) => {
4413                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4414            }
4415            EventFilter::MoveModule { package, module } => {
4416                let module_id = ModuleId::new(
4417                    AccountAddress::new(package.into_bytes()),
4418                    move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4419                );
4420                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4421            }
4422            EventFilter::MoveEventType(struct_name) => index_store
4423                .events_by_move_event_struct_name(
4424                    &struct_name,
4425                    tx_num,
4426                    event_num,
4427                    limit,
4428                    descending,
4429                )?,
4430            EventFilter::Sender(sender) => {
4431                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4432            }
4433            EventFilter::TimeRange {
4434                start_time,
4435                end_time,
4436            } => index_store
4437                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4438            EventFilter::MoveEventModule { package, module } => index_store
4439                .events_by_move_event_module(
4440                    &ModuleId::new(
4441                        AccountAddress::new(package.into_bytes()),
4442                        move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4443                    ),
4444                    tx_num,
4445                    event_num,
4446                    limit,
4447                    descending,
4448                )?,
4449            // not using "_ =>" because we want to make sure we remember to add new variants here
4450            EventFilter::Package(_)
4451            | EventFilter::MoveEventField { .. }
4452            | EventFilter::Any(_)
4453            | EventFilter::And(_, _)
4454            | EventFilter::Or(_, _) => {
4455                return Err(IotaError::UserInput {
4456                    error: UserInputError::Unsupported(
4457                        "This query type is not supported by the full node.".to_string(),
4458                    ),
4459                });
4460            }
4461        };
4462
4463        // skip one event if exclusive cursor is provided,
4464        // otherwise truncate to the original limit.
4465        if cursor.is_some() {
4466            if !event_keys.is_empty() {
4467                event_keys.remove(0);
4468            }
4469        } else {
4470            event_keys.truncate(limit - 1);
4471        }
4472
4473        // get the unique set of digests from the event_keys
4474        let transaction_digests = event_keys
4475            .iter()
4476            .map(|(_, digest, _, _)| *digest)
4477            .collect::<HashSet<_>>()
4478            .into_iter()
4479            .collect::<Vec<_>>();
4480
4481        let events = kv_store
4482            .multi_get_events_by_tx_digests(&transaction_digests)
4483            .await?;
4484
4485        let events_map: HashMap<_, _> =
4486            transaction_digests.iter().zip(events.into_iter()).collect();
4487
4488        let stored_events = event_keys
4489            .into_iter()
4490            .map(|k| {
4491                (
4492                    k,
4493                    events_map
4494                        .get(&k.1)
4495                        .expect("fetched digest is missing")
4496                        .clone()
4497                        .and_then(|e| e.get(k.2).cloned()),
4498                )
4499            })
4500            .map(
4501                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4502                    event
4503                        .map(|e| (e, tx_digest, event_seq, timestamp))
4504                        .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4505                },
4506            )
4507            .collect::<Result<Vec<_>, _>>()?;
4508
4509        let epoch_store = self.load_epoch_store_one_call_per_task();
4510        let backing_store = self.get_backing_package_store().as_ref();
4511        let mut layout_resolver = epoch_store
4512            .executor()
4513            .type_layout_resolver(Box::new(backing_store));
4514        let mut events = vec![];
4515        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4516            events.push(IotaEvent::try_from(
4517                e.clone(),
4518                tx_digest,
4519                event_seq as u64,
4520                Some(timestamp),
4521                layout_resolver.get_annotated_layout(&e.type_)?,
4522            )?)
4523        }
4524        Ok(events)
4525    }
4526
4527    pub async fn insert_genesis_object(&self, object: Object) {
4528        self.get_reconfig_api()
4529            .try_insert_genesis_object(object)
4530            .expect("Cannot insert genesis object")
4531    }
4532
4533    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4534        futures::future::join_all(
4535            objects
4536                .iter()
4537                .map(|o| self.insert_genesis_object(o.clone())),
4538        )
4539        .await;
4540    }
4541
4542    /// Make a status response for a transaction
4543    #[instrument(level = "trace", skip_all)]
4544    pub fn get_transaction_status(
4545        &self,
4546        transaction_digest: &TransactionDigest,
4547        epoch_store: &Arc<AuthorityPerEpochStore>,
4548    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4549        // TODO: In the case of read path, we should not have to re-sign the effects.
4550        if let Some(effects) =
4551            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4552        {
4553            if let Some(transaction) = self
4554                .get_transaction_cache_reader()
4555                .try_get_transaction_block(transaction_digest)?
4556            {
4557                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4558                let events = if effects.events_digest().is_some() {
4559                    self.get_transaction_events(effects.transaction_digest())?
4560                } else {
4561                    TransactionEvents::default()
4562                };
4563                return Ok(Some((
4564                    (*transaction).clone().into_message(),
4565                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4566                )));
4567            } else {
4568                // The read of effects and read of transaction are not atomic. It's possible
4569                // that we reverted the transaction (during epoch change) in
4570                // between the above two reads, and we end up having effects but
4571                // not transaction. In this case, we just fall through.
4572                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4573            }
4574        }
4575        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4576            self.metrics.tx_already_processed.inc();
4577            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4578            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4579        } else {
4580            Ok(None)
4581        }
4582    }
4583
4584    /// Get the signed effects of the given transaction. If the effects was
4585    /// signed in a previous epoch, re-sign it so that the caller is able to
4586    /// form a cert of the effects in the current epoch.
4587    #[instrument(level = "trace", skip_all)]
4588    pub fn get_signed_effects_and_maybe_resign(
4589        &self,
4590        transaction_digest: &TransactionDigest,
4591        epoch_store: &Arc<AuthorityPerEpochStore>,
4592    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4593        let effects = self
4594            .get_transaction_cache_reader()
4595            .try_get_executed_effects(transaction_digest)?;
4596        match effects {
4597            Some(effects) => {
4598                // If the transaction was executed in previous epochs, the validator will
4599                // re-sign the effects with new current epoch so that a client is always able to
4600                // obtain an effects certificate at the current epoch.
4601                //
4602                // Why is this necessary? Consider the following case:
4603                // - assume there are 4 validators
4604                // - Quorum driver gets 2 signed effects before reconfig halt
4605                // - The tx makes it into final checkpoint.
4606                // - 2 validators go away and are replaced in the new epoch.
4607                // - The new epoch begins.
4608                // - The quorum driver cannot complete the partial effects cert from the
4609                //   previous epoch, because it may not be able to reach either of the 2 former
4610                //   validators.
4611                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4612                //   the new epoch, the QD can make a new effects cert and return it to the
4613                //   client.
4614                //
4615                // This is a considered a short-term workaround. Eventually, Quorum Driver
4616                // should be able to return either an effects certificate, -or-
4617                // a proof of inclusion in a checkpoint. In the case above, the
4618                // Quorum Driver would return a proof of inclusion in the final
4619                // checkpoint, and this code would no longer be necessary.
4620                if effects.epoch() != epoch_store.epoch() {
4621                    debug!(
4622                        tx_digest=?transaction_digest,
4623                        effects_epoch=?effects.epoch(),
4624                        epoch=?epoch_store.epoch(),
4625                        "Re-signing the effects with the current epoch"
4626                    );
4627                }
4628                Ok(Some(self.sign_effects(effects, epoch_store)?))
4629            }
4630            None => Ok(None),
4631        }
4632    }
4633
4634    #[instrument(level = "trace", skip_all)]
4635    pub(crate) fn sign_effects(
4636        &self,
4637        effects: TransactionEffects,
4638        epoch_store: &Arc<AuthorityPerEpochStore>,
4639    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4640        let tx_digest = *effects.transaction_digest();
4641        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4642            Some(sig) => {
4643                debug_assert!(sig.epoch == epoch_store.epoch());
4644                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4645            }
4646            _ => {
4647                let sig = AuthoritySignInfo::new(
4648                    epoch_store.epoch(),
4649                    &effects,
4650                    Intent::iota_app(IntentScope::TransactionEffects),
4651                    self.name,
4652                    &*self.secret,
4653                );
4654
4655                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4656
4657                epoch_store.insert_effects_digest_and_signature(
4658                    &tx_digest,
4659                    effects.digest(),
4660                    &sig,
4661                )?;
4662
4663                effects
4664            }
4665        };
4666
4667        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4668            signed_effects,
4669        ))
4670    }
4671
4672    // Returns coin objects for indexing for fullnode if indexing is enabled.
4673    #[instrument(level = "trace", skip_all)]
4674    fn fullnode_only_get_tx_coins_for_indexing(
4675        &self,
4676        effects: &TransactionEffects,
4677        inner_temporary_store: &InnerTemporaryStore,
4678        epoch_store: &Arc<AuthorityPerEpochStore>,
4679    ) -> Option<TxCoins> {
4680        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4681            return None;
4682        }
4683        let written_coin_objects = inner_temporary_store
4684            .written
4685            .iter()
4686            .filter_map(|(k, v)| {
4687                if v.is_coin() {
4688                    Some((*k, v.clone()))
4689                } else {
4690                    None
4691                }
4692            })
4693            .collect();
4694        let mut input_coin_objects = inner_temporary_store
4695            .input_objects
4696            .iter()
4697            .filter_map(|(k, v)| {
4698                if v.is_coin() {
4699                    Some((*k, v.clone()))
4700                } else {
4701                    None
4702                }
4703            })
4704            .collect::<ObjectMap>();
4705
4706        // Check for receiving objects that were actually used and modified during
4707        // execution. Their updated version will already showup in
4708        // "written_coins" but their input isn't included in the set of input
4709        // objects in a inner_temporary_store.
4710        for (object_id, version) in effects.modified_at_versions() {
4711            if inner_temporary_store
4712                .loaded_runtime_objects
4713                .contains_key(&object_id)
4714            {
4715                if let Some(object) = self
4716                    .get_object_store()
4717                    .get_object_by_key(&object_id, version)
4718                {
4719                    if object.is_coin() {
4720                        input_coin_objects.insert(object_id, object);
4721                    }
4722                }
4723            }
4724        }
4725
4726        Some((input_coin_objects, written_coin_objects))
4727    }
4728
4729    /// Get the TransactionEnvelope that currently locks the given object, if
4730    /// any. Since object locks are only valid for one epoch, we also need
4731    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4732    /// no lock records for the given object can be found.
4733    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4734    /// object record is at a different version.
4735    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4736    /// certain transaction. Returns None if the a lock record is
4737    /// initialized for the given ObjectRef but not yet locked by any
4738    /// transaction,     or cannot find the transaction in transaction
4739    /// table, because of data race etc.
4740    #[instrument(level = "trace", skip_all)]
4741    pub async fn get_transaction_lock(
4742        &self,
4743        object_ref: &ObjectRef,
4744        epoch_store: &AuthorityPerEpochStore,
4745    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4746        let lock_info = self
4747            .get_object_cache_reader()
4748            .try_get_lock(*object_ref, epoch_store)?;
4749        let lock_info = match lock_info {
4750            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4751                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4752                    provided_obj_ref: *object_ref,
4753                    current_version: locked_ref.version,
4754                }
4755                .into());
4756            }
4757            ObjectLockStatus::Initialized => {
4758                return Ok(None);
4759            }
4760            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4761        };
4762
4763        epoch_store.get_signed_transaction(&lock_info)
4764    }
4765
4766    pub async fn try_get_objects(&self, objects: &[ObjectId]) -> IotaResult<Vec<Option<Object>>> {
4767        self.get_object_cache_reader().try_get_objects(objects)
4768    }
4769
4770    /// Non-fallible version of `try_get_objects`.
4771    pub async fn get_objects(&self, objects: &[ObjectId]) -> Vec<Option<Object>> {
4772        self.try_get_objects(objects)
4773            .await
4774            .expect("storage access failed")
4775    }
4776
4777    pub async fn try_get_object_or_tombstone(
4778        &self,
4779        object_id: ObjectId,
4780    ) -> IotaResult<Option<ObjectRef>> {
4781        self.get_object_cache_reader()
4782            .try_get_latest_object_ref_or_tombstone(object_id)
4783    }
4784
4785    /// Non-fallible version of `try_get_object_or_tombstone`.
4786    pub async fn get_object_or_tombstone(&self, object_id: ObjectId) -> Option<ObjectRef> {
4787        self.try_get_object_or_tombstone(object_id)
4788            .await
4789            .expect("storage access failed")
4790    }
4791
4792    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4793    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4794    /// upgrade.
4795    ///
4796    /// This method can be used to dynamic adjust the amount of buffer. If set
4797    /// to 0, the upgrade will go through with only 2f+1 votes.
4798    ///
4799    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4800    /// should have the same value), or you risk halting the chain.
4801    pub fn set_override_protocol_upgrade_buffer_stake(
4802        &self,
4803        expected_epoch: EpochId,
4804        buffer_stake_bps: u64,
4805    ) -> IotaResult {
4806        let epoch_store = self.load_epoch_store_one_call_per_task();
4807        let actual_epoch = epoch_store.epoch();
4808        if actual_epoch != expected_epoch {
4809            return Err(IotaError::WrongEpoch {
4810                expected_epoch,
4811                actual_epoch,
4812            });
4813        }
4814
4815        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4816    }
4817
4818    pub fn clear_override_protocol_upgrade_buffer_stake(
4819        &self,
4820        expected_epoch: EpochId,
4821    ) -> IotaResult {
4822        let epoch_store = self.load_epoch_store_one_call_per_task();
4823        let actual_epoch = epoch_store.epoch();
4824        if actual_epoch != expected_epoch {
4825            return Err(IotaError::WrongEpoch {
4826                expected_epoch,
4827                actual_epoch,
4828            });
4829        }
4830
4831        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4832    }
4833
4834    /// Get the set of system packages that are compiled in to this build, if
4835    /// those packages are compatible with the current versions of those
4836    /// packages on-chain.
4837    pub async fn get_available_system_packages(
4838        &self,
4839        binary_config: &BinaryConfig,
4840    ) -> Vec<ObjectRef> {
4841        let mut results = vec![];
4842
4843        let system_packages = BuiltInFramework::iter_system_packages();
4844
4845        // Add extra framework packages during simtest
4846        #[cfg(msim)]
4847        let extra_packages = framework_injection::get_extra_packages(self.name);
4848        #[cfg(msim)]
4849        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4850
4851        for system_package in system_packages {
4852            let modules = system_package.modules().to_vec();
4853            // In simtests, we could override the current built-in framework packages.
4854            #[cfg(msim)]
4855            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4856                .unwrap_or(modules);
4857
4858            let Some(obj_ref) = iota_framework::compare_system_package(
4859                &self.get_object_store(),
4860                &system_package.id,
4861                &modules,
4862                system_package.dependencies.to_vec(),
4863                binary_config,
4864            )
4865            .await
4866            else {
4867                return vec![];
4868            };
4869            results.push(obj_ref);
4870        }
4871
4872        results
4873    }
4874
4875    /// Return the new versions, module bytes, and dependencies for the packages
4876    /// that have been committed to for a framework upgrade, in
4877    /// `system_packages`.  Loads the module contents from the binary, and
4878    /// performs the following checks:
4879    ///
4880    /// - Whether its contents matches what is on-chain already, in which case
4881    ///   no upgrade is required, and its contents are omitted from the output.
4882    /// - Whether the contents in the binary can form a package whose digest
4883    ///   matches the input, meaning the framework will be upgraded, and this
4884    ///   authority can satisfy that upgrade, in which case the contents are
4885    ///   included in the output.
4886    ///
4887    /// If the current version of the framework can't be loaded, the binary does
4888    /// not contain the bytes for that framework ID, or the resulting
4889    /// package fails the digest check, `None` is returned indicating that
4890    /// this authority cannot run the upgrade that the network voted on.
4891    async fn get_system_package_bytes(
4892        &self,
4893        system_packages: Vec<ObjectRef>,
4894        binary_config: &BinaryConfig,
4895    ) -> Option<Vec<SystemPackage>> {
4896        let ids: Vec<_> = system_packages
4897            .iter()
4898            .map(|object_ref| object_ref.object_id)
4899            .collect();
4900        let objects = self.get_objects(&ids).await;
4901
4902        let mut res = Vec::with_capacity(system_packages.len());
4903        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4904            let prev_transaction = match object {
4905                Some(cur_object) if cur_object.object_ref() == system_package_ref => {
4906                    // Skip this one because it doesn't need to be upgraded.
4907                    info!(
4908                        "Framework {} does not need updating",
4909                        system_package_ref.object_id
4910                    );
4911                    continue;
4912                }
4913
4914                Some(cur_object) => cur_object.previous_transaction,
4915                None => TransactionDigest::GENESIS_MARKER,
4916            };
4917
4918            #[cfg(msim)]
4919            let FrameworkSystemPackage {
4920                id: _,
4921                bytes,
4922                dependencies,
4923            } = framework_injection::get_override_system_package(
4924                &system_package_ref.object_id,
4925                self.name,
4926            )
4927            .unwrap_or_else(|| {
4928                BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone()
4929            });
4930
4931            #[cfg(not(msim))]
4932            let FrameworkSystemPackage {
4933                id: _,
4934                bytes,
4935                dependencies,
4936            } = BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone();
4937
4938            let modules: Vec<_> = bytes
4939                .iter()
4940                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4941                .collect();
4942
4943            let new_object = Object::new_system_package(
4944                &modules,
4945                system_package_ref.version,
4946                dependencies.clone(),
4947                prev_transaction,
4948            );
4949
4950            let new_ref = new_object.object_ref();
4951            if new_ref != system_package_ref {
4952                error!(
4953                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4954                );
4955                return None;
4956            }
4957
4958            res.push(SystemPackage {
4959                version: system_package_ref.version,
4960                modules: bytes,
4961                dependencies,
4962            });
4963        }
4964
4965        Some(res)
4966    }
4967
4968    /// Returns the new protocol version and system packages that the network
4969    /// has voted to upgrade to. If the proposed protocol version is not
4970    /// supported, None is returned.
4971    fn is_protocol_version_supported_v1(
4972        proposed_protocol_version: ProtocolVersion,
4973        committee: &Committee,
4974        capabilities: Vec<AuthorityCapabilitiesV1>,
4975        mut buffer_stake_bps: u64,
4976    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4977        if buffer_stake_bps > 10000 {
4978            warn!("clamping buffer_stake_bps to 10000");
4979            buffer_stake_bps = 10000;
4980        }
4981
4982        // For each validator, gather the protocol version and system packages that it
4983        // would like to upgrade to in the next epoch.
4984        let mut desired_upgrades: Vec<_> = capabilities
4985            .into_iter()
4986            .filter_map(|mut cap| {
4987                // A validator that lists no packages is voting against any change at all.
4988                if cap.available_system_packages.is_empty() {
4989                    return None;
4990                }
4991
4992                cap.available_system_packages.sort();
4993
4994                info!(
4995                    "validator {:?} supports {:?} with system packages: {:?}",
4996                    cap.authority.concise(),
4997                    cap.supported_protocol_versions,
4998                    cap.available_system_packages,
4999                );
5000
5001                // A validator that only supports the current protocol version is also voting
5002                // against any change, because framework upgrades always require a protocol
5003                // version bump.
5004                cap.supported_protocol_versions
5005                    .get_version_digest(proposed_protocol_version)
5006                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
5007            })
5008            .collect();
5009
5010        // There can only be one set of votes that have a majority, find one if it
5011        // exists.
5012        desired_upgrades.sort();
5013        desired_upgrades
5014            .into_iter()
5015            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5016            .into_iter()
5017            .find_map(|((digest, packages), group)| {
5018                // should have been filtered out earlier.
5019                assert!(!packages.is_empty());
5020
5021                let mut stake_aggregator: StakeAggregator<(), true> =
5022                    StakeAggregator::new(Arc::new(committee.clone()));
5023
5024                for (_, _, authority) in group {
5025                    stake_aggregator.insert_generic(authority, ());
5026                }
5027
5028                let total_votes = stake_aggregator.total_votes();
5029                let quorum_threshold = committee.quorum_threshold();
5030                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5031
5032                info!(
5033                    protocol_config_digest = ?digest,
5034                    ?total_votes,
5035                    ?quorum_threshold,
5036                    ?buffer_stake_bps,
5037                    ?effective_threshold,
5038                    ?proposed_protocol_version,
5039                    ?packages,
5040                    "support for upgrade"
5041                );
5042
5043                let has_support = total_votes >= effective_threshold;
5044                has_support.then_some((proposed_protocol_version, digest, packages))
5045            })
5046    }
5047
5048    /// Selects the highest supported protocol version and system packages that
5049    /// the network has voted to upgrade to. If no upgrade is supported,
5050    /// returns the current protocol version and system packages.
5051    fn choose_protocol_version_and_system_packages_v1(
5052        current_protocol_version: ProtocolVersion,
5053        current_protocol_digest: Digest,
5054        committee: &Committee,
5055        capabilities: Vec<AuthorityCapabilitiesV1>,
5056        buffer_stake_bps: u64,
5057    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
5058        let mut next_protocol_version = current_protocol_version;
5059        let mut system_packages = vec![];
5060        let mut protocol_version_digest = current_protocol_digest;
5061
5062        // Finds the highest supported protocol version and system packages by
5063        // incrementing the proposed protocol version by one until no further
5064        // upgrades are supported.
5065        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
5066            next_protocol_version + 1,
5067            committee,
5068            capabilities.clone(),
5069            buffer_stake_bps,
5070        ) {
5071            next_protocol_version = version;
5072            protocol_version_digest = digest;
5073            system_packages = packages;
5074        }
5075
5076        (
5077            next_protocol_version,
5078            protocol_version_digest,
5079            system_packages,
5080        )
5081    }
5082
5083    /// Returns the indices of validators that support the given protocol
5084    /// version and digest. This includes both committee and non-committee
5085    /// validators based on their capabilities. Uses active validators
5086    /// instead of committee indices.
5087    fn get_validators_supporting_protocol_version(
5088        target_protocol_version: ProtocolVersion,
5089        target_digest: Digest,
5090        active_validators: &[AuthorityPublicKey],
5091        capabilities: &[AuthorityCapabilitiesV1],
5092    ) -> Vec<u64> {
5093        let mut eligible_validators = Vec::new();
5094
5095        for capability in capabilities {
5096            // Check if this validator supports the target protocol version and digest
5097            if let Some(digest) = capability
5098                .supported_protocol_versions
5099                .get_version_digest(target_protocol_version)
5100            {
5101                if digest == target_digest {
5102                    // Find the validator's index in the active validators list
5103                    if let Some(index) = active_validators
5104                        .iter()
5105                        .position(|name| AuthorityName::from(name) == capability.authority)
5106                    {
5107                        eligible_validators.push(index as u64);
5108                    }
5109                }
5110            }
5111        }
5112
5113        // Sort indices for deterministic behavior
5114        eligible_validators.sort();
5115        eligible_validators
5116    }
5117
5118    /// Calculates the sum of weights for eligible validators that are part of
5119    /// the committee. Takes the indices from
5120    /// get_validators_supporting_protocol_version and maps them back
5121    /// to committee members to get their weights.
5122    fn calculate_eligible_validators_weight(
5123        eligible_validator_indices: &[u64],
5124        active_validators: &[AuthorityPublicKey],
5125        committee: &Committee,
5126    ) -> u64 {
5127        let mut total_weight = 0u64;
5128
5129        for &index in eligible_validator_indices {
5130            let authority_pubkey = &active_validators[index as usize];
5131            // Check if this validator is in the committee and get their weight
5132            if let Some((_, weight)) = committee
5133                .members()
5134                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
5135            {
5136                total_weight += weight;
5137            }
5138        }
5139
5140        total_weight
5141    }
5142
5143    /// Creates and execute the advance epoch transaction to effects without
5144    /// committing it to the database. The effects of the change epoch tx
5145    /// are only written to the database after a certified checkpoint has been
5146    /// formed and executed by CheckpointExecutor.
5147    ///
5148    /// When a framework upgraded has been decided on, but the validator does
5149    /// not have the new versions of the packages locally, the validator
5150    /// cannot form the ChangeEpochTx. In this case it returns Err,
5151    /// indicating that the checkpoint builder should give up trying to make the
5152    /// final checkpoint. As long as the network is able to create a certified
5153    /// checkpoint (which should be ensured by the capabilities vote), it
5154    /// will arrive via state sync and be executed by CheckpointExecutor.
5155    #[instrument(level = "error", skip_all)]
5156    pub async fn create_and_execute_advance_epoch_tx(
5157        &self,
5158        epoch_store: &Arc<AuthorityPerEpochStore>,
5159        gas_cost_summary: &GasCostSummary,
5160        checkpoint: CheckpointSequenceNumber,
5161        epoch_start_timestamp_ms: CheckpointTimestamp,
5162        scores: Vec<u64>,
5163    ) -> anyhow::Result<(
5164        IotaSystemState,
5165        Option<SystemEpochInfoEvent>,
5166        TransactionEffects,
5167    )> {
5168        let mut txns = Vec::new();
5169
5170        let next_epoch = epoch_store.epoch() + 1;
5171
5172        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5173        let authority_capabilities = epoch_store
5174            .get_capabilities_v1()
5175            .expect("read capabilities from db cannot fail");
5176        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
5177            Self::choose_protocol_version_and_system_packages_v1(
5178                epoch_store.protocol_version(),
5179                SupportedProtocolVersionsWithHashes::protocol_config_digest(
5180                    epoch_store.protocol_config(),
5181                ),
5182                epoch_store.committee(),
5183                authority_capabilities.clone(),
5184                buffer_stake_bps,
5185            );
5186
5187        // since system packages are created during the current epoch, they should abide
5188        // by the rules of the current epoch, including the current epoch's max
5189        // Move binary format version
5190        let config = epoch_store.protocol_config();
5191        let binary_config = to_binary_config(config);
5192        let Some(next_epoch_system_package_bytes) = self
5193            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5194            .await
5195        else {
5196            error!(
5197                "upgraded system packages {:?} are not locally available, cannot create \
5198                ChangeEpochTx. validator binary must be upgraded to the correct version!",
5199                next_epoch_system_packages
5200            );
5201            // the checkpoint builder will keep retrying forever when it hits this error.
5202            // Eventually, one of two things will happen:
5203            // - The operator will upgrade this binary to one that has the new packages
5204            //   locally, and this function will succeed.
5205            // - The final checkpoint will be certified by other validators, we will receive
5206            //   it via state sync, and execute it. This will upgrade the framework
5207            //   packages, reconfigure, and most likely shut down in the new epoch (this
5208            //   validator likely doesn't support the new protocol version, or else it
5209            //   should have had the packages.)
5210            bail!("missing system packages: cannot form ChangeEpochTx");
5211        };
5212
5213        // Use ChangeEpochV3 or ChangeEpochV4 when the feature flags are enabled and
5214        // ChangeEpochV2 requirements are met
5215        if config.select_committee_from_eligible_validators() {
5216            // Get the list of eligible validators that support the target protocol version
5217            let active_validators = epoch_store.epoch_start_state().get_active_validators();
5218
5219            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5220
5221            // Use validators supporting the target protocol version as eligible validators
5222            // in the next version if select_committee_supporting_next_epoch_version feature
5223            // flag is set to true.
5224            if config.select_committee_supporting_next_epoch_version() {
5225                eligible_active_validators = Self::get_validators_supporting_protocol_version(
5226                    next_epoch_protocol_version,
5227                    next_epoch_protocol_digest,
5228                    &active_validators,
5229                    &authority_capabilities,
5230                );
5231
5232                // Calculate the total weight of eligible validators in the committee
5233                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5234                    &eligible_active_validators,
5235                    &active_validators,
5236                    epoch_store.committee(),
5237                );
5238
5239                // Safety check: ensure eligible validators have enough stake
5240                // Use the same effective threshold calculation that was used to decide the
5241                // protocol version
5242                let committee = epoch_store.committee();
5243                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5244
5245                if eligible_validators_weight < effective_threshold {
5246                    error!(
5247                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5248                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5249                    );
5250                    // Pass all active validator indices as eligible validators
5251                    // to perform selection among all of them.
5252                    eligible_active_validators = (0..active_validators.len() as u64).collect();
5253                }
5254            }
5255
5256            // Use ChangeEpochV4 when the pass_validator_scores_to_advance_epoch feature
5257            // flag is enabled.
5258            if config.pass_validator_scores_to_advance_epoch() {
5259                txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5260                    next_epoch,
5261                    next_epoch_protocol_version.as_u64(),
5262                    gas_cost_summary.storage_cost,
5263                    gas_cost_summary.computation_cost,
5264                    gas_cost_summary.computation_cost_burned,
5265                    gas_cost_summary.storage_rebate,
5266                    gas_cost_summary.non_refundable_storage_fee,
5267                    epoch_start_timestamp_ms,
5268                    next_epoch_system_package_bytes,
5269                    eligible_active_validators,
5270                    scores,
5271                    config.adjust_rewards_by_score(),
5272                ));
5273            } else {
5274                txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5275                    next_epoch,
5276                    next_epoch_protocol_version.as_u64(),
5277                    gas_cost_summary.storage_cost,
5278                    gas_cost_summary.computation_cost,
5279                    gas_cost_summary.computation_cost_burned,
5280                    gas_cost_summary.storage_rebate,
5281                    gas_cost_summary.non_refundable_storage_fee,
5282                    epoch_start_timestamp_ms,
5283                    next_epoch_system_package_bytes,
5284                    eligible_active_validators,
5285                ));
5286            }
5287        } else if config.protocol_defined_base_fee()
5288            && config.max_committee_members_count_as_option().is_some()
5289        {
5290            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5291                next_epoch,
5292                next_epoch_protocol_version.as_u64(),
5293                gas_cost_summary.storage_cost,
5294                gas_cost_summary.computation_cost,
5295                gas_cost_summary.computation_cost_burned,
5296                gas_cost_summary.storage_rebate,
5297                gas_cost_summary.non_refundable_storage_fee,
5298                epoch_start_timestamp_ms,
5299                next_epoch_system_package_bytes,
5300            ));
5301        } else {
5302            txns.push(EndOfEpochTransactionKind::new_change_epoch(
5303                next_epoch,
5304                next_epoch_protocol_version.as_u64(),
5305                gas_cost_summary.storage_cost,
5306                gas_cost_summary.computation_cost,
5307                gas_cost_summary.storage_rebate,
5308                gas_cost_summary.non_refundable_storage_fee,
5309                epoch_start_timestamp_ms,
5310                next_epoch_system_package_bytes,
5311            ));
5312        }
5313
5314        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5315
5316        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5317            tx.clone(),
5318            epoch_store.epoch(),
5319            checkpoint,
5320        );
5321
5322        let tx_digest = executable_tx.digest();
5323
5324        info!(
5325            ?next_epoch,
5326            ?next_epoch_protocol_version,
5327            ?next_epoch_system_packages,
5328            computation_cost=?gas_cost_summary.computation_cost,
5329            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5330            storage_cost=?gas_cost_summary.storage_cost,
5331            storage_rebate=?gas_cost_summary.storage_rebate,
5332            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5333            ?tx_digest,
5334            "Creating advance epoch transaction"
5335        );
5336
5337        fail_point_async!("change_epoch_tx_delay");
5338        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5339
5340        // The tx could have been executed by state sync already - if so simply return
5341        // an error. The checkpoint builder will shortly be terminated by
5342        // reconfiguration anyway.
5343        if self
5344            .get_transaction_cache_reader()
5345            .try_is_tx_already_executed(tx_digest)?
5346        {
5347            warn!("change epoch tx has already been executed via state sync");
5348            bail!("change epoch tx has already been executed via state sync",);
5349        }
5350
5351        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5352
5353        // We must manually assign the shared object versions to the transaction before
5354        // executing it. This is because we do not sequence end-of-epoch
5355        // transactions through consensus.
5356        epoch_store.assign_shared_object_versions_idempotent(
5357            self.get_object_cache_reader().as_ref(),
5358            std::slice::from_ref(&executable_tx),
5359        )?;
5360
5361        let (input_objects, _) =
5362            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5363
5364        let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5365            &execution_guard,
5366            &executable_tx,
5367            input_objects,
5368            vec![],
5369            epoch_store,
5370        )?;
5371        let system_obj = get_iota_system_state(&temporary_store.written)
5372            .expect("change epoch tx must write to system object");
5373        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
5374        let system_epoch_info_event = temporary_store
5375            .events
5376            .0
5377            .into_iter()
5378            .find(|event| event.is_system_epoch_info_event())
5379            .map(SystemEpochInfoEvent::from);
5380        // The system epoch info event can be `None` in case if the `advance_epoch`
5381        // Move function call failed and was executed in the safe mode.
5382        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5383
5384        // We must write tx and effects to the state sync tables so that state sync is
5385        // able to deliver to the transaction to CheckpointExecutor after it is
5386        // included in a certified checkpoint.
5387        self.get_state_sync_store()
5388            .try_insert_transaction_and_effects(&tx, &effects)
5389            .map_err(|err| {
5390                let err: anyhow::Error = err.into();
5391                err
5392            })?;
5393
5394        info!(
5395            "Effects summary of the change epoch transaction: {:?}",
5396            effects.summary_for_debug()
5397        );
5398        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5399        // The change epoch transaction cannot fail to execute.
5400        assert!(effects.status().is_success());
5401        Ok((system_obj, system_epoch_info_event, effects))
5402    }
5403
5404    /// This function is called at the very end of the epoch.
5405    /// This step is required before updating new epoch in the db and calling
5406    /// reopen_epoch_db.
5407    #[instrument(level = "error", skip_all)]
5408    async fn revert_uncommitted_epoch_transactions(
5409        &self,
5410        epoch_store: &AuthorityPerEpochStore,
5411    ) -> IotaResult {
5412        {
5413            let state = epoch_store.get_reconfig_state_write_lock_guard();
5414            if state.should_accept_user_certs() {
5415                // Need to change this so that consensus adapter do not accept certificates from
5416                // user. This can happen if our local validator did not initiate
5417                // epoch change locally, but 2f+1 nodes already concluded the
5418                // epoch.
5419                //
5420                // This lock is essentially a barrier for
5421                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5422                // after this block
5423                epoch_store.close_user_certs(state);
5424            }
5425            // lock is dropped here
5426        }
5427        let pending_certificates = epoch_store.pending_consensus_certificates();
5428        info!(
5429            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5430            pending_certificates.len(),
5431            pending_certificates,
5432        );
5433        for digest in pending_certificates {
5434            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5435                info!(
5436                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5437                    digest
5438                );
5439                continue;
5440            }
5441            info!("Reverting {:?} at the end of epoch", digest);
5442            epoch_store.revert_executed_transaction(&digest)?;
5443            self.get_reconfig_api().try_revert_state_update(&digest)?;
5444        }
5445        info!("All uncommitted local transactions reverted");
5446        Ok(())
5447    }
5448
5449    #[instrument(level = "error", skip_all)]
5450    async fn reopen_epoch_db(
5451        &self,
5452        cur_epoch_store: &AuthorityPerEpochStore,
5453        new_committee: Committee,
5454        epoch_start_configuration: EpochStartConfiguration,
5455        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5456        epoch_last_checkpoint: CheckpointSequenceNumber,
5457    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5458        let new_epoch = new_committee.epoch;
5459        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5460        assert_eq!(
5461            epoch_start_configuration.epoch_start_state().epoch(),
5462            new_committee.epoch
5463        );
5464        fail_point!("before-open-new-epoch-store");
5465        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5466            self.name,
5467            new_committee,
5468            epoch_start_configuration,
5469            self.get_backing_package_store().clone(),
5470            expensive_safety_check_config,
5471            epoch_last_checkpoint,
5472        )?;
5473        self.epoch_store.store(new_epoch_store.clone());
5474        Ok(new_epoch_store)
5475    }
5476
5477    /// Checks if `authenticator` unlocks a valid Move account and returns the
5478    /// account-related `AuthenticatorFunctionRef` object.
5479    fn check_move_account(
5480        &self,
5481        auth_account_object_id: ObjectId,
5482        auth_account_object_seq_number: Option<SequenceNumber>,
5483        auth_account_object_digest: Option<ObjectDigest>,
5484        account_object: ObjectReadResult,
5485        signer: &IotaAddress,
5486    ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5487        let account_object = match account_object.object {
5488            ObjectReadResultKind::Object(object) => Ok(object),
5489            ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5490                Err(UserInputError::AccountObjectDeleted {
5491                    account_id: account_object.id(),
5492                    account_version: version,
5493                    transaction_digest: digest,
5494                })
5495            }
5496            // It is impossible to check the account object because it is used in a canceled
5497            // transaction and is not loaded.
5498            ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5499                Err(UserInputError::AccountObjectInCanceledTransaction {
5500                    account_id: account_object.id(),
5501                    account_version: version,
5502                })
5503            }
5504        }?;
5505
5506        let account_object_addr = IotaAddress::from(auth_account_object_id);
5507
5508        fp_ensure!(
5509            signer == &account_object_addr,
5510            UserInputError::IncorrectUserSignature {
5511                error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5512            }
5513            .into()
5514        );
5515
5516        fp_ensure!(
5517            account_object.is_shared() || account_object.is_immutable(),
5518            UserInputError::AccountObjectNotSupported {
5519                object_id: auth_account_object_id
5520            }
5521            .into()
5522        );
5523
5524        let auth_account_object_seq_number =
5525            if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5526                let account_object_version = account_object.version();
5527
5528                fp_ensure!(
5529                    account_object_version == auth_account_object_seq_number,
5530                    UserInputError::AccountObjectVersionMismatch {
5531                        object_id: auth_account_object_id,
5532                        expected_version: auth_account_object_seq_number,
5533                        actual_version: account_object_version,
5534                    }
5535                    .into()
5536                );
5537
5538                auth_account_object_seq_number
5539            } else {
5540                account_object.version()
5541            };
5542
5543        if let Some(auth_account_object_digest) = auth_account_object_digest {
5544            let expected_digest = account_object.digest();
5545            fp_ensure!(
5546                expected_digest == auth_account_object_digest,
5547                UserInputError::InvalidAccountObjectDigest {
5548                    object_id: auth_account_object_id,
5549                    expected_digest,
5550                    actual_digest: auth_account_object_digest,
5551                }
5552                .into()
5553            );
5554        }
5555
5556        let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5557            auth_account_object_id,
5558            &AuthenticatorFunctionRefV1Key::tag().into(),
5559            &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5560        )
5561        .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5562            account_object_id: auth_account_object_id,
5563        })?;
5564
5565        let authenticator_function_ref_field = self
5566            .get_object_cache_reader()
5567            .try_find_object_lt_or_eq_version(
5568                authenticator_function_ref_field_id,
5569                auth_account_object_seq_number,
5570            )?;
5571
5572        if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5573            let field_move_object = authenticator_function_ref_field_obj
5574                .data
5575                .as_struct_opt()
5576                .expect("dynamic field should never be a package object");
5577
5578            let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5579                field_move_object.to_rust().map_err(|_| {
5580                    UserInputError::InvalidAuthenticatorFunctionRefField {
5581                        account_object_id: auth_account_object_id,
5582                    }
5583                })?;
5584
5585            Ok(AuthenticatorFunctionRefForExecution::new_v1(
5586                field.value,
5587                authenticator_function_ref_field_obj.object_ref(),
5588                authenticator_function_ref_field_obj.owner,
5589                authenticator_function_ref_field_obj.storage_rebate,
5590                authenticator_function_ref_field_obj.previous_transaction,
5591            ))
5592        } else {
5593            Err(UserInputError::MoveAuthenticatorNotFound {
5594                authenticator_function_ref_id: authenticator_function_ref_field_id,
5595                account_object_id: auth_account_object_id,
5596                account_object_version: auth_account_object_seq_number,
5597            }
5598            .into())
5599        }
5600    }
5601
5602    #[allow(clippy::type_complexity)]
5603    fn read_objects_for_signing(
5604        &self,
5605        transaction: &VerifiedTransaction,
5606        epoch: u64,
5607    ) -> IotaResult<(
5608        InputObjects,
5609        ReceivingObjects,
5610        Vec<(InputObjects, ObjectReadResult)>,
5611    )> {
5612        let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5613            Some(transaction.digest()),
5614            &transaction.collect_all_input_object_kind_for_reading()?,
5615            &transaction.data().transaction_data().receiving_objects(),
5616            epoch,
5617        )?;
5618
5619        transaction
5620            .split_input_objects_into_groups_for_reading(input_objects)
5621            .map(|(tx_input_objects, per_authenticator_inputs)| {
5622                (
5623                    tx_input_objects,
5624                    tx_receiving_objects,
5625                    per_authenticator_inputs,
5626                )
5627            })
5628    }
5629
5630    #[allow(clippy::type_complexity)]
5631    fn check_transaction_inputs_for_signing(
5632        &self,
5633        protocol_config: &ProtocolConfig,
5634        reference_gas_price: u64,
5635        tx_data: &TransactionData,
5636        tx_input_objects: InputObjects,
5637        tx_receiving_objects: &ReceivingObjects,
5638        move_authenticators: &Vec<&MoveAuthenticator>,
5639        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
5640    ) -> IotaResult<(
5641        IotaGasStatus,
5642        CheckedInputObjects,
5643        Vec<(CheckedInputObjects, AuthenticatorFunctionRef)>,
5644    )> {
5645        let authenticator_gas_budget = if move_authenticators.is_empty() {
5646            0
5647        } else {
5648            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
5649            // not a part of the transaction data.
5650            protocol_config.max_auth_gas()
5651        };
5652
5653        debug_assert_eq!(
5654            move_authenticators.len(),
5655            per_authenticator_inputs.len(),
5656            "Move authenticators amount must match the number of authenticator inputs"
5657        );
5658
5659        let per_authenticator_checked_inputs = move_authenticators
5660            .iter()
5661            .zip(per_authenticator_inputs)
5662            .map(
5663                |(move_authenticator, (authenticator_input_objects, account_object))| {
5664                    // Check basic `object_to_authenticate` preconditions and get its components.
5665                    let (
5666                        auth_account_object_id,
5667                        auth_account_object_seq_number,
5668                        auth_account_object_digest,
5669                    ) = move_authenticator.object_to_authenticate_components()?;
5670
5671                    let signer = move_authenticator.address()?;
5672
5673                    // Make sure the signer is a Move account.
5674                    let AuthenticatorFunctionRefForExecution {
5675                        authenticator_function_ref,
5676                        ..
5677                    } = self.check_move_account(
5678                        auth_account_object_id,
5679                        auth_account_object_seq_number,
5680                        auth_account_object_digest,
5681                        account_object,
5682                        &signer,
5683                    )?;
5684
5685                    // Check the MoveAuthenticator input objects.
5686                    let authenticator_checked_input_objects =
5687                        iota_transaction_checks::check_move_authenticator_input_for_signing(
5688                            authenticator_input_objects,
5689                        )?;
5690
5691                    Ok((
5692                        authenticator_checked_input_objects,
5693                        authenticator_function_ref,
5694                    ))
5695                },
5696            )
5697            .collect::<IotaResult<Vec<_>>>()?;
5698
5699        // Check the transaction inputs.
5700        let (gas_status, tx_checked_input_objects) =
5701            iota_transaction_checks::check_transaction_input(
5702                protocol_config,
5703                reference_gas_price,
5704                tx_data,
5705                tx_input_objects,
5706                tx_receiving_objects,
5707                &self.metrics.bytecode_verifier_metrics,
5708                &self.config.verifier_signing_config,
5709                authenticator_gas_budget,
5710            )?;
5711
5712        Ok((
5713            gas_status,
5714            tx_checked_input_objects,
5715            per_authenticator_checked_inputs,
5716        ))
5717    }
5718
5719    #[cfg(test)]
5720    pub(crate) fn iter_live_object_set_for_testing(
5721        &self,
5722    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5723        self.get_global_state_hash_store()
5724            .iter_cached_live_object_set_for_testing()
5725    }
5726
5727    #[cfg(test)]
5728    pub(crate) fn shutdown_execution_for_test(&self) {
5729        self.tx_execution_shutdown
5730            .lock()
5731            .take()
5732            .unwrap()
5733            .send(())
5734            .unwrap();
5735    }
5736
5737    /// NOTE: this function is only to be used for fuzzing and testing. Never
5738    /// use in prod
5739    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5740        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5741        self.get_object_cache_reader()
5742            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5743        self.get_reconfig_api()
5744            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5745    }
5746}
5747
5748pub struct RandomnessRoundReceiver {
5749    authority_state: Arc<AuthorityState>,
5750    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5751}
5752
5753impl RandomnessRoundReceiver {
5754    pub fn spawn(
5755        authority_state: Arc<AuthorityState>,
5756        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5757    ) -> JoinHandle<()> {
5758        let rrr = RandomnessRoundReceiver {
5759            authority_state,
5760            randomness_rx,
5761        };
5762        spawn_monitored_task!(rrr.run())
5763    }
5764
5765    async fn run(mut self) {
5766        info!("RandomnessRoundReceiver event loop started");
5767
5768        loop {
5769            tokio::select! {
5770                maybe_recv = self.randomness_rx.recv() => {
5771                    if let Some((epoch, round, bytes)) = maybe_recv {
5772                        self.handle_new_randomness(epoch, round, bytes);
5773                    } else {
5774                        break;
5775                    }
5776                },
5777            }
5778        }
5779
5780        info!("RandomnessRoundReceiver event loop ended");
5781    }
5782
5783    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5784    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5785        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5786        if epoch_store.epoch() != epoch {
5787            warn!(
5788                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5789                epoch_store.epoch()
5790            );
5791            return;
5792        }
5793        let transaction = VerifiedTransaction::new_randomness_state_update(
5794            epoch,
5795            round,
5796            bytes,
5797            epoch_store
5798                .epoch_start_config()
5799                .randomness_obj_initial_shared_version(),
5800        );
5801        debug!(
5802            "created randomness state update transaction with digest: {:?}",
5803            transaction.digest()
5804        );
5805        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5806        let digest = *transaction.digest();
5807
5808        // Randomness state updates contain the full bls signature for the random round,
5809        // which cannot necessarily be reconstructed again later. Therefore we must
5810        // immediately persist this transaction. If we crash before its outputs
5811        // are committed, this ensures we will be able to re-execute it.
5812        self.authority_state
5813            .get_cache_commit()
5814            .persist_transaction(&transaction);
5815
5816        // Send transaction to TransactionManager for execution.
5817        self.authority_state
5818            .transaction_manager()
5819            .enqueue(vec![transaction], &epoch_store);
5820
5821        let authority_state = self.authority_state.clone();
5822        spawn_monitored_task!(async move {
5823            // Wait for transaction execution in a separate task, to avoid deadlock in case
5824            // of out-of-order randomness generation. (Each
5825            // RandomnessStateUpdate depends on the output of the
5826            // RandomnessStateUpdate from the previous round.)
5827            //
5828            // We set a very long timeout so that in case this gets stuck for some reason,
5829            // the validator will eventually crash rather than continuing in a
5830            // zombie mode.
5831            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5832            let result = tokio::time::timeout(
5833                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5834                authority_state
5835                    .get_transaction_cache_reader()
5836                    .try_notify_read_executed_effects(&[digest]),
5837            )
5838            .await;
5839            let result = match result {
5840                Ok(result) => result,
5841                Err(_) => {
5842                    if cfg!(debug_assertions) {
5843                        // Crash on randomness update execution timeout in debug builds.
5844                        panic!(
5845                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5846                        );
5847                    }
5848                    warn!(
5849                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5850                    );
5851                    // Continue waiting as long as necessary in non-debug builds.
5852                    authority_state
5853                        .get_transaction_cache_reader()
5854                        .try_notify_read_executed_effects(&[digest])
5855                        .await
5856                }
5857            };
5858
5859            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5860            let effects = effects.pop().expect("should return effects");
5861            if *effects.status() != ExecutionStatus::Success {
5862                fatal!(
5863                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5864                );
5865            }
5866            debug!(
5867                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5868            );
5869        });
5870    }
5871}
5872
5873#[async_trait]
5874impl TransactionKeyValueStoreTrait for AuthorityState {
5875    async fn multi_get(
5876        &self,
5877        transaction_keys: &[TransactionDigest],
5878        effects_keys: &[TransactionDigest],
5879    ) -> IotaResult<KVStoreTransactionData> {
5880        let txns = if !transaction_keys.is_empty() {
5881            self.get_transaction_cache_reader()
5882                .try_multi_get_transaction_blocks(transaction_keys)?
5883                .into_iter()
5884                .map(|t| t.map(|t| (*t).clone().into_inner()))
5885                .collect()
5886        } else {
5887            vec![]
5888        };
5889
5890        let fx = if !effects_keys.is_empty() {
5891            self.get_transaction_cache_reader()
5892                .try_multi_get_executed_effects(effects_keys)?
5893        } else {
5894            vec![]
5895        };
5896
5897        Ok((txns, fx))
5898    }
5899
5900    async fn multi_get_checkpoints(
5901        &self,
5902        checkpoint_summaries: &[CheckpointSequenceNumber],
5903        checkpoint_contents: &[CheckpointSequenceNumber],
5904        checkpoint_summaries_by_digest: &[CheckpointDigest],
5905    ) -> IotaResult<(
5906        Vec<Option<CertifiedCheckpointSummary>>,
5907        Vec<Option<CheckpointContents>>,
5908        Vec<Option<CertifiedCheckpointSummary>>,
5909    )> {
5910        // TODO: use multi-get methods if it ever becomes important (unlikely)
5911        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5912        let store = self.get_checkpoint_store();
5913        for seq in checkpoint_summaries {
5914            let checkpoint = store
5915                .get_checkpoint_by_sequence_number(*seq)?
5916                .map(|c| c.into_inner());
5917
5918            summaries.push(checkpoint);
5919        }
5920
5921        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5922        for seq in checkpoint_contents {
5923            let checkpoint = store
5924                .get_checkpoint_by_sequence_number(*seq)?
5925                .and_then(|summary| {
5926                    store
5927                        .get_checkpoint_contents(&summary.content_digest)
5928                        .expect("db read cannot fail")
5929                });
5930            contents.push(checkpoint);
5931        }
5932
5933        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5934        for digest in checkpoint_summaries_by_digest {
5935            let checkpoint = store
5936                .get_checkpoint_by_digest(digest)?
5937                .map(|c| c.into_inner());
5938            summaries_by_digest.push(checkpoint);
5939        }
5940
5941        Ok((summaries, contents, summaries_by_digest))
5942    }
5943
5944    async fn get_transaction_perpetual_checkpoint(
5945        &self,
5946        digest: TransactionDigest,
5947    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5948        self.get_checkpoint_cache()
5949            .try_get_transaction_perpetual_checkpoint(&digest)
5950            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5951    }
5952
5953    async fn get_object(
5954        &self,
5955        object_id: ObjectId,
5956        version: VersionNumber,
5957    ) -> IotaResult<Option<Object>> {
5958        self.get_object_cache_reader()
5959            .try_get_object_by_key(&object_id, version)
5960    }
5961
5962    #[instrument(skip_all)]
5963    async fn multi_get_objects(
5964        &self,
5965        object_keys: &[ObjectKey],
5966    ) -> IotaResult<Vec<Option<Object>>> {
5967        Ok(self
5968            .get_object_cache_reader()
5969            .multi_get_objects_by_key(object_keys))
5970    }
5971
5972    async fn multi_get_transactions_perpetual_checkpoints(
5973        &self,
5974        digests: &[TransactionDigest],
5975    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5976        let res = self
5977            .get_checkpoint_cache()
5978            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5979
5980        Ok(res
5981            .into_iter()
5982            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5983            .collect())
5984    }
5985
5986    #[instrument(skip(self, digests), fields(digests = digests.iter().map(|d| d.to_string()).collect::<Vec<String>>().join(", ")))]
5987    async fn multi_get_events_by_tx_digests(
5988        &self,
5989        digests: &[TransactionDigest],
5990    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5991        if digests.is_empty() {
5992            return Ok(vec![]);
5993        }
5994
5995        Ok(self
5996            .get_transaction_cache_reader()
5997            .multi_get_events(digests))
5998    }
5999}
6000
6001#[cfg(msim)]
6002pub mod framework_injection {
6003    use std::{
6004        cell::RefCell,
6005        collections::{BTreeMap, BTreeSet},
6006    };
6007
6008    use iota_framework::{BuiltInFramework, SystemPackage};
6009    use iota_sdk_types::ObjectId;
6010    use iota_types::base_types::AuthorityName;
6011    use move_binary_format::CompiledModule;
6012
6013    type FrameworkOverrideConfig = BTreeMap<ObjectId, PackageOverrideConfig>;
6014
6015    // Thread local cache because all simtests run in a single unique thread.
6016    thread_local! {
6017        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6018    }
6019
6020    type Framework = Vec<CompiledModule>;
6021
6022    pub type PackageUpgradeCallback =
6023        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6024
6025    enum PackageOverrideConfig {
6026        Global(Framework),
6027        PerValidator(PackageUpgradeCallback),
6028    }
6029
6030    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6031        modules
6032            .iter()
6033            .map(|m| {
6034                let mut buf = Vec::new();
6035                m.serialize_with_version(m.version, &mut buf).unwrap();
6036                buf
6037            })
6038            .collect()
6039    }
6040
6041    pub fn set_override(package_id: ObjectId, modules: Vec<CompiledModule>) {
6042        OVERRIDE.with(|bs| {
6043            bs.borrow_mut()
6044                .insert(package_id, PackageOverrideConfig::Global(modules))
6045        });
6046    }
6047
6048    pub fn set_override_cb(package_id: ObjectId, func: PackageUpgradeCallback) {
6049        OVERRIDE.with(|bs| {
6050            bs.borrow_mut()
6051                .insert(package_id, PackageOverrideConfig::PerValidator(func))
6052        });
6053    }
6054
6055    pub fn get_override_bytes(package_id: &ObjectId, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6056        OVERRIDE.with(|cfg| {
6057            cfg.borrow().get(package_id).and_then(|entry| match entry {
6058                PackageOverrideConfig::Global(framework) => {
6059                    Some(compiled_modules_to_bytes(framework))
6060                }
6061                PackageOverrideConfig::PerValidator(func) => {
6062                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6063                }
6064            })
6065        })
6066    }
6067
6068    pub fn get_override_modules(
6069        package_id: &ObjectId,
6070        name: AuthorityName,
6071    ) -> Option<Vec<CompiledModule>> {
6072        OVERRIDE.with(|cfg| {
6073            cfg.borrow().get(package_id).and_then(|entry| match entry {
6074                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6075                PackageOverrideConfig::PerValidator(func) => func(name),
6076            })
6077        })
6078    }
6079
6080    pub fn get_override_system_package(
6081        package_id: &ObjectId,
6082        name: AuthorityName,
6083    ) -> Option<SystemPackage> {
6084        let bytes = get_override_bytes(package_id, name)?;
6085        let dependencies = if package_id.is_system_package() {
6086            BuiltInFramework::get_package_by_id(package_id)
6087                .dependencies
6088                .to_vec()
6089        } else {
6090            // Assume that entirely new injected packages depend on all existing system
6091            // packages.
6092            BuiltInFramework::all_package_ids()
6093        };
6094        Some(SystemPackage {
6095            id: *package_id,
6096            bytes,
6097            dependencies,
6098        })
6099    }
6100
6101    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6102        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
6103        let extra: Vec<ObjectId> = OVERRIDE.with(|cfg| {
6104            cfg.borrow()
6105                .keys()
6106                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6107                .collect()
6108        });
6109
6110        extra
6111            .into_iter()
6112            .map(|package| SystemPackage {
6113                id: package,
6114                bytes: get_override_bytes(&package, name).unwrap(),
6115                dependencies: BuiltInFramework::all_package_ids(),
6116            })
6117            .collect()
6118    }
6119}
6120
6121#[derive(Debug, Serialize, Deserialize, Clone)]
6122pub struct ObjDumpFormat {
6123    pub id: ObjectId,
6124    pub version: VersionNumber,
6125    pub digest: ObjectDigest,
6126    pub object: Object,
6127}
6128
6129impl ObjDumpFormat {
6130    fn new(object: Object) -> Self {
6131        let oref = object.object_ref();
6132        Self {
6133            id: oref.object_id,
6134            version: oref.version,
6135            digest: oref.digest,
6136            object,
6137        }
6138    }
6139}
6140
6141#[derive(Debug, Serialize, Deserialize, Clone)]
6142pub struct NodeStateDump {
6143    pub tx_digest: TransactionDigest,
6144    pub sender_signed_data: SenderSignedData,
6145    pub executed_epoch: u64,
6146    pub reference_gas_price: u64,
6147    pub protocol_version: u64,
6148    pub epoch_start_timestamp_ms: u64,
6149    pub computed_effects: TransactionEffects,
6150    pub expected_effects_digest: TransactionEffectsDigest,
6151    pub relevant_system_packages: Vec<ObjDumpFormat>,
6152    pub shared_objects: Vec<ObjDumpFormat>,
6153    pub loaded_child_objects: Vec<ObjDumpFormat>,
6154    pub modified_at_versions: Vec<ObjDumpFormat>,
6155    pub runtime_reads: Vec<ObjDumpFormat>,
6156    pub input_objects: Vec<ObjDumpFormat>,
6157}
6158
6159impl NodeStateDump {
6160    pub fn new(
6161        tx_digest: &TransactionDigest,
6162        effects: &TransactionEffects,
6163        expected_effects_digest: TransactionEffectsDigest,
6164        object_store: &dyn ObjectStore,
6165        epoch_store: &Arc<AuthorityPerEpochStore>,
6166        inner_temporary_store: &InnerTemporaryStore,
6167        certificate: &VerifiedExecutableTransaction,
6168    ) -> IotaResult<Self> {
6169        // Epoch info
6170        let executed_epoch = epoch_store.epoch();
6171        let reference_gas_price = epoch_store.reference_gas_price();
6172        let epoch_start_config = epoch_store.epoch_start_config();
6173        let protocol_version = epoch_store.protocol_version().as_u64();
6174        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6175
6176        // Record all system packages at this version
6177        let mut relevant_system_packages = Vec::new();
6178        for sys_package_id in BuiltInFramework::all_package_ids() {
6179            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
6180                relevant_system_packages.push(ObjDumpFormat::new(w))
6181            }
6182        }
6183
6184        // Record all the shared objects
6185        let mut shared_objects = Vec::new();
6186        for kind in effects.input_shared_objects() {
6187            match kind {
6188                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
6189                    if let Some(w) =
6190                        object_store.try_get_object_by_key(&obj_ref.object_id, obj_ref.version)?
6191                    {
6192                        shared_objects.push(ObjDumpFormat::new(w))
6193                    }
6194                }
6195                InputSharedObject::ReadDeleted(..)
6196                | InputSharedObject::MutateDeleted(..)
6197                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
6198                                                           * objects. */
6199            }
6200        }
6201
6202        // Record all loaded child objects
6203        // Child objects which are read but not mutated are not tracked anywhere else
6204        let mut loaded_child_objects = Vec::new();
6205        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6206            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6207                loaded_child_objects.push(ObjDumpFormat::new(w))
6208            }
6209        }
6210
6211        // Record all modified objects
6212        let mut modified_at_versions = Vec::new();
6213        for (id, ver) in effects.modified_at_versions() {
6214            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6215                modified_at_versions.push(ObjDumpFormat::new(w))
6216            }
6217        }
6218
6219        // Packages read at runtime, which were not previously loaded into the temoorary
6220        // store Some packages may be fetched at runtime and wont show up in
6221        // input objects
6222        let mut runtime_reads = Vec::new();
6223        for obj in inner_temporary_store
6224            .runtime_packages_loaded_from_db
6225            .values()
6226        {
6227            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6228        }
6229
6230        // All other input objects should already be in `inner_temporary_store.objects`
6231
6232        Ok(Self {
6233            tx_digest: *tx_digest,
6234            executed_epoch,
6235            reference_gas_price,
6236            epoch_start_timestamp_ms,
6237            protocol_version,
6238            relevant_system_packages,
6239            shared_objects,
6240            loaded_child_objects,
6241            modified_at_versions,
6242            runtime_reads,
6243            sender_signed_data: certificate.clone().into_message(),
6244            input_objects: inner_temporary_store
6245                .input_objects
6246                .values()
6247                .map(|o| ObjDumpFormat::new(o.clone()))
6248                .collect(),
6249            computed_effects: effects.clone(),
6250            expected_effects_digest,
6251        })
6252    }
6253
6254    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6255        let mut objects = Vec::new();
6256        objects.extend(self.relevant_system_packages.clone());
6257        objects.extend(self.shared_objects.clone());
6258        objects.extend(self.loaded_child_objects.clone());
6259        objects.extend(self.modified_at_versions.clone());
6260        objects.extend(self.runtime_reads.clone());
6261        objects.extend(self.input_objects.clone());
6262        objects
6263    }
6264
6265    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6266        let file_name = format!(
6267            "{}_{}_NODE_DUMP.json",
6268            self.tx_digest,
6269            AuthorityState::unixtime_now_ms()
6270        );
6271        let mut path = path.to_path_buf();
6272        path.push(&file_name);
6273        let mut file = File::create(path.clone())?;
6274        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6275        Ok(path)
6276    }
6277
6278    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6279        let file = File::open(path)?;
6280        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6281    }
6282}
6283
6284/// Returns the [`MoveAuthenticator`]s to execute during the pre-consensus
6285/// phase.
6286///
6287/// When `pre_consensus_sponsor_only_move_authentication` is enabled:
6288/// - For sponsored transactions: only the sponsor's [`MoveAuthenticator`] is
6289///   returned (empty if the sponsor does not use one).
6290/// - For non-sponsored transactions: all [`MoveAuthenticator`]s are returned
6291///   (currently only the sender's).
6292///
6293/// When the flag is not set, all [`MoveAuthenticator`]s are returned for
6294/// compatibility.
6295fn pre_consensus_move_authenticators<'a>(
6296    tx: &'a VerifiedTransaction,
6297    protocol_config: &ProtocolConfig,
6298) -> Vec<&'a MoveAuthenticator> {
6299    if protocol_config.pre_consensus_sponsor_only_move_authentication() {
6300        if tx.transaction_data().is_sponsored_tx() {
6301            if let Some(sponsor_move_authenticator) = tx.sponsor_move_authenticator() {
6302                vec![sponsor_move_authenticator]
6303            } else {
6304                vec![]
6305            }
6306        } else {
6307            tx.move_authenticators()
6308        }
6309    } else {
6310        tx.move_authenticators()
6311    }
6312}