Skip to main content

iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::TxLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage as FrameworkSystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::{
48    Address, EndOfEpochTransactionKind, Event, ExecutionStatus, ObjectId, Owner, RandomnessRound,
49    StructTag, TransactionExpiration, TransactionKind, TypeTag,
50    crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion},
51    gas::GasCostSummary,
52};
53use iota_storage::{
54    key_value_store::{
55        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
56    },
57    key_value_store_metrics::KeyValueStoreMetrics,
58};
59#[cfg(msim)]
60use iota_types::committee::CommitteeTrait;
61use iota_types::{
62    account_abstraction::{
63        account::AuthenticatorFunctionRefV1Key,
64        authenticator_function::{
65            AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
66            AuthenticatorFunctionRefV1, extract_auth_fun_refs,
67        },
68    },
69    auth_context::AuthContextData,
70    base_types::{
71        AuthorityName, ConciseableName, ObjectInfo, ObjectRef, ObjectType, SequenceNumber,
72        VersionNumber,
73    },
74    committee::{Committee, EpochId, ProtocolVersion},
75    crypto::{AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, Signer},
76    deny_list_v1::check_coin_deny_list_v1,
77    digests::{ChainIdentifier, Digest, ObjectDigest, TransactionDigest, TransactionEffectsDigest},
78    dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
79    effects::{
80        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
81        TransactionEffectsExt, TransactionEvents, VerifiedSignedTransactionEffects,
82    },
83    error::{ExecutionError, IotaError, IotaResult, UserInputError},
84    event::{EventID, SystemEpochInfoEvent},
85    executable_transaction::VerifiedExecutableTransaction,
86    execution_config_utils::to_binary_config,
87    fp_ensure,
88    gas::IotaGasStatus,
89    gas_coin::NANOS_PER_IOTA,
90    inner_temporary_store::{
91        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
92        WrittenObjects,
93    },
94    iota_sdk_types_conversions::type_tag_core_to_sdk,
95    iota_system_state::{
96        IotaSystemState, IotaSystemStateTrait,
97        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
98    },
99    layout_resolver::{LayoutResolver, into_struct_layout},
100    message_envelope::Message,
101    messages_checkpoint::{
102        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
103        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
104        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
105        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
106    },
107    messages_consensus::AuthorityCapabilitiesV1,
108    messages_grpc::{
109        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
110        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
111        TransactionStatus,
112    },
113    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
114    move_authenticator::MoveAuthenticator,
115    object::{
116        MoveObject, MoveObjectExt, OBJECT_START_VERSION, Object, ObjectRead, PastObjectRead,
117        bounded_visitor::BoundedVisitor,
118    },
119    storage::{
120        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
121    },
122    supported_protocol_versions::{
123        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
124    },
125    traffic_control::{PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams},
126    transaction::*,
127    transaction_executor::{SimulateTransactionResult, VmChecks},
128};
129use itertools::Itertools;
130use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
131use move_core_types::{
132    account_address::AccountAddress, annotated_value::MoveStructLayout, language_storage::ModuleId,
133};
134use parking_lot::Mutex;
135use prometheus::{
136    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
137    register_histogram_vec_with_registry, register_histogram_with_registry,
138    register_int_counter_vec_with_registry, register_int_counter_with_registry,
139    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
140};
141use serde::{Deserialize, Serialize, de::DeserializeOwned};
142use tap::TapFallible;
143use tokio::{
144    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
145    task::JoinHandle,
146};
147use tracing::{debug, error, info, instrument, trace, warn};
148use typed_store::TypedStoreError;
149
150use self::{
151    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
152};
153#[cfg(msim)]
154pub use crate::checkpoints::checkpoint_executor::utils::{
155    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
156};
157use crate::{
158    authority::{
159        authority_per_epoch_store::{AuthorityPerEpochStore, TxGuard},
160        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
161        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
162        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
163        authority_store_tables::AuthorityPrunerTables,
164        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
165    },
166    authority_client::NetworkAuthorityClient,
167    checkpoint_progress_tracker::CheckpointProgressTracker,
168    checkpoints::CheckpointStore,
169    congestion_tracker::CongestionTracker,
170    consensus_adapter::ConsensusAdapter,
171    epoch::committee_store::CommitteeStore,
172    execution_cache::{
173        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
174        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
175        TransactionCacheRead,
176    },
177    execution_driver::execution_process,
178    global_state_hasher::{GlobalStateHashStore, GlobalStateHasher},
179    grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
180    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
181    metrics::{LatencyObserver, RateTracker},
182    module_cache_metrics::ResolverMetrics,
183    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
184    stake_aggregator::StakeAggregator,
185    subscription_handler::SubscriptionHandler,
186    traffic_controller::{TrafficController, metrics::TrafficControllerMetrics},
187    transaction_input_loader::TransactionInputLoader,
188    transaction_manager::TransactionManager,
189    transaction_outputs::TransactionOutputs,
190    validator_tx_finalizer::ValidatorTxFinalizer,
191    verify_indexes::verify_indexes,
192};
193
194#[cfg(test)]
195#[path = "unit_tests/authority_tests.rs"]
196pub mod authority_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/transaction_tests.rs"]
200pub mod transaction_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/batch_transaction_tests.rs"]
204mod batch_transaction_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/move_integration_tests.rs"]
208pub mod move_integration_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/gas_tests.rs"]
212mod gas_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/batch_verification_tests.rs"]
216mod batch_verification_tests;
217
218#[cfg(test)]
219#[path = "unit_tests/coin_deny_list_tests.rs"]
220mod coin_deny_list_tests;
221
222#[cfg(test)]
223#[path = "unit_tests/auth_unit_test_utils.rs"]
224pub mod auth_unit_test_utils;
225
226pub mod authority_test_utils;
227
228pub mod authority_per_epoch_store;
229pub mod authority_per_epoch_store_pruner;
230
231mod authority_store_migrations;
232pub mod authority_store_pruner;
233pub mod authority_store_tables;
234pub mod authority_store_types;
235pub mod epoch_start_configuration;
236pub mod shared_object_congestion_tracker;
237pub mod shared_object_version_manager;
238pub mod suggested_gas_price_calculator;
239pub mod test_authority_builder;
240pub mod transaction_deferral;
241
242pub(crate) mod authority_store;
243pub mod backpressure;
244pub(crate) mod dropped_tx_status_cache;
245
246/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
247pub struct AuthorityMetrics {
248    tx_orders: IntCounter,
249    total_certs: IntCounter,
250    total_cert_attempts: IntCounter,
251    total_effects: IntCounter,
252    pub shared_obj_tx: IntCounter,
253    sponsored_tx: IntCounter,
254    tx_already_processed: IntCounter,
255    num_input_objs: Histogram,
256    num_shared_objects: Histogram,
257    batch_size: Histogram,
258
259    authority_state_handle_transaction_latency: Histogram,
260
261    execute_certificate_latency_single_writer: Histogram,
262    execute_certificate_latency_shared_object: Histogram,
263
264    internal_execution_latency: Histogram,
265    execution_load_input_objects_latency: Histogram,
266    prepare_certificate_latency: Histogram,
267    commit_certificate_latency: Histogram,
268    db_checkpoint_latency: Histogram,
269
270    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
271    pub(crate) transaction_manager_num_missing_objects: IntGauge,
272    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
273    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
274    pub(crate) transaction_manager_num_ready: IntGauge,
275    pub(crate) transaction_manager_object_cache_size: IntGauge,
276    pub(crate) transaction_manager_object_cache_hits: IntCounter,
277    pub(crate) transaction_manager_object_cache_misses: IntCounter,
278    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
279    pub(crate) transaction_manager_package_cache_size: IntGauge,
280    pub(crate) transaction_manager_package_cache_hits: IntCounter,
281    pub(crate) transaction_manager_package_cache_misses: IntCounter,
282    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
283    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
284
285    pub(crate) execution_driver_executed_transactions: IntCounter,
286    pub(crate) execution_driver_dispatch_queue: IntGauge,
287    pub(crate) execution_queueing_delay_s: Histogram,
288    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
289    pub(crate) execution_gas_latency_ratio: Histogram,
290
291    pub(crate) skipped_consensus_txns: IntCounter,
292    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
293
294    pub(crate) authority_overload_status: IntGauge,
295    pub(crate) authority_load_shedding_percentage: IntGauge,
296
297    pub(crate) transaction_overload_sources: IntCounterVec,
298
299    /// Post processing metrics
300    post_processing_total_events_emitted: IntCounter,
301    post_processing_total_tx_indexed: IntCounter,
302    post_processing_total_tx_had_event_processed: IntCounter,
303    post_processing_total_failures: IntCounter,
304
305    /// Consensus handler metrics
306    pub consensus_handler_processed: IntCounterVec,
307    pub consensus_handler_transaction_sizes: HistogramVec,
308    pub consensus_handler_num_low_scoring_authorities: IntGauge,
309    pub consensus_handler_scores: IntGaugeVec,
310    pub consensus_handler_deferred_transactions: IntCounter,
311    pub consensus_handler_congested_transactions: IntCounter,
312    pub consensus_handler_cancelled_transactions: IntCounter,
313    pub consensus_handler_validation_dropped_transactions: IntCounter,
314    pub consensus_handler_max_object_costs: IntGaugeVec,
315    pub consensus_committed_subdags: IntCounterVec,
316    pub consensus_committed_messages: IntGaugeVec,
317    pub consensus_committed_user_transactions: IntGaugeVec,
318    pub consensus_handler_leader_round: IntGauge,
319    pub consensus_calculated_throughput: IntGauge,
320    pub consensus_calculated_throughput_profile: IntGauge,
321
322    pub validator_scoreboard_scores: IntGaugeVec,
323    pub invalid_misbehavior_reports_by_authority: IntGaugeVec,
324
325    pub limits_metrics: Arc<LimitsMetrics>,
326
327    /// bytecode verifier metrics for tracking timeouts
328    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
329
330    /// Count of multisig signatures
331    pub multisig_sig_count: IntCounter,
332
333    // Tracks recent average txn queueing delay between when it is ready for execution
334    // until it starts executing.
335    pub execution_queueing_latency: LatencyObserver,
336
337    // Tracks the rate of transactions become ready for execution in transaction manager.
338    // The need for the Mutex is that the tracker is updated in transaction manager and read
339    // in the overload_monitor. There should be low mutex contention because
340    // transaction manager is single threaded and the read rate in overload_monitor is
341    // low. In the case where transaction manager becomes multi-threaded, we can
342    // create one rate tracker per thread.
343    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
344
345    // Tracks the rate of transactions starts execution in execution driver.
346    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
347    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
348}
349
350// Override default Prom buckets for positive numbers in 0-10M range
351const POSITIVE_INT_BUCKETS: &[f64] = &[
352    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
353    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
354    7000000., 10000000.,
355];
356
357const LATENCY_SEC_BUCKETS: &[f64] = &[
358    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
359    10., 20., 30., 60., 90.,
360];
361
362// Buckets for low latency samples. Starts from 10us.
363const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
364    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
365    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
366];
367
368const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
369    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
370    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
371];
372
373/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
374pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
375
376impl AuthorityMetrics {
377    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
378        let execute_certificate_latency = register_histogram_vec_with_registry!(
379            "authority_state_execute_certificate_latency",
380            "Latency of executing certificates, including waiting for inputs",
381            &["tx_type"],
382            LATENCY_SEC_BUCKETS.to_vec(),
383            registry,
384        )
385        .unwrap();
386
387        let execute_certificate_latency_single_writer =
388            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
389        let execute_certificate_latency_shared_object =
390            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
391
392        Self {
393            tx_orders: register_int_counter_with_registry!(
394                "total_transaction_orders",
395                "Total number of transaction orders",
396                registry,
397            )
398                .unwrap(),
399            total_certs: register_int_counter_with_registry!(
400                "total_transaction_certificates",
401                "Total number of transaction certificates handled",
402                registry,
403            )
404                .unwrap(),
405            total_cert_attempts: register_int_counter_with_registry!(
406                "total_handle_certificate_attempts",
407                "Number of calls to handle_certificate",
408                registry,
409            )
410                .unwrap(),
411            // total_effects == total transactions finished
412            total_effects: register_int_counter_with_registry!(
413                "total_transaction_effects",
414                "Total number of transaction effects produced",
415                registry,
416            )
417                .unwrap(),
418
419            shared_obj_tx: register_int_counter_with_registry!(
420                "num_shared_obj_tx",
421                "Number of transactions involving shared objects",
422                registry,
423            )
424                .unwrap(),
425
426            sponsored_tx: register_int_counter_with_registry!(
427                "num_sponsored_tx",
428                "Number of sponsored transactions",
429                registry,
430            )
431                .unwrap(),
432
433            tx_already_processed: register_int_counter_with_registry!(
434                "num_tx_already_processed",
435                "Number of transaction orders already processed previously",
436                registry,
437            )
438                .unwrap(),
439            num_input_objs: register_histogram_with_registry!(
440                "num_input_objects",
441                "Distribution of number of input TX objects per TX",
442                POSITIVE_INT_BUCKETS.to_vec(),
443                registry,
444            )
445                .unwrap(),
446            num_shared_objects: register_histogram_with_registry!(
447                "num_shared_objects",
448                "Number of shared input objects per TX",
449                POSITIVE_INT_BUCKETS.to_vec(),
450                registry,
451            )
452                .unwrap(),
453            batch_size: register_histogram_with_registry!(
454                "batch_size",
455                "Distribution of size of transaction batch",
456                POSITIVE_INT_BUCKETS.to_vec(),
457                registry,
458            )
459                .unwrap(),
460            authority_state_handle_transaction_latency: register_histogram_with_registry!(
461                "authority_state_handle_transaction_latency",
462                "Latency of handling transactions",
463                LATENCY_SEC_BUCKETS.to_vec(),
464                registry,
465            )
466                .unwrap(),
467            execute_certificate_latency_single_writer,
468            execute_certificate_latency_shared_object,
469            internal_execution_latency: register_histogram_with_registry!(
470                "authority_state_internal_execution_latency",
471                "Latency of actual certificate executions",
472                LATENCY_SEC_BUCKETS.to_vec(),
473                registry,
474            )
475                .unwrap(),
476            execution_load_input_objects_latency: register_histogram_with_registry!(
477                "authority_state_execution_load_input_objects_latency",
478                "Latency of loading input objects for execution",
479                LOW_LATENCY_SEC_BUCKETS.to_vec(),
480                registry,
481            )
482                .unwrap(),
483            prepare_certificate_latency: register_histogram_with_registry!(
484                "authority_state_prepare_certificate_latency",
485                "Latency of executing certificates, before committing the results",
486                LATENCY_SEC_BUCKETS.to_vec(),
487                registry,
488            )
489                .unwrap(),
490            commit_certificate_latency: register_histogram_with_registry!(
491                "authority_state_commit_certificate_latency",
492                "Latency of committing certificate execution results",
493                LATENCY_SEC_BUCKETS.to_vec(),
494                registry,
495            )
496                .unwrap(),
497            db_checkpoint_latency: register_histogram_with_registry!(
498                "db_checkpoint_latency",
499                "Latency of checkpointing dbs",
500                LATENCY_SEC_BUCKETS.to_vec(),
501                registry,
502            ).unwrap(),
503            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
504                "transaction_manager_num_enqueued_certificates",
505                "Current number of certificates enqueued to TransactionManager",
506                &["result"],
507                registry,
508            )
509                .unwrap(),
510            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
511                "transaction_manager_num_missing_objects",
512                "Current number of missing objects in TransactionManager",
513                registry,
514            )
515                .unwrap(),
516            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
517                "transaction_manager_num_pending_certificates",
518                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
519                registry,
520            )
521                .unwrap(),
522            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
523                "transaction_manager_num_executing_certificates",
524                "Number of executing certificates, including queued and actually running certificates",
525                registry,
526            )
527                .unwrap(),
528            transaction_manager_num_ready: register_int_gauge_with_registry!(
529                "transaction_manager_num_ready",
530                "Number of ready transactions in TransactionManager",
531                registry,
532            )
533                .unwrap(),
534            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
535                "transaction_manager_object_cache_size",
536                "Current size of object-availability cache in TransactionManager",
537                registry,
538            )
539                .unwrap(),
540            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
541                "transaction_manager_object_cache_hits",
542                "Number of object-availability cache hits in TransactionManager",
543                registry,
544            )
545                .unwrap(),
546            authority_overload_status: register_int_gauge_with_registry!(
547                "authority_overload_status",
548                "Whether authority is current experiencing overload and enters load shedding mode.",
549                registry)
550                .unwrap(),
551            authority_load_shedding_percentage: register_int_gauge_with_registry!(
552                "authority_load_shedding_percentage",
553                "The percentage of transactions is shed when the authority is in load shedding mode.",
554                registry)
555                .unwrap(),
556            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
557                "transaction_manager_object_cache_misses",
558                "Number of object-availability cache misses in TransactionManager",
559                registry,
560            )
561                .unwrap(),
562            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
563                "transaction_manager_object_cache_evictions",
564                "Number of object-availability cache evictions in TransactionManager",
565                registry,
566            )
567                .unwrap(),
568            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
569                "transaction_manager_package_cache_size",
570                "Current size of package-availability cache in TransactionManager",
571                registry,
572            )
573                .unwrap(),
574            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
575                "transaction_manager_package_cache_hits",
576                "Number of package-availability cache hits in TransactionManager",
577                registry,
578            )
579                .unwrap(),
580            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
581                "transaction_manager_package_cache_misses",
582                "Number of package-availability cache misses in TransactionManager",
583                registry,
584            )
585                .unwrap(),
586            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
587                "transaction_manager_package_cache_evictions",
588                "Number of package-availability cache evictions in TransactionManager",
589                registry,
590            )
591                .unwrap(),
592            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
593                "transaction_manager_transaction_queue_age_s",
594                "Time spent in waiting for transaction in the queue",
595                LATENCY_SEC_BUCKETS.to_vec(),
596                registry,
597            )
598                .unwrap(),
599            transaction_overload_sources: register_int_counter_vec_with_registry!(
600                "transaction_overload_sources",
601                "Number of times each source indicates transaction overload.",
602                &["source"],
603                registry)
604                .unwrap(),
605            execution_driver_executed_transactions: register_int_counter_with_registry!(
606                "execution_driver_executed_transactions",
607                "Cumulative number of transaction executed by execution driver",
608                registry,
609            )
610                .unwrap(),
611            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
612                "execution_driver_dispatch_queue",
613                "Number of transaction pending in execution driver dispatch queue",
614                registry,
615            )
616                .unwrap(),
617            execution_queueing_delay_s: register_histogram_with_registry!(
618                "execution_queueing_delay_s",
619                "Queueing delay between a transaction is ready for execution until it starts executing.",
620                LATENCY_SEC_BUCKETS.to_vec(),
621                registry
622            )
623                .unwrap(),
624            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
625                "prepare_cert_gas_latency_ratio",
626                "The ratio of computation gas divided by VM execution latency.",
627                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
628                registry
629            )
630                .unwrap(),
631            execution_gas_latency_ratio: register_histogram_with_registry!(
632                "execution_gas_latency_ratio",
633                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
634                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
635                registry
636            )
637                .unwrap(),
638            skipped_consensus_txns: register_int_counter_with_registry!(
639                "skipped_consensus_txns",
640                "Total number of consensus transactions skipped",
641                registry,
642            )
643                .unwrap(),
644            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
645                "skipped_consensus_txns_cache_hit",
646                "Total number of consensus transactions skipped because of local cache hit",
647                registry,
648            )
649                .unwrap(),
650            post_processing_total_events_emitted: register_int_counter_with_registry!(
651                "post_processing_total_events_emitted",
652                "Total number of events emitted in post processing",
653                registry,
654            )
655                .unwrap(),
656            post_processing_total_tx_indexed: register_int_counter_with_registry!(
657                "post_processing_total_tx_indexed",
658                "Total number of txes indexed in post processing",
659                registry,
660            )
661                .unwrap(),
662            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
663                "post_processing_total_tx_had_event_processed",
664                "Total number of txes finished event processing in post processing",
665                registry,
666            )
667                .unwrap(),
668            post_processing_total_failures: register_int_counter_with_registry!(
669                "post_processing_total_failures",
670                "Total number of failure in post processing",
671                registry,
672            )
673                .unwrap(),
674            consensus_handler_processed: register_int_counter_vec_with_registry!(
675                "consensus_handler_processed",
676                "Number of transactions processed by consensus handler",
677                &["class"],
678                registry
679            ).unwrap(),
680            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
681                "consensus_handler_transaction_sizes",
682                "Sizes of each type of transactions processed by consensus handler",
683                &["class"],
684                POSITIVE_INT_BUCKETS.to_vec(),
685                registry
686            ).unwrap(),
687            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
688                "consensus_handler_num_low_scoring_authorities",
689                "Number of low scoring authorities based on reputation scores from consensus",
690                registry
691            ).unwrap(),
692            consensus_handler_scores: register_int_gauge_vec_with_registry!(
693                "consensus_handler_scores",
694                "scores from consensus for each authority",
695                &["authority"],
696                registry,
697            ).unwrap(),
698            validator_scoreboard_scores: register_int_gauge_vec_with_registry!(
699                "validator_scoreboard_scores",
700                "Per-authority validator scores published by the local Scoreboard after each consensus commit. Range [0, MAX_SCORE].",
701                &["authority"],
702                registry,
703            ).unwrap(),
704            invalid_misbehavior_reports_by_authority: register_int_gauge_vec_with_registry!(
705                "invalid_misbehavior_reports_by_authority",
706                "Cumulative count of invalid misbehavior reports received from each reporting authority in the current epoch. Bumped when a `MisbehaviorReport` consensus transaction fails sender/authority match or payload validation. Snapshot republished after each consensus commit.",
707                &["authority"],
708                registry,
709            ).unwrap(),
710            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
711                "consensus_handler_deferred_transactions",
712                "Number of transactions deferred by consensus handler",
713                registry,
714            ).unwrap(),
715            consensus_handler_congested_transactions: register_int_counter_with_registry!(
716                "consensus_handler_congested_transactions",
717                "Number of transactions deferred by consensus handler due to congestion",
718                registry,
719            ).unwrap(),
720            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
721                "consensus_handler_cancelled_transactions",
722                "Number of transactions cancelled by consensus handler",
723                registry,
724            ).unwrap(),
725            consensus_handler_validation_dropped_transactions: register_int_counter_with_registry!(
726                "consensus_handler_validation_dropped_transactions",
727                "Number of UserTransactionV1 transactions dropped by post-consensus validation",
728                registry,
729            ).unwrap(),
730            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
731                "consensus_handler_max_congestion_control_object_costs",
732                "Max object costs for congestion control in the current consensus commit",
733                &["commit_type"],
734                registry,
735            ).unwrap(),
736            consensus_committed_subdags: register_int_counter_vec_with_registry!(
737                "consensus_committed_subdags",
738                "Number of committed subdags, sliced by leader",
739                &["authority"],
740                registry,
741            ).unwrap(),
742            consensus_committed_messages: register_int_gauge_vec_with_registry!(
743                "consensus_committed_messages",
744                "Total number of committed consensus messages, sliced by author",
745                &["authority"],
746                registry,
747            ).unwrap(),
748            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
749                "consensus_committed_user_transactions",
750                "Number of committed user transactions, sliced by submitter",
751                &["authority"],
752                registry,
753            ).unwrap(),
754            consensus_handler_leader_round: register_int_gauge_with_registry!(
755                "consensus_handler_leader_round",
756                "The leader round of the current consensus output being processed in the consensus handler",
757                registry,
758            ).unwrap(),
759            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
760            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
761            multisig_sig_count: register_int_counter_with_registry!(
762                "multisig_sig_count",
763                "Count of multisig signatures",
764                registry,
765            )
766                .unwrap(),
767            consensus_calculated_throughput: register_int_gauge_with_registry!(
768                "consensus_calculated_throughput",
769                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
770                registry,
771            ).unwrap(),
772            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
773                "consensus_calculated_throughput_profile",
774                "The current active calculated throughput profile",
775                registry
776            ).unwrap(),
777            execution_queueing_latency: LatencyObserver::new(),
778            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
779            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
780        }
781    }
782
783    /// Reset metrics that contain `hostname` as one of the labels. This is
784    /// needed to avoid retaining metrics for long-gone committee members and
785    /// only exposing metrics for the committee in the current epoch.
786    pub fn reset_on_reconfigure(&self) {
787        self.consensus_committed_messages.reset();
788        self.consensus_handler_scores.reset();
789        self.validator_scoreboard_scores.reset();
790        self.invalid_misbehavior_reports_by_authority.reset();
791        self.consensus_committed_user_transactions.reset();
792    }
793}
794
795/// a Trait object for `Signer` that is:
796/// - Pin, i.e. confined to one place in memory (we don't want to copy private
797///   keys).
798/// - Sync, i.e. can be safely shared between threads.
799///
800/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
801pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
802
803pub struct AuthorityState {
804    // Fixed size, static, identity of the authority
805    /// The name of this authority.
806    pub name: AuthorityName,
807    /// The signature key of the authority.
808    pub secret: StableSyncAuthoritySigner,
809
810    /// The database
811    input_loader: TransactionInputLoader,
812    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
813
814    epoch_store: ArcSwap<AuthorityPerEpochStore>,
815
816    /// This lock denotes current 'execution epoch'.
817    /// Execution acquires read lock, checks transaction epoch and holds it
818    /// until all writes are complete. Reconfiguration acquires write lock,
819    /// changes the epoch and revert all transactions from previous epoch
820    /// that are executed but did not make into checkpoint.
821    execution_lock: RwLock<EpochId>,
822
823    pub indexes: Option<Arc<IndexStore>>,
824    pub grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
825
826    pub subscription_handler: Arc<SubscriptionHandler>,
827    pub checkpoint_store: Arc<CheckpointStore>,
828
829    committee_store: Arc<CommitteeStore>,
830
831    /// Manages pending transactions and their missing input objects.
832    transaction_manager: Arc<TransactionManager>,
833
834    /// Shuts down the execution task. Used only in testing.
835    #[cfg_attr(not(test), expect(unused))]
836    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
837
838    pub metrics: Arc<AuthorityMetrics>,
839    _pruner: AuthorityStorePruner,
840    authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
841    checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
842
843    /// Take db checkpoints of different dbs
844    db_checkpoint_config: DBCheckpointConfig,
845
846    pub config: NodeConfig,
847
848    /// Current overload status in this authority. Updated periodically.
849    pub overload_info: AuthorityOverloadInfo,
850
851    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
852
853    /// The chain identifier is derived from the digest of the genesis
854    /// checkpoint.
855    chain_identifier: ChainIdentifier,
856
857    pub(crate) congestion_tracker: Arc<CongestionTracker>,
858
859    /// Traffic controller for IOTA core servers (json-rpc, validator service)
860    pub traffic_controller: Option<Arc<TrafficController>>,
861}
862
863/// The authority state encapsulates all state, drives execution, and ensures
864/// safety.
865///
866/// Note the authority operations can be accessed through a read ref (&) and do
867/// not require &mut. Internally a database is synchronized through a mutex
868/// lock.
869///
870/// Repeating valid commands should produce no changes and return no error.
871impl AuthorityState {
872    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
873        epoch_store.committee().authority_exists(&self.name)
874    }
875
876    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
877        epoch_store
878            .active_validators()
879            .iter()
880            .any(|a| AuthorityName::from(a) == self.name)
881    }
882
883    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
884        !self.is_committee_validator(epoch_store)
885    }
886
887    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
888        &self.committee_store
889    }
890
891    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
892        self.committee_store.clone()
893    }
894
895    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
896        &self.config.authority_overload_config
897    }
898
899    pub fn get_epoch_state_commitments(
900        &self,
901        epoch: EpochId,
902    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
903        self.checkpoint_store.get_epoch_state_commitments(epoch)
904    }
905
906    /// Runs deny list, input object validation, gas checks, coin deny list, and
907    /// MoveAuthenticator checks. Returns the owned object refs for optional
908    /// version validation. Does NOT acquire locks or sign the transaction.
909    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
910    pub(crate) async fn handle_transaction_validation_checks(
911        &self,
912        transaction: &VerifiedTransaction,
913        epoch_store: &Arc<AuthorityPerEpochStore>,
914    ) -> IotaResult<Vec<ObjectRef>> {
915        let protocol_config = epoch_store.protocol_config();
916        let reference_gas_price = epoch_store.reference_gas_price();
917
918        let epoch = epoch_store.epoch();
919
920        let tx_data = transaction.data().transaction_data();
921
922        // Note: the deny checks may do redundant package loads but:
923        // - they only load packages when there is an active package deny map
924        // - the loads are cached anyway
925        iota_transaction_checks::deny::check_transaction_for_validation(
926            tx_data,
927            transaction.tx_signatures(),
928            &transaction.input_objects()?,
929            &tx_data.receiving_objects(),
930            &self.config.transaction_deny_config,
931            self.get_backing_package_store().as_ref(),
932        )?;
933
934        // Load all transaction-related input objects including ones for every
935        // `MoveAuthenticator`. Loading all objects eagerly means that any invalid
936        // reference — missing object, wrong version, inaccessible object — causes a
937        // pre-consensus rejection.
938        let (tx_input_objects, tx_receiving_objects, per_authenticator_inputs) =
939            self.read_objects_for_validation(transaction, epoch)?;
940
941        let move_authenticators = transaction.move_authenticators();
942
943        // Check the inputs for signing.
944        // If there are `MoveAuthenticator` signatures, their input objects and the
945        // account objects are also checked and must be provided.
946        // It is also checked if there is enough gas to execute the transaction and its
947        // authenticators.
948        let (gas_status, tx_checked_input_objects, per_authenticator_checked_inputs) = self
949            .check_transaction_inputs_for_validation(
950                protocol_config,
951                reference_gas_price,
952                tx_data,
953                tx_input_objects,
954                &tx_receiving_objects,
955                &move_authenticators,
956                per_authenticator_inputs,
957            )?;
958
959        // Get the input objects for the authenticators, if there are
960        // `MoveAuthenticator`s.
961        let per_authenticator_checked_input_objects = per_authenticator_checked_inputs
962            .iter()
963            .map(|i| &i.0)
964            .collect();
965
966        // Check if any of the sender, the transaction input objects, the receiving
967        // objects and the authenticator input objects are in the coin deny
968        // list, which would prevent the transaction from being signed.
969        check_coin_deny_list_v1(
970            tx_data.sender(),
971            &tx_checked_input_objects,
972            &tx_receiving_objects,
973            &per_authenticator_checked_input_objects,
974            &self.get_object_store(),
975        )?;
976
977        let (kind, signer, gas_data) = tx_data.execution_parts();
978
979        let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
980            extract_auth_fun_refs(signer, gas_data.owner, |address| {
981                move_authenticators
982                    .iter()
983                    .zip(per_authenticator_checked_inputs.iter())
984                    .find(|(move_authenticator, _)| {
985                        move_authenticator.address().ok().as_ref() == Some(&address)
986                    })
987                    .map(|(_, (_, auth_fun_ref))| auth_fun_ref.clone())
988            });
989
990        // Filter the authenticators and their checked inputs down to those that must
991        // be executed pre-consensus. This is done *after* the deny-list check so
992        // that all MoveAuthenticator input objects are covered by that check regardless
993        // of deferral.
994        let pre_consensus_move_authenticators =
995            pre_consensus_move_authenticators(transaction, protocol_config);
996        let (move_authenticators, per_authenticator_checked_inputs): (Vec<_>, Vec<_>) =
997            move_authenticators
998                .into_iter()
999                .zip(per_authenticator_checked_inputs)
1000                .filter(|(a, _)| pre_consensus_move_authenticators.contains(a))
1001                .unzip();
1002        let per_authenticator_checked_input_objects: Vec<_> = per_authenticator_checked_inputs
1003            .iter()
1004            .map(|i| &i.0)
1005            .collect();
1006
1007        // If there are `MoveAuthenticator` signatures, execute them and check if they
1008        // all succeed.
1009        if !move_authenticators.is_empty() {
1010            let aggregated_authenticator_input_objects =
1011                iota_transaction_checks::aggregate_authenticator_input_objects(
1012                    &per_authenticator_checked_input_objects,
1013                )?;
1014
1015            debug_assert_eq!(
1016                move_authenticators.len(),
1017                per_authenticator_checked_inputs.len(),
1018                "Move authenticators amount must match the number of checked authenticator inputs"
1019            );
1020
1021            let move_authenticators = move_authenticators
1022                .into_iter()
1023                .zip(per_authenticator_checked_inputs)
1024                .map(
1025                    |(
1026                        move_authenticator,
1027                        (authenticator_checked_input_objects, authenticator_function_ref),
1028                    )| {
1029                        (
1030                            move_authenticator.to_owned(),
1031                            authenticator_function_ref,
1032                            authenticator_checked_input_objects,
1033                        )
1034                    },
1035                )
1036                .collect();
1037
1038            // It is supposed that `MoveAuthenticator` availability is checked in
1039            // `SenderSignedData::validity_check`.
1040
1041            // Serialize the TransactionData for the auth context before decomposing.
1042            let tx_data_bytes =
1043                bcs::to_bytes(&tx_data).expect("TransactionData serialization cannot fail");
1044
1045            let (sender_auth_digest, sponsor_auth_digest) =
1046                transaction.data().compute_auth_digests()?;
1047
1048            let auth_context_data = AuthContextData {
1049                transaction_data_bytes: tx_data_bytes,
1050                sender_auth_digest,
1051                sponsor_auth_digest,
1052                sender_authenticator_function_ref,
1053                sponsor_authenticator_function_ref,
1054            };
1055
1056            // Execute the Move authenticators.
1057            let validation_result = epoch_store.executor().authenticate_transaction(
1058                self.get_backing_store().as_ref(),
1059                protocol_config,
1060                self.metrics.limits_metrics.clone(),
1061                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1062                epoch_store
1063                    .epoch_start_config()
1064                    .epoch_data()
1065                    .epoch_start_timestamp(),
1066                gas_data,
1067                gas_status,
1068                move_authenticators,
1069                aggregated_authenticator_input_objects,
1070                kind,
1071                signer,
1072                transaction.digest().to_owned(),
1073                auth_context_data,
1074                &mut None,
1075            );
1076
1077            if let Err(validation_error) = validation_result {
1078                return Err(IotaError::MoveAuthenticatorExecutionFailure {
1079                    error: validation_error.to_string(),
1080                });
1081            }
1082        }
1083
1084        Ok(tx_checked_input_objects.inner().filter_owned_objects())
1085    }
1086
1087    /// This is a private method and should be kept that way. It doesn't check
1088    /// whether the provided transaction is a system transaction, and hence
1089    /// can only be called internally.
1090    async fn handle_transaction_impl(
1091        &self,
1092        transaction: VerifiedTransaction,
1093        epoch_store: &Arc<AuthorityPerEpochStore>,
1094    ) -> IotaResult<VerifiedSignedTransaction> {
1095        // Ensure that validator cannot reconfigure while we are signing the tx
1096        let _execution_lock = self.execution_lock_for_signing()?;
1097
1098        let owned_objects = self
1099            .handle_transaction_validation_checks(&transaction, epoch_store)
1100            .await?;
1101
1102        let epoch = epoch_store.epoch();
1103        let signed_transaction =
1104            VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
1105
1106        // Check and write locks, to signed transaction, into the database
1107        // The call to self.set_transaction_lock checks the lock is not conflicting,
1108        // and returns ConflictingTransaction error in case there is a lock on a
1109        // different existing transaction.
1110        self.get_cache_writer().try_acquire_transaction_locks(
1111            epoch_store,
1112            &owned_objects,
1113            signed_transaction.clone(),
1114        )?;
1115
1116        Ok(signed_transaction)
1117    }
1118
1119    /// Initiate a new transaction.
1120    #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()
1121    ))]
1122    pub async fn handle_transaction(
1123        &self,
1124        epoch_store: &Arc<AuthorityPerEpochStore>,
1125        transaction: VerifiedTransaction,
1126    ) -> IotaResult<HandleTransactionResponse> {
1127        let tx_digest = *transaction.digest();
1128        debug!("handle_transaction");
1129
1130        // Ensure an idempotent answer.
1131        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1132            return Ok(HandleTransactionResponse { status });
1133        }
1134
1135        let _metrics_guard = self
1136            .metrics
1137            .authority_state_handle_transaction_latency
1138            .start_timer();
1139        self.metrics.tx_orders.inc();
1140
1141        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1142        match signed {
1143            Ok(s) => {
1144                if self.is_committee_validator(epoch_store) {
1145                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1146                        let tx = s.clone();
1147                        let validator_tx_finalizer = validator_tx_finalizer.clone();
1148                        let cache_reader = self.get_transaction_cache_reader().clone();
1149                        let epoch_store = epoch_store.clone();
1150                        spawn_monitored_task!(epoch_store.within_alive_epoch(
1151                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1152                        ));
1153                    }
1154                }
1155                Ok(HandleTransactionResponse {
1156                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
1157                })
1158            }
1159            // It happens frequently that while we are checking the validity of the transaction, it
1160            // has just been executed.
1161            // In that case, we could still return Ok to avoid showing confusing errors.
1162            Err(err) => Ok(HandleTransactionResponse {
1163                status: self
1164                    .get_transaction_status(&tx_digest, epoch_store)?
1165                    .ok_or(err)?
1166                    .1,
1167            }),
1168        }
1169    }
1170
1171    pub fn check_system_overload_at_signing(&self) -> bool {
1172        self.config
1173            .authority_overload_config
1174            .check_system_overload_at_signing
1175    }
1176
1177    pub fn check_system_overload_at_execution(&self) -> bool {
1178        self.config
1179            .authority_overload_config
1180            .check_system_overload_at_execution
1181    }
1182
1183    pub(crate) fn check_system_overload(
1184        &self,
1185        consensus_adapter: &Arc<ConsensusAdapter>,
1186        tx_data: &SenderSignedData,
1187        do_authority_overload_check: bool,
1188    ) -> IotaResult {
1189        if do_authority_overload_check {
1190            self.check_authority_overload(tx_data).tap_err(|_| {
1191                self.update_overload_metrics("execution_queue");
1192            })?;
1193        }
1194        self.transaction_manager
1195            .check_execution_overload(self.overload_config(), tx_data)
1196            .tap_err(|_| {
1197                self.update_overload_metrics("execution_pending");
1198            })?;
1199        consensus_adapter.check_consensus_overload().tap_err(|_| {
1200            self.update_overload_metrics("consensus");
1201        })?;
1202
1203        let pending_tx_count = self
1204            .get_cache_commit()
1205            .approximate_pending_transaction_count();
1206        if pending_tx_count
1207            > self
1208                .config
1209                .execution_cache_config
1210                .writeback_cache
1211                .backpressure_threshold_for_rpc()
1212        {
1213            return Err(IotaError::ValidatorOverloadedRetryAfter {
1214                retry_after_secs: 10,
1215            });
1216        }
1217
1218        Ok(())
1219    }
1220
1221    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1222        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1223            return Ok(());
1224        }
1225
1226        let load_shedding_percentage = self
1227            .overload_info
1228            .load_shedding_percentage
1229            .load(Ordering::Relaxed);
1230        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1231    }
1232
1233    fn update_overload_metrics(&self, source: &str) {
1234        self.metrics
1235            .transaction_overload_sources
1236            .with_label_values(&[source])
1237            .inc();
1238    }
1239
1240    /// Wait for a certificate to be executed.
1241    /// For consensus transactions, it needs to be sequenced by the consensus.
1242    /// For owned object transactions, this function will enqueue the
1243    /// transaction for execution.
1244    #[instrument(level = "trace", skip_all)]
1245    pub async fn wait_for_certificate_execution(
1246        &self,
1247        certificate: &VerifiedCertificate,
1248        epoch_store: &Arc<AuthorityPerEpochStore>,
1249    ) -> IotaResult<TransactionEffects> {
1250        let _metrics_guard = if certificate.contains_shared_object() {
1251            self.metrics
1252                .execute_certificate_latency_shared_object
1253                .start_timer()
1254        } else {
1255            self.metrics
1256                .execute_certificate_latency_single_writer
1257                .start_timer()
1258        };
1259        trace!("wait_for_certificate_execution");
1260
1261        self.metrics.total_cert_attempts.inc();
1262
1263        if !certificate.contains_shared_object() {
1264            // Shared object transactions need to be sequenced by the consensus before
1265            // enqueueing for execution, done in
1266            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1267            // object transactions, they can be enqueued for execution immediately.
1268            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1269        }
1270
1271        // tx could be reverted when epoch ends, so we must be careful not to return a
1272        // result here after the epoch ends.
1273        epoch_store
1274            .within_alive_epoch(self.notify_read_effects(certificate))
1275            .await
1276            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1277            .and_then(|r| r)
1278    }
1279
1280    /// Internal logic to execute a transaction.
1281    ///
1282    /// Guarantees that
1283    /// - If input objects are available, return no permanent failure.
1284    /// - Execution and output commit are atomic. i.e. outputs are only written
1285    ///   to storage,
1286    /// on successful execution; crashed execution has no observable effect and
1287    /// can be retried.
1288    ///
1289    /// It is caller's responsibility to ensure input objects are available and
1290    /// locks are set. If this cannot be satisfied by the caller,
1291    /// `wait_for_certificate_execution()` should be called instead.
1292    ///
1293    /// Should only be called within iota-core.
1294    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1295    pub fn try_execute_immediately(
1296        &self,
1297        transaction: &VerifiedExecutableTransaction,
1298        expected_effects_digest: Option<TransactionEffectsDigest>,
1299        epoch_store: &Arc<AuthorityPerEpochStore>,
1300    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1301        let _scope = monitored_scope("Execution::try_execute_immediately");
1302        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1303
1304        let tx_digest = transaction.digest();
1305
1306        // Acquire a lock to prevent concurrent executions of the same transaction.
1307        let tx_guard = epoch_store.acquire_tx_guard(transaction)?;
1308
1309        // The transaction could have been processed by a concurrent attempt of the
1310        // same transaction, so check if the effects have already been written.
1311        if let Some(effects) = self
1312            .get_transaction_cache_reader()
1313            .try_get_executed_effects(tx_digest)?
1314        {
1315            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1316                assert_eq!(
1317                    effects.digest(),
1318                    expected_effects_digest_inner,
1319                    "Unexpected effects digest for transaction {tx_digest}"
1320                );
1321            }
1322            tx_guard.release();
1323            return Ok((effects, None));
1324        }
1325
1326        let (tx_input_objects, per_authenticator_inputs) =
1327            self.read_objects_for_execution(tx_guard.as_lock_guard(), transaction, epoch_store)?;
1328
1329        // If no expected_effects_digest was provided, try to get it from storage.
1330        // We could be re-executing a previously executed but uncommitted transaction,
1331        // perhaps after restarting with a new binary. In this situation, if
1332        // we have published an effects signature, we must be sure not to
1333        // equivocate.
1334        let expected_effects_digest =
1335            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1336
1337        self.process_transaction(
1338            tx_guard,
1339            transaction,
1340            tx_input_objects,
1341            per_authenticator_inputs,
1342            expected_effects_digest,
1343            epoch_store,
1344        )
1345        .tap_err(|e| info!(?tx_digest, "process_transaction failed: {e}"))
1346        .tap_ok(
1347            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_transaction succeeded"),
1348        )
1349    }
1350
1351    pub fn read_objects_for_execution(
1352        &self,
1353        tx_lock: &TxLockGuard,
1354        transaction: &VerifiedExecutableTransaction,
1355        epoch_store: &Arc<AuthorityPerEpochStore>,
1356    ) -> IotaResult<(InputObjects, Vec<(InputObjects, ObjectReadResult)>)> {
1357        let _scope = monitored_scope("Execution::load_input_objects");
1358        let _metrics_guard = self
1359            .metrics
1360            .execution_load_input_objects_latency
1361            .start_timer();
1362
1363        let input_objects = transaction.collect_all_input_object_kind_for_reading()?;
1364
1365        let input_objects = self.input_loader.read_objects_for_execution(
1366            epoch_store,
1367            &transaction.key(),
1368            tx_lock,
1369            &input_objects,
1370            epoch_store.epoch(),
1371        )?;
1372
1373        transaction.split_input_objects_into_groups_for_reading(input_objects)
1374    }
1375
1376    /// Test only wrapper for `try_execute_immediately()` above, useful for
1377    /// checking errors if the pre-conditions are not satisfied, and
1378    /// executing change epoch transactions.
1379    pub fn try_execute_for_test(
1380        &self,
1381        certificate: &VerifiedCertificate,
1382    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1383        let epoch_store = self.epoch_store_for_testing();
1384        let (effects, execution_error_opt) = self.try_execute_immediately(
1385            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1386            None,
1387            &epoch_store,
1388        )?;
1389        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1390        Ok((signed_effects, execution_error_opt))
1391    }
1392
1393    /// Non-fallible version of `try_execute_for_test()`.
1394    pub fn execute_for_test(
1395        &self,
1396        certificate: &VerifiedCertificate,
1397    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1398        self.try_execute_for_test(certificate)
1399            .expect("try_execute_for_test should not fail")
1400    }
1401
1402    pub async fn notify_read_effects(
1403        &self,
1404        certificate: &VerifiedCertificate,
1405    ) -> IotaResult<TransactionEffects> {
1406        self.get_transaction_cache_reader()
1407            .try_notify_read_executed_effects(&[*certificate.digest()])
1408            .await
1409            .map(|mut r| r.pop().expect("must return correct number of effects"))
1410    }
1411
1412    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1413        self.get_object_cache_reader()
1414            .try_check_owned_objects_are_live(owned_object_refs)
1415    }
1416
1417    /// This function captures the required state to debug a forked transaction.
1418    /// The dump is written to a file in dir `path`, with name prefixed by the
1419    /// transaction digest. NOTE: Since this info escapes the validator
1420    /// context, make sure not to leak any private info here
1421    pub(crate) fn debug_dump_transaction_state(
1422        &self,
1423        tx_digest: &TransactionDigest,
1424        effects: &TransactionEffects,
1425        expected_effects_digest: TransactionEffectsDigest,
1426        inner_temporary_store: &InnerTemporaryStore,
1427        transaction: &VerifiedExecutableTransaction,
1428        debug_dump_config: &StateDebugDumpConfig,
1429    ) -> IotaResult<PathBuf> {
1430        // Fall back to the OS temp directory if no dump directory is configured.
1431        // This is safe: dump files are named by transaction digest, so no collisions.
1432        let dump_dir = debug_dump_config
1433            .dump_file_directory
1434            .as_ref()
1435            .cloned()
1436            .unwrap_or(std::env::temp_dir());
1437        let epoch_store = self.load_epoch_store_one_call_per_task();
1438
1439        NodeStateDump::new(
1440            tx_digest,
1441            effects,
1442            expected_effects_digest,
1443            self.get_object_store().as_ref(),
1444            &epoch_store,
1445            inner_temporary_store,
1446            transaction,
1447        )?
1448        .write_to_file(&dump_dir)
1449        .map_err(|e| IotaError::FileIO(e.to_string()))
1450    }
1451
1452    #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = ?transaction.data().transaction_data().gas_owner().to_string()))]
1453    pub(crate) fn process_transaction(
1454        &self,
1455        tx_guard: TxGuard,
1456        transaction: &VerifiedExecutableTransaction,
1457        tx_input_objects: InputObjects,
1458        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1459        expected_effects_digest: Option<TransactionEffectsDigest>,
1460        epoch_store: &Arc<AuthorityPerEpochStore>,
1461    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1462        let process_transaction_start_time = tokio::time::Instant::now();
1463        let digest = *transaction.digest();
1464
1465        let _scope = monitored_scope("Execution::process_certificate");
1466
1467        fail_point_if!("correlated-crash-process-transaction", || {
1468            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1469                iota_simulator::task::kill_current_node(None);
1470            }
1471        });
1472
1473        let execution_guard = self.execution_lock_for_executable_transaction(transaction);
1474        // Any caller that verifies the signatures on the transaction will have already
1475        // checked the epoch. But paths that don't verify sigs (e.g. execution
1476        // from checkpoint, reading from db) present the possibility of an epoch
1477        // mismatch. If this transaction is not finalized in previous epoch, then it's
1478        // invalid.
1479        let execution_guard = match execution_guard {
1480            Ok(execution_guard) => execution_guard,
1481            Err(err) => {
1482                tx_guard.release();
1483                return Err(err);
1484            }
1485        };
1486        // Since we obtain a reference to the epoch store before taking the execution
1487        // lock, it's possible that reconfiguration has happened and they no
1488        // longer match.
1489        if *execution_guard != epoch_store.epoch() {
1490            tx_guard.release();
1491            info!("The epoch of the execution_guard doesn't match the epoch store");
1492            return Err(IotaError::WrongEpoch {
1493                expected_epoch: epoch_store.epoch(),
1494                actual_epoch: *execution_guard,
1495            });
1496        }
1497
1498        // Errors originating from `execute_transaction` may be transient (failure to
1499        // read locks) or non-transient (transaction input is invalid, move vm
1500        // errors). However, all errors from this function occur before we have
1501        // written anything to the db, so we commit the tx guard and rely on the
1502        // client to retry the tx (if it was transient).
1503        let (inner_temporary_store, effects, execution_error_opt) = match self.execute_transaction(
1504            &execution_guard,
1505            transaction,
1506            tx_input_objects,
1507            per_authenticator_inputs,
1508            epoch_store,
1509        ) {
1510            Err(e) => {
1511                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1512                tx_guard.release();
1513                return Err(e);
1514            }
1515            Ok(res) => res,
1516        };
1517
1518        if let Some(expected_effects_digest) = expected_effects_digest {
1519            if effects.digest() != expected_effects_digest {
1520                // We dont want to mask the original error, so we log it and continue.
1521                match self.debug_dump_transaction_state(
1522                    &digest,
1523                    &effects,
1524                    expected_effects_digest,
1525                    &inner_temporary_store,
1526                    transaction,
1527                    &self.config.state_debug_dump_config,
1528                ) {
1529                    Ok(out_path) => {
1530                        info!(
1531                            "Dumped node state for transaction {} to {}",
1532                            digest,
1533                            out_path.as_path().display().to_string()
1534                        );
1535                    }
1536                    Err(e) => {
1537                        error!("Error dumping state for transaction {}: {e}", digest);
1538                    }
1539                }
1540                error!(
1541                    tx_digest = ?digest,
1542                    ?expected_effects_digest,
1543                    actual_effects = ?effects,
1544                    "fork detected!"
1545                );
1546                panic!(
1547                    "Transaction {} is expected to have effects digest {}, but got {}!",
1548                    digest,
1549                    expected_effects_digest,
1550                    effects.digest(),
1551                );
1552            }
1553        }
1554
1555        fail_point!("crash");
1556
1557        self.commit_transaction(
1558            transaction,
1559            inner_temporary_store,
1560            &effects,
1561            tx_guard,
1562            execution_guard,
1563            epoch_store,
1564        )?;
1565
1566        let elapsed = process_transaction_start_time.elapsed().as_micros() as f64;
1567        if elapsed > 0.0 {
1568            self.metrics
1569                .execution_gas_latency_ratio
1570                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1571        };
1572        Ok((effects, execution_error_opt))
1573    }
1574
1575    pub async fn reconfigure_traffic_control(
1576        &self,
1577        params: TrafficControlReconfigParams,
1578    ) -> Result<TrafficControlReconfigParams, IotaError> {
1579        if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1580            traffic_controller.admin_reconfigure(params).await
1581        } else {
1582            Err(IotaError::InvalidAdminRequest(
1583                "Traffic controller is not configured on this node".to_string(),
1584            ))
1585        }
1586    }
1587
1588    #[instrument(level = "trace", skip_all)]
1589    fn commit_transaction(
1590        &self,
1591        transaction: &VerifiedExecutableTransaction,
1592        inner_temporary_store: InnerTemporaryStore,
1593        effects: &TransactionEffects,
1594        tx_guard: TxGuard,
1595        _execution_guard: ExecutionLockReadGuard<'_>,
1596        epoch_store: &Arc<AuthorityPerEpochStore>,
1597    ) -> IotaResult {
1598        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1599            monitored_scope("Execution::commit_certificate");
1600        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1601
1602        let tx_key = transaction.key();
1603        let tx_digest = transaction.digest();
1604        let input_object_count = inner_temporary_store.input_objects.len();
1605        let shared_object_count = effects.input_shared_objects().len();
1606
1607        let output_keys = inner_temporary_store.get_output_keys(effects);
1608
1609        // index transaction
1610        let _ = self
1611            .post_process_one_tx(transaction, effects, &inner_temporary_store, epoch_store)
1612            .tap_err(|e| {
1613                self.metrics.post_processing_total_failures.inc();
1614                error!(?tx_digest, "tx post processing failed: {e}");
1615            });
1616
1617        // The insertion to epoch_store is not atomic with the insertion to the
1618        // perpetual store. This is OK because we insert to the epoch store
1619        // first. And during lookups we always look up in the perpetual store first.
1620        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1621
1622        // Allow testing what happens if we crash here.
1623        fail_point!("crash");
1624
1625        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1626            transaction.clone().into_unsigned(),
1627            effects.clone(),
1628            inner_temporary_store,
1629        );
1630        self.get_cache_writer()
1631            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1632
1633        if transaction.transaction_data().is_end_of_epoch_tx() {
1634            // At the end of epoch, since system packages may have been upgraded, force
1635            // reload them in the cache.
1636            self.get_object_cache_reader()
1637                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1638        }
1639
1640        // `commit_transaction()` finished, the tx is fully committed to the store.
1641        tx_guard.commit_tx();
1642
1643        // Notifies transaction manager about transaction and output objects committed.
1644        // This provides necessary information to transaction manager to start executing
1645        // additional ready transactions.
1646        self.transaction_manager
1647            .notify_commit(tx_digest, output_keys, epoch_store);
1648
1649        self.update_metrics(transaction, input_object_count, shared_object_count);
1650
1651        Ok(())
1652    }
1653
1654    fn update_metrics(
1655        &self,
1656        transaction: &VerifiedExecutableTransaction,
1657        input_object_count: usize,
1658        shared_object_count: usize,
1659    ) {
1660        // count signature by scheme, for multisig
1661        if transaction.has_upgraded_multisig() {
1662            self.metrics.multisig_sig_count.inc();
1663        }
1664
1665        self.metrics.total_effects.inc();
1666        self.metrics.total_certs.inc();
1667
1668        if shared_object_count > 0 {
1669            self.metrics.shared_obj_tx.inc();
1670        }
1671
1672        if transaction.is_sponsored_tx() {
1673            self.metrics.sponsored_tx.inc();
1674        }
1675
1676        self.metrics
1677            .num_input_objs
1678            .observe(input_object_count as f64);
1679        self.metrics
1680            .num_shared_objects
1681            .observe(shared_object_count as f64);
1682        self.metrics.batch_size.observe(
1683            transaction
1684                .data()
1685                .intent_message()
1686                .value
1687                .kind()
1688                .num_commands() as f64,
1689        );
1690    }
1691
1692    /// `execute_transaction()` validates the transaction input, and executes
1693    /// the transaction, returning effects, output objects, events, etc.
1694    ///
1695    /// It reads state from the db (both owned and shared locks), but it has no
1696    /// side effects.
1697    ///
1698    /// It can be generally understood that a failure of `execute_transaction`
1699    /// indicates a non-transient error, e.g. the transaction input is
1700    /// somehow invalid, the correct locks are not held, etc. However, this
1701    /// is not entirely true, as a transient db read error may also cause
1702    /// this function to fail.
1703    #[instrument(level = "trace", skip_all)]
1704    fn execute_transaction(
1705        &self,
1706        _execution_guard: &ExecutionLockReadGuard<'_>,
1707        transaction: &VerifiedExecutableTransaction,
1708        tx_input_objects: InputObjects,
1709        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1710        epoch_store: &Arc<AuthorityPerEpochStore>,
1711    ) -> IotaResult<(
1712        InnerTemporaryStore,
1713        TransactionEffects,
1714        Option<ExecutionError>,
1715    )> {
1716        let _scope = monitored_scope("Execution::execute_certificate");
1717        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1718        let prepare_transaction_start_time = tokio::time::Instant::now();
1719
1720        let protocol_config = epoch_store.protocol_config();
1721
1722        let reference_gas_price = epoch_store.reference_gas_price();
1723
1724        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1725        let epoch_start_timestamp = epoch_store
1726            .epoch_start_config()
1727            .epoch_data()
1728            .epoch_start_timestamp();
1729
1730        let backing_store = self.get_backing_store().as_ref();
1731
1732        let tx_digest = *transaction.digest();
1733
1734        // TODO: We need to move this to a more appropriate place to avoid redundant
1735        // checks.
1736        let tx_data = transaction.data().transaction_data();
1737        tx_data.validity_check(protocol_config)?;
1738
1739        let (kind, signer, gas_data) = tx_data.execution_parts();
1740
1741        let move_authenticators = transaction.move_authenticators();
1742
1743        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1744        let (inner_temp_store, _, mut effects, execution_error_opt) = if move_authenticators
1745            .is_empty()
1746        {
1747            // No Move authentication required, proceed to execute the transaction directly.
1748
1749            // The cost of partially re-auditing a transaction before execution is
1750            // tolerated.
1751            let (tx_gas_status, tx_checked_input_objects) =
1752                iota_transaction_checks::check_certificate_input(
1753                    transaction,
1754                    tx_input_objects,
1755                    protocol_config,
1756                    reference_gas_price,
1757                )?;
1758
1759            let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1760            self.check_owned_locks(&owned_object_refs)?;
1761            epoch_store.executor().execute_transaction_to_effects(
1762                backing_store,
1763                protocol_config,
1764                self.metrics.limits_metrics.clone(),
1765                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1766                // cyclic dependency w/ iota-adapter
1767                self.config
1768                    .expensive_safety_check_config
1769                    .enable_deep_per_tx_iota_conservation_check(),
1770                self.config.certificate_deny_config.certificate_deny_set(),
1771                &epoch_id,
1772                epoch_start_timestamp,
1773                tx_checked_input_objects,
1774                gas_data,
1775                tx_gas_status,
1776                kind,
1777                signer,
1778                tx_digest,
1779                &mut None,
1780            )
1781        } else {
1782            // One or more `MoveAuthenticator` signatures present — authenticate each and
1783            // then execute the transaction.
1784            // It is supposed that `MoveAuthenticator` availability is checked in
1785            // `SenderSignedData::validity_check`.
1786
1787            debug_assert_eq!(
1788                move_authenticators.len(),
1789                per_authenticator_inputs.len(),
1790                "Move authenticators amount must match the number of authenticator inputs"
1791            );
1792
1793            let per_authenticator_inputs = move_authenticators
1794                .iter()
1795                .zip(per_authenticator_inputs)
1796                .map(
1797                    |(move_authenticator, (authenticator_input_objects, account_object))| {
1798                        // Check basic `object_to_authenticate` preconditions and get its
1799                        // components.
1800                        let (
1801                            auth_account_object_id,
1802                            auth_account_object_seq_number,
1803                            auth_account_object_digest,
1804                        ) = move_authenticator.object_to_authenticate_components()?;
1805
1806                        let signer = move_authenticator.address()?;
1807
1808                        let authenticator_function_ref_for_execution = self.check_move_account(
1809                            auth_account_object_id,
1810                            auth_account_object_seq_number,
1811                            auth_account_object_digest,
1812                            account_object,
1813                            &signer,
1814                        )?;
1815
1816                        Ok((
1817                            authenticator_input_objects,
1818                            authenticator_function_ref_for_execution,
1819                        ))
1820                    },
1821                )
1822                .collect::<IotaResult<Vec<_>>>()?;
1823
1824            let per_authenticator_input_objects = per_authenticator_inputs
1825                .iter()
1826                .map(|(authenticator_input_objects, _)| authenticator_input_objects.clone())
1827                .collect::<Vec<_>>();
1828
1829            // Serialize the TransactionData for the auth context.
1830            let tx_data_bytes =
1831                bcs::to_bytes(tx_data).expect("TransactionData serialization cannot fail");
1832
1833            let (sender_auth_digest, sponsor_auth_digest) =
1834                transaction.data().compute_auth_digests()?;
1835
1836            // Check the `MoveAuthenticator` input objects.
1837            // The `MoveAuthenticator` receiving objects are checked on the signing step.
1838            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
1839            // not a part of the transaction data.
1840            let authenticator_gas_budget = protocol_config.max_auth_gas();
1841            let (
1842                gas_status,
1843                per_authenticator_checked_input_objects,
1844                authenticator_and_tx_checked_input_objects,
1845            ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1846                transaction,
1847                tx_input_objects,
1848                per_authenticator_input_objects,
1849                authenticator_gas_budget,
1850                protocol_config,
1851                reference_gas_price,
1852            )?;
1853
1854            debug_assert_eq!(
1855                move_authenticators.len(),
1856                per_authenticator_checked_input_objects.len(),
1857                "Move authenticators amount must match the number of checked authenticator inputs"
1858            );
1859
1860            let move_authenticators = move_authenticators
1861                .into_iter()
1862                .zip(per_authenticator_inputs)
1863                .zip(per_authenticator_checked_input_objects)
1864                .map(
1865                    |(
1866                        (move_authenticator, (_, authenticator_function_ref_for_execution)),
1867                        authenticator_checked_input_objects,
1868                    )| {
1869                        (
1870                            move_authenticator.to_owned(),
1871                            authenticator_function_ref_for_execution,
1872                            authenticator_checked_input_objects,
1873                        )
1874                    },
1875                )
1876                .collect::<Vec<_>>();
1877
1878            let owned_object_refs = authenticator_and_tx_checked_input_objects
1879                .inner()
1880                .filter_owned_objects();
1881            self.check_owned_locks(&owned_object_refs)?;
1882
1883            let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
1884                extract_auth_fun_refs(signer, gas_data.owner, |address| {
1885                    move_authenticators
1886                        .iter()
1887                        .find(|t| t.0.address().ok().as_ref() == Some(&address))
1888                        .map(|t| t.1.authenticator_function_ref.clone())
1889                });
1890
1891            let auth_context_data = AuthContextData {
1892                transaction_data_bytes: tx_data_bytes,
1893                sender_auth_digest,
1894                sponsor_auth_digest,
1895                sender_authenticator_function_ref,
1896                sponsor_authenticator_function_ref,
1897            };
1898
1899            epoch_store
1900                .executor()
1901                .authenticate_then_execute_transaction_to_effects(
1902                    backing_store,
1903                    protocol_config,
1904                    self.metrics.limits_metrics.clone(),
1905                    self.config
1906                        .expensive_safety_check_config
1907                        .enable_deep_per_tx_iota_conservation_check(),
1908                    self.config.certificate_deny_config.certificate_deny_set(),
1909                    &epoch_id,
1910                    epoch_start_timestamp,
1911                    gas_data,
1912                    gas_status,
1913                    move_authenticators,
1914                    authenticator_and_tx_checked_input_objects,
1915                    kind,
1916                    signer,
1917                    tx_digest,
1918                    auth_context_data,
1919                    &mut None,
1920                )
1921        };
1922
1923        fail_point_if!("cp_execution_nondeterminism", || {
1924            #[cfg(msim)]
1925            self.create_fail_state(transaction, epoch_store, &mut effects);
1926        });
1927
1928        let elapsed = prepare_transaction_start_time.elapsed().as_micros() as f64;
1929        if elapsed > 0.0 {
1930            self.metrics
1931                .prepare_cert_gas_latency_ratio
1932                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1933        }
1934
1935        Ok((inner_temp_store, effects, execution_error_opt.err()))
1936    }
1937
1938    pub fn prepare_transaction_for_benchmark(
1939        &self,
1940        transaction: &VerifiedExecutableTransaction,
1941        input_objects: InputObjects,
1942        epoch_store: &Arc<AuthorityPerEpochStore>,
1943    ) -> IotaResult<(
1944        InnerTemporaryStore,
1945        TransactionEffects,
1946        Option<ExecutionError>,
1947    )> {
1948        let lock = RwLock::new(epoch_store.epoch());
1949        let execution_guard = lock.try_read().unwrap();
1950
1951        self.execute_transaction(
1952            &execution_guard,
1953            transaction,
1954            input_objects,
1955            vec![],
1956            epoch_store,
1957        )
1958    }
1959
1960    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1961    /// `VmChecks::Enabled` instead.
1962    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1963    #[allow(clippy::type_complexity)]
1964    pub fn dry_exec_transaction(
1965        &self,
1966        transaction: TransactionData,
1967        transaction_digest: TransactionDigest,
1968    ) -> IotaResult<(
1969        DryRunTransactionBlockResponse,
1970        BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1971        TransactionEffects,
1972        Option<ObjectId>,
1973    )> {
1974        let epoch_store = self.load_epoch_store_one_call_per_task();
1975        if !self.is_fullnode(&epoch_store) {
1976            return Err(IotaError::UnsupportedFeature {
1977                error: "dry-exec is only supported on fullnodes".to_string(),
1978            });
1979        }
1980
1981        if transaction.kind().is_system() {
1982            return Err(IotaError::UnsupportedFeature {
1983                error: "dry-exec does not support system transactions".to_string(),
1984            });
1985        }
1986
1987        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1988    }
1989
1990    #[allow(clippy::type_complexity)]
1991    pub fn dry_exec_transaction_for_benchmark(
1992        &self,
1993        transaction: TransactionData,
1994        transaction_digest: TransactionDigest,
1995    ) -> IotaResult<(
1996        DryRunTransactionBlockResponse,
1997        BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1998        TransactionEffects,
1999        Option<ObjectId>,
2000    )> {
2001        let epoch_store = self.load_epoch_store_one_call_per_task();
2002        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2003    }
2004
2005    #[instrument(level = "trace", skip_all)]
2006    #[allow(clippy::type_complexity)]
2007    fn dry_exec_transaction_impl(
2008        &self,
2009        epoch_store: &AuthorityPerEpochStore,
2010        transaction: TransactionData,
2011        transaction_digest: TransactionDigest,
2012    ) -> IotaResult<(
2013        DryRunTransactionBlockResponse,
2014        BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
2015        TransactionEffects,
2016        Option<ObjectId>,
2017    )> {
2018        // Cheap validity checks for a transaction, including input size limits.
2019        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2020
2021        let input_object_kinds = transaction.input_objects()?;
2022        let receiving_object_refs = transaction.receiving_objects();
2023
2024        iota_transaction_checks::deny::check_transaction_for_validation(
2025            &transaction,
2026            &[],
2027            &input_object_kinds,
2028            &receiving_object_refs,
2029            &self.config.transaction_deny_config,
2030            self.get_backing_package_store().as_ref(),
2031        )?;
2032
2033        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2034            // We don't want to cache this transaction since it's a dry run.
2035            None,
2036            &input_object_kinds,
2037            &receiving_object_refs,
2038            epoch_store.epoch(),
2039        )?;
2040
2041        // make a gas object if one was not provided
2042        let mut transaction = transaction;
2043        let reference_gas_price = epoch_store.reference_gas_price();
2044        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2045            let sender = transaction.gas_owner();
2046            let gas_object_id = ObjectId::random();
2047            let gas_object = Object::new_move(
2048                MoveObject::new_gas_coin(
2049                    OBJECT_START_VERSION,
2050                    gas_object_id,
2051                    SIMULATION_GAS_COIN_VALUE,
2052                ),
2053                Owner::Address(sender),
2054                TransactionDigest::GENESIS_MARKER,
2055            );
2056            let gas_object_ref = gas_object.object_ref();
2057            // Add gas object to transaction gas payment
2058            transaction.gas_data_mut().objects = vec![gas_object_ref];
2059            (
2060                iota_transaction_checks::check_transaction_input_with_given_gas(
2061                    epoch_store.protocol_config(),
2062                    reference_gas_price,
2063                    &transaction,
2064                    input_objects,
2065                    receiving_objects,
2066                    gas_object,
2067                    &self.metrics.bytecode_verifier_metrics,
2068                    &self.config.verifier_signing_config,
2069                )?,
2070                Some(gas_object_id),
2071            )
2072        } else {
2073            // `MoveAuthenticator`s are not supported in dry runs, so we set the
2074            // `authenticator_gas_budget` to 0.
2075            let authenticator_gas_budget = 0;
2076
2077            (
2078                iota_transaction_checks::check_transaction_input(
2079                    epoch_store.protocol_config(),
2080                    reference_gas_price,
2081                    &transaction,
2082                    input_objects,
2083                    &receiving_objects,
2084                    &self.metrics.bytecode_verifier_metrics,
2085                    &self.config.verifier_signing_config,
2086                    authenticator_gas_budget,
2087                )?,
2088                None,
2089            )
2090        };
2091
2092        let protocol_config = epoch_store.protocol_config();
2093        let (kind, signer, gas_data) = transaction.execution_parts();
2094
2095        let silent = true;
2096        let executor = iota_execution::executor(protocol_config, silent, None)
2097            .expect("Creating an executor should not fail here");
2098
2099        let expensive_checks = false;
2100        let (inner_temp_store, _, effects, execution_error) = executor
2101            .execute_transaction_to_effects(
2102                self.get_backing_store().as_ref(),
2103                protocol_config,
2104                self.metrics.limits_metrics.clone(),
2105                expensive_checks,
2106                self.config.certificate_deny_config.certificate_deny_set(),
2107                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2108                epoch_store
2109                    .epoch_start_config()
2110                    .epoch_data()
2111                    .epoch_start_timestamp(),
2112                checked_input_objects,
2113                gas_data,
2114                gas_status,
2115                kind,
2116                signer,
2117                transaction_digest,
2118                &mut None,
2119            );
2120        let tx_digest = *effects.transaction_digest();
2121
2122        let module_cache =
2123            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2124
2125        let mut layout_resolver =
2126            epoch_store
2127                .executor()
2128                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2129                    &inner_temp_store,
2130                    self.get_backing_package_store(),
2131                )));
2132        // Returning empty vector here because we recalculate changes in the rpc layer.
2133        let object_changes = Vec::new();
2134
2135        // Returning empty vector here because we recalculate changes in the rpc layer.
2136        let balance_changes = Vec::new();
2137
2138        let written_with_kind = effects
2139            .created()
2140            .into_iter()
2141            .map(|(oref, _)| (oref, WriteKind::Create))
2142            .chain(
2143                effects
2144                    .unwrapped()
2145                    .into_iter()
2146                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2147            )
2148            .chain(
2149                effects
2150                    .mutated()
2151                    .into_iter()
2152                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2153            )
2154            .map(|(oref, kind)| {
2155                let obj = inner_temp_store.written.get(&oref.object_id).unwrap();
2156                // TODO: Avoid clones.
2157                (oref.object_id, (oref, obj.clone(), kind))
2158            })
2159            .collect();
2160
2161        let execution_error_source = execution_error
2162            .as_ref()
2163            .err()
2164            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2165
2166        Ok((
2167            DryRunTransactionBlockResponse {
2168                // to avoid cloning `transaction`, fields are populated in this order
2169                suggested_gas_price: self
2170                    .congestion_tracker
2171                    .get_prediction_suggested_gas_price(&transaction),
2172                input: IotaTransactionBlockData::try_from_with_module_cache(
2173                    transaction,
2174                    &module_cache,
2175                    tx_digest,
2176                )
2177                .map_err(|e| IotaError::TransactionSerialization {
2178                    error: format!(
2179                        "Failed to convert transaction to IotaTransactionBlockData: {e}",
2180                    ),
2181                })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
2182                effects: effects.clone().try_into()?,
2183                events: IotaTransactionBlockEvents::try_from(
2184                    inner_temp_store.events.clone(),
2185                    tx_digest,
2186                    None,
2187                    layout_resolver.as_mut(),
2188                )?,
2189                object_changes,
2190                balance_changes,
2191                execution_error_source,
2192            },
2193            written_with_kind,
2194            effects,
2195            mock_gas,
2196        ))
2197    }
2198
2199    pub fn simulate_transaction(
2200        &self,
2201        mut transaction: TransactionData,
2202        checks: VmChecks,
2203    ) -> IotaResult<SimulateTransactionResult> {
2204        if transaction.kind().is_system() {
2205            return Err(IotaError::UnsupportedFeature {
2206                error: "simulate does not support system transactions".to_string(),
2207            });
2208        }
2209
2210        let epoch_store = self.load_epoch_store_one_call_per_task();
2211        if !self.is_fullnode(&epoch_store) {
2212            return Err(IotaError::UnsupportedFeature {
2213                error: "simulate is only supported on fullnodes".to_string(),
2214            });
2215        }
2216
2217        // Cheap validity checks for a transaction, including input size limits.
2218        // This does not check if gas objects are missing since we may create a
2219        // mock gas object. It checks for other transaction input validity.
2220        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2221
2222        let input_object_kinds = transaction.input_objects()?;
2223        let receiving_object_refs = transaction.receiving_objects();
2224
2225        // Since we need to simulate a validator signing the transaction, the first step
2226        // is to check if some transaction elements are denied.
2227        iota_transaction_checks::deny::check_transaction_for_validation(
2228            &transaction,
2229            &[],
2230            &input_object_kinds,
2231            &receiving_object_refs,
2232            &self.config.transaction_deny_config,
2233            self.get_backing_package_store().as_ref(),
2234        )?;
2235
2236        // Load input and receiving objects
2237        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2238            // We don't want to cache this transaction since it's a simulation.
2239            None,
2240            &input_object_kinds,
2241            &receiving_object_refs,
2242            epoch_store.epoch(),
2243        )?;
2244
2245        // Create a mock gas object if one was not provided
2246        let mock_gas_id = if transaction.gas().is_empty() {
2247            let mock_gas_object = Object::new_move(
2248                MoveObject::new_gas_coin(
2249                    OBJECT_START_VERSION,
2250                    ObjectId::MAX,
2251                    SIMULATION_GAS_COIN_VALUE,
2252                ),
2253                Owner::Address(transaction.gas_data().owner),
2254                TransactionDigest::GENESIS_MARKER,
2255            );
2256            let mock_gas_object_ref = mock_gas_object.object_ref();
2257            transaction.gas_data_mut().objects = vec![mock_gas_object_ref];
2258            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2259            Some(mock_gas_object.id())
2260        } else {
2261            None
2262        };
2263
2264        let protocol_config = epoch_store.protocol_config();
2265
2266        // `MoveAuthenticator`s are not supported in simulation, so we set the
2267        // `authenticator_gas_budget` to 0.
2268        let authenticator_gas_budget = 0;
2269
2270        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
2271        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
2272        let (gas_status, checked_input_objects) = if checks.enabled() {
2273            iota_transaction_checks::check_transaction_input(
2274                protocol_config,
2275                epoch_store.reference_gas_price(),
2276                &transaction,
2277                input_objects,
2278                &receiving_objects,
2279                &self.metrics.bytecode_verifier_metrics,
2280                &self.config.verifier_signing_config,
2281                authenticator_gas_budget,
2282            )?
2283        } else {
2284            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2285                protocol_config,
2286                transaction.kind(),
2287                input_objects,
2288                receiving_objects,
2289            )?;
2290            let gas_status = IotaGasStatus::new(
2291                transaction.gas_budget(),
2292                transaction.gas_price(),
2293                epoch_store.reference_gas_price(),
2294                protocol_config,
2295            )?;
2296
2297            (gas_status, checked_input_objects)
2298        };
2299
2300        // Create a new executor for the simulation
2301        let executor = iota_execution::executor(
2302            protocol_config,
2303            true, // silent
2304            None,
2305        )
2306        .expect("Creating an executor should not fail here");
2307
2308        // Execute the simulation
2309        let (kind, signer, gas_data) = transaction.execution_parts();
2310        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2311            self.get_backing_store().as_ref(),
2312            protocol_config,
2313            self.metrics.limits_metrics.clone(),
2314            false, // expensive_checks
2315            self.config.certificate_deny_config.certificate_deny_set(),
2316            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2317            epoch_store
2318                .epoch_start_config()
2319                .epoch_data()
2320                .epoch_start_timestamp(),
2321            checked_input_objects,
2322            gas_data,
2323            gas_status,
2324            kind,
2325            signer,
2326            transaction.digest(),
2327            checks.disabled(),
2328        );
2329
2330        // In the case of a dev inspect, the execution_result could be filled with some
2331        // values. Else, execution_result is empty in the case of a dry run.
2332        Ok(SimulateTransactionResult {
2333            input_objects: inner_temp_store.input_objects,
2334            output_objects: inner_temp_store.written,
2335            events: effects.events_digest().map(|_| inner_temp_store.events),
2336            effects,
2337            execution_result,
2338            suggested_gas_price: self
2339                .congestion_tracker
2340                .get_prediction_suggested_gas_price(&transaction),
2341            mock_gas_id,
2342        })
2343    }
2344
2345    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
2346    /// `VmChecks::DISABLED` instead.
2347    /// The object ID for gas can be any
2348    /// object ID, even for an uncreated object
2349    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2350    pub async fn dev_inspect_transaction_block(
2351        &self,
2352        sender: Address,
2353        transaction_kind: TransactionKind,
2354        gas_price: Option<u64>,
2355        gas_budget: Option<u64>,
2356        gas_sponsor: Option<Address>,
2357        gas_objects: Option<Vec<ObjectRef>>,
2358        show_raw_txn_data_and_effects: Option<bool>,
2359        skip_checks: Option<bool>,
2360    ) -> IotaResult<DevInspectResults> {
2361        let epoch_store = self.load_epoch_store_one_call_per_task();
2362
2363        if !self.is_fullnode(&epoch_store) {
2364            return Err(IotaError::UnsupportedFeature {
2365                error: "dev-inspect is only supported on fullnodes".to_string(),
2366            });
2367        }
2368
2369        if transaction_kind.is_system() {
2370            return Err(IotaError::UnsupportedFeature {
2371                error: "system transactions are not supported".to_string(),
2372            });
2373        }
2374
2375        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2376        let skip_checks = skip_checks.unwrap_or(true);
2377        let reference_gas_price = epoch_store.reference_gas_price();
2378        let protocol_config = epoch_store.protocol_config();
2379        let max_tx_gas = protocol_config.max_tx_gas();
2380
2381        let price = gas_price.unwrap_or(reference_gas_price);
2382        let budget = gas_budget.unwrap_or(max_tx_gas);
2383        let owner = gas_sponsor.unwrap_or(sender);
2384        // Payment might be empty here, but it's fine we'll have to deal with it later
2385        // after reading all the input objects.
2386        let payment = gas_objects.unwrap_or_default();
2387        let mut transaction = TransactionData::V1(TransactionDataV1 {
2388            kind: transaction_kind.clone(),
2389            sender,
2390            gas_payment: GasData {
2391                objects: payment,
2392                owner,
2393                price,
2394                budget,
2395            },
2396            expiration: TransactionExpiration::None,
2397        });
2398
2399        let raw_txn_data = if show_raw_txn_data_and_effects {
2400            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2401                error: "Failed to serialize transaction during dev inspect".to_string(),
2402            })?
2403        } else {
2404            vec![]
2405        };
2406
2407        transaction.validity_check_no_gas_check(protocol_config)?;
2408
2409        let input_object_kinds = transaction.input_objects()?;
2410        let receiving_object_refs = transaction.receiving_objects();
2411
2412        iota_transaction_checks::deny::check_transaction_for_validation(
2413            &transaction,
2414            &[],
2415            &input_object_kinds,
2416            &receiving_object_refs,
2417            &self.config.transaction_deny_config,
2418            self.get_backing_package_store().as_ref(),
2419        )?;
2420
2421        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2422            // We don't want to cache this transaction since it's a dev inspect.
2423            None,
2424            &input_object_kinds,
2425            &receiving_object_refs,
2426            epoch_store.epoch(),
2427        )?;
2428
2429        let (gas_status, checked_input_objects) = if skip_checks {
2430            // If we are skipping checks, then we call the check_dev_inspect_input function
2431            // which will perform only lightweight checks on the transaction
2432            // input. And if the gas field is empty, that means we will
2433            // use the dummy gas object so we need to add it to the input objects vector.
2434            if transaction.gas().is_empty() {
2435                // Create and use a dummy gas object if there is no gas object provided.
2436                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2437                    SIMULATION_GAS_COIN_VALUE,
2438                    transaction.gas_owner(),
2439                );
2440                let gas_object_ref = dummy_gas_object.object_ref();
2441                transaction.gas_data_mut().objects = vec![gas_object_ref];
2442                input_objects.push(ObjectReadResult::new(
2443                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2444                    dummy_gas_object.into(),
2445                ));
2446            }
2447            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2448                protocol_config,
2449                &transaction_kind,
2450                input_objects,
2451                receiving_objects,
2452            )?;
2453            let gas_status = IotaGasStatus::new(
2454                max_tx_gas,
2455                transaction.gas_price(),
2456                reference_gas_price,
2457                protocol_config,
2458            )?;
2459
2460            (gas_status, checked_input_objects)
2461        } else {
2462            // If we are not skipping checks, then we call the check_transaction_input
2463            // function and its dummy gas variant which will perform full
2464            // fledged checks just like a real transaction execution.
2465            if transaction.gas().is_empty() {
2466                // Create and use a dummy gas object if there is no gas object provided.
2467                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2468                    SIMULATION_GAS_COIN_VALUE,
2469                    transaction.gas_owner(),
2470                );
2471                let gas_object_ref = dummy_gas_object.object_ref();
2472                transaction.gas_data_mut().objects = vec![gas_object_ref];
2473                iota_transaction_checks::check_transaction_input_with_given_gas(
2474                    epoch_store.protocol_config(),
2475                    reference_gas_price,
2476                    &transaction,
2477                    input_objects,
2478                    receiving_objects,
2479                    dummy_gas_object,
2480                    &self.metrics.bytecode_verifier_metrics,
2481                    &self.config.verifier_signing_config,
2482                )?
2483            } else {
2484                // `MoveAuthenticator`s are not supported in dev inspects, so we set the
2485                // `authenticator_gas_budget` to 0.
2486                let authenticator_gas_budget = 0;
2487
2488                iota_transaction_checks::check_transaction_input(
2489                    epoch_store.protocol_config(),
2490                    reference_gas_price,
2491                    &transaction,
2492                    input_objects,
2493                    &receiving_objects,
2494                    &self.metrics.bytecode_verifier_metrics,
2495                    &self.config.verifier_signing_config,
2496                    authenticator_gas_budget,
2497                )?
2498            }
2499        };
2500
2501        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2502            .expect("Creating an executor should not fail here");
2503        let gas_data = transaction.gas_data().clone();
2504        let intent_msg = IntentMessage::new(
2505            Intent {
2506                version: IntentVersion::V0,
2507                scope: IntentScope::TransactionData,
2508                app_id: IntentAppId::Iota,
2509            },
2510            transaction,
2511        );
2512        let transaction_digest = TransactionDigest::new(intent_msg.value.digest().into_inner());
2513        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2514            self.get_backing_store().as_ref(),
2515            protocol_config,
2516            self.metrics.limits_metrics.clone(),
2517            // expensive checks
2518            false,
2519            self.config.certificate_deny_config.certificate_deny_set(),
2520            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2521            epoch_store
2522                .epoch_start_config()
2523                .epoch_data()
2524                .epoch_start_timestamp(),
2525            checked_input_objects,
2526            gas_data,
2527            gas_status,
2528            transaction_kind,
2529            sender,
2530            transaction_digest,
2531            skip_checks,
2532        );
2533
2534        let raw_effects = if show_raw_txn_data_and_effects {
2535            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2536                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2537            })?
2538        } else {
2539            vec![]
2540        };
2541
2542        let mut layout_resolver =
2543            epoch_store
2544                .executor()
2545                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2546                    &inner_temp_store,
2547                    self.get_backing_package_store(),
2548                )));
2549
2550        DevInspectResults::new(
2551            effects,
2552            inner_temp_store.events.clone(),
2553            execution_result,
2554            raw_txn_data,
2555            raw_effects,
2556            layout_resolver.as_mut(),
2557        )
2558    }
2559
2560    // Only used for testing because of how epoch store is loaded.
2561    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2562        let epoch_store = self.epoch_store_for_testing();
2563        Ok(epoch_store.reference_gas_price())
2564    }
2565
2566    #[instrument(level = "trace", skip_all)]
2567    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2568        self.get_transaction_cache_reader()
2569            .try_is_tx_already_executed(digest)
2570    }
2571
2572    /// Non-fallible version of `try_is_tx_already_executed`.
2573    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2574        self.try_is_tx_already_executed(digest)
2575            .expect("storage access failed")
2576    }
2577
2578    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2579    #[instrument(level = "debug", skip_all, err)]
2580    fn index_tx(
2581        &self,
2582        indexes: &IndexStore,
2583        digest: &TransactionDigest,
2584        // TODO: index_tx really just need the transaction data here.
2585        transaction: &VerifiedExecutableTransaction,
2586        effects: &TransactionEffects,
2587        events: &TransactionEvents,
2588        timestamp_ms: u64,
2589        tx_coins: Option<TxCoins>,
2590        written: &WrittenObjects,
2591        inner_temporary_store: &InnerTemporaryStore,
2592        epoch_store: &Arc<AuthorityPerEpochStore>,
2593    ) -> IotaResult<u64> {
2594        let changes = self
2595            .process_object_index(effects, written, inner_temporary_store, epoch_store)
2596            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2597
2598        indexes.index_tx(
2599            transaction.data().intent_message().value.sender(),
2600            transaction
2601                .data()
2602                .intent_message()
2603                .value
2604                .input_objects()?
2605                .iter()
2606                .map(|o| o.object_id()),
2607            effects
2608                .all_changed_objects()
2609                .into_iter()
2610                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2611            transaction
2612                .data()
2613                .intent_message()
2614                .value
2615                .move_calls()
2616                .into_iter()
2617                .map(|(package, module, function)| {
2618                    (*package, module.to_owned(), function.to_owned())
2619                }),
2620            events,
2621            changes,
2622            digest,
2623            timestamp_ms,
2624            tx_coins,
2625        )
2626    }
2627
2628    #[cfg(msim)]
2629    fn create_fail_state(
2630        &self,
2631        transaction: &VerifiedExecutableTransaction,
2632        epoch_store: &Arc<AuthorityPerEpochStore>,
2633        effects: &mut TransactionEffects,
2634    ) {
2635        use std::cell::RefCell;
2636
2637        use iota_types::effects::TransactionEffectsAPIForTesting;
2638        thread_local! {
2639            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2640        }
2641        if !transaction.data().intent_message().value.is_system_tx() {
2642            let committee = epoch_store.committee();
2643            let cur_stake = (**committee).weight(&self.name);
2644            if cur_stake > 0 {
2645                FAIL_STATE.with_borrow_mut(|fail_state| {
2646                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2647                    if fail_state.0 < committee.validity_threshold() {
2648                        fail_state.0 += cur_stake;
2649                        fail_state.1.insert(self.name);
2650                    }
2651
2652                    if fail_state.1.contains(&self.name) {
2653                        info!("cp_exec failing tx");
2654                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2655                    }
2656                });
2657            }
2658        }
2659    }
2660
2661    fn process_object_index(
2662        &self,
2663        effects: &TransactionEffects,
2664        written: &WrittenObjects,
2665        inner_temporary_store: &InnerTemporaryStore,
2666        epoch_store: &Arc<AuthorityPerEpochStore>,
2667    ) -> IotaResult<ObjectIndexChanges> {
2668        let mut layout_resolver =
2669            epoch_store
2670                .executor()
2671                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2672                    inner_temporary_store,
2673                    self.get_backing_package_store(),
2674                )));
2675
2676        let modified_at_version = effects
2677            .modified_at_versions()
2678            .into_iter()
2679            .collect::<HashMap<_, _>>();
2680
2681        let tx_digest = effects.transaction_digest();
2682        let mut deleted_owners = vec![];
2683        let mut deleted_dynamic_fields = vec![];
2684        for object_ref in effects.deleted().into_iter().chain(effects.wrapped()) {
2685            let old_version = modified_at_version.get(&object_ref.object_id).unwrap();
2686            // When we process the index, the latest object hasn't been written yet so
2687            // the old object must be present.
2688            match self.get_owner_at_version(&object_ref.object_id, *old_version).unwrap_or_else(
2689                |e| panic!("tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {} at version {old_version:?}. Err: {e:?}", object_ref.object_id)
2690            ) {
2691                Owner::Address(addr) => deleted_owners.push((addr, object_ref.object_id)),
2692                Owner::Object(object_id) => {
2693                    deleted_dynamic_fields.push((object_id, object_ref.object_id))
2694                }
2695                _ => {}
2696            }
2697        }
2698
2699        let mut new_owners = vec![];
2700        let mut new_dynamic_fields = vec![];
2701
2702        for (oref, owner, kind) in effects.all_changed_objects() {
2703            let id = &oref.object_id;
2704            // For mutated objects, retrieve old owner and delete old index if there is a
2705            // owner change.
2706            if let WriteKind::Mutate = kind {
2707                let Some(old_version) = modified_at_version.get(id) else {
2708                    panic!(
2709                        "tx_digest={tx_digest}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2710                    );
2711                };
2712                // When we process the index, the latest object hasn't been written yet so
2713                // the old object must be present.
2714                let Some(old_object) = self
2715                    .get_object_store()
2716                    .try_get_object_by_key(id, *old_version)?
2717                else {
2718                    panic!(
2719                        "tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {id} at version {old_version:?}"
2720                    );
2721                };
2722                if old_object.owner != owner {
2723                    match old_object.owner {
2724                        Owner::Address(addr) => {
2725                            deleted_owners.push((addr, *id));
2726                        }
2727                        Owner::Object(object_id) => deleted_dynamic_fields.push((object_id, *id)),
2728                        _ => {}
2729                    }
2730                }
2731            }
2732
2733            match owner {
2734                Owner::Address(addr) => {
2735                    // TODO: We can remove the object fetching after we added ObjectType to
2736                    // TransactionEffects
2737                    let new_object = written.get(id).unwrap_or_else(
2738                        || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2739                    );
2740                    assert_eq!(
2741                        new_object.version(),
2742                        oref.version,
2743                        "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2744                        tx_digest,
2745                        id,
2746                        new_object.version(),
2747                        oref.version
2748                    );
2749
2750                    let type_ = new_object
2751                        .type_()
2752                        .map(|type_| ObjectType::Struct(type_.clone()))
2753                        .unwrap_or(ObjectType::Package);
2754
2755                    new_owners.push((
2756                        (addr, *id),
2757                        ObjectInfo {
2758                            object_id: *id,
2759                            version: oref.version,
2760                            digest: oref.digest,
2761                            type_,
2762                            owner,
2763                            previous_transaction: *effects.transaction_digest(),
2764                        },
2765                    ));
2766                }
2767                Owner::Object(owner) => {
2768                    let new_object = written.get(id).unwrap_or_else(
2769                        || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2770                    );
2771                    assert_eq!(
2772                        new_object.version(),
2773                        oref.version,
2774                        "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2775                        tx_digest,
2776                        id,
2777                        new_object.version(),
2778                        oref.version
2779                    );
2780
2781                    let Some(df_info) = self
2782                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2783                        .unwrap_or_else(|e| {
2784                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2785                            None
2786                        }
2787                        )
2788                    else {
2789                        // Skip indexing for non dynamic field objects.
2790                        continue;
2791                    };
2792                    new_dynamic_fields.push(((owner, *id), df_info))
2793                }
2794                _ => {}
2795            }
2796        }
2797
2798        Ok(ObjectIndexChanges {
2799            deleted_owners,
2800            deleted_dynamic_fields,
2801            new_owners,
2802            new_dynamic_fields,
2803        })
2804    }
2805
2806    fn try_create_dynamic_field_info(
2807        &self,
2808        o: &Object,
2809        written: &WrittenObjects,
2810        resolver: &mut dyn LayoutResolver,
2811    ) -> IotaResult<Option<DynamicFieldInfo>> {
2812        // Skip if not a move object
2813        let Some(move_object) = o.data.as_struct_opt().cloned() else {
2814            return Ok(None);
2815        };
2816
2817        // We only index dynamic field objects
2818        if !move_object.struct_tag().is_dynamic_field() {
2819            return Ok(None);
2820        }
2821
2822        let layout = match resolver.get_annotated_layout(move_object.struct_tag()) {
2823            Ok(annotated_layout) => annotated_layout.into_layout(),
2824            Err(e) => {
2825                error!(
2826                    "unable to load layout for type `{:?}`: {e}",
2827                    move_object.struct_tag()
2828                );
2829                return Ok(None);
2830            }
2831        };
2832
2833        let field =
2834            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2835                IotaError::ObjectDeserialization {
2836                    error: e.to_string(),
2837                }
2838            })?;
2839
2840        let type_ = field.kind;
2841        let name_type: TypeTag = type_tag_core_to_sdk(&field.name_layout.into());
2842        let bcs_name = field.name_bytes.to_owned();
2843
2844        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2845            .map_err(|e| {
2846                warn!("{e}");
2847                IotaError::ObjectDeserialization {
2848                    error: e.to_string(),
2849                }
2850            })?;
2851
2852        let name = DynamicFieldName {
2853            type_: name_type,
2854            value: IotaMoveValue::from(name_value).to_json_value(),
2855        };
2856
2857        let value_metadata = field.value_metadata().map_err(|e| {
2858            warn!("{e}");
2859            IotaError::ObjectDeserialization {
2860                error: e.to_string(),
2861            }
2862        })?;
2863
2864        Ok(Some(match value_metadata {
2865            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2866                name,
2867                bcs_name,
2868                type_,
2869                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2870                object_id: o.id(),
2871                version: o.version(),
2872                digest: o.digest(),
2873            },
2874
2875            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2876                // Find the actual object from storage using the object id obtained from the
2877                // wrapper.
2878
2879                // Try to find the object in the written objects first.
2880                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2881                    (
2882                        object.version(),
2883                        object.digest(),
2884                        object.data.object_type().unwrap().clone(),
2885                    )
2886                } else {
2887                    // If not found, try to find it in the database.
2888                    let object = self
2889                        .get_object_store()
2890                        .try_get_object_by_key(&object_id, o.version())?
2891                        .ok_or_else(|| UserInputError::ObjectNotFound {
2892                            object_id,
2893                            version: Some(o.version()),
2894                        })?;
2895                    let version = object.version();
2896                    let digest = object.digest();
2897                    let object_type = object.data.object_type().unwrap().clone();
2898                    (version, digest, object_type)
2899                };
2900
2901                DynamicFieldInfo {
2902                    name,
2903                    bcs_name,
2904                    type_,
2905                    object_type: object_type.to_string(),
2906                    object_id,
2907                    version,
2908                    digest,
2909                }
2910            }
2911        }))
2912    }
2913
2914    #[instrument(level = "trace", skip_all, err)]
2915    fn post_process_one_tx(
2916        &self,
2917        transaction: &VerifiedExecutableTransaction,
2918        effects: &TransactionEffects,
2919        inner_temporary_store: &InnerTemporaryStore,
2920        epoch_store: &Arc<AuthorityPerEpochStore>,
2921    ) -> IotaResult {
2922        if self.indexes.is_none() {
2923            return Ok(());
2924        }
2925
2926        let _scope = monitored_scope("Execution::post_process_one_tx");
2927
2928        let tx_digest = transaction.digest();
2929        let timestamp_ms = Self::unixtime_now_ms();
2930        let events = &inner_temporary_store.events;
2931        let written = &inner_temporary_store.written;
2932        let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
2933            effects,
2934            inner_temporary_store,
2935            epoch_store,
2936        );
2937
2938        // Index tx
2939        if let Some(indexes) = &self.indexes {
2940            let _ = self
2941                .index_tx(
2942                    indexes.as_ref(),
2943                    tx_digest,
2944                    transaction,
2945                    effects,
2946                    events,
2947                    timestamp_ms,
2948                    tx_coins,
2949                    written,
2950                    inner_temporary_store,
2951                    epoch_store,
2952                )
2953                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2954                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2955                .expect("Indexing tx should not fail");
2956
2957            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2958            let events = self.make_transaction_block_events(
2959                events.clone(),
2960                *tx_digest,
2961                timestamp_ms,
2962                epoch_store,
2963                inner_temporary_store,
2964            )?;
2965            // Emit events
2966            self.subscription_handler
2967                .process_tx(transaction.data().transaction_data(), &effects, &events)
2968                .tap_ok(|_| {
2969                    self.metrics
2970                        .post_processing_total_tx_had_event_processed
2971                        .inc()
2972                })
2973                .tap_err(|e| {
2974                    warn!(
2975                        ?tx_digest,
2976                        "Post processing - Couldn't process events for tx: {}", e
2977                    )
2978                })?;
2979
2980            self.metrics
2981                .post_processing_total_events_emitted
2982                .inc_by(events.data.len() as u64);
2983        };
2984        Ok(())
2985    }
2986
2987    fn make_transaction_block_events(
2988        &self,
2989        transaction_events: TransactionEvents,
2990        digest: TransactionDigest,
2991        timestamp_ms: u64,
2992        epoch_store: &Arc<AuthorityPerEpochStore>,
2993        inner_temporary_store: &InnerTemporaryStore,
2994    ) -> IotaResult<IotaTransactionBlockEvents> {
2995        let mut layout_resolver =
2996            epoch_store
2997                .executor()
2998                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2999                    inner_temporary_store,
3000                    self.get_backing_package_store(),
3001                )));
3002        IotaTransactionBlockEvents::try_from(
3003            transaction_events,
3004            digest,
3005            Some(timestamp_ms),
3006            layout_resolver.as_mut(),
3007        )
3008    }
3009
3010    pub fn unixtime_now_ms() -> u64 {
3011        let now = SystemTime::now()
3012            .duration_since(UNIX_EPOCH)
3013            .expect("Time went backwards")
3014            .as_millis();
3015        u64::try_from(now).expect("Travelling in time machine")
3016    }
3017
3018    #[instrument(level = "trace", skip_all)]
3019    pub async fn handle_transaction_info_request(
3020        &self,
3021        request: TransactionInfoRequest,
3022    ) -> IotaResult<TransactionInfoResponse> {
3023        let epoch_store = self.load_epoch_store_one_call_per_task();
3024        let (transaction, status) = self
3025            .get_transaction_status(&request.transaction_digest, &epoch_store)?
3026            .ok_or(IotaError::TransactionNotFound {
3027                digest: request.transaction_digest,
3028            })?;
3029        Ok(TransactionInfoResponse {
3030            transaction,
3031            status,
3032        })
3033    }
3034
3035    #[instrument(level = "trace", skip_all)]
3036    pub async fn handle_object_info_request(
3037        &self,
3038        request: ObjectInfoRequest,
3039    ) -> IotaResult<ObjectInfoResponse> {
3040        let epoch_store = self.load_epoch_store_one_call_per_task();
3041
3042        let requested_object_seq = match request.request_kind {
3043            ObjectInfoRequestKind::LatestObjectInfo => {
3044                self.try_get_object_or_tombstone(request.object_id)
3045                    .await?
3046                    .ok_or_else(|| {
3047                        IotaError::from(UserInputError::ObjectNotFound {
3048                            object_id: request.object_id,
3049                            version: None,
3050                        })
3051                    })?
3052                    .version
3053            }
3054            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3055        };
3056
3057        let object = self
3058            .get_object_store()
3059            .try_get_object_by_key(&request.object_id, requested_object_seq)?
3060            .ok_or_else(|| {
3061                IotaError::from(UserInputError::ObjectNotFound {
3062                    object_id: request.object_id,
3063                    version: Some(requested_object_seq),
3064                })
3065            })?;
3066
3067        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3068            (request.generate_layout, object.data.as_struct_opt())
3069        {
3070            Some(into_struct_layout(
3071                epoch_store
3072                    .executor()
3073                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3074                    .get_annotated_layout(move_obj.struct_tag())?,
3075            )?)
3076        } else {
3077            None
3078        };
3079
3080        let lock = if !object.is_address_owned() {
3081            // Only address owned objects have locks.
3082            None
3083        } else {
3084            self.get_transaction_lock(&object.object_ref(), &epoch_store)
3085                .await?
3086                .map(|s| s.into_inner())
3087        };
3088
3089        Ok(ObjectInfoResponse {
3090            object,
3091            layout,
3092            lock_for_debugging: lock,
3093        })
3094    }
3095
3096    #[instrument(level = "trace", skip_all)]
3097    pub fn handle_checkpoint_request(
3098        &self,
3099        request: &CheckpointRequest,
3100    ) -> IotaResult<CheckpointResponse> {
3101        let summary = if request.certified {
3102            let summary = match request.sequence_number {
3103                Some(seq) => self
3104                    .checkpoint_store
3105                    .get_checkpoint_by_sequence_number(seq)?,
3106                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3107            }
3108            .map(|v| v.into_inner());
3109            summary.map(CheckpointSummaryResponse::Certified)
3110        } else {
3111            let summary = match request.sequence_number {
3112                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3113                None => self
3114                    .checkpoint_store
3115                    .get_latest_locally_computed_checkpoint()?,
3116            };
3117            summary.map(CheckpointSummaryResponse::Pending)
3118        };
3119        let contents = match &summary {
3120            Some(s) => self
3121                .checkpoint_store
3122                .get_checkpoint_contents(&s.content_digest())?,
3123            None => None,
3124        };
3125        Ok(CheckpointResponse {
3126            checkpoint: summary,
3127            contents,
3128        })
3129    }
3130
3131    fn check_protocol_version(
3132        supported_protocol_versions: SupportedProtocolVersions,
3133        current_version: ProtocolVersion,
3134    ) {
3135        info!("current protocol version is now {:?}", current_version);
3136        info!("supported versions are: {:?}", supported_protocol_versions);
3137        if !supported_protocol_versions.is_version_supported(current_version) {
3138            let msg = format!(
3139                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
3140            );
3141
3142            error!("{}", msg);
3143            eprintln!("{msg}");
3144
3145            #[cfg(not(msim))]
3146            std::process::exit(1);
3147
3148            #[cfg(msim)]
3149            iota_simulator::task::shutdown_current_node();
3150        }
3151    }
3152
3153    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
3154    #[expect(clippy::too_many_arguments)]
3155    pub async fn new(
3156        name: AuthorityName,
3157        secret: StableSyncAuthoritySigner,
3158        supported_protocol_versions: SupportedProtocolVersions,
3159        store: Arc<AuthorityStore>,
3160        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3161        epoch_store: Arc<AuthorityPerEpochStore>,
3162        committee_store: Arc<CommitteeStore>,
3163        indexes: Option<Arc<IndexStore>>,
3164        grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
3165        checkpoint_store: Arc<CheckpointStore>,
3166        prometheus_registry: &Registry,
3167        genesis_objects: &[Object],
3168        db_checkpoint_config: &DBCheckpointConfig,
3169        config: NodeConfig,
3170        archive_readers: ArchiveReaderBalancer,
3171        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3172        chain_identifier: ChainIdentifier,
3173        pruner_db: Option<Arc<AuthorityPrunerTables>>,
3174        checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
3175        policy_config: Option<PolicyConfig>,
3176        firewall_config: Option<RemoteFirewallConfig>,
3177    ) -> Arc<Self> {
3178        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3179
3180        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3181        let (tx_ready_transactions, rx_ready_transactions) = unbounded_channel();
3182        let transaction_manager = Arc::new(TransactionManager::new(
3183            execution_cache_trait_pointers.object_cache_reader.clone(),
3184            execution_cache_trait_pointers
3185                .transaction_cache_reader
3186                .clone(),
3187            &epoch_store,
3188            tx_ready_transactions,
3189            metrics.clone(),
3190        ));
3191        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3192
3193        let authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3194            epoch_store.get_parent_path(),
3195            config
3196                .authority_store_pruning_config
3197                .num_latest_epoch_dbs_to_retain,
3198        )
3199        .await;
3200        let _pruner = AuthorityStorePruner::new(
3201            store.perpetual_tables.clone(),
3202            checkpoint_store.clone(),
3203            grpc_indexes_store.clone(),
3204            indexes.clone(),
3205            config.authority_store_pruning_config.clone(),
3206            epoch_store.committee().authority_exists(&name),
3207            epoch_store.epoch_start_state().epoch_duration_ms(),
3208            prometheus_registry,
3209            archive_readers,
3210            pruner_db,
3211            checkpoint_progress_tracker.clone(),
3212        );
3213        let input_loader =
3214            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3215        let epoch = epoch_store.epoch();
3216        let rgp = epoch_store.reference_gas_price();
3217        let traffic_controller_metrics =
3218            Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3219        let traffic_controller = if let Some(policy_config) = policy_config {
3220            Some(Arc::new(
3221                TrafficController::init(
3222                    policy_config,
3223                    traffic_controller_metrics,
3224                    firewall_config.clone(),
3225                )
3226                .await,
3227            ))
3228        } else {
3229            None
3230        };
3231        let state = Arc::new(AuthorityState {
3232            name,
3233            secret,
3234            execution_lock: RwLock::new(epoch),
3235            epoch_store: ArcSwap::new(epoch_store.clone()),
3236            input_loader,
3237            execution_cache_trait_pointers,
3238            indexes,
3239            grpc_indexes_store,
3240            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3241            checkpoint_store,
3242            committee_store,
3243            transaction_manager,
3244            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3245            metrics,
3246            _pruner,
3247            authority_per_epoch_pruner,
3248            checkpoint_progress_tracker,
3249            db_checkpoint_config: db_checkpoint_config.clone(),
3250            config,
3251            overload_info: AuthorityOverloadInfo::default(),
3252            validator_tx_finalizer,
3253            chain_identifier,
3254            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3255            traffic_controller,
3256        });
3257
3258        // Start a task to execute ready transactions.
3259        let authority_state = Arc::downgrade(&state);
3260        spawn_monitored_task!(execution_process(
3261            authority_state,
3262            rx_ready_transactions,
3263            rx_execution_shutdown,
3264        ));
3265        spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3266
3267        // TODO: This doesn't belong to the constructor of AuthorityState.
3268        state
3269            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3270            .expect("Error indexing genesis objects.");
3271
3272        state
3273    }
3274
3275    pub fn epoch_db_pruner(&self) -> &AuthorityPerEpochStorePruner {
3276        &self.authority_per_epoch_pruner
3277    }
3278
3279    // TODO: Consolidate our traits to reduce the number of methods here.
3280    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3281        &self.execution_cache_trait_pointers.object_cache_reader
3282    }
3283
3284    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3285        &self.execution_cache_trait_pointers.transaction_cache_reader
3286    }
3287
3288    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3289        &self.execution_cache_trait_pointers.cache_writer
3290    }
3291
3292    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3293        &self.execution_cache_trait_pointers.backing_store
3294    }
3295
3296    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3297        &self.execution_cache_trait_pointers.backing_package_store
3298    }
3299
3300    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3301        &self.execution_cache_trait_pointers.object_store
3302    }
3303
3304    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3305        &self.execution_cache_trait_pointers.reconfig_api
3306    }
3307
3308    pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3309        &self.execution_cache_trait_pointers.global_state_hash_store
3310    }
3311
3312    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3313        &self.execution_cache_trait_pointers.checkpoint_cache
3314    }
3315
3316    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3317        &self.execution_cache_trait_pointers.state_sync_store
3318    }
3319
3320    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3321        &self.execution_cache_trait_pointers.cache_commit
3322    }
3323
3324    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3325        self.execution_cache_trait_pointers
3326            .testing_api
3327            .database_for_testing()
3328    }
3329
3330    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3331        &self,
3332        config: NodeConfig,
3333        metrics: Arc<AuthorityStorePruningMetrics>,
3334    ) -> anyhow::Result<()> {
3335        let archive_readers =
3336            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3337        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3338            &self.database_for_testing().perpetual_tables,
3339            &self.checkpoint_store,
3340            self.grpc_indexes_store.as_deref(),
3341            None,
3342            config.authority_store_pruning_config,
3343            metrics,
3344            archive_readers,
3345            EPOCH_DURATION_MS_FOR_TESTING,
3346            self.checkpoint_progress_tracker.as_ref(),
3347        )
3348        .await
3349    }
3350
3351    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3352        &self.transaction_manager
3353    }
3354
3355    /// Adds transactions / certificates to transaction manager for ordered
3356    /// execution.
3357    pub fn enqueue_transactions_for_execution(
3358        &self,
3359        transactions: Vec<VerifiedExecutableTransaction>,
3360        epoch_store: &Arc<AuthorityPerEpochStore>,
3361    ) {
3362        self.transaction_manager.enqueue(transactions, epoch_store)
3363    }
3364
3365    /// Adds certificates to transaction manager for ordered execution.
3366    pub fn enqueue_certificates_for_execution(
3367        &self,
3368        certs: Vec<VerifiedCertificate>,
3369        epoch_store: &Arc<AuthorityPerEpochStore>,
3370    ) {
3371        self.transaction_manager
3372            .enqueue_certificates(certs, epoch_store)
3373    }
3374
3375    pub fn enqueue_with_expected_effects_digest(
3376        &self,
3377        transactions: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3378        epoch_store: &AuthorityPerEpochStore,
3379    ) {
3380        self.transaction_manager
3381            .enqueue_with_expected_effects_digest(transactions, epoch_store)
3382    }
3383
3384    fn create_owner_index_if_empty(
3385        &self,
3386        genesis_objects: &[Object],
3387        epoch_store: &Arc<AuthorityPerEpochStore>,
3388    ) -> IotaResult {
3389        let Some(index_store) = &self.indexes else {
3390            return Ok(());
3391        };
3392        if !index_store.is_empty() {
3393            return Ok(());
3394        }
3395
3396        let mut new_owners = vec![];
3397        let mut new_dynamic_fields = vec![];
3398        let mut layout_resolver = epoch_store
3399            .executor()
3400            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3401        for o in genesis_objects.iter() {
3402            match o.owner {
3403                Owner::Address(addr) => {
3404                    new_owners.push(((addr, o.id()), ObjectInfo::new(&o.object_ref(), o)))
3405                }
3406                Owner::Object(object_id) => {
3407                    let id = o.id();
3408                    let info = match self.try_create_dynamic_field_info(
3409                        o,
3410                        &BTreeMap::new(),
3411                        layout_resolver.as_mut(),
3412                    ) {
3413                        Ok(Some(info)) => info,
3414                        Ok(None) => continue,
3415                        Err(IotaError::UserInput {
3416                            error:
3417                                UserInputError::ObjectNotFound {
3418                                    object_id: not_found_id,
3419                                    version,
3420                                },
3421                        }) => {
3422                            warn!(
3423                                ?not_found_id,
3424                                ?version,
3425                                object_owner=?object_id,
3426                                field=?id,
3427                                "Skipping dynamic field: referenced genesis object not found"
3428                            );
3429                            continue;
3430                        }
3431                        Err(e) => return Err(e),
3432                    };
3433                    new_dynamic_fields.push(((object_id, id), info));
3434                }
3435                _ => {}
3436            }
3437        }
3438
3439        index_store.insert_genesis_objects(ObjectIndexChanges {
3440            deleted_owners: vec![],
3441            deleted_dynamic_fields: vec![],
3442            new_owners,
3443            new_dynamic_fields,
3444        })
3445    }
3446
3447    /// Attempts to acquire execution lock for an executable transaction.
3448    /// Returns the lock if the transaction is matching current executed epoch
3449    /// Returns None otherwise
3450    pub fn execution_lock_for_executable_transaction(
3451        &self,
3452        transaction: &VerifiedExecutableTransaction,
3453    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3454        let lock = self
3455            .execution_lock
3456            .try_read()
3457            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3458        if *lock == transaction.auth_sig().epoch() {
3459            Ok(lock)
3460        } else {
3461            Err(IotaError::WrongEpoch {
3462                expected_epoch: *lock,
3463                actual_epoch: transaction.auth_sig().epoch(),
3464            })
3465        }
3466    }
3467
3468    /// Acquires the execution lock for the duration of a transaction signing
3469    /// request. This prevents reconfiguration from starting until we are
3470    /// finished handling the signing request. Otherwise, in-memory lock
3471    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3472    /// while we are attempting to acquire locks for the transaction.
3473    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3474        self.execution_lock
3475            .try_read()
3476            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3477    }
3478
3479    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3480        self.execution_lock.write().await
3481    }
3482
3483    #[instrument(level = "error", skip_all)]
3484    pub async fn reconfigure(
3485        &self,
3486        cur_epoch_store: &AuthorityPerEpochStore,
3487        supported_protocol_versions: SupportedProtocolVersions,
3488        new_committee: Committee,
3489        epoch_start_configuration: EpochStartConfiguration,
3490        state_hasher: Arc<GlobalStateHasher>,
3491        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3492        epoch_supply_change: i64,
3493        epoch_last_checkpoint: CheckpointSequenceNumber,
3494    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3495        Self::check_protocol_version(
3496            supported_protocol_versions,
3497            epoch_start_configuration
3498                .epoch_start_state()
3499                .protocol_version(),
3500        );
3501        self.metrics.reset_on_reconfigure();
3502        self.committee_store.insert_new_committee(&new_committee)?;
3503
3504        // Wait until no transactions are being executed.
3505        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3506
3507        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3508        cur_epoch_store.epoch_terminated().await;
3509
3510        let highest_locally_built_checkpoint_seq = self
3511            .checkpoint_store
3512            .get_latest_locally_computed_checkpoint()?
3513            .map(|c| *c.sequence_number())
3514            .unwrap_or(0);
3515
3516        assert!(
3517            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3518            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3519        );
3520        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint
3521            || self.is_fullnode(cur_epoch_store)
3522        {
3523            // if we built the last checkpoint locally (as opposed to receiving it from a
3524            // peer), then all shared_version_assignments except the one for the
3525            // ChangeEpoch transaction should have been removed
3526            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3527            // Due to (otherwise harmless) race conditions between CheckpointExecutor and
3528            // ConsensusHandler, we actually can't guarantee that all
3529            // shared_version_assignments have been removed. However,
3530            // typically at most 2 or 3 are left over. We leave this check here in order to
3531            // catch complete failure of cleanup which would cause a memory
3532            // leak.
3533            if num_shared_version_assignments > 10 {
3534                // If this happens in prod, we have a memory leak, but not a correctness issue.
3535                debug_fatal!(
3536                    "all shared_version_assignments should have been removed \
3537                    (num_shared_version_assignments: {num_shared_version_assignments})"
3538                );
3539            }
3540        }
3541
3542        // Safe to reconfigure now. No transactions are being executed,
3543        // and no epoch-specific tasks are running.
3544
3545        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3546        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3547        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3548            .await?;
3549        self.get_reconfig_api()
3550            .clear_state_end_of_epoch(&execution_lock);
3551        self.check_system_consistency(
3552            cur_epoch_store,
3553            state_hasher,
3554            expensive_safety_check_config,
3555            epoch_supply_change,
3556        )?;
3557        self.get_reconfig_api()
3558            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3559        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3560            if self
3561                .db_checkpoint_config
3562                .perform_db_checkpoints_at_epoch_end
3563            {
3564                let checkpoint_indexes = self
3565                    .db_checkpoint_config
3566                    .perform_index_db_checkpoints_at_epoch_end
3567                    .unwrap_or(false);
3568                let current_epoch = cur_epoch_store.epoch();
3569                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3570                self.checkpoint_all_dbs(
3571                    &epoch_checkpoint_path,
3572                    cur_epoch_store,
3573                    checkpoint_indexes,
3574                )?;
3575            }
3576        }
3577
3578        let new_epoch = new_committee.epoch;
3579        let new_epoch_store = self
3580            .reopen_epoch_db(
3581                cur_epoch_store,
3582                new_committee,
3583                epoch_start_configuration,
3584                expensive_safety_check_config,
3585                epoch_last_checkpoint,
3586            )
3587            .await?;
3588        assert_eq!(new_epoch_store.epoch(), new_epoch);
3589        self.transaction_manager.reconfigure(new_epoch);
3590        *execution_lock = new_epoch;
3591        // drop execution_lock after epoch store was updated
3592        // see also assert in AuthorityState::process_transaction
3593        // on the epoch store and execution lock epoch match
3594        Ok(new_epoch_store)
3595    }
3596
3597    /// Advance the epoch store to the next epoch for testing only.
3598    /// This only manually sets all the places where we have the epoch number.
3599    /// It doesn't properly reconfigure the node, hence should be only used for
3600    /// testing.
3601    pub async fn reconfigure_for_testing(&self) {
3602        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3603        let epoch_store = self.epoch_store_for_testing().clone();
3604        let protocol_config = epoch_store.protocol_config().clone();
3605        // The current protocol config used in the epoch store may have been overridden
3606        // and diverged from the protocol config definitions. That override may
3607        // have now been dropped when the initial guard was dropped. We reapply
3608        // the override before creating the new epoch store, to make sure that
3609        // the new epoch store has the same protocol config as the current one.
3610        // Since this is for testing only, we mostly like to keep the protocol config
3611        // the same across epochs.
3612        let _guard =
3613            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3614        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3615            self.get_backing_package_store().clone(),
3616            &self.config.expensive_safety_check_config,
3617            self.checkpoint_store
3618                .get_epoch_last_checkpoint(epoch_store.epoch())
3619                .unwrap()
3620                .map(|c| *c.sequence_number())
3621                .unwrap_or_default(),
3622        );
3623        let new_epoch = new_epoch_store.epoch();
3624        self.transaction_manager.reconfigure(new_epoch);
3625        self.epoch_store.store(new_epoch_store);
3626        epoch_store.epoch_terminated().await;
3627        *execution_lock = new_epoch;
3628    }
3629
3630    #[instrument(level = "error", skip_all)]
3631    fn check_system_consistency(
3632        &self,
3633        cur_epoch_store: &AuthorityPerEpochStore,
3634        state_hasher: Arc<GlobalStateHasher>,
3635        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3636        epoch_supply_change: i64,
3637    ) -> IotaResult<()> {
3638        info!(
3639            "Performing iota conservation consistency check for epoch {}",
3640            cur_epoch_store.epoch()
3641        );
3642
3643        if cfg!(debug_assertions) {
3644            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3645        }
3646
3647        self.get_reconfig_api()
3648            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3649
3650        // check for root state hash consistency with live object set
3651        if expensive_safety_check_config.enable_state_consistency_check() {
3652            info!(
3653                "Performing state consistency check for epoch {}",
3654                cur_epoch_store.epoch()
3655            );
3656            self.expensive_check_is_consistent_state(
3657                state_hasher,
3658                cur_epoch_store,
3659                cfg!(debug_assertions), // panic in debug mode only
3660            );
3661        }
3662
3663        if expensive_safety_check_config.enable_secondary_index_checks() {
3664            if let Some(indexes) = self.indexes.clone() {
3665                verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
3666                    .expect("secondary indexes are inconsistent");
3667            }
3668        }
3669
3670        Ok(())
3671    }
3672
3673    fn expensive_check_is_consistent_state(
3674        &self,
3675        state_hasher: Arc<GlobalStateHasher>,
3676        cur_epoch_store: &AuthorityPerEpochStore,
3677        panic: bool,
3678    ) {
3679        let live_object_set_hash = state_hasher.digest_live_object_set();
3680
3681        let root_state_hash: ECMHLiveObjectSetDigest = self
3682            .get_global_state_hash_store()
3683            .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
3684            .expect("Retrieving root state hash cannot fail")
3685            .expect("Root state hash for epoch must exist")
3686            .1
3687            .digest()
3688            .into();
3689
3690        let is_inconsistent = root_state_hash != live_object_set_hash;
3691        if is_inconsistent {
3692            if panic {
3693                panic!(
3694                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3695                );
3696            } else {
3697                error!(
3698                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3699                    root_state_hash, live_object_set_hash
3700                );
3701            }
3702        } else {
3703            info!("State consistency check passed");
3704        }
3705
3706        if !panic {
3707            state_hasher.set_inconsistent_state(is_inconsistent);
3708        }
3709    }
3710
3711    pub fn current_epoch_for_testing(&self) -> EpochId {
3712        self.epoch_store_for_testing().epoch()
3713    }
3714
3715    #[instrument(level = "error", skip_all)]
3716    pub fn checkpoint_all_dbs(
3717        &self,
3718        checkpoint_path: &Path,
3719        cur_epoch_store: &AuthorityPerEpochStore,
3720        checkpoint_indexes: bool,
3721    ) -> IotaResult {
3722        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3723        let current_epoch = cur_epoch_store.epoch();
3724
3725        if checkpoint_path.exists() {
3726            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3727            return Ok(());
3728        }
3729
3730        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3731        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3732
3733        if checkpoint_path_tmp.exists() {
3734            fs::remove_dir_all(&checkpoint_path_tmp)
3735                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3736        }
3737
3738        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3739        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3740
3741        // NOTE: Do not change the order of invoking these checkpoint calls
3742        // We want to snapshot checkpoint db first to not race with state sync
3743        self.checkpoint_store
3744            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3745
3746        self.get_reconfig_api()
3747            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3748
3749        self.committee_store
3750            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3751
3752        if checkpoint_indexes {
3753            if let Some(indexes) = self.indexes.as_ref() {
3754                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3755            }
3756            if let Some(grpc_indexes_store) = self.grpc_indexes_store.as_ref() {
3757                grpc_indexes_store.checkpoint_db(&checkpoint_path_tmp.join(GRPC_INDEXES_DIR))?;
3758            }
3759        }
3760
3761        fs::rename(checkpoint_path_tmp, checkpoint_path)
3762            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3763        Ok(())
3764    }
3765
3766    /// Load the current epoch store. This can change during reconfiguration. To
3767    /// ensure that we never end up accessing different epoch stores in a
3768    /// single task, we need to make sure that this is called once per task.
3769    /// Each call needs to be carefully audited to ensure it is
3770    /// the case. This also means we should minimize the number of call-sites.
3771    /// Only call it when there is no way to obtain it from somewhere else.
3772    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3773        self.epoch_store.load()
3774    }
3775
3776    // Load the epoch store, should be used in tests only.
3777    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3778        self.load_epoch_store_one_call_per_task()
3779    }
3780
3781    pub fn clone_committee_for_testing(&self) -> Committee {
3782        Committee::clone(self.epoch_store_for_testing().committee())
3783    }
3784
3785    #[instrument(level = "trace", skip_all)]
3786    pub async fn try_get_object(&self, object_id: &ObjectId) -> IotaResult<Option<Object>> {
3787        self.get_object_store()
3788            .try_get_object(object_id)
3789            .map_err(Into::into)
3790    }
3791
3792    /// Non-fallible version of `try_get_object`.
3793    pub async fn get_object(&self, object_id: &ObjectId) -> Option<Object> {
3794        self.try_get_object(object_id)
3795            .await
3796            .expect("storage access failed")
3797    }
3798
3799    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3800        Ok(self
3801            .try_get_object(&ObjectId::SYSTEM)
3802            .await?
3803            .expect("system package should always exist")
3804            .object_ref())
3805    }
3806
3807    // This function is only used for testing.
3808    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3809        self.get_object_cache_reader()
3810            .try_get_iota_system_state_object_unsafe()
3811    }
3812
3813    #[instrument(level = "trace", skip_all)]
3814    pub fn get_checkpoint_by_sequence_number(
3815        &self,
3816        sequence_number: CheckpointSequenceNumber,
3817    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3818        Ok(self
3819            .checkpoint_store
3820            .get_checkpoint_by_sequence_number(sequence_number)?)
3821    }
3822
3823    /// Wait for the given transactions to be included in a checkpoint.
3824    ///
3825    /// Returns a mapping from transaction digest to
3826    /// `(checkpoint_sequence_number, checkpoint_timestamp_ms)`.
3827    /// On timeout, returns partial results for any transactions that were
3828    /// already checkpointed.
3829    pub async fn wait_for_checkpoint_inclusion(
3830        &self,
3831        digests: &[TransactionDigest],
3832        timeout: Duration,
3833    ) -> IotaResult<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>> {
3834        let epoch_store = self.load_epoch_store_one_call_per_task();
3835
3836        // Local cache so multiple transactions in the same checkpoint only
3837        // trigger a single checkpoint summary lookup.
3838        let mut checkpoint_timestamp_cache = HashMap::<CheckpointSequenceNumber, u64>::new();
3839
3840        let results = epoch_store
3841            .wait_for_transactions_in_checkpoint_with_timeout(digests, timeout, |seq| {
3842                *checkpoint_timestamp_cache.entry(seq).or_insert_with(|| {
3843                    self.get_checkpoint_by_sequence_number(seq)
3844                        .ok()
3845                        .flatten()
3846                        .map(|c| c.timestamp_ms)
3847                        .unwrap_or(0)
3848                })
3849            })
3850            .await?;
3851
3852        Ok(digests
3853            .iter()
3854            .copied()
3855            .zip(results)
3856            .filter_map(|(digest, opt)| opt.map(|seq_and_ts| (digest, seq_and_ts)))
3857            .collect())
3858    }
3859
3860    #[instrument(level = "trace", skip_all)]
3861    pub fn get_transaction_checkpoint_for_tests(
3862        &self,
3863        digest: &TransactionDigest,
3864        epoch_store: &AuthorityPerEpochStore,
3865    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3866        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3867        let Some(checkpoint) = checkpoint else {
3868            return Ok(None);
3869        };
3870        let checkpoint = self
3871            .checkpoint_store
3872            .get_checkpoint_by_sequence_number(checkpoint)?;
3873        Ok(checkpoint)
3874    }
3875
3876    #[instrument(level = "trace", skip_all)]
3877    pub fn get_object_read(&self, object_id: &ObjectId) -> IotaResult<ObjectRead> {
3878        Ok(
3879            match self
3880                .get_object_cache_reader()
3881                .try_get_latest_object_or_tombstone(*object_id)?
3882            {
3883                Some((_, ObjectOrTombstone::Object(object))) => {
3884                    let layout = self.get_object_layout(&object)?;
3885                    ObjectRead::Exists(object.object_ref(), object, layout)
3886                }
3887                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3888                None => ObjectRead::NotExists(*object_id),
3889            },
3890        )
3891    }
3892
3893    /// Chain Identifier is the digest of the genesis checkpoint.
3894    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3895        self.chain_identifier
3896    }
3897
3898    #[instrument(level = "trace", skip_all)]
3899    pub fn get_move_object<T>(&self, object_id: &ObjectId) -> IotaResult<T>
3900    where
3901        T: DeserializeOwned,
3902    {
3903        let o = self.get_object_read(object_id)?.into_object()?;
3904        if let Some(move_object) = o.data.as_struct_opt() {
3905            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3906                IotaError::ObjectDeserialization {
3907                    error: format!("{e}"),
3908                }
3909            })?)
3910        } else {
3911            Err(IotaError::ObjectDeserialization {
3912                error: format!("Provided object : [{object_id}] is not a Move object."),
3913            })
3914        }
3915    }
3916
3917    /// This function aims to serve rpc reads on past objects and
3918    /// we don't expect it to be called for other purposes.
3919    /// Depending on the object pruning policies that will be enforced in the
3920    /// future there is no software-level guarantee/SLA to retrieve an object
3921    /// with an old version even if it exists/existed.
3922    #[instrument(level = "trace", skip_all)]
3923    pub fn get_past_object_read(
3924        &self,
3925        object_id: &ObjectId,
3926        version: SequenceNumber,
3927    ) -> IotaResult<PastObjectRead> {
3928        // Firstly we see if the object ever existed by getting its latest data
3929        let Some(obj_ref) = self
3930            .get_object_cache_reader()
3931            .try_get_latest_object_ref_or_tombstone(*object_id)?
3932        else {
3933            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3934        };
3935
3936        if version > obj_ref.version {
3937            return Ok(PastObjectRead::VersionTooHigh {
3938                object_id: *object_id,
3939                asked_version: version,
3940                latest_version: obj_ref.version,
3941            });
3942        }
3943
3944        if version < obj_ref.version {
3945            // Read past objects
3946            return Ok(match self.read_object_at_version(object_id, version)? {
3947                Some((object, layout)) => {
3948                    let obj_ref = object.object_ref();
3949                    PastObjectRead::VersionFound(obj_ref, object, layout)
3950                }
3951
3952                None => PastObjectRead::VersionNotFound(*object_id, version),
3953            });
3954        }
3955
3956        if !obj_ref.digest.is_object_alive() {
3957            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3958        }
3959
3960        match self.read_object_at_version(object_id, obj_ref.version)? {
3961            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3962            None => {
3963                error!(
3964                    "Object with in parent_entry is missing from object store, datastore is \
3965                     inconsistent",
3966                );
3967                Err(UserInputError::ObjectNotFound {
3968                    object_id: *object_id,
3969                    version: Some(obj_ref.version),
3970                }
3971                .into())
3972            }
3973        }
3974    }
3975
3976    #[instrument(level = "trace", skip_all)]
3977    fn read_object_at_version(
3978        &self,
3979        object_id: &ObjectId,
3980        version: SequenceNumber,
3981    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3982        let Some(object) = self
3983            .get_object_cache_reader()
3984            .try_get_object_by_key(object_id, version)?
3985        else {
3986            return Ok(None);
3987        };
3988
3989        let layout = self.get_object_layout(&object)?;
3990        Ok(Some((object, layout)))
3991    }
3992
3993    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3994        let layout = object
3995            .data
3996            .as_struct_opt()
3997            .map(|object| {
3998                into_struct_layout(
3999                    self.load_epoch_store_one_call_per_task()
4000                        .executor()
4001                        // TODO(cache) - must read through cache
4002                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
4003                        .get_annotated_layout(object.struct_tag())?,
4004                )
4005            })
4006            .transpose()?;
4007        Ok(layout)
4008    }
4009
4010    fn get_owner_at_version(
4011        &self,
4012        object_id: &ObjectId,
4013        version: SequenceNumber,
4014    ) -> IotaResult<Owner> {
4015        self.get_object_store()
4016            .try_get_object_by_key(object_id, version)?
4017            .ok_or_else(|| {
4018                IotaError::from(UserInputError::ObjectNotFound {
4019                    object_id: *object_id,
4020                    version: Some(version),
4021                })
4022            })
4023            .map(|o| o.owner)
4024    }
4025
4026    #[instrument(level = "trace", skip_all)]
4027    pub fn get_owner_objects(
4028        &self,
4029        owner: Address,
4030        // If `Some`, the query will start from the next item after the specified cursor
4031        cursor: Option<ObjectId>,
4032        limit: usize,
4033        filter: Option<IotaObjectDataFilter>,
4034    ) -> IotaResult<Vec<ObjectInfo>> {
4035        if let Some(indexes) = &self.indexes {
4036            indexes.get_owner_objects(owner, cursor, limit, filter)
4037        } else {
4038            Err(IotaError::IndexStoreNotAvailable)
4039        }
4040    }
4041
4042    #[instrument(level = "trace", skip_all)]
4043    pub fn get_owned_coins_iterator_with_cursor(
4044        &self,
4045        owner: Address,
4046        // If `Some`, the query will start from the next item after the specified cursor
4047        cursor: (String, ObjectId),
4048        limit: usize,
4049        one_coin_type_only: bool,
4050    ) -> IotaResult<impl Iterator<Item = (String, ObjectId, CoinInfo)> + '_> {
4051        if let Some(indexes) = &self.indexes {
4052            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4053        } else {
4054            Err(IotaError::IndexStoreNotAvailable)
4055        }
4056    }
4057
4058    #[instrument(level = "trace", skip_all)]
4059    pub fn get_owner_objects_iterator(
4060        &self,
4061        owner: Address,
4062        // If `Some`, the query will start from the next item after the specified cursor
4063        cursor: Option<ObjectId>,
4064        filter: Option<IotaObjectDataFilter>,
4065    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
4066        let cursor_u = cursor.unwrap_or(ObjectId::ZERO);
4067        if let Some(indexes) = &self.indexes {
4068            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4069        } else {
4070            Err(IotaError::IndexStoreNotAvailable)
4071        }
4072    }
4073
4074    #[instrument(level = "trace", skip_all)]
4075    pub async fn get_move_objects<T>(&self, owner: Address, tag: StructTag) -> IotaResult<Vec<T>>
4076    where
4077        T: DeserializeOwned,
4078    {
4079        let object_ids = self
4080            .get_owner_objects_iterator(owner, None, None)?
4081            .filter(|o| match &o.type_ {
4082                ObjectType::Struct(s) => *s == tag,
4083                ObjectType::Package => false,
4084            })
4085            .map(|info| ObjectKey(info.object_id, info.version))
4086            .collect::<Vec<_>>();
4087        let mut move_objects = vec![];
4088
4089        let objects = self
4090            .get_object_store()
4091            .try_multi_get_objects_by_key(&object_ids)?;
4092
4093        for (o, id) in objects.into_iter().zip(object_ids) {
4094            let object = o.ok_or_else(|| {
4095                IotaError::from(UserInputError::ObjectNotFound {
4096                    object_id: id.0,
4097                    version: Some(id.1),
4098                })
4099            })?;
4100            let move_object = object.data.as_struct_opt().ok_or_else(|| {
4101                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4102            })?;
4103            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4104                IotaError::ObjectDeserialization {
4105                    error: format!("{e}"),
4106                }
4107            })?);
4108        }
4109        Ok(move_objects)
4110    }
4111
4112    #[instrument(level = "trace", skip_all)]
4113    pub fn get_dynamic_fields(
4114        &self,
4115        owner: ObjectId,
4116        // If `Some`, the query will start from the next item after the specified cursor
4117        cursor: Option<ObjectId>,
4118        limit: usize,
4119    ) -> IotaResult<Vec<(ObjectId, DynamicFieldInfo)>> {
4120        Ok(self
4121            .get_dynamic_fields_iterator(owner, cursor)?
4122            .take(limit)
4123            .collect::<Result<Vec<_>, _>>()?)
4124    }
4125
4126    fn get_dynamic_fields_iterator(
4127        &self,
4128        owner: ObjectId,
4129        // If `Some`, the query will start from the next item after the specified cursor
4130        cursor: Option<ObjectId>,
4131    ) -> IotaResult<impl Iterator<Item = Result<(ObjectId, DynamicFieldInfo), TypedStoreError>> + '_>
4132    {
4133        if let Some(indexes) = &self.indexes {
4134            indexes.get_dynamic_fields_iterator(owner, cursor)
4135        } else {
4136            Err(IotaError::IndexStoreNotAvailable)
4137        }
4138    }
4139
4140    #[instrument(level = "trace", skip_all)]
4141    pub fn get_dynamic_field_object_id(
4142        &self,
4143        owner: ObjectId,
4144        name_type: TypeTag,
4145        name_bcs_bytes: &[u8],
4146    ) -> IotaResult<Option<ObjectId>> {
4147        if let Some(indexes) = &self.indexes {
4148            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4149        } else {
4150            Err(IotaError::IndexStoreNotAvailable)
4151        }
4152    }
4153
4154    #[instrument(level = "trace", skip_all)]
4155    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
4156        Ok(self.get_indexes()?.next_sequence_number())
4157    }
4158
4159    #[instrument(level = "trace", skip_all)]
4160    pub async fn get_executed_transaction_and_effects(
4161        &self,
4162        digest: TransactionDigest,
4163        kv_store: Arc<TransactionKeyValueStore>,
4164    ) -> IotaResult<(Transaction, TransactionEffects)> {
4165        let transaction = kv_store.get_tx(digest).await?;
4166        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4167        Ok((transaction, effects))
4168    }
4169
4170    #[instrument(level = "trace", skip_all)]
4171    pub fn multi_get_checkpoint_by_sequence_number(
4172        &self,
4173        sequence_numbers: &[CheckpointSequenceNumber],
4174    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
4175        Ok(self
4176            .checkpoint_store
4177            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4178    }
4179
4180    #[instrument(level = "trace", skip_all)]
4181    pub fn get_transaction_events(
4182        &self,
4183        digest: &TransactionDigest,
4184    ) -> IotaResult<TransactionEvents> {
4185        self.get_transaction_cache_reader()
4186            .try_get_events(digest)?
4187            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
4188    }
4189
4190    pub fn get_transaction_input_objects(
4191        &self,
4192        effects: &TransactionEffects,
4193    ) -> anyhow::Result<Vec<Object>> {
4194        iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4195            .map_err(Into::into)
4196    }
4197
4198    pub fn get_transaction_output_objects(
4199        &self,
4200        effects: &TransactionEffects,
4201    ) -> anyhow::Result<Vec<Object>> {
4202        iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4203            .map_err(Into::into)
4204    }
4205
4206    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
4207        match &self.indexes {
4208            Some(i) => Ok(i.clone()),
4209            None => Err(IotaError::UnsupportedFeature {
4210                error: "extended object indexing is not enabled on this server".into(),
4211            }),
4212        }
4213    }
4214
4215    pub async fn get_transactions_for_tests(
4216        self: &Arc<Self>,
4217        filter: Option<TransactionFilter>,
4218        cursor: Option<TransactionDigest>,
4219        limit: Option<usize>,
4220        reverse: bool,
4221    ) -> IotaResult<Vec<TransactionDigest>> {
4222        let metrics = KeyValueStoreMetrics::new_for_tests();
4223        let kv_store = Arc::new(TransactionKeyValueStore::new(
4224            "rocksdb",
4225            metrics,
4226            self.clone(),
4227        ));
4228        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4229            .await
4230    }
4231
4232    #[instrument(level = "trace", skip_all)]
4233    pub async fn get_transactions(
4234        &self,
4235        kv_store: &Arc<TransactionKeyValueStore>,
4236        filter: Option<TransactionFilter>,
4237        // If `Some`, the query will start from the next item after the specified cursor
4238        cursor: Option<TransactionDigest>,
4239        limit: Option<usize>,
4240        reverse: bool,
4241    ) -> IotaResult<Vec<TransactionDigest>> {
4242        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4243            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4244            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4245            if reverse {
4246                let iter = iter
4247                    .rev()
4248                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4249                    .skip(usize::from(cursor.is_some()));
4250                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4251            } else {
4252                let iter = iter
4253                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4254                    .skip(usize::from(cursor.is_some()));
4255                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4256            }
4257        }
4258        self.get_indexes()?
4259            .get_transactions(filter, cursor, limit, reverse)
4260    }
4261
4262    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4263        &self.checkpoint_store
4264    }
4265
4266    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4267        self.get_checkpoint_store()
4268            .get_highest_executed_checkpoint_seq_number()?
4269            .ok_or(IotaError::UserInput {
4270                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4271            })
4272    }
4273
4274    #[cfg(msim)]
4275    pub fn get_highest_pruned_checkpoint_for_testing(
4276        &self,
4277    ) -> IotaResult<CheckpointSequenceNumber> {
4278        self.database_for_testing()
4279            .perpetual_tables
4280            .get_highest_pruned_checkpoint()
4281            .map(|c| c.unwrap_or(0))
4282            .map_err(Into::into)
4283    }
4284
4285    #[instrument(level = "trace", skip_all)]
4286    pub fn get_checkpoint_summary_by_sequence_number(
4287        &self,
4288        sequence_number: CheckpointSequenceNumber,
4289    ) -> IotaResult<CheckpointSummary> {
4290        let verified_checkpoint = self
4291            .get_checkpoint_store()
4292            .get_checkpoint_by_sequence_number(sequence_number)?;
4293        match verified_checkpoint {
4294            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4295            None => Err(IotaError::UserInput {
4296                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4297            }),
4298        }
4299    }
4300
4301    #[instrument(level = "trace", skip_all)]
4302    pub fn get_checkpoint_summary_by_digest(
4303        &self,
4304        digest: CheckpointDigest,
4305    ) -> IotaResult<CheckpointSummary> {
4306        let verified_checkpoint = self
4307            .get_checkpoint_store()
4308            .get_checkpoint_by_digest(&digest)?;
4309        match verified_checkpoint {
4310            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4311            None => Err(IotaError::UserInput {
4312                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4313            }),
4314        }
4315    }
4316
4317    #[instrument(level = "trace", skip_all)]
4318    pub fn find_publish_txn_digest(&self, package_id: ObjectId) -> IotaResult<TransactionDigest> {
4319        if package_id.is_system_package() {
4320            return self.find_genesis_txn_digest();
4321        }
4322        Ok(self
4323            .get_object_read(&package_id)?
4324            .into_object()?
4325            .previous_transaction)
4326    }
4327
4328    #[instrument(level = "trace", skip_all)]
4329    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4330        let summary = self
4331            .get_verified_checkpoint_by_sequence_number(0)?
4332            .into_message();
4333        let content = self.get_checkpoint_contents(summary.content_digest)?;
4334        let genesis_transaction = content.enumerate_transactions(&summary).next();
4335        Ok(genesis_transaction
4336            .ok_or(IotaError::UserInput {
4337                error: UserInputError::GenesisTransactionNotFound,
4338            })?
4339            .1
4340            .transaction)
4341    }
4342
4343    #[instrument(level = "trace", skip_all)]
4344    pub fn get_verified_checkpoint_by_sequence_number(
4345        &self,
4346        sequence_number: CheckpointSequenceNumber,
4347    ) -> IotaResult<VerifiedCheckpoint> {
4348        let verified_checkpoint = self
4349            .get_checkpoint_store()
4350            .get_checkpoint_by_sequence_number(sequence_number)?;
4351        match verified_checkpoint {
4352            Some(verified_checkpoint) => Ok(verified_checkpoint),
4353            None => Err(IotaError::UserInput {
4354                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4355            }),
4356        }
4357    }
4358
4359    #[instrument(level = "trace", skip_all)]
4360    pub fn get_verified_checkpoint_summary_by_digest(
4361        &self,
4362        digest: CheckpointDigest,
4363    ) -> IotaResult<VerifiedCheckpoint> {
4364        let verified_checkpoint = self
4365            .get_checkpoint_store()
4366            .get_checkpoint_by_digest(&digest)?;
4367        match verified_checkpoint {
4368            Some(verified_checkpoint) => Ok(verified_checkpoint),
4369            None => Err(IotaError::UserInput {
4370                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4371            }),
4372        }
4373    }
4374
4375    #[instrument(level = "trace", skip_all)]
4376    pub fn get_checkpoint_contents(
4377        &self,
4378        digest: CheckpointContentsDigest,
4379    ) -> IotaResult<CheckpointContents> {
4380        self.get_checkpoint_store()
4381            .get_checkpoint_contents(&digest)?
4382            .ok_or(IotaError::UserInput {
4383                error: UserInputError::CheckpointContentsNotFound(digest),
4384            })
4385    }
4386
4387    #[instrument(level = "trace", skip_all)]
4388    pub fn get_checkpoint_contents_by_sequence_number(
4389        &self,
4390        sequence_number: CheckpointSequenceNumber,
4391    ) -> IotaResult<CheckpointContents> {
4392        let verified_checkpoint = self
4393            .get_checkpoint_store()
4394            .get_checkpoint_by_sequence_number(sequence_number)?;
4395        match verified_checkpoint {
4396            Some(verified_checkpoint) => {
4397                let content_digest = verified_checkpoint.into_inner().content_digest;
4398                self.get_checkpoint_contents(content_digest)
4399            }
4400            None => Err(IotaError::UserInput {
4401                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4402            }),
4403        }
4404    }
4405
4406    #[instrument(level = "trace", skip_all)]
4407    pub async fn query_events(
4408        &self,
4409        kv_store: &Arc<TransactionKeyValueStore>,
4410        query: EventFilter,
4411        // If `Some`, the query will start from the next item after the specified cursor
4412        cursor: Option<EventID>,
4413        limit: usize,
4414        descending: bool,
4415    ) -> IotaResult<Vec<IotaEvent>> {
4416        let index_store = self.get_indexes()?;
4417
4418        // Get the tx_num from tx_digest
4419        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4420            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4421                IotaError::TransactionNotFound {
4422                    digest: cursor.tx_digest,
4423                },
4424            )?;
4425            (tx_seq, cursor.event_seq as usize)
4426        } else if descending {
4427            (u64::MAX, usize::MAX)
4428        } else {
4429            (0, 0)
4430        };
4431
4432        let limit = limit + 1;
4433        let mut event_keys = match query {
4434            EventFilter::All(filters) => {
4435                if filters.is_empty() {
4436                    index_store.all_events(tx_num, event_num, limit, descending)?
4437                } else {
4438                    return Err(IotaError::UserInput {
4439                        error: UserInputError::Unsupported(
4440                            "This query type does not currently support filter combinations"
4441                                .to_string(),
4442                        ),
4443                    });
4444                }
4445            }
4446            EventFilter::Transaction(digest) => {
4447                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4448            }
4449            EventFilter::MoveModule { package, module } => {
4450                let module_id = ModuleId::new(
4451                    AccountAddress::new(package.into_bytes()),
4452                    move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4453                );
4454                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4455            }
4456            EventFilter::MoveEventType(struct_name) => index_store
4457                .events_by_move_event_struct_name(
4458                    &struct_name,
4459                    tx_num,
4460                    event_num,
4461                    limit,
4462                    descending,
4463                )?,
4464            EventFilter::Sender(sender) => {
4465                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4466            }
4467            EventFilter::TimeRange {
4468                start_time,
4469                end_time,
4470            } => index_store
4471                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4472            EventFilter::MoveEventModule { package, module } => index_store
4473                .events_by_move_event_module(
4474                    &ModuleId::new(
4475                        AccountAddress::new(package.into_bytes()),
4476                        move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4477                    ),
4478                    tx_num,
4479                    event_num,
4480                    limit,
4481                    descending,
4482                )?,
4483            // not using "_ =>" because we want to make sure we remember to add new variants here
4484            EventFilter::Package(_)
4485            | EventFilter::MoveEventField { .. }
4486            | EventFilter::Any(_)
4487            | EventFilter::And(_, _)
4488            | EventFilter::Or(_, _) => {
4489                return Err(IotaError::UserInput {
4490                    error: UserInputError::Unsupported(
4491                        "This query type is not supported by the full node.".to_string(),
4492                    ),
4493                });
4494            }
4495        };
4496
4497        // skip one event if exclusive cursor is provided,
4498        // otherwise truncate to the original limit.
4499        if cursor.is_some() {
4500            if !event_keys.is_empty() {
4501                event_keys.remove(0);
4502            }
4503        } else {
4504            event_keys.truncate(limit - 1);
4505        }
4506
4507        // get the unique set of digests from the event_keys
4508        let transaction_digests = event_keys
4509            .iter()
4510            .map(|(_, digest, _, _)| *digest)
4511            .collect::<HashSet<_>>()
4512            .into_iter()
4513            .collect::<Vec<_>>();
4514
4515        let events = kv_store
4516            .multi_get_events_by_tx_digests(&transaction_digests)
4517            .await?;
4518
4519        let events_map: HashMap<_, _> =
4520            transaction_digests.iter().zip(events.into_iter()).collect();
4521
4522        let stored_events = event_keys
4523            .into_iter()
4524            .map(|k| {
4525                (
4526                    k,
4527                    events_map
4528                        .get(&k.1)
4529                        .expect("fetched digest is missing")
4530                        .clone()
4531                        .and_then(|e| e.get(k.2).cloned()),
4532                )
4533            })
4534            .map(
4535                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4536                    event
4537                        .map(|e| (e, tx_digest, event_seq, timestamp))
4538                        .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4539                },
4540            )
4541            .collect::<Result<Vec<_>, _>>()?;
4542
4543        let epoch_store = self.load_epoch_store_one_call_per_task();
4544        let backing_store = self.get_backing_package_store().as_ref();
4545        let mut layout_resolver = epoch_store
4546            .executor()
4547            .type_layout_resolver(Box::new(backing_store));
4548        let mut events = vec![];
4549        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4550            events.push(IotaEvent::try_from(
4551                e.clone(),
4552                tx_digest,
4553                event_seq as u64,
4554                Some(timestamp),
4555                layout_resolver.get_annotated_layout(&e.type_)?,
4556            )?)
4557        }
4558        Ok(events)
4559    }
4560
4561    pub async fn insert_genesis_object(&self, object: Object) {
4562        self.get_reconfig_api()
4563            .try_insert_genesis_object(object)
4564            .expect("Cannot insert genesis object")
4565    }
4566
4567    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4568        futures::future::join_all(
4569            objects
4570                .iter()
4571                .map(|o| self.insert_genesis_object(o.clone())),
4572        )
4573        .await;
4574    }
4575
4576    /// Make a status response for a transaction
4577    #[instrument(level = "trace", skip_all)]
4578    pub fn get_transaction_status(
4579        &self,
4580        transaction_digest: &TransactionDigest,
4581        epoch_store: &Arc<AuthorityPerEpochStore>,
4582    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4583        // TODO: In the case of read path, we should not have to re-sign the effects.
4584        if let Some(effects) =
4585            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4586        {
4587            if let Some(transaction) = self
4588                .get_transaction_cache_reader()
4589                .try_get_transaction_block(transaction_digest)?
4590            {
4591                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4592                let events = if effects.events_digest().is_some() {
4593                    self.get_transaction_events(effects.transaction_digest())?
4594                } else {
4595                    TransactionEvents::default()
4596                };
4597                return Ok(Some((
4598                    (*transaction).clone().into_message(),
4599                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4600                )));
4601            } else {
4602                // The read of effects and read of transaction are not atomic. It's possible
4603                // that we reverted the transaction (during epoch change) in
4604                // between the above two reads, and we end up having effects but
4605                // not transaction. In this case, we just fall through.
4606                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4607            }
4608        }
4609        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4610            self.metrics.tx_already_processed.inc();
4611            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4612            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4613        } else {
4614            Ok(None)
4615        }
4616    }
4617
4618    /// Get the signed effects of the given transaction. If the effects was
4619    /// signed in a previous epoch, re-sign it so that the caller is able to
4620    /// form a cert of the effects in the current epoch.
4621    #[instrument(level = "trace", skip_all)]
4622    pub fn get_signed_effects_and_maybe_resign(
4623        &self,
4624        transaction_digest: &TransactionDigest,
4625        epoch_store: &Arc<AuthorityPerEpochStore>,
4626    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4627        let effects = self
4628            .get_transaction_cache_reader()
4629            .try_get_executed_effects(transaction_digest)?;
4630        match effects {
4631            Some(effects) => {
4632                // If the transaction was executed in previous epochs, the validator will
4633                // re-sign the effects with new current epoch so that a client is always able to
4634                // obtain an effects certificate at the current epoch.
4635                //
4636                // Why is this necessary? Consider the following case:
4637                // - assume there are 4 validators
4638                // - Quorum driver gets 2 signed effects before reconfig halt
4639                // - The tx makes it into final checkpoint.
4640                // - 2 validators go away and are replaced in the new epoch.
4641                // - The new epoch begins.
4642                // - The quorum driver cannot complete the partial effects cert from the
4643                //   previous epoch, because it may not be able to reach either of the 2 former
4644                //   validators.
4645                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4646                //   the new epoch, the QD can make a new effects cert and return it to the
4647                //   client.
4648                //
4649                // This is a considered a short-term workaround. Eventually, Quorum Driver
4650                // should be able to return either an effects certificate, -or-
4651                // a proof of inclusion in a checkpoint. In the case above, the
4652                // Quorum Driver would return a proof of inclusion in the final
4653                // checkpoint, and this code would no longer be necessary.
4654                if effects.epoch() != epoch_store.epoch() {
4655                    debug!(
4656                        tx_digest=?transaction_digest,
4657                        effects_epoch=?effects.epoch(),
4658                        epoch=?epoch_store.epoch(),
4659                        "Re-signing the effects with the current epoch"
4660                    );
4661                }
4662                Ok(Some(self.sign_effects(effects, epoch_store)?))
4663            }
4664            None => Ok(None),
4665        }
4666    }
4667
4668    #[instrument(level = "trace", skip_all)]
4669    pub(crate) fn sign_effects(
4670        &self,
4671        effects: TransactionEffects,
4672        epoch_store: &Arc<AuthorityPerEpochStore>,
4673    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4674        let tx_digest = *effects.transaction_digest();
4675        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4676            Some(sig) => {
4677                debug_assert!(sig.epoch == epoch_store.epoch());
4678                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4679            }
4680            _ => {
4681                let sig = AuthoritySignInfo::new(
4682                    epoch_store.epoch(),
4683                    &effects,
4684                    Intent::iota_app(IntentScope::TransactionEffects),
4685                    self.name,
4686                    &*self.secret,
4687                );
4688
4689                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4690
4691                epoch_store.insert_effects_digest_and_signature(
4692                    &tx_digest,
4693                    effects.digest(),
4694                    &sig,
4695                )?;
4696
4697                effects
4698            }
4699        };
4700
4701        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4702            signed_effects,
4703        ))
4704    }
4705
4706    // Returns coin objects for indexing for fullnode if indexing is enabled.
4707    #[instrument(level = "trace", skip_all)]
4708    fn fullnode_only_get_tx_coins_for_indexing(
4709        &self,
4710        effects: &TransactionEffects,
4711        inner_temporary_store: &InnerTemporaryStore,
4712        epoch_store: &Arc<AuthorityPerEpochStore>,
4713    ) -> Option<TxCoins> {
4714        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4715            return None;
4716        }
4717        let written_coin_objects = inner_temporary_store
4718            .written
4719            .iter()
4720            .filter_map(|(k, v)| {
4721                if v.is_coin() {
4722                    Some((*k, v.clone()))
4723                } else {
4724                    None
4725                }
4726            })
4727            .collect();
4728        let mut input_coin_objects = inner_temporary_store
4729            .input_objects
4730            .iter()
4731            .filter_map(|(k, v)| {
4732                if v.is_coin() {
4733                    Some((*k, v.clone()))
4734                } else {
4735                    None
4736                }
4737            })
4738            .collect::<ObjectMap>();
4739
4740        // Check for receiving objects that were actually used and modified during
4741        // execution. Their updated version will already showup in
4742        // "written_coins" but their input isn't included in the set of input
4743        // objects in a inner_temporary_store.
4744        for (object_id, version) in effects.modified_at_versions() {
4745            if inner_temporary_store
4746                .loaded_runtime_objects
4747                .contains_key(&object_id)
4748            {
4749                if let Some(object) = self
4750                    .get_object_store()
4751                    .get_object_by_key(&object_id, version)
4752                {
4753                    if object.is_coin() {
4754                        input_coin_objects.insert(object_id, object);
4755                    }
4756                }
4757            }
4758        }
4759
4760        Some((input_coin_objects, written_coin_objects))
4761    }
4762
4763    /// Get the TransactionEnvelope that currently locks the given object, if
4764    /// any. Since object locks are only valid for one epoch, we also need
4765    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4766    /// no lock records for the given object can be found.
4767    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4768    /// object record is at a different version.
4769    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4770    /// certain transaction. Returns None if the a lock record is
4771    /// initialized for the given ObjectRef but not yet locked by any
4772    /// transaction,     or cannot find the transaction in transaction
4773    /// table, because of data race etc.
4774    #[instrument(level = "trace", skip_all)]
4775    pub async fn get_transaction_lock(
4776        &self,
4777        object_ref: &ObjectRef,
4778        epoch_store: &AuthorityPerEpochStore,
4779    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4780        let lock_info = self
4781            .get_object_cache_reader()
4782            .try_get_lock(*object_ref, epoch_store)?;
4783        let lock_info = match lock_info {
4784            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4785                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4786                    provided_obj_ref: *object_ref,
4787                    current_version: locked_ref.version,
4788                }
4789                .into());
4790            }
4791            ObjectLockStatus::Initialized => {
4792                return Ok(None);
4793            }
4794            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4795        };
4796
4797        epoch_store.get_signed_transaction(&lock_info)
4798    }
4799
4800    pub async fn try_get_objects(&self, objects: &[ObjectId]) -> IotaResult<Vec<Option<Object>>> {
4801        self.get_object_cache_reader().try_get_objects(objects)
4802    }
4803
4804    /// Non-fallible version of `try_get_objects`.
4805    pub async fn get_objects(&self, objects: &[ObjectId]) -> Vec<Option<Object>> {
4806        self.try_get_objects(objects)
4807            .await
4808            .expect("storage access failed")
4809    }
4810
4811    pub async fn try_get_object_or_tombstone(
4812        &self,
4813        object_id: ObjectId,
4814    ) -> IotaResult<Option<ObjectRef>> {
4815        self.get_object_cache_reader()
4816            .try_get_latest_object_ref_or_tombstone(object_id)
4817    }
4818
4819    /// Non-fallible version of `try_get_object_or_tombstone`.
4820    pub async fn get_object_or_tombstone(&self, object_id: ObjectId) -> Option<ObjectRef> {
4821        self.try_get_object_or_tombstone(object_id)
4822            .await
4823            .expect("storage access failed")
4824    }
4825
4826    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4827    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4828    /// upgrade.
4829    ///
4830    /// This method can be used to dynamic adjust the amount of buffer. If set
4831    /// to 0, the upgrade will go through with only 2f+1 votes.
4832    ///
4833    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4834    /// should have the same value), or you risk halting the chain.
4835    pub fn set_override_protocol_upgrade_buffer_stake(
4836        &self,
4837        expected_epoch: EpochId,
4838        buffer_stake_bps: u64,
4839    ) -> IotaResult {
4840        let epoch_store = self.load_epoch_store_one_call_per_task();
4841        let actual_epoch = epoch_store.epoch();
4842        if actual_epoch != expected_epoch {
4843            return Err(IotaError::WrongEpoch {
4844                expected_epoch,
4845                actual_epoch,
4846            });
4847        }
4848
4849        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4850    }
4851
4852    pub fn clear_override_protocol_upgrade_buffer_stake(
4853        &self,
4854        expected_epoch: EpochId,
4855    ) -> IotaResult {
4856        let epoch_store = self.load_epoch_store_one_call_per_task();
4857        let actual_epoch = epoch_store.epoch();
4858        if actual_epoch != expected_epoch {
4859            return Err(IotaError::WrongEpoch {
4860                expected_epoch,
4861                actual_epoch,
4862            });
4863        }
4864
4865        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4866    }
4867
4868    /// Get the set of system packages that are compiled in to this build, if
4869    /// those packages are compatible with the current versions of those
4870    /// packages on-chain.
4871    pub async fn get_available_system_packages(
4872        &self,
4873        binary_config: &BinaryConfig,
4874    ) -> Vec<ObjectRef> {
4875        let mut results = vec![];
4876
4877        let system_packages = BuiltInFramework::iter_system_packages();
4878
4879        // Add extra framework packages during simtest
4880        #[cfg(msim)]
4881        let extra_packages = framework_injection::get_extra_packages(self.name);
4882        #[cfg(msim)]
4883        let system_packages = {
4884            let mut packages: Vec<_> = system_packages.collect();
4885            packages.extend(extra_packages.iter());
4886            packages
4887        };
4888
4889        for system_package in system_packages {
4890            let modules = system_package.modules().to_vec();
4891            // In simtests, we could override the current built-in framework packages.
4892            #[cfg(msim)]
4893            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4894                .unwrap_or(modules);
4895
4896            let Some(obj_ref) = iota_framework::compare_system_package(
4897                &self.get_object_store(),
4898                &system_package.id,
4899                &modules,
4900                system_package.dependencies.to_vec(),
4901                binary_config,
4902            )
4903            .await
4904            else {
4905                return vec![];
4906            };
4907            results.push(obj_ref);
4908        }
4909
4910        results
4911    }
4912
4913    /// Return the new versions, module bytes, and dependencies for the packages
4914    /// that have been committed to for a framework upgrade, in
4915    /// `system_packages`.  Loads the module contents from the binary, and
4916    /// performs the following checks:
4917    ///
4918    /// - Whether its contents matches what is on-chain already, in which case
4919    ///   no upgrade is required, and its contents are omitted from the output.
4920    /// - Whether the contents in the binary can form a package whose digest
4921    ///   matches the input, meaning the framework will be upgraded, and this
4922    ///   authority can satisfy that upgrade, in which case the contents are
4923    ///   included in the output.
4924    ///
4925    /// If the current version of the framework can't be loaded, the binary does
4926    /// not contain the bytes for that framework ID, or the resulting
4927    /// package fails the digest check, `None` is returned indicating that
4928    /// this authority cannot run the upgrade that the network voted on.
4929    async fn get_system_package_bytes(
4930        &self,
4931        system_packages: Vec<ObjectRef>,
4932        binary_config: &BinaryConfig,
4933    ) -> Option<Vec<SystemPackage>> {
4934        let ids: Vec<_> = system_packages
4935            .iter()
4936            .map(|object_ref| object_ref.object_id)
4937            .collect();
4938        let objects = self.get_objects(&ids).await;
4939
4940        let mut res = Vec::with_capacity(system_packages.len());
4941        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4942            let prev_transaction = match object {
4943                Some(cur_object) if cur_object.object_ref() == system_package_ref => {
4944                    // Skip this one because it doesn't need to be upgraded.
4945                    info!(
4946                        "Framework {} does not need updating",
4947                        system_package_ref.object_id
4948                    );
4949                    continue;
4950                }
4951
4952                Some(cur_object) => cur_object.previous_transaction,
4953                None => TransactionDigest::GENESIS_MARKER,
4954            };
4955
4956            #[cfg(msim)]
4957            let FrameworkSystemPackage {
4958                id: _,
4959                bytes,
4960                dependencies,
4961            } = framework_injection::get_override_system_package(
4962                &system_package_ref.object_id,
4963                self.name,
4964            )
4965            .unwrap_or_else(|| {
4966                BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone()
4967            });
4968
4969            #[cfg(not(msim))]
4970            let FrameworkSystemPackage {
4971                id: _,
4972                bytes,
4973                dependencies,
4974            } = BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone();
4975
4976            let modules: Vec<_> = bytes
4977                .iter()
4978                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4979                .collect();
4980
4981            let new_object = Object::new_system_package(
4982                &modules,
4983                system_package_ref.version,
4984                dependencies.clone(),
4985                prev_transaction,
4986            );
4987
4988            let new_ref = new_object.object_ref();
4989            if new_ref != system_package_ref {
4990                error!(
4991                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4992                );
4993                return None;
4994            }
4995
4996            res.push(SystemPackage {
4997                version: system_package_ref.version,
4998                modules: bytes,
4999                dependencies,
5000            });
5001        }
5002
5003        Some(res)
5004    }
5005
5006    /// Returns the new protocol version and system packages that the network
5007    /// has voted to upgrade to. If the proposed protocol version is not
5008    /// supported, None is returned.
5009    fn is_protocol_version_supported_v1(
5010        proposed_protocol_version: ProtocolVersion,
5011        committee: &Committee,
5012        capabilities: Vec<AuthorityCapabilitiesV1>,
5013        mut buffer_stake_bps: u64,
5014    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
5015        if buffer_stake_bps > 10000 {
5016            warn!("clamping buffer_stake_bps to 10000");
5017            buffer_stake_bps = 10000;
5018        }
5019
5020        // For each validator, gather the protocol version and system packages that it
5021        // would like to upgrade to in the next epoch.
5022        let mut desired_upgrades: Vec<_> = capabilities
5023            .into_iter()
5024            .filter_map(|mut cap| {
5025                // A validator that lists no packages is voting against any change at all.
5026                if cap.available_system_packages.is_empty() {
5027                    return None;
5028                }
5029
5030                cap.available_system_packages.sort();
5031
5032                info!(
5033                    "validator {:?} supports {:?} with system packages: {:?}",
5034                    cap.authority.concise(),
5035                    cap.supported_protocol_versions,
5036                    cap.available_system_packages,
5037                );
5038
5039                // A validator that only supports the current protocol version is also voting
5040                // against any change, because framework upgrades always require a protocol
5041                // version bump.
5042                cap.supported_protocol_versions
5043                    .get_version_digest(proposed_protocol_version)
5044                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
5045            })
5046            .collect();
5047
5048        // There can only be one set of votes that have a majority, find one if it
5049        // exists.
5050        desired_upgrades.sort();
5051        desired_upgrades
5052            .into_iter()
5053            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5054            .into_iter()
5055            .find_map(|((digest, packages), group)| {
5056                // should have been filtered out earlier.
5057                assert!(!packages.is_empty());
5058
5059                let mut stake_aggregator: StakeAggregator<(), true> =
5060                    StakeAggregator::new(Arc::new(committee.clone()));
5061
5062                for (_, _, authority) in group {
5063                    stake_aggregator.insert_generic(authority, ());
5064                }
5065
5066                let total_votes = stake_aggregator.total_votes();
5067                let quorum_threshold = committee.quorum_threshold();
5068                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5069
5070                info!(
5071                    protocol_config_digest = ?digest,
5072                    ?total_votes,
5073                    ?quorum_threshold,
5074                    ?buffer_stake_bps,
5075                    ?effective_threshold,
5076                    ?proposed_protocol_version,
5077                    ?packages,
5078                    "support for upgrade"
5079                );
5080
5081                let has_support = total_votes >= effective_threshold;
5082                has_support.then_some((proposed_protocol_version, digest, packages))
5083            })
5084    }
5085
5086    /// Selects the highest supported protocol version and system packages that
5087    /// the network has voted to upgrade to. If no upgrade is supported,
5088    /// returns the current protocol version and system packages.
5089    fn choose_protocol_version_and_system_packages_v1(
5090        current_protocol_version: ProtocolVersion,
5091        current_protocol_digest: Digest,
5092        committee: &Committee,
5093        capabilities: Vec<AuthorityCapabilitiesV1>,
5094        buffer_stake_bps: u64,
5095    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
5096        let mut next_protocol_version = current_protocol_version;
5097        let mut system_packages = vec![];
5098        let mut protocol_version_digest = current_protocol_digest;
5099
5100        // Finds the highest supported protocol version and system packages by
5101        // incrementing the proposed protocol version by one until no further
5102        // upgrades are supported.
5103        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
5104            next_protocol_version + 1,
5105            committee,
5106            capabilities.clone(),
5107            buffer_stake_bps,
5108        ) {
5109            next_protocol_version = version;
5110            protocol_version_digest = digest;
5111            system_packages = packages;
5112        }
5113
5114        (
5115            next_protocol_version,
5116            protocol_version_digest,
5117            system_packages,
5118        )
5119    }
5120
5121    /// Returns the indices of validators that support the given protocol
5122    /// version and digest. This includes both committee and non-committee
5123    /// validators based on their capabilities. Uses active validators
5124    /// instead of committee indices.
5125    fn get_validators_supporting_protocol_version(
5126        target_protocol_version: ProtocolVersion,
5127        target_digest: Digest,
5128        active_validators: &[AuthorityPublicKey],
5129        capabilities: &[AuthorityCapabilitiesV1],
5130    ) -> Vec<u64> {
5131        let mut eligible_validators = Vec::new();
5132
5133        for capability in capabilities {
5134            // Check if this validator supports the target protocol version and digest
5135            if let Some(digest) = capability
5136                .supported_protocol_versions
5137                .get_version_digest(target_protocol_version)
5138            {
5139                if digest == target_digest {
5140                    // Find the validator's index in the active validators list
5141                    if let Some(index) = active_validators
5142                        .iter()
5143                        .position(|name| AuthorityName::from(name) == capability.authority)
5144                    {
5145                        eligible_validators.push(index as u64);
5146                    }
5147                }
5148            }
5149        }
5150
5151        // Sort indices for deterministic behavior
5152        eligible_validators.sort();
5153        eligible_validators
5154    }
5155
5156    /// Calculates the sum of weights for eligible validators that are part of
5157    /// the committee. Takes the indices from
5158    /// get_validators_supporting_protocol_version and maps them back
5159    /// to committee members to get their weights.
5160    fn calculate_eligible_validators_weight(
5161        eligible_validator_indices: &[u64],
5162        active_validators: &[AuthorityPublicKey],
5163        committee: &Committee,
5164    ) -> u64 {
5165        let mut total_weight = 0u64;
5166
5167        for &index in eligible_validator_indices {
5168            let authority_pubkey = &active_validators[index as usize];
5169            // Check if this validator is in the committee and get their weight
5170            if let Some((_, weight)) = committee
5171                .members()
5172                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
5173            {
5174                total_weight += weight;
5175            }
5176        }
5177
5178        total_weight
5179    }
5180
5181    /// Creates and execute the advance epoch transaction to effects without
5182    /// committing it to the database. The effects of the change epoch tx
5183    /// are only written to the database after a certified checkpoint has been
5184    /// formed and executed by CheckpointExecutor.
5185    ///
5186    /// When a framework upgraded has been decided on, but the validator does
5187    /// not have the new versions of the packages locally, the validator
5188    /// cannot form the ChangeEpochTx. In this case it returns Err,
5189    /// indicating that the checkpoint builder should give up trying to make the
5190    /// final checkpoint. As long as the network is able to create a certified
5191    /// checkpoint (which should be ensured by the capabilities vote), it
5192    /// will arrive via state sync and be executed by CheckpointExecutor.
5193    #[instrument(level = "error", skip_all)]
5194    pub async fn create_and_execute_advance_epoch_tx(
5195        &self,
5196        epoch_store: &Arc<AuthorityPerEpochStore>,
5197        gas_cost_summary: &GasCostSummary,
5198        checkpoint: CheckpointSequenceNumber,
5199        epoch_start_timestamp_ms: CheckpointTimestamp,
5200        scores: Vec<u64>,
5201    ) -> anyhow::Result<(
5202        IotaSystemState,
5203        Option<SystemEpochInfoEvent>,
5204        TransactionEffects,
5205    )> {
5206        let mut txns = Vec::new();
5207
5208        let next_epoch = epoch_store.epoch() + 1;
5209
5210        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5211        let authority_capabilities = epoch_store
5212            .get_capabilities_v1()
5213            .expect("read capabilities from db cannot fail");
5214        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
5215            Self::choose_protocol_version_and_system_packages_v1(
5216                epoch_store.protocol_version(),
5217                SupportedProtocolVersionsWithHashes::protocol_config_digest(
5218                    epoch_store.protocol_config(),
5219                ),
5220                epoch_store.committee(),
5221                authority_capabilities.clone(),
5222                buffer_stake_bps,
5223            );
5224
5225        // since system packages are created during the current epoch, they should abide
5226        // by the rules of the current epoch, including the current epoch's max
5227        // Move binary format version
5228        let config = epoch_store.protocol_config();
5229        let binary_config = to_binary_config(config);
5230        let Some(next_epoch_system_package_bytes) = self
5231            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5232            .await
5233        else {
5234            error!(
5235                "upgraded system packages {:?} are not locally available, cannot create \
5236                ChangeEpochTx. validator binary must be upgraded to the correct version!",
5237                next_epoch_system_packages
5238            );
5239            // the checkpoint builder will keep retrying forever when it hits this error.
5240            // Eventually, one of two things will happen:
5241            // - The operator will upgrade this binary to one that has the new packages
5242            //   locally, and this function will succeed.
5243            // - The final checkpoint will be certified by other validators, we will receive
5244            //   it via state sync, and execute it. This will upgrade the framework
5245            //   packages, reconfigure, and most likely shut down in the new epoch (this
5246            //   validator likely doesn't support the new protocol version, or else it
5247            //   should have had the packages.)
5248            bail!("missing system packages: cannot form ChangeEpochTx");
5249        };
5250
5251        // Use ChangeEpochV3 or ChangeEpochV4 when the feature flags are enabled and
5252        // ChangeEpochV2 requirements are met
5253        if config.select_committee_from_eligible_validators() {
5254            // Get the list of eligible validators that support the target protocol version
5255            let active_validators = epoch_store.epoch_start_state().get_active_validators();
5256
5257            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5258
5259            // Use validators supporting the target protocol version as eligible validators
5260            // in the next version if select_committee_supporting_next_epoch_version feature
5261            // flag is set to true.
5262            if config.select_committee_supporting_next_epoch_version() {
5263                eligible_active_validators = Self::get_validators_supporting_protocol_version(
5264                    next_epoch_protocol_version,
5265                    next_epoch_protocol_digest,
5266                    &active_validators,
5267                    &authority_capabilities,
5268                );
5269
5270                // Calculate the total weight of eligible validators in the committee
5271                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5272                    &eligible_active_validators,
5273                    &active_validators,
5274                    epoch_store.committee(),
5275                );
5276
5277                // Safety check: ensure eligible validators have enough stake
5278                // Use the same effective threshold calculation that was used to decide the
5279                // protocol version
5280                let committee = epoch_store.committee();
5281                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5282
5283                if eligible_validators_weight < effective_threshold {
5284                    error!(
5285                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5286                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5287                    );
5288                    // Pass all active validator indices as eligible validators
5289                    // to perform selection among all of them.
5290                    eligible_active_validators = (0..active_validators.len() as u64).collect();
5291                }
5292            }
5293
5294            // Use ChangeEpochV4 when the pass_validator_scores_to_advance_epoch feature
5295            // flag is enabled.
5296            if config.pass_validator_scores_to_advance_epoch() {
5297                txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5298                    next_epoch,
5299                    next_epoch_protocol_version.as_u64(),
5300                    gas_cost_summary.storage_cost,
5301                    gas_cost_summary.computation_cost,
5302                    gas_cost_summary.computation_cost_burned,
5303                    gas_cost_summary.storage_rebate,
5304                    gas_cost_summary.non_refundable_storage_fee,
5305                    epoch_start_timestamp_ms,
5306                    next_epoch_system_package_bytes,
5307                    eligible_active_validators,
5308                    scores,
5309                    config.adjust_rewards_by_score(),
5310                ));
5311            } else {
5312                txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5313                    next_epoch,
5314                    next_epoch_protocol_version.as_u64(),
5315                    gas_cost_summary.storage_cost,
5316                    gas_cost_summary.computation_cost,
5317                    gas_cost_summary.computation_cost_burned,
5318                    gas_cost_summary.storage_rebate,
5319                    gas_cost_summary.non_refundable_storage_fee,
5320                    epoch_start_timestamp_ms,
5321                    next_epoch_system_package_bytes,
5322                    eligible_active_validators,
5323                ));
5324            }
5325        } else if config.protocol_defined_base_fee()
5326            && config.max_committee_members_count_as_option().is_some()
5327        {
5328            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5329                next_epoch,
5330                next_epoch_protocol_version.as_u64(),
5331                gas_cost_summary.storage_cost,
5332                gas_cost_summary.computation_cost,
5333                gas_cost_summary.computation_cost_burned,
5334                gas_cost_summary.storage_rebate,
5335                gas_cost_summary.non_refundable_storage_fee,
5336                epoch_start_timestamp_ms,
5337                next_epoch_system_package_bytes,
5338            ));
5339        } else {
5340            txns.push(EndOfEpochTransactionKind::new_change_epoch(
5341                next_epoch,
5342                next_epoch_protocol_version.as_u64(),
5343                gas_cost_summary.storage_cost,
5344                gas_cost_summary.computation_cost,
5345                gas_cost_summary.storage_rebate,
5346                gas_cost_summary.non_refundable_storage_fee,
5347                epoch_start_timestamp_ms,
5348                next_epoch_system_package_bytes,
5349            ));
5350        }
5351
5352        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5353
5354        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5355            tx.clone(),
5356            epoch_store.epoch(),
5357            checkpoint,
5358        );
5359
5360        let tx_digest = executable_tx.digest();
5361
5362        info!(
5363            ?next_epoch,
5364            ?next_epoch_protocol_version,
5365            ?next_epoch_system_packages,
5366            computation_cost=?gas_cost_summary.computation_cost,
5367            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5368            storage_cost=?gas_cost_summary.storage_cost,
5369            storage_rebate=?gas_cost_summary.storage_rebate,
5370            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5371            ?tx_digest,
5372            "Creating advance epoch transaction"
5373        );
5374
5375        fail_point_async!("change_epoch_tx_delay");
5376        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5377
5378        // The tx could have been executed by state sync already - if so simply return
5379        // an error. The checkpoint builder will shortly be terminated by
5380        // reconfiguration anyway.
5381        if self
5382            .get_transaction_cache_reader()
5383            .try_is_tx_already_executed(tx_digest)?
5384        {
5385            warn!("change epoch tx has already been executed via state sync");
5386            bail!("change epoch tx has already been executed via state sync",);
5387        }
5388
5389        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5390
5391        // We must manually assign the shared object versions to the transaction before
5392        // executing it. This is because we do not sequence end-of-epoch
5393        // transactions through consensus.
5394        epoch_store.assign_shared_object_versions_idempotent(
5395            self.get_object_cache_reader().as_ref(),
5396            std::slice::from_ref(&executable_tx),
5397        )?;
5398
5399        let (input_objects, _) =
5400            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5401
5402        let (temporary_store, effects, _execution_error_opt) = self.execute_transaction(
5403            &execution_guard,
5404            &executable_tx,
5405            input_objects,
5406            vec![],
5407            epoch_store,
5408        )?;
5409        let system_obj = get_iota_system_state(&temporary_store.written)
5410            .expect("change epoch tx must write to system object");
5411        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
5412        let system_epoch_info_event = temporary_store
5413            .events
5414            .0
5415            .into_iter()
5416            .find(|event| event.is_system_epoch_info_event())
5417            .map(SystemEpochInfoEvent::from);
5418        // The system epoch info event can be `None` in case if the `advance_epoch`
5419        // Move function call failed and was executed in the safe mode.
5420        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5421
5422        // We must write tx and effects to the state sync tables so that state sync is
5423        // able to deliver to the transaction to CheckpointExecutor after it is
5424        // included in a certified checkpoint.
5425        self.get_state_sync_store()
5426            .try_insert_transaction_and_effects(&tx, &effects)
5427            .map_err(|err| {
5428                let err: anyhow::Error = err.into();
5429                err
5430            })?;
5431
5432        info!(
5433            "Effects summary of the change epoch transaction: {:?}",
5434            effects.summary_for_debug()
5435        );
5436        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5437        // The change epoch transaction cannot fail to execute.
5438        assert!(effects.status().is_success());
5439        Ok((system_obj, system_epoch_info_event, effects))
5440    }
5441
5442    /// This function is called at the very end of the epoch.
5443    /// This step is required before updating new epoch in the db and calling
5444    /// reopen_epoch_db.
5445    #[instrument(level = "error", skip_all)]
5446    async fn revert_uncommitted_epoch_transactions(
5447        &self,
5448        epoch_store: &AuthorityPerEpochStore,
5449    ) -> IotaResult {
5450        {
5451            let state = epoch_store.get_reconfig_state_write_lock_guard();
5452            if state.should_accept_user_certs() {
5453                // Need to change this so that consensus adapter do not accept certificates from
5454                // user. This can happen if our local validator did not initiate
5455                // epoch change locally, but 2f+1 nodes already concluded the
5456                // epoch.
5457                //
5458                // This lock is essentially a barrier (in the certificate mode only) for
5459                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5460                // after this block
5461                epoch_store.close_user_certs(state);
5462            }
5463            // lock is dropped here
5464        }
5465
5466        // In the P-COOL flow, the list of pending consensus certificates is
5467        // always empty, so the reverting below is only for the certificate mode.
5468        if !epoch_store.protocol_config().enable_pcool_flow() {
5469            let pending_certificates = epoch_store.pending_consensus_certificates();
5470            info!(
5471                "Reverting {} locally executed transactions that was not included in the epoch: \
5472                    {:?}",
5473                pending_certificates.len(),
5474                pending_certificates,
5475            );
5476            for digest in pending_certificates {
5477                if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5478                    info!(
5479                        "Not reverting pending consensus transaction {:?} - it was included in \
5480                            checkpoint",
5481                        digest
5482                    );
5483                    continue;
5484                }
5485                info!("Reverting {:?} at the end of epoch", digest);
5486                epoch_store.revert_executed_transaction(&digest)?;
5487                self.get_reconfig_api().try_revert_state_update(&digest)?;
5488            }
5489            info!("All uncommitted local transactions reverted");
5490        } else {
5491            info!("P-COOL mode: skipping revert of uncommitted epoch transactions");
5492        }
5493
5494        Ok(())
5495    }
5496
5497    #[instrument(level = "error", skip_all)]
5498    async fn reopen_epoch_db(
5499        &self,
5500        cur_epoch_store: &AuthorityPerEpochStore,
5501        new_committee: Committee,
5502        epoch_start_configuration: EpochStartConfiguration,
5503        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5504        epoch_last_checkpoint: CheckpointSequenceNumber,
5505    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5506        let new_epoch = new_committee.epoch;
5507        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5508        assert_eq!(
5509            epoch_start_configuration.epoch_start_state().epoch(),
5510            new_committee.epoch
5511        );
5512        fail_point!("before-open-new-epoch-store");
5513        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5514            self.name,
5515            new_committee,
5516            epoch_start_configuration,
5517            self.get_backing_package_store().clone(),
5518            expensive_safety_check_config,
5519            epoch_last_checkpoint,
5520        )?;
5521        self.epoch_store.store(new_epoch_store.clone());
5522        Ok(new_epoch_store)
5523    }
5524
5525    /// Checks if `authenticator` unlocks a valid Move account and returns the
5526    /// account-related `AuthenticatorFunctionRef` object.
5527    fn check_move_account(
5528        &self,
5529        auth_account_object_id: ObjectId,
5530        auth_account_object_seq_number: Option<SequenceNumber>,
5531        auth_account_object_digest: Option<ObjectDigest>,
5532        account_object: ObjectReadResult,
5533        signer: &Address,
5534    ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5535        let account_object = match account_object.object {
5536            ObjectReadResultKind::Object(object) => Ok(object),
5537            ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5538                Err(UserInputError::AccountObjectDeleted {
5539                    account_id: account_object.id(),
5540                    account_version: version,
5541                    transaction_digest: digest,
5542                })
5543            }
5544            // It is impossible to check the account object because it is used in a canceled
5545            // transaction and is not loaded.
5546            ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5547                Err(UserInputError::AccountObjectInCanceledTransaction {
5548                    account_id: account_object.id(),
5549                    account_version: version,
5550                })
5551            }
5552        }?;
5553
5554        let account_object_addr = Address::from(auth_account_object_id);
5555
5556        fp_ensure!(
5557            signer == &account_object_addr,
5558            UserInputError::IncorrectUserSignature {
5559                error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5560            }
5561            .into()
5562        );
5563
5564        fp_ensure!(
5565            account_object.is_shared() || account_object.is_immutable(),
5566            UserInputError::AccountObjectNotSupported {
5567                object_id: auth_account_object_id
5568            }
5569            .into()
5570        );
5571
5572        let auth_account_object_seq_number =
5573            if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5574                let account_object_version = account_object.version();
5575
5576                fp_ensure!(
5577                    account_object_version == auth_account_object_seq_number,
5578                    UserInputError::AccountObjectVersionMismatch {
5579                        object_id: auth_account_object_id,
5580                        expected_version: auth_account_object_seq_number,
5581                        actual_version: account_object_version,
5582                    }
5583                    .into()
5584                );
5585
5586                auth_account_object_seq_number
5587            } else {
5588                account_object.version()
5589            };
5590
5591        if let Some(auth_account_object_digest) = auth_account_object_digest {
5592            let expected_digest = account_object.digest();
5593            fp_ensure!(
5594                expected_digest == auth_account_object_digest,
5595                UserInputError::InvalidAccountObjectDigest {
5596                    object_id: auth_account_object_id,
5597                    expected_digest,
5598                    actual_digest: auth_account_object_digest,
5599                }
5600                .into()
5601            );
5602        }
5603
5604        let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5605            auth_account_object_id,
5606            &AuthenticatorFunctionRefV1Key::tag().into(),
5607            &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5608        )
5609        .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5610            account_object_id: auth_account_object_id,
5611        })?;
5612
5613        let authenticator_function_ref_field = self
5614            .get_object_cache_reader()
5615            .try_find_object_lt_or_eq_version(
5616                authenticator_function_ref_field_id,
5617                auth_account_object_seq_number,
5618            )?;
5619
5620        if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5621            let field_move_object = authenticator_function_ref_field_obj
5622                .data
5623                .as_struct_opt()
5624                .expect("dynamic field should never be a package object");
5625
5626            let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5627                field_move_object.to_rust().map_err(|_| {
5628                    UserInputError::InvalidAuthenticatorFunctionRefField {
5629                        account_object_id: auth_account_object_id,
5630                    }
5631                })?;
5632
5633            Ok(AuthenticatorFunctionRefForExecution::new_v1(
5634                field.value,
5635                authenticator_function_ref_field_obj.object_ref(),
5636                authenticator_function_ref_field_obj.owner,
5637                authenticator_function_ref_field_obj.storage_rebate,
5638                authenticator_function_ref_field_obj.previous_transaction,
5639            ))
5640        } else {
5641            Err(UserInputError::MoveAuthenticatorNotFound {
5642                authenticator_function_ref_id: authenticator_function_ref_field_id,
5643                account_object_id: auth_account_object_id,
5644                account_object_version: auth_account_object_seq_number,
5645            }
5646            .into())
5647        }
5648    }
5649
5650    #[allow(clippy::type_complexity)]
5651    fn read_objects_for_validation(
5652        &self,
5653        transaction: &VerifiedTransaction,
5654        epoch: u64,
5655    ) -> IotaResult<(
5656        InputObjects,
5657        ReceivingObjects,
5658        Vec<(InputObjects, ObjectReadResult)>,
5659    )> {
5660        let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5661            Some(transaction.digest()),
5662            &transaction.collect_all_input_object_kind_for_reading()?,
5663            &transaction.data().transaction_data().receiving_objects(),
5664            epoch,
5665        )?;
5666
5667        transaction
5668            .split_input_objects_into_groups_for_reading(input_objects)
5669            .map(|(tx_input_objects, per_authenticator_inputs)| {
5670                (
5671                    tx_input_objects,
5672                    tx_receiving_objects,
5673                    per_authenticator_inputs,
5674                )
5675            })
5676    }
5677
5678    #[allow(clippy::type_complexity)]
5679    fn check_transaction_inputs_for_validation(
5680        &self,
5681        protocol_config: &ProtocolConfig,
5682        reference_gas_price: u64,
5683        tx_data: &TransactionData,
5684        tx_input_objects: InputObjects,
5685        tx_receiving_objects: &ReceivingObjects,
5686        move_authenticators: &Vec<&MoveAuthenticator>,
5687        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
5688    ) -> IotaResult<(
5689        IotaGasStatus,
5690        CheckedInputObjects,
5691        Vec<(CheckedInputObjects, AuthenticatorFunctionRef)>,
5692    )> {
5693        let authenticator_gas_budget = if move_authenticators.is_empty() {
5694            0
5695        } else {
5696            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
5697            // not a part of the transaction data.
5698            protocol_config.max_auth_gas()
5699        };
5700
5701        debug_assert_eq!(
5702            move_authenticators.len(),
5703            per_authenticator_inputs.len(),
5704            "Move authenticators amount must match the number of authenticator inputs"
5705        );
5706
5707        let per_authenticator_checked_inputs = move_authenticators
5708            .iter()
5709            .zip(per_authenticator_inputs)
5710            .map(
5711                |(move_authenticator, (authenticator_input_objects, account_object))| {
5712                    // Check basic `object_to_authenticate` preconditions and get its components.
5713                    let (
5714                        auth_account_object_id,
5715                        auth_account_object_seq_number,
5716                        auth_account_object_digest,
5717                    ) = move_authenticator.object_to_authenticate_components()?;
5718
5719                    let signer = move_authenticator.address()?;
5720
5721                    // Make sure the signer is a Move account.
5722                    let AuthenticatorFunctionRefForExecution {
5723                        authenticator_function_ref,
5724                        ..
5725                    } = self.check_move_account(
5726                        auth_account_object_id,
5727                        auth_account_object_seq_number,
5728                        auth_account_object_digest,
5729                        account_object,
5730                        &signer,
5731                    )?;
5732
5733                    // Check the MoveAuthenticator input objects.
5734                    let authenticator_checked_input_objects =
5735                        iota_transaction_checks::check_move_authenticator_input_for_validation(
5736                            authenticator_input_objects,
5737                        )?;
5738
5739                    Ok((
5740                        authenticator_checked_input_objects,
5741                        authenticator_function_ref,
5742                    ))
5743                },
5744            )
5745            .collect::<IotaResult<Vec<_>>>()?;
5746
5747        // Check the transaction inputs.
5748        let (gas_status, tx_checked_input_objects) =
5749            iota_transaction_checks::check_transaction_input(
5750                protocol_config,
5751                reference_gas_price,
5752                tx_data,
5753                tx_input_objects,
5754                tx_receiving_objects,
5755                &self.metrics.bytecode_verifier_metrics,
5756                &self.config.verifier_signing_config,
5757                authenticator_gas_budget,
5758            )?;
5759
5760        Ok((
5761            gas_status,
5762            tx_checked_input_objects,
5763            per_authenticator_checked_inputs,
5764        ))
5765    }
5766
5767    #[cfg(test)]
5768    pub(crate) fn iter_live_object_set_for_testing(
5769        &self,
5770    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5771        self.get_global_state_hash_store()
5772            .iter_cached_live_object_set_for_testing()
5773    }
5774
5775    #[cfg(test)]
5776    pub(crate) fn shutdown_execution_for_test(&self) {
5777        self.tx_execution_shutdown
5778            .lock()
5779            .take()
5780            .unwrap()
5781            .send(())
5782            .unwrap();
5783    }
5784
5785    /// NOTE: this function is only to be used for fuzzing and testing. Never
5786    /// use in prod
5787    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5788        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5789        self.get_object_cache_reader()
5790            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5791        self.get_reconfig_api()
5792            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5793    }
5794}
5795
5796pub struct RandomnessRoundReceiver {
5797    authority_state: Arc<AuthorityState>,
5798    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5799}
5800
5801impl RandomnessRoundReceiver {
5802    pub fn spawn(
5803        authority_state: Arc<AuthorityState>,
5804        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5805    ) -> JoinHandle<()> {
5806        let rrr = RandomnessRoundReceiver {
5807            authority_state,
5808            randomness_rx,
5809        };
5810        spawn_monitored_task!(rrr.run())
5811    }
5812
5813    async fn run(mut self) {
5814        info!("RandomnessRoundReceiver event loop started");
5815
5816        loop {
5817            tokio::select! {
5818                maybe_recv = self.randomness_rx.recv() => {
5819                    if let Some((epoch, round, bytes)) = maybe_recv {
5820                        self.handle_new_randomness(epoch, round, bytes);
5821                    } else {
5822                        break;
5823                    }
5824                },
5825            }
5826        }
5827
5828        info!("RandomnessRoundReceiver event loop ended");
5829    }
5830
5831    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5832    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5833        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5834        if epoch_store.epoch() != epoch {
5835            warn!(
5836                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5837                epoch_store.epoch()
5838            );
5839            return;
5840        }
5841        let transaction = VerifiedTransaction::new_randomness_state_update(
5842            epoch,
5843            round,
5844            bytes,
5845            epoch_store
5846                .epoch_start_config()
5847                .randomness_obj_initial_shared_version(),
5848        );
5849        debug!(
5850            "created randomness state update transaction with digest: {:?}",
5851            transaction.digest()
5852        );
5853        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5854        let digest = *transaction.digest();
5855
5856        // Randomness state updates contain the full bls signature for the random round,
5857        // which cannot necessarily be reconstructed again later. Therefore we must
5858        // immediately persist this transaction. If we crash before its outputs
5859        // are committed, this ensures we will be able to re-execute it.
5860        self.authority_state
5861            .get_cache_commit()
5862            .persist_transaction(&transaction);
5863
5864        // Send transaction to TransactionManager for execution.
5865        self.authority_state
5866            .transaction_manager()
5867            .enqueue(vec![transaction], &epoch_store);
5868
5869        let authority_state = self.authority_state.clone();
5870        spawn_monitored_task!(async move {
5871            // Wait for transaction execution in a separate task, to avoid deadlock in case
5872            // of out-of-order randomness generation. (Each
5873            // RandomnessStateUpdate depends on the output of the
5874            // RandomnessStateUpdate from the previous round.)
5875            //
5876            // We set a very long timeout so that in case this gets stuck for some reason,
5877            // the validator will eventually crash rather than continuing in a
5878            // zombie mode.
5879            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5880            let result = tokio::time::timeout(
5881                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5882                authority_state
5883                    .get_transaction_cache_reader()
5884                    .try_notify_read_executed_effects(&[digest]),
5885            )
5886            .await;
5887            let result = match result {
5888                Ok(result) => result,
5889                Err(_) => {
5890                    if cfg!(debug_assertions) {
5891                        // Crash on randomness update execution timeout in debug builds.
5892                        panic!(
5893                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5894                        );
5895                    }
5896                    warn!(
5897                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5898                    );
5899                    // Continue waiting as long as necessary in non-debug builds.
5900                    authority_state
5901                        .get_transaction_cache_reader()
5902                        .try_notify_read_executed_effects(&[digest])
5903                        .await
5904                }
5905            };
5906
5907            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5908            let effects = effects.pop().expect("should return effects");
5909            if *effects.status() != ExecutionStatus::Success {
5910                fatal!(
5911                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5912                );
5913            }
5914            debug!(
5915                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5916            );
5917        });
5918    }
5919}
5920
5921#[async_trait]
5922impl TransactionKeyValueStoreTrait for AuthorityState {
5923    async fn multi_get(
5924        &self,
5925        transaction_keys: &[TransactionDigest],
5926        effects_keys: &[TransactionDigest],
5927    ) -> IotaResult<KVStoreTransactionData> {
5928        let txns = if !transaction_keys.is_empty() {
5929            self.get_transaction_cache_reader()
5930                .try_multi_get_transaction_blocks(transaction_keys)?
5931                .into_iter()
5932                .map(|t| t.map(|t| (*t).clone().into_inner()))
5933                .collect()
5934        } else {
5935            vec![]
5936        };
5937
5938        let fx = if !effects_keys.is_empty() {
5939            self.get_transaction_cache_reader()
5940                .try_multi_get_executed_effects(effects_keys)?
5941        } else {
5942            vec![]
5943        };
5944
5945        Ok((txns, fx))
5946    }
5947
5948    async fn multi_get_checkpoints(
5949        &self,
5950        checkpoint_summaries: &[CheckpointSequenceNumber],
5951        checkpoint_contents: &[CheckpointSequenceNumber],
5952        checkpoint_summaries_by_digest: &[CheckpointDigest],
5953    ) -> IotaResult<(
5954        Vec<Option<CertifiedCheckpointSummary>>,
5955        Vec<Option<CheckpointContents>>,
5956        Vec<Option<CertifiedCheckpointSummary>>,
5957    )> {
5958        // TODO: use multi-get methods if it ever becomes important (unlikely)
5959        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5960        let store = self.get_checkpoint_store();
5961        for seq in checkpoint_summaries {
5962            let checkpoint = store
5963                .get_checkpoint_by_sequence_number(*seq)?
5964                .map(|c| c.into_inner());
5965
5966            summaries.push(checkpoint);
5967        }
5968
5969        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5970        for seq in checkpoint_contents {
5971            let checkpoint = store
5972                .get_checkpoint_by_sequence_number(*seq)?
5973                .and_then(|summary| {
5974                    store
5975                        .get_checkpoint_contents(&summary.content_digest)
5976                        .expect("db read cannot fail")
5977                });
5978            contents.push(checkpoint);
5979        }
5980
5981        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5982        for digest in checkpoint_summaries_by_digest {
5983            let checkpoint = store
5984                .get_checkpoint_by_digest(digest)?
5985                .map(|c| c.into_inner());
5986            summaries_by_digest.push(checkpoint);
5987        }
5988
5989        Ok((summaries, contents, summaries_by_digest))
5990    }
5991
5992    async fn get_transaction_perpetual_checkpoint(
5993        &self,
5994        digest: TransactionDigest,
5995    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5996        self.get_checkpoint_cache()
5997            .try_get_transaction_perpetual_checkpoint(&digest)
5998            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5999    }
6000
6001    async fn get_object(
6002        &self,
6003        object_id: ObjectId,
6004        version: VersionNumber,
6005    ) -> IotaResult<Option<Object>> {
6006        self.get_object_cache_reader()
6007            .try_get_object_by_key(&object_id, version)
6008    }
6009
6010    #[instrument(skip_all)]
6011    async fn multi_get_objects(
6012        &self,
6013        object_keys: &[ObjectKey],
6014    ) -> IotaResult<Vec<Option<Object>>> {
6015        Ok(self
6016            .get_object_cache_reader()
6017            .multi_get_objects_by_key(object_keys))
6018    }
6019
6020    async fn multi_get_transactions_perpetual_checkpoints(
6021        &self,
6022        digests: &[TransactionDigest],
6023    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
6024        let res = self
6025            .get_checkpoint_cache()
6026            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
6027
6028        Ok(res
6029            .into_iter()
6030            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6031            .collect())
6032    }
6033
6034    #[instrument(skip(self, digests), fields(digests = digests.iter().map(|d| d.to_string()).collect::<Vec<String>>().join(", ")))]
6035    async fn multi_get_events_by_tx_digests(
6036        &self,
6037        digests: &[TransactionDigest],
6038    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
6039        if digests.is_empty() {
6040            return Ok(vec![]);
6041        }
6042
6043        Ok(self
6044            .get_transaction_cache_reader()
6045            .multi_get_events(digests))
6046    }
6047}
6048
6049#[cfg(msim)]
6050pub mod framework_injection {
6051    use std::{
6052        cell::RefCell,
6053        collections::{BTreeMap, BTreeSet},
6054    };
6055
6056    use iota_framework::{BuiltInFramework, SystemPackage};
6057    use iota_sdk_types::ObjectId;
6058    use iota_types::base_types::AuthorityName;
6059    use move_binary_format::CompiledModule;
6060
6061    type FrameworkOverrideConfig = BTreeMap<ObjectId, PackageOverrideConfig>;
6062
6063    // Thread local cache because all simtests run in a single unique thread.
6064    thread_local! {
6065        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6066    }
6067
6068    type Framework = Vec<CompiledModule>;
6069
6070    pub type PackageUpgradeCallback =
6071        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6072
6073    enum PackageOverrideConfig {
6074        Global(Framework),
6075        PerValidator(PackageUpgradeCallback),
6076    }
6077
6078    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6079        modules
6080            .iter()
6081            .map(|m| {
6082                let mut buf = Vec::new();
6083                m.serialize_with_version(m.version, &mut buf).unwrap();
6084                buf
6085            })
6086            .collect()
6087    }
6088
6089    pub fn set_override(package_id: ObjectId, modules: Vec<CompiledModule>) {
6090        OVERRIDE.with(|bs| {
6091            bs.borrow_mut()
6092                .insert(package_id, PackageOverrideConfig::Global(modules))
6093        });
6094    }
6095
6096    pub fn set_override_cb(package_id: ObjectId, func: PackageUpgradeCallback) {
6097        OVERRIDE.with(|bs| {
6098            bs.borrow_mut()
6099                .insert(package_id, PackageOverrideConfig::PerValidator(func))
6100        });
6101    }
6102
6103    pub fn get_override_bytes(package_id: &ObjectId, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6104        OVERRIDE.with(|cfg| {
6105            cfg.borrow().get(package_id).and_then(|entry| match entry {
6106                PackageOverrideConfig::Global(framework) => {
6107                    Some(compiled_modules_to_bytes(framework))
6108                }
6109                PackageOverrideConfig::PerValidator(func) => {
6110                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6111                }
6112            })
6113        })
6114    }
6115
6116    pub fn get_override_modules(
6117        package_id: &ObjectId,
6118        name: AuthorityName,
6119    ) -> Option<Vec<CompiledModule>> {
6120        OVERRIDE.with(|cfg| {
6121            cfg.borrow().get(package_id).and_then(|entry| match entry {
6122                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6123                PackageOverrideConfig::PerValidator(func) => func(name),
6124            })
6125        })
6126    }
6127
6128    pub fn get_override_system_package(
6129        package_id: &ObjectId,
6130        name: AuthorityName,
6131    ) -> Option<SystemPackage> {
6132        let bytes = get_override_bytes(package_id, name)?;
6133        let dependencies = if package_id.is_system_package() {
6134            BuiltInFramework::get_package_by_id(package_id)
6135                .dependencies
6136                .to_vec()
6137        } else {
6138            // Assume that entirely new injected packages depend on all existing system
6139            // packages.
6140            BuiltInFramework::all_package_ids()
6141        };
6142        Some(SystemPackage {
6143            id: *package_id,
6144            bytes,
6145            dependencies,
6146        })
6147    }
6148
6149    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6150        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
6151        let extra: Vec<ObjectId> = OVERRIDE.with(|cfg| {
6152            cfg.borrow()
6153                .keys()
6154                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6155                .collect()
6156        });
6157
6158        extra
6159            .into_iter()
6160            .map(|package| SystemPackage {
6161                id: package,
6162                bytes: get_override_bytes(&package, name).unwrap(),
6163                dependencies: BuiltInFramework::all_package_ids(),
6164            })
6165            .collect()
6166    }
6167}
6168
6169#[derive(Debug, Serialize, Deserialize, Clone)]
6170pub struct ObjDumpFormat {
6171    pub id: ObjectId,
6172    pub version: VersionNumber,
6173    pub digest: ObjectDigest,
6174    pub object: Object,
6175}
6176
6177impl ObjDumpFormat {
6178    fn new(object: Object) -> Self {
6179        let oref = object.object_ref();
6180        Self {
6181            id: oref.object_id,
6182            version: oref.version,
6183            digest: oref.digest,
6184            object,
6185        }
6186    }
6187}
6188
6189#[derive(Debug, Serialize, Deserialize, Clone)]
6190pub struct NodeStateDump {
6191    pub tx_digest: TransactionDigest,
6192    pub sender_signed_data: SenderSignedData,
6193    pub executed_epoch: u64,
6194    pub reference_gas_price: u64,
6195    pub protocol_version: u64,
6196    pub epoch_start_timestamp_ms: u64,
6197    pub computed_effects: TransactionEffects,
6198    pub expected_effects_digest: TransactionEffectsDigest,
6199    pub relevant_system_packages: Vec<ObjDumpFormat>,
6200    pub shared_objects: Vec<ObjDumpFormat>,
6201    pub loaded_child_objects: Vec<ObjDumpFormat>,
6202    pub modified_at_versions: Vec<ObjDumpFormat>,
6203    pub runtime_reads: Vec<ObjDumpFormat>,
6204    pub input_objects: Vec<ObjDumpFormat>,
6205}
6206
6207impl NodeStateDump {
6208    pub fn new(
6209        tx_digest: &TransactionDigest,
6210        effects: &TransactionEffects,
6211        expected_effects_digest: TransactionEffectsDigest,
6212        object_store: &dyn ObjectStore,
6213        epoch_store: &Arc<AuthorityPerEpochStore>,
6214        inner_temporary_store: &InnerTemporaryStore,
6215        transaction: &VerifiedExecutableTransaction,
6216    ) -> IotaResult<Self> {
6217        // Epoch info
6218        let executed_epoch = epoch_store.epoch();
6219        let reference_gas_price = epoch_store.reference_gas_price();
6220        let epoch_start_config = epoch_store.epoch_start_config();
6221        let protocol_version = epoch_store.protocol_version().as_u64();
6222        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6223
6224        // Record all system packages at this version
6225        let mut relevant_system_packages = Vec::new();
6226        for sys_package_id in BuiltInFramework::all_package_ids() {
6227            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
6228                relevant_system_packages.push(ObjDumpFormat::new(w))
6229            }
6230        }
6231
6232        // Record all the shared objects
6233        let mut shared_objects = Vec::new();
6234        for kind in effects.input_shared_objects() {
6235            match kind {
6236                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
6237                    if let Some(w) =
6238                        object_store.try_get_object_by_key(&obj_ref.object_id, obj_ref.version)?
6239                    {
6240                        shared_objects.push(ObjDumpFormat::new(w))
6241                    }
6242                }
6243                InputSharedObject::ReadDeleted(..)
6244                | InputSharedObject::MutateDeleted(..)
6245                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
6246                                                           * objects. */
6247            }
6248        }
6249
6250        // Record all loaded child objects
6251        // Child objects which are read but not mutated are not tracked anywhere else
6252        let mut loaded_child_objects = Vec::new();
6253        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6254            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6255                loaded_child_objects.push(ObjDumpFormat::new(w))
6256            }
6257        }
6258
6259        // Record all modified objects
6260        let mut modified_at_versions = Vec::new();
6261        for (id, ver) in effects.modified_at_versions() {
6262            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6263                modified_at_versions.push(ObjDumpFormat::new(w))
6264            }
6265        }
6266
6267        // Packages read at runtime, which were not previously loaded into the temoorary
6268        // store Some packages may be fetched at runtime and wont show up in
6269        // input objects
6270        let mut runtime_reads = Vec::new();
6271        for obj in inner_temporary_store
6272            .runtime_packages_loaded_from_db
6273            .values()
6274        {
6275            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6276        }
6277
6278        // All other input objects should already be in `inner_temporary_store.objects`
6279
6280        Ok(Self {
6281            tx_digest: *tx_digest,
6282            executed_epoch,
6283            reference_gas_price,
6284            epoch_start_timestamp_ms,
6285            protocol_version,
6286            relevant_system_packages,
6287            shared_objects,
6288            loaded_child_objects,
6289            modified_at_versions,
6290            runtime_reads,
6291            sender_signed_data: transaction.clone().into_message(),
6292            input_objects: inner_temporary_store
6293                .input_objects
6294                .values()
6295                .map(|o| ObjDumpFormat::new(o.clone()))
6296                .collect(),
6297            computed_effects: effects.clone(),
6298            expected_effects_digest,
6299        })
6300    }
6301
6302    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6303        let mut objects = Vec::new();
6304        objects.extend(self.relevant_system_packages.clone());
6305        objects.extend(self.shared_objects.clone());
6306        objects.extend(self.loaded_child_objects.clone());
6307        objects.extend(self.modified_at_versions.clone());
6308        objects.extend(self.runtime_reads.clone());
6309        objects.extend(self.input_objects.clone());
6310        objects
6311    }
6312
6313    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6314        let file_name = format!(
6315            "{}_{}_NODE_DUMP.json",
6316            self.tx_digest,
6317            AuthorityState::unixtime_now_ms()
6318        );
6319        let mut path = path.to_path_buf();
6320        path.push(&file_name);
6321        let mut file = File::create(path.clone())?;
6322        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6323        Ok(path)
6324    }
6325
6326    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6327        let file = File::open(path)?;
6328        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6329    }
6330}
6331
6332/// Returns the [`MoveAuthenticator`]s to execute during the pre-consensus
6333/// phase.
6334///
6335/// When `pre_consensus_sponsor_only_move_authentication` is enabled:
6336/// - For sponsored transactions: only the sponsor's [`MoveAuthenticator`] is
6337///   returned (empty if the sponsor does not use one).
6338/// - For non-sponsored transactions: all [`MoveAuthenticator`]s are returned
6339///   (currently only the sender's).
6340///
6341/// When the flag is not set, all [`MoveAuthenticator`]s are returned for
6342/// compatibility.
6343fn pre_consensus_move_authenticators<'a>(
6344    tx: &'a VerifiedTransaction,
6345    protocol_config: &ProtocolConfig,
6346) -> Vec<&'a MoveAuthenticator> {
6347    if protocol_config.pre_consensus_sponsor_only_move_authentication() {
6348        if tx.transaction_data().is_sponsored_tx() {
6349            if let Some(sponsor_move_authenticator) = tx.sponsor_move_authenticator() {
6350                vec![sponsor_move_authenticator]
6351            } else {
6352                vec![]
6353            }
6354        } else {
6355            tx.move_authenticators()
6356        }
6357    } else {
6358        tx.move_authenticators()
6359    }
6360}