iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49    key_value_store::{
50        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51    },
52    key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57    IOTA_SYSTEM_ADDRESS, TypeTag,
58    account_abstraction::{
59        account::AuthenticatorFunctionRefV1Key,
60        authenticator_function::{
61            AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
62            AuthenticatorFunctionRefV1,
63        },
64    },
65    authenticator_state::get_authenticator_state,
66    base_types::*,
67    committee::{Committee, EpochId, ProtocolVersion},
68    crypto::{
69        AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
70        default_hash,
71    },
72    deny_list_v1::check_coin_deny_list_v1_during_signing,
73    digests::{ChainIdentifier, Digest, TransactionEventsDigest},
74    dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
75    effects::{
76        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
77        TransactionEvents, VerifiedSignedTransactionEffects,
78    },
79    error::{ExecutionError, IotaError, IotaResult, UserInputError},
80    event::{Event, EventID, SystemEpochInfoEvent},
81    executable_transaction::VerifiedExecutableTransaction,
82    execution_config_utils::to_binary_config,
83    execution_status::ExecutionStatus,
84    fp_ensure,
85    gas::{GasCostSummary, IotaGasStatus},
86    gas_coin::NANOS_PER_IOTA,
87    inner_temporary_store::{
88        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
89        WrittenObjects,
90    },
91    iota_system_state::{
92        IotaSystemState, IotaSystemStateTrait,
93        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
94    },
95    is_system_package,
96    layout_resolver::{LayoutResolver, into_struct_layout},
97    message_envelope::Message,
98    messages_checkpoint::{
99        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
100        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
101        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
102        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
103    },
104    messages_consensus::AuthorityCapabilitiesV1,
105    messages_grpc::{
106        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
107        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
108        TransactionStatus,
109    },
110    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
111    move_authenticator::MoveAuthenticator,
112    object::{
113        MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
114        bounded_visitor::BoundedVisitor,
115    },
116    storage::{
117        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
118    },
119    supported_protocol_versions::{
120        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
121    },
122    transaction::*,
123    transaction_executor::{SimulateTransactionResult, VmChecks},
124};
125use itertools::Itertools;
126use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
127use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
128use parking_lot::Mutex;
129use prometheus::{
130    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131    register_histogram_vec_with_registry, register_histogram_with_registry,
132    register_int_counter_vec_with_registry, register_int_counter_with_registry,
133    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139    task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152    authority::{
153        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157        authority_store_tables::AuthorityPrunerTables,
158        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159    },
160    authority_client::NetworkAuthorityClient,
161    checkpoints::CheckpointStore,
162    congestion_tracker::CongestionTracker,
163    consensus_adapter::ConsensusAdapter,
164    epoch::committee_store::CommitteeStore,
165    execution_cache::{
166        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
167        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
168        TransactionCacheRead,
169    },
170    execution_driver::execution_process,
171    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
172    metrics::{LatencyObserver, RateTracker},
173    module_cache_metrics::ResolverMetrics,
174    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
175    rest_index::RestIndexStore,
176    stake_aggregator::StakeAggregator,
177    state_accumulator::{AccumulatorStore, StateAccumulator},
178    subscription_handler::SubscriptionHandler,
179    transaction_input_loader::TransactionInputLoader,
180    transaction_manager::TransactionManager,
181    transaction_outputs::TransactionOutputs,
182    validator_tx_finalizer::ValidatorTxFinalizer,
183    verify_indexes::verify_indexes,
184};
185
186#[cfg(test)]
187#[path = "unit_tests/authority_tests.rs"]
188pub mod authority_tests;
189
190#[cfg(test)]
191#[path = "unit_tests/transaction_tests.rs"]
192pub mod transaction_tests;
193
194#[cfg(test)]
195#[path = "unit_tests/batch_transaction_tests.rs"]
196mod batch_transaction_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/move_integration_tests.rs"]
200pub mod move_integration_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/gas_tests.rs"]
204mod gas_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/batch_verification_tests.rs"]
208mod batch_verification_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/coin_deny_list_tests.rs"]
212mod coin_deny_list_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/auth_unit_test_utils.rs"]
216pub mod auth_unit_test_utils;
217
218pub mod authority_test_utils;
219
220pub mod authority_per_epoch_store;
221pub mod authority_per_epoch_store_pruner;
222
223pub mod authority_store_pruner;
224pub mod authority_store_tables;
225pub mod authority_store_types;
226pub mod epoch_start_configuration;
227pub mod shared_object_congestion_tracker;
228pub mod shared_object_version_manager;
229pub mod suggested_gas_price_calculator;
230pub mod test_authority_builder;
231pub mod transaction_deferral;
232
233pub(crate) mod authority_store;
234pub mod backpressure;
235
236/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
237pub struct AuthorityMetrics {
238    tx_orders: IntCounter,
239    total_certs: IntCounter,
240    total_cert_attempts: IntCounter,
241    total_effects: IntCounter,
242    pub shared_obj_tx: IntCounter,
243    sponsored_tx: IntCounter,
244    tx_already_processed: IntCounter,
245    num_input_objs: Histogram,
246    num_shared_objects: Histogram,
247    batch_size: Histogram,
248
249    authority_state_handle_transaction_latency: Histogram,
250
251    execute_certificate_latency_single_writer: Histogram,
252    execute_certificate_latency_shared_object: Histogram,
253
254    internal_execution_latency: Histogram,
255    execution_load_input_objects_latency: Histogram,
256    prepare_certificate_latency: Histogram,
257    commit_certificate_latency: Histogram,
258    db_checkpoint_latency: Histogram,
259
260    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
261    pub(crate) transaction_manager_num_missing_objects: IntGauge,
262    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
263    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
264    pub(crate) transaction_manager_num_ready: IntGauge,
265    pub(crate) transaction_manager_object_cache_size: IntGauge,
266    pub(crate) transaction_manager_object_cache_hits: IntCounter,
267    pub(crate) transaction_manager_object_cache_misses: IntCounter,
268    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
269    pub(crate) transaction_manager_package_cache_size: IntGauge,
270    pub(crate) transaction_manager_package_cache_hits: IntCounter,
271    pub(crate) transaction_manager_package_cache_misses: IntCounter,
272    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
273    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
274
275    pub(crate) execution_driver_executed_transactions: IntCounter,
276    pub(crate) execution_driver_dispatch_queue: IntGauge,
277    pub(crate) execution_queueing_delay_s: Histogram,
278    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
279    pub(crate) execution_gas_latency_ratio: Histogram,
280
281    pub(crate) skipped_consensus_txns: IntCounter,
282    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
283
284    pub(crate) authority_overload_status: IntGauge,
285    pub(crate) authority_load_shedding_percentage: IntGauge,
286
287    pub(crate) transaction_overload_sources: IntCounterVec,
288
289    /// Post processing metrics
290    post_processing_total_events_emitted: IntCounter,
291    post_processing_total_tx_indexed: IntCounter,
292    post_processing_total_tx_had_event_processed: IntCounter,
293    post_processing_total_failures: IntCounter,
294
295    /// Consensus handler metrics
296    pub consensus_handler_processed: IntCounterVec,
297    pub consensus_handler_transaction_sizes: HistogramVec,
298    pub consensus_handler_num_low_scoring_authorities: IntGauge,
299    pub consensus_handler_scores: IntGaugeVec,
300    pub consensus_handler_deferred_transactions: IntCounter,
301    pub consensus_handler_congested_transactions: IntCounter,
302    pub consensus_handler_cancelled_transactions: IntCounter,
303    pub consensus_handler_max_object_costs: IntGaugeVec,
304    pub consensus_committed_subdags: IntCounterVec,
305    pub consensus_committed_messages: IntGaugeVec,
306    pub consensus_committed_user_transactions: IntGaugeVec,
307    pub consensus_handler_leader_round: IntGauge,
308    pub consensus_calculated_throughput: IntGauge,
309    pub consensus_calculated_throughput_profile: IntGauge,
310
311    pub limits_metrics: Arc<LimitsMetrics>,
312
313    /// bytecode verifier metrics for tracking timeouts
314    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
315
316    /// Count of zklogin signatures
317    pub zklogin_sig_count: IntCounter,
318    /// Count of multisig signatures
319    pub multisig_sig_count: IntCounter,
320
321    // Tracks recent average txn queueing delay between when it is ready for execution
322    // until it starts executing.
323    pub execution_queueing_latency: LatencyObserver,
324
325    // Tracks the rate of transactions become ready for execution in transaction manager.
326    // The need for the Mutex is that the tracker is updated in transaction manager and read
327    // in the overload_monitor. There should be low mutex contention because
328    // transaction manager is single threaded and the read rate in overload_monitor is
329    // low. In the case where transaction manager becomes multi-threaded, we can
330    // create one rate tracker per thread.
331    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
332
333    // Tracks the rate of transactions starts execution in execution driver.
334    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
335    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
336}
337
338// Override default Prom buckets for positive numbers in 0-10M range
339const POSITIVE_INT_BUCKETS: &[f64] = &[
340    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
341    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
342    7000000., 10000000.,
343];
344
345const LATENCY_SEC_BUCKETS: &[f64] = &[
346    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
347    10., 20., 30., 60., 90.,
348];
349
350// Buckets for low latency samples. Starts from 10us.
351const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
352    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
353    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
354];
355
356const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
357    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
358    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
359];
360
361/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
362pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
363
364impl AuthorityMetrics {
365    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
366        let execute_certificate_latency = register_histogram_vec_with_registry!(
367            "authority_state_execute_certificate_latency",
368            "Latency of executing certificates, including waiting for inputs",
369            &["tx_type"],
370            LATENCY_SEC_BUCKETS.to_vec(),
371            registry,
372        )
373        .unwrap();
374
375        let execute_certificate_latency_single_writer =
376            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
377        let execute_certificate_latency_shared_object =
378            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
379
380        Self {
381            tx_orders: register_int_counter_with_registry!(
382                "total_transaction_orders",
383                "Total number of transaction orders",
384                registry,
385            )
386            .unwrap(),
387            total_certs: register_int_counter_with_registry!(
388                "total_transaction_certificates",
389                "Total number of transaction certificates handled",
390                registry,
391            )
392            .unwrap(),
393            total_cert_attempts: register_int_counter_with_registry!(
394                "total_handle_certificate_attempts",
395                "Number of calls to handle_certificate",
396                registry,
397            )
398            .unwrap(),
399            // total_effects == total transactions finished
400            total_effects: register_int_counter_with_registry!(
401                "total_transaction_effects",
402                "Total number of transaction effects produced",
403                registry,
404            )
405            .unwrap(),
406
407            shared_obj_tx: register_int_counter_with_registry!(
408                "num_shared_obj_tx",
409                "Number of transactions involving shared objects",
410                registry,
411            )
412            .unwrap(),
413
414            sponsored_tx: register_int_counter_with_registry!(
415                "num_sponsored_tx",
416                "Number of sponsored transactions",
417                registry,
418            )
419            .unwrap(),
420
421            tx_already_processed: register_int_counter_with_registry!(
422                "num_tx_already_processed",
423                "Number of transaction orders already processed previously",
424                registry,
425            )
426            .unwrap(),
427            num_input_objs: register_histogram_with_registry!(
428                "num_input_objects",
429                "Distribution of number of input TX objects per TX",
430                POSITIVE_INT_BUCKETS.to_vec(),
431                registry,
432            )
433            .unwrap(),
434            num_shared_objects: register_histogram_with_registry!(
435                "num_shared_objects",
436                "Number of shared input objects per TX",
437                POSITIVE_INT_BUCKETS.to_vec(),
438                registry,
439            )
440            .unwrap(),
441            batch_size: register_histogram_with_registry!(
442                "batch_size",
443                "Distribution of size of transaction batch",
444                POSITIVE_INT_BUCKETS.to_vec(),
445                registry,
446            )
447            .unwrap(),
448            authority_state_handle_transaction_latency: register_histogram_with_registry!(
449                "authority_state_handle_transaction_latency",
450                "Latency of handling transactions",
451                LATENCY_SEC_BUCKETS.to_vec(),
452                registry,
453            )
454            .unwrap(),
455            execute_certificate_latency_single_writer,
456            execute_certificate_latency_shared_object,
457            internal_execution_latency: register_histogram_with_registry!(
458                "authority_state_internal_execution_latency",
459                "Latency of actual certificate executions",
460                LATENCY_SEC_BUCKETS.to_vec(),
461                registry,
462            )
463            .unwrap(),
464            execution_load_input_objects_latency: register_histogram_with_registry!(
465                "authority_state_execution_load_input_objects_latency",
466                "Latency of loading input objects for execution",
467                LOW_LATENCY_SEC_BUCKETS.to_vec(),
468                registry,
469            )
470            .unwrap(),
471            prepare_certificate_latency: register_histogram_with_registry!(
472                "authority_state_prepare_certificate_latency",
473                "Latency of executing certificates, before committing the results",
474                LATENCY_SEC_BUCKETS.to_vec(),
475                registry,
476            )
477            .unwrap(),
478            commit_certificate_latency: register_histogram_with_registry!(
479                "authority_state_commit_certificate_latency",
480                "Latency of committing certificate execution results",
481                LATENCY_SEC_BUCKETS.to_vec(),
482                registry,
483            )
484            .unwrap(),
485            db_checkpoint_latency: register_histogram_with_registry!(
486                "db_checkpoint_latency",
487                "Latency of checkpointing dbs",
488                LATENCY_SEC_BUCKETS.to_vec(),
489                registry,
490            ).unwrap(),
491            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
492                "transaction_manager_num_enqueued_certificates",
493                "Current number of certificates enqueued to TransactionManager",
494                &["result"],
495                registry,
496            )
497            .unwrap(),
498            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
499                "transaction_manager_num_missing_objects",
500                "Current number of missing objects in TransactionManager",
501                registry,
502            )
503            .unwrap(),
504            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
505                "transaction_manager_num_pending_certificates",
506                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
507                registry,
508            )
509            .unwrap(),
510            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
511                "transaction_manager_num_executing_certificates",
512                "Number of executing certificates, including queued and actually running certificates",
513                registry,
514            )
515            .unwrap(),
516            transaction_manager_num_ready: register_int_gauge_with_registry!(
517                "transaction_manager_num_ready",
518                "Number of ready transactions in TransactionManager",
519                registry,
520            )
521            .unwrap(),
522            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
523                "transaction_manager_object_cache_size",
524                "Current size of object-availability cache in TransactionManager",
525                registry,
526            )
527            .unwrap(),
528            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
529                "transaction_manager_object_cache_hits",
530                "Number of object-availability cache hits in TransactionManager",
531                registry,
532            )
533            .unwrap(),
534            authority_overload_status: register_int_gauge_with_registry!(
535                "authority_overload_status",
536                "Whether authority is current experiencing overload and enters load shedding mode.",
537                registry)
538            .unwrap(),
539            authority_load_shedding_percentage: register_int_gauge_with_registry!(
540                "authority_load_shedding_percentage",
541                "The percentage of transactions is shed when the authority is in load shedding mode.",
542                registry)
543            .unwrap(),
544            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
545                "transaction_manager_object_cache_misses",
546                "Number of object-availability cache misses in TransactionManager",
547                registry,
548            )
549            .unwrap(),
550            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
551                "transaction_manager_object_cache_evictions",
552                "Number of object-availability cache evictions in TransactionManager",
553                registry,
554            )
555            .unwrap(),
556            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
557                "transaction_manager_package_cache_size",
558                "Current size of package-availability cache in TransactionManager",
559                registry,
560            )
561            .unwrap(),
562            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
563                "transaction_manager_package_cache_hits",
564                "Number of package-availability cache hits in TransactionManager",
565                registry,
566            )
567            .unwrap(),
568            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
569                "transaction_manager_package_cache_misses",
570                "Number of package-availability cache misses in TransactionManager",
571                registry,
572            )
573            .unwrap(),
574            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
575                "transaction_manager_package_cache_evictions",
576                "Number of package-availability cache evictions in TransactionManager",
577                registry,
578            )
579            .unwrap(),
580            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
581                "transaction_manager_transaction_queue_age_s",
582                "Time spent in waiting for transaction in the queue",
583                LATENCY_SEC_BUCKETS.to_vec(),
584                registry,
585            )
586            .unwrap(),
587            transaction_overload_sources: register_int_counter_vec_with_registry!(
588                "transaction_overload_sources",
589                "Number of times each source indicates transaction overload.",
590                &["source"],
591                registry)
592            .unwrap(),
593            execution_driver_executed_transactions: register_int_counter_with_registry!(
594                "execution_driver_executed_transactions",
595                "Cumulative number of transaction executed by execution driver",
596                registry,
597            )
598            .unwrap(),
599            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
600                "execution_driver_dispatch_queue",
601                "Number of transaction pending in execution driver dispatch queue",
602                registry,
603            )
604            .unwrap(),
605            execution_queueing_delay_s: register_histogram_with_registry!(
606                "execution_queueing_delay_s",
607                "Queueing delay between a transaction is ready for execution until it starts executing.",
608                LATENCY_SEC_BUCKETS.to_vec(),
609                registry
610            )
611            .unwrap(),
612            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
613                "prepare_cert_gas_latency_ratio",
614                "The ratio of computation gas divided by VM execution latency.",
615                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
616                registry
617            )
618            .unwrap(),
619            execution_gas_latency_ratio: register_histogram_with_registry!(
620                "execution_gas_latency_ratio",
621                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
622                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
623                registry
624            )
625            .unwrap(),
626            skipped_consensus_txns: register_int_counter_with_registry!(
627                "skipped_consensus_txns",
628                "Total number of consensus transactions skipped",
629                registry,
630            )
631            .unwrap(),
632            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
633                "skipped_consensus_txns_cache_hit",
634                "Total number of consensus transactions skipped because of local cache hit",
635                registry,
636            )
637            .unwrap(),
638            post_processing_total_events_emitted: register_int_counter_with_registry!(
639                "post_processing_total_events_emitted",
640                "Total number of events emitted in post processing",
641                registry,
642            )
643            .unwrap(),
644            post_processing_total_tx_indexed: register_int_counter_with_registry!(
645                "post_processing_total_tx_indexed",
646                "Total number of txes indexed in post processing",
647                registry,
648            )
649            .unwrap(),
650            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
651                "post_processing_total_tx_had_event_processed",
652                "Total number of txes finished event processing in post processing",
653                registry,
654            )
655            .unwrap(),
656            post_processing_total_failures: register_int_counter_with_registry!(
657                "post_processing_total_failures",
658                "Total number of failure in post processing",
659                registry,
660            )
661            .unwrap(),
662            consensus_handler_processed: register_int_counter_vec_with_registry!(
663                "consensus_handler_processed",
664                "Number of transactions processed by consensus handler",
665                &["class"],
666                registry
667            ).unwrap(),
668            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
669                "consensus_handler_transaction_sizes",
670                "Sizes of each type of transactions processed by consensus handler",
671                &["class"],
672                POSITIVE_INT_BUCKETS.to_vec(),
673                registry
674            ).unwrap(),
675            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
676                "consensus_handler_num_low_scoring_authorities",
677                "Number of low scoring authorities based on reputation scores from consensus",
678                registry
679            ).unwrap(),
680            consensus_handler_scores: register_int_gauge_vec_with_registry!(
681                "consensus_handler_scores",
682                "scores from consensus for each authority",
683                &["authority"],
684                registry,
685            ).unwrap(),
686            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
687                "consensus_handler_deferred_transactions",
688                "Number of transactions deferred by consensus handler",
689                registry,
690            ).unwrap(),
691            consensus_handler_congested_transactions: register_int_counter_with_registry!(
692                "consensus_handler_congested_transactions",
693                "Number of transactions deferred by consensus handler due to congestion",
694                registry,
695            ).unwrap(),
696            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
697                "consensus_handler_cancelled_transactions",
698                "Number of transactions cancelled by consensus handler",
699                registry,
700            ).unwrap(),
701            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
702                "consensus_handler_max_congestion_control_object_costs",
703                "Max object costs for congestion control in the current consensus commit",
704                &["commit_type"],
705                registry,
706            ).unwrap(),
707            consensus_committed_subdags: register_int_counter_vec_with_registry!(
708                "consensus_committed_subdags",
709                "Number of committed subdags, sliced by leader",
710                &["authority"],
711                registry,
712            ).unwrap(),
713            consensus_committed_messages: register_int_gauge_vec_with_registry!(
714                "consensus_committed_messages",
715                "Total number of committed consensus messages, sliced by author",
716                &["authority"],
717                registry,
718            ).unwrap(),
719            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
720                "consensus_committed_user_transactions",
721                "Number of committed user transactions, sliced by submitter",
722                &["authority"],
723                registry,
724            ).unwrap(),
725            consensus_handler_leader_round: register_int_gauge_with_registry!(
726                "consensus_handler_leader_round",
727                "The leader round of the current consensus output being processed in the consensus handler",
728                registry,
729            ).unwrap(),
730            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
731            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
732            zklogin_sig_count: register_int_counter_with_registry!(
733                "zklogin_sig_count",
734                "Count of zkLogin signatures",
735                registry,
736            )
737            .unwrap(),
738            multisig_sig_count: register_int_counter_with_registry!(
739                "multisig_sig_count",
740                "Count of zkLogin signatures",
741                registry,
742            )
743            .unwrap(),
744            consensus_calculated_throughput: register_int_gauge_with_registry!(
745                "consensus_calculated_throughput",
746                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
747                registry,
748            ).unwrap(),
749            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
750                "consensus_calculated_throughput_profile",
751                "The current active calculated throughput profile",
752                registry
753            ).unwrap(),
754            execution_queueing_latency: LatencyObserver::new(),
755            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
756            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757        }
758    }
759
760    /// Reset metrics that contain `hostname` as one of the labels. This is
761    /// needed to avoid retaining metrics for long-gone committee members and
762    /// only exposing metrics for the committee in the current epoch.
763    pub fn reset_on_reconfigure(&self) {
764        self.consensus_committed_messages.reset();
765        self.consensus_handler_scores.reset();
766        self.consensus_committed_user_transactions.reset();
767    }
768}
769
770/// a Trait object for `Signer` that is:
771/// - Pin, i.e. confined to one place in memory (we don't want to copy private
772///   keys).
773/// - Sync, i.e. can be safely shared between threads.
774///
775/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
776pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
777
778pub struct AuthorityState {
779    // Fixed size, static, identity of the authority
780    /// The name of this authority.
781    pub name: AuthorityName,
782    /// The signature key of the authority.
783    pub secret: StableSyncAuthoritySigner,
784
785    /// The database
786    input_loader: TransactionInputLoader,
787    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
788
789    epoch_store: ArcSwap<AuthorityPerEpochStore>,
790
791    /// This lock denotes current 'execution epoch'.
792    /// Execution acquires read lock, checks certificate epoch and holds it
793    /// until all writes are complete. Reconfiguration acquires write lock,
794    /// changes the epoch and revert all transactions from previous epoch
795    /// that are executed but did not make into checkpoint.
796    execution_lock: RwLock<EpochId>,
797
798    pub indexes: Option<Arc<IndexStore>>,
799    pub rest_index: Option<Arc<RestIndexStore>>,
800
801    pub subscription_handler: Arc<SubscriptionHandler>,
802    pub checkpoint_store: Arc<CheckpointStore>,
803
804    committee_store: Arc<CommitteeStore>,
805
806    /// Manages pending certificates and their missing input objects.
807    transaction_manager: Arc<TransactionManager>,
808
809    /// Shuts down the execution task. Used only in testing.
810    #[cfg_attr(not(test), expect(unused))]
811    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
812
813    pub metrics: Arc<AuthorityMetrics>,
814    _pruner: AuthorityStorePruner,
815    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
816
817    /// Take db checkpoints of different dbs
818    db_checkpoint_config: DBCheckpointConfig,
819
820    pub config: NodeConfig,
821
822    /// Current overload status in this authority. Updated periodically.
823    pub overload_info: AuthorityOverloadInfo,
824
825    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
826
827    /// The chain identifier is derived from the digest of the genesis
828    /// checkpoint.
829    chain_identifier: ChainIdentifier,
830
831    pub(crate) congestion_tracker: Arc<CongestionTracker>,
832}
833
834/// The authority state encapsulates all state, drives execution, and ensures
835/// safety.
836///
837/// Note the authority operations can be accessed through a read ref (&) and do
838/// not require &mut. Internally a database is synchronized through a mutex
839/// lock.
840///
841/// Repeating valid commands should produce no changes and return no error.
842impl AuthorityState {
843    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
844        epoch_store.committee().authority_exists(&self.name)
845    }
846
847    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
848        epoch_store
849            .active_validators()
850            .iter()
851            .any(|a| AuthorityName::from(a) == self.name)
852    }
853
854    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
855        !self.is_committee_validator(epoch_store)
856    }
857
858    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
859        &self.committee_store
860    }
861
862    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
863        self.committee_store.clone()
864    }
865
866    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
867        &self.config.authority_overload_config
868    }
869
870    pub fn get_epoch_state_commitments(
871        &self,
872        epoch: EpochId,
873    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
874        self.checkpoint_store.get_epoch_state_commitments(epoch)
875    }
876
877    /// This is a private method and should be kept that way. It doesn't check
878    /// whether the provided transaction is a system transaction, and hence
879    /// can only be called internally.
880    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
881    async fn handle_transaction_impl(
882        &self,
883        transaction: VerifiedTransaction,
884        epoch_store: &Arc<AuthorityPerEpochStore>,
885    ) -> IotaResult<VerifiedSignedTransaction> {
886        // Ensure that validator cannot reconfigure while we are signing the tx
887        let _execution_lock = self.execution_lock_for_signing();
888
889        let protocol_config = epoch_store.protocol_config();
890        let reference_gas_price = epoch_store.reference_gas_price();
891
892        let epoch = epoch_store.epoch();
893
894        let tx_data = transaction.data().transaction_data();
895
896        // Note: the deny checks may do redundant package loads but:
897        // - they only load packages when there is an active package deny map
898        // - the loads are cached anyway
899        iota_transaction_checks::deny::check_transaction_for_signing(
900            tx_data,
901            transaction.tx_signatures(),
902            &transaction.input_objects()?,
903            &tx_data.receiving_objects(),
904            &self.config.transaction_deny_config,
905            self.get_backing_package_store().as_ref(),
906        )?;
907
908        // Load all transaction-related input objects.
909        // Authenticator input objects and the account object are loaded in the same
910        // call if there is a sender `MoveAuthenticator` signature present in the
911        // transaction.
912        let (tx_input_objects, tx_receiving_objects, auth_input_objects, account_object) =
913            self.read_objects_for_signing(&transaction, epoch)?;
914
915        // Get the sender `MoveAuthenticator`, if any.
916        // Only one `MoveAuthenticator` signature is possible, since it is not
917        // implemented for the sponsor at the moment.
918        let move_authenticator = transaction.sender_move_authenticator();
919
920        // Check the inputs for signing.
921        // If there is a sender `MoveAuthenticator` signature, its input objects and the
922        // account object are also checked and must be provided.
923        // It is also checked if there is enough gas to execute the transaction and its
924        // authenticators.
925        let (
926            gas_status,
927            tx_checked_input_objects,
928            auth_checked_input_objects,
929            authenticator_function_ref,
930        ) = self.check_transaction_inputs_for_signing(
931            protocol_config,
932            reference_gas_price,
933            tx_data,
934            tx_input_objects,
935            &tx_receiving_objects,
936            move_authenticator,
937            auth_input_objects,
938            account_object,
939        )?;
940
941        check_coin_deny_list_v1_during_signing(
942            tx_data.sender(),
943            &tx_checked_input_objects,
944            &tx_receiving_objects,
945            &auth_checked_input_objects,
946            &self.get_object_store(),
947        )?;
948
949        if let Some(move_authenticator) = move_authenticator {
950            // It is supposed that `MoveAuthenticator` availability is checked in
951            // `SenderSignedData::validity_check`.
952
953            let auth_checked_input_objects = auth_checked_input_objects
954                .expect("MoveAuthenticator input objects must be provided");
955            let authenticator_function_ref = authenticator_function_ref
956                .expect("AuthenticatorFunctionRefV1 object must be provided");
957
958            let (kind, signer, _) = tx_data.execution_parts();
959
960            // Execute the Move authenticator.
961            let validation_result = epoch_store.executor().authenticate_transaction(
962                self.get_backing_store().as_ref(),
963                protocol_config,
964                self.metrics.limits_metrics.clone(),
965                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
966                epoch_store
967                    .epoch_start_config()
968                    .epoch_data()
969                    .epoch_start_timestamp(),
970                gas_status,
971                move_authenticator.to_owned(),
972                authenticator_function_ref,
973                auth_checked_input_objects,
974                kind,
975                signer,
976                transaction.digest().to_owned(),
977                &mut None,
978            );
979
980            if let Err(validation_error) = validation_result {
981                return Err(IotaError::MoveAuthenticatorExecutionFailure {
982                    error: validation_error.to_string(),
983                });
984            }
985        }
986
987        let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
988
989        let signed_transaction =
990            VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
991
992        // Check and write locks, to signed transaction, into the database
993        // The call to self.set_transaction_lock checks the lock is not conflicting,
994        // and returns ConflictingTransaction error in case there is a lock on a
995        // different existing transaction.
996        self.get_cache_writer().try_acquire_transaction_locks(
997            epoch_store,
998            &owned_objects,
999            signed_transaction.clone(),
1000        )?;
1001
1002        Ok(signed_transaction)
1003    }
1004
1005    /// Initiate a new transaction.
1006    #[instrument(name = "handle_transaction", level = "trace", skip_all)]
1007    pub async fn handle_transaction(
1008        &self,
1009        epoch_store: &Arc<AuthorityPerEpochStore>,
1010        transaction: VerifiedTransaction,
1011    ) -> IotaResult<HandleTransactionResponse> {
1012        let tx_digest = *transaction.digest();
1013        debug!("handle_transaction");
1014
1015        // Ensure an idempotent answer.
1016        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1017            return Ok(HandleTransactionResponse { status });
1018        }
1019
1020        let _metrics_guard = self
1021            .metrics
1022            .authority_state_handle_transaction_latency
1023            .start_timer();
1024        self.metrics.tx_orders.inc();
1025
1026        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1027        match signed {
1028            Ok(s) => {
1029                if self.is_committee_validator(epoch_store) {
1030                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1031                        let tx = s.clone();
1032                        let validator_tx_finalizer = validator_tx_finalizer.clone();
1033                        let cache_reader = self.get_transaction_cache_reader().clone();
1034                        let epoch_store = epoch_store.clone();
1035                        spawn_monitored_task!(epoch_store.within_alive_epoch(
1036                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1037                        ));
1038                    }
1039                }
1040                Ok(HandleTransactionResponse {
1041                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
1042                })
1043            }
1044            // It happens frequently that while we are checking the validity of the transaction, it
1045            // has just been executed.
1046            // In that case, we could still return Ok to avoid showing confusing errors.
1047            Err(err) => Ok(HandleTransactionResponse {
1048                status: self
1049                    .get_transaction_status(&tx_digest, epoch_store)?
1050                    .ok_or(err)?
1051                    .1,
1052            }),
1053        }
1054    }
1055
1056    pub fn check_system_overload_at_signing(&self) -> bool {
1057        self.config
1058            .authority_overload_config
1059            .check_system_overload_at_signing
1060    }
1061
1062    pub fn check_system_overload_at_execution(&self) -> bool {
1063        self.config
1064            .authority_overload_config
1065            .check_system_overload_at_execution
1066    }
1067
1068    pub(crate) fn check_system_overload(
1069        &self,
1070        consensus_adapter: &Arc<ConsensusAdapter>,
1071        tx_data: &SenderSignedData,
1072        do_authority_overload_check: bool,
1073    ) -> IotaResult {
1074        if do_authority_overload_check {
1075            self.check_authority_overload(tx_data).tap_err(|_| {
1076                self.update_overload_metrics("execution_queue");
1077            })?;
1078        }
1079        self.transaction_manager
1080            .check_execution_overload(self.overload_config(), tx_data)
1081            .tap_err(|_| {
1082                self.update_overload_metrics("execution_pending");
1083            })?;
1084        consensus_adapter.check_consensus_overload().tap_err(|_| {
1085            self.update_overload_metrics("consensus");
1086        })?;
1087
1088        let pending_tx_count = self
1089            .get_cache_commit()
1090            .approximate_pending_transaction_count();
1091        if pending_tx_count
1092            > self
1093                .config
1094                .execution_cache_config
1095                .writeback_cache
1096                .backpressure_threshold_for_rpc()
1097        {
1098            return Err(IotaError::ValidatorOverloadedRetryAfter {
1099                retry_after_secs: 10,
1100            });
1101        }
1102
1103        Ok(())
1104    }
1105
1106    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1107        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1108            return Ok(());
1109        }
1110
1111        let load_shedding_percentage = self
1112            .overload_info
1113            .load_shedding_percentage
1114            .load(Ordering::Relaxed);
1115        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1116    }
1117
1118    fn update_overload_metrics(&self, source: &str) {
1119        self.metrics
1120            .transaction_overload_sources
1121            .with_label_values(&[source])
1122            .inc();
1123    }
1124
1125    /// Executes a certificate for its effects.
1126    #[instrument(level = "trace", skip_all)]
1127    pub async fn execute_certificate(
1128        &self,
1129        certificate: &VerifiedCertificate,
1130        epoch_store: &Arc<AuthorityPerEpochStore>,
1131    ) -> IotaResult<TransactionEffects> {
1132        let _metrics_guard = if certificate.contains_shared_object() {
1133            self.metrics
1134                .execute_certificate_latency_shared_object
1135                .start_timer()
1136        } else {
1137            self.metrics
1138                .execute_certificate_latency_single_writer
1139                .start_timer()
1140        };
1141        trace!("execute_certificate");
1142
1143        self.metrics.total_cert_attempts.inc();
1144
1145        if !certificate.contains_shared_object() {
1146            // Shared object transactions need to be sequenced by the consensus before
1147            // enqueueing for execution, done in
1148            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1149            // object transactions, they can be enqueued for execution immediately.
1150            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1151        }
1152
1153        // tx could be reverted when epoch ends, so we must be careful not to return a
1154        // result here after the epoch ends.
1155        epoch_store
1156            .within_alive_epoch(self.notify_read_effects(certificate))
1157            .await
1158            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1159            .and_then(|r| r)
1160    }
1161
1162    /// Internal logic to execute a certificate.
1163    ///
1164    /// Guarantees that
1165    /// - If input objects are available, return no permanent failure.
1166    /// - Execution and output commit are atomic. i.e. outputs are only written
1167    ///   to storage,
1168    /// on successful execution; crashed execution has no observable effect and
1169    /// can be retried.
1170    ///
1171    /// It is caller's responsibility to ensure input objects are available and
1172    /// locks are set. If this cannot be satisfied by the caller,
1173    /// execute_certificate() should be called instead.
1174    ///
1175    /// Should only be called within iota-core.
1176    #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1177    pub fn try_execute_immediately(
1178        &self,
1179        certificate: &VerifiedExecutableTransaction,
1180        expected_effects_digest: Option<TransactionEffectsDigest>,
1181        epoch_store: &Arc<AuthorityPerEpochStore>,
1182    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1183        let _scope = monitored_scope("Execution::try_execute_immediately");
1184        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1185
1186        let tx_digest = certificate.digest();
1187
1188        // Acquire a lock to prevent concurrent executions of the same transaction.
1189        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1190
1191        // The cert could have been processed by a concurrent attempt of the same cert,
1192        // so check if the effects have already been written.
1193        if let Some(effects) = self
1194            .get_transaction_cache_reader()
1195            .try_get_executed_effects(tx_digest)?
1196        {
1197            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1198                assert_eq!(
1199                    effects.digest(),
1200                    expected_effects_digest_inner,
1201                    "Unexpected effects digest for transaction {tx_digest:?}"
1202                );
1203            }
1204            tx_guard.release();
1205            return Ok((effects, None));
1206        }
1207
1208        let (tx_input_objects, authenticator_input_objects, account_object) =
1209            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1210
1211        // If no expected_effects_digest was provided, try to get it from storage.
1212        // We could be re-executing a previously executed but uncommitted transaction,
1213        // perhaps after restarting with a new binary. In this situation, if
1214        // we have published an effects signature, we must be sure not to
1215        // equivocate. TODO: read from cache instead of DB
1216        let expected_effects_digest =
1217            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1218
1219        self.process_certificate(
1220            tx_guard,
1221            certificate,
1222            tx_input_objects,
1223            account_object,
1224            authenticator_input_objects,
1225            expected_effects_digest,
1226            epoch_store,
1227        )
1228        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1229        .tap_ok(
1230            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1231        )
1232    }
1233
1234    pub fn read_objects_for_execution(
1235        &self,
1236        tx_lock: &CertLockGuard,
1237        certificate: &VerifiedExecutableTransaction,
1238        epoch_store: &Arc<AuthorityPerEpochStore>,
1239    ) -> IotaResult<(InputObjects, Option<InputObjects>, Option<ObjectReadResult>)> {
1240        let _scope = monitored_scope("Execution::load_input_objects");
1241        let _metrics_guard = self
1242            .metrics
1243            .execution_load_input_objects_latency
1244            .start_timer();
1245
1246        let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1247
1248        let input_objects = self.input_loader.read_objects_for_execution(
1249            epoch_store,
1250            &certificate.key(),
1251            tx_lock,
1252            &input_objects,
1253            epoch_store.epoch(),
1254        )?;
1255
1256        certificate.split_input_objects_into_groups_for_reading(input_objects)
1257    }
1258
1259    /// Test only wrapper for `try_execute_immediately()` above, useful for
1260    /// checking errors if the pre-conditions are not satisfied, and
1261    /// executing change epoch transactions.
1262    pub fn try_execute_for_test(
1263        &self,
1264        certificate: &VerifiedCertificate,
1265    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1266        let epoch_store = self.epoch_store_for_testing();
1267        let (effects, execution_error_opt) = self.try_execute_immediately(
1268            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1269            None,
1270            &epoch_store,
1271        )?;
1272        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1273        Ok((signed_effects, execution_error_opt))
1274    }
1275
1276    /// Non-fallible version of `try_execute_for_test()`.
1277    pub fn execute_for_test(
1278        &self,
1279        certificate: &VerifiedCertificate,
1280    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1281        self.try_execute_for_test(certificate)
1282            .expect("try_execute_for_test should not fail")
1283    }
1284
1285    pub async fn notify_read_effects(
1286        &self,
1287        certificate: &VerifiedCertificate,
1288    ) -> IotaResult<TransactionEffects> {
1289        self.get_transaction_cache_reader()
1290            .try_notify_read_executed_effects(&[*certificate.digest()])
1291            .await
1292            .map(|mut r| r.pop().expect("must return correct number of effects"))
1293    }
1294
1295    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1296        self.get_object_cache_reader()
1297            .try_check_owned_objects_are_live(owned_object_refs)
1298    }
1299
1300    /// This function captures the required state to debug a forked transaction.
1301    /// The dump is written to a file in dir `path`, with name prefixed by the
1302    /// transaction digest. NOTE: Since this info escapes the validator
1303    /// context, make sure not to leak any private info here
1304    pub(crate) fn debug_dump_transaction_state(
1305        &self,
1306        tx_digest: &TransactionDigest,
1307        effects: &TransactionEffects,
1308        expected_effects_digest: TransactionEffectsDigest,
1309        inner_temporary_store: &InnerTemporaryStore,
1310        certificate: &VerifiedExecutableTransaction,
1311        debug_dump_config: &StateDebugDumpConfig,
1312    ) -> IotaResult<PathBuf> {
1313        let dump_dir = debug_dump_config
1314            .dump_file_directory
1315            .as_ref()
1316            .cloned()
1317            .unwrap_or(std::env::temp_dir());
1318        let epoch_store = self.load_epoch_store_one_call_per_task();
1319
1320        NodeStateDump::new(
1321            tx_digest,
1322            effects,
1323            expected_effects_digest,
1324            self.get_object_store().as_ref(),
1325            &epoch_store,
1326            inner_temporary_store,
1327            certificate,
1328        )?
1329        .write_to_file(&dump_dir)
1330        .map_err(|e| IotaError::FileIO(e.to_string()))
1331    }
1332
1333    #[instrument(level = "trace", skip_all)]
1334    pub(crate) fn process_certificate(
1335        &self,
1336        tx_guard: CertTxGuard,
1337        certificate: &VerifiedExecutableTransaction,
1338        tx_input_objects: InputObjects,
1339        account_object: Option<ObjectReadResult>,
1340        authenticator_input_objects: Option<InputObjects>,
1341        expected_effects_digest: Option<TransactionEffectsDigest>,
1342        epoch_store: &Arc<AuthorityPerEpochStore>,
1343    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1344        let process_certificate_start_time = tokio::time::Instant::now();
1345        let digest = *certificate.digest();
1346
1347        fail_point_if!("correlated-crash-process-certificate", || {
1348            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1349                iota_simulator::task::kill_current_node(None);
1350            }
1351        });
1352
1353        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1354        // Any caller that verifies the signatures on the certificate will have already
1355        // checked the epoch. But paths that don't verify sigs (e.g. execution
1356        // from checkpoint, reading from db) present the possibility of an epoch
1357        // mismatch. If this cert is not finalzied in previous epoch, then it's
1358        // invalid.
1359        let execution_guard = match execution_guard {
1360            Ok(execution_guard) => execution_guard,
1361            Err(err) => {
1362                tx_guard.release();
1363                return Err(err);
1364            }
1365        };
1366        // Since we obtain a reference to the epoch store before taking the execution
1367        // lock, it's possible that reconfiguration has happened and they no
1368        // longer match.
1369        if *execution_guard != epoch_store.epoch() {
1370            tx_guard.release();
1371            info!("The epoch of the execution_guard doesn't match the epoch store");
1372            return Err(IotaError::WrongEpoch {
1373                expected_epoch: epoch_store.epoch(),
1374                actual_epoch: *execution_guard,
1375            });
1376        }
1377
1378        // Errors originating from prepare_certificate may be transient (failure to read
1379        // locks) or non-transient (transaction input is invalid, move vm
1380        // errors). However, all errors from this function occur before we have
1381        // written anything to the db, so we commit the tx guard and rely on the
1382        // client to retry the tx (if it was transient).
1383        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1384            &execution_guard,
1385            certificate,
1386            tx_input_objects,
1387            account_object,
1388            authenticator_input_objects,
1389            epoch_store,
1390        ) {
1391            Err(e) => {
1392                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1393                tx_guard.release();
1394                return Err(e);
1395            }
1396            Ok(res) => res,
1397        };
1398
1399        if let Some(expected_effects_digest) = expected_effects_digest {
1400            if effects.digest() != expected_effects_digest {
1401                // We dont want to mask the original error, so we log it and continue.
1402                match self.debug_dump_transaction_state(
1403                    &digest,
1404                    &effects,
1405                    expected_effects_digest,
1406                    &inner_temporary_store,
1407                    certificate,
1408                    &self.config.state_debug_dump_config,
1409                ) {
1410                    Ok(out_path) => {
1411                        info!(
1412                            "Dumped node state for transaction {} to {}",
1413                            digest,
1414                            out_path.as_path().display().to_string()
1415                        );
1416                    }
1417                    Err(e) => {
1418                        error!("Error dumping state for transaction {}: {e}", digest);
1419                    }
1420                }
1421                error!(
1422                    tx_digest = ?digest,
1423                    ?expected_effects_digest,
1424                    actual_effects = ?effects,
1425                    "fork detected!"
1426                );
1427                panic!(
1428                    "Transaction {} is expected to have effects digest {}, but got {}!",
1429                    digest,
1430                    expected_effects_digest,
1431                    effects.digest(),
1432                );
1433            }
1434        }
1435
1436        fail_point!("crash");
1437
1438        self.commit_certificate(
1439            certificate,
1440            inner_temporary_store,
1441            &effects,
1442            tx_guard,
1443            execution_guard,
1444            epoch_store,
1445        )?;
1446
1447        if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1448            certificate.data().transaction_data().kind()
1449        {
1450            if let Some(err) = &execution_error_opt {
1451                debug_fatal!("Authenticator state update failed: {:?}", err);
1452            }
1453            epoch_store.update_authenticator_state(auth_state);
1454
1455            // double check that the signature verifier always matches the authenticator
1456            // state
1457            if cfg!(debug_assertions) {
1458                let authenticator_state = get_authenticator_state(self.get_object_store())
1459                    .expect("Read cannot fail")
1460                    .expect("Authenticator state must exist");
1461
1462                let mut sys_jwks: Vec<_> = authenticator_state
1463                    .active_jwks
1464                    .into_iter()
1465                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1466                    .collect();
1467                let mut active_jwks: Vec<_> = epoch_store
1468                    .signature_verifier
1469                    .get_jwks()
1470                    .into_iter()
1471                    .collect();
1472                sys_jwks.sort();
1473                active_jwks.sort();
1474
1475                assert_eq!(sys_jwks, active_jwks);
1476            }
1477        }
1478
1479        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1480        if elapsed > 0.0 {
1481            self.metrics
1482                .execution_gas_latency_ratio
1483                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1484        };
1485        Ok((effects, execution_error_opt))
1486    }
1487
1488    #[instrument(level = "trace", skip_all)]
1489    fn commit_certificate(
1490        &self,
1491        certificate: &VerifiedExecutableTransaction,
1492        inner_temporary_store: InnerTemporaryStore,
1493        effects: &TransactionEffects,
1494        tx_guard: CertTxGuard,
1495        _execution_guard: ExecutionLockReadGuard<'_>,
1496        epoch_store: &Arc<AuthorityPerEpochStore>,
1497    ) -> IotaResult {
1498        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1499            monitored_scope("Execution::commit_certificate");
1500        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1501
1502        let tx_key = certificate.key();
1503        let tx_digest = certificate.digest();
1504        let input_object_count = inner_temporary_store.input_objects.len();
1505        let shared_object_count = effects.input_shared_objects().len();
1506
1507        let output_keys = inner_temporary_store.get_output_keys(effects);
1508
1509        // index certificate
1510        let _ = self
1511            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1512            .tap_err(|e| {
1513                self.metrics.post_processing_total_failures.inc();
1514                error!(?tx_digest, "tx post processing failed: {e}");
1515            });
1516
1517        // The insertion to epoch_store is not atomic with the insertion to the
1518        // perpetual store. This is OK because we insert to the epoch store
1519        // first. And during lookups we always look up in the perpetual store first.
1520        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1521
1522        // Allow testing what happens if we crash here.
1523        fail_point!("crash");
1524
1525        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1526            certificate.clone().into_unsigned(),
1527            effects.clone(),
1528            inner_temporary_store,
1529        );
1530        self.get_cache_writer()
1531            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1532
1533        if certificate.transaction_data().is_end_of_epoch_tx() {
1534            // At the end of epoch, since system packages may have been upgraded, force
1535            // reload them in the cache.
1536            self.get_object_cache_reader()
1537                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1538        }
1539
1540        // commit_certificate finished, the tx is fully committed to the store.
1541        tx_guard.commit_tx();
1542
1543        // Notifies transaction manager about transaction and output objects committed.
1544        // This provides necessary information to transaction manager to start executing
1545        // additional ready transactions.
1546        self.transaction_manager
1547            .notify_commit(tx_digest, output_keys, epoch_store);
1548
1549        self.update_metrics(certificate, input_object_count, shared_object_count);
1550
1551        Ok(())
1552    }
1553
1554    fn update_metrics(
1555        &self,
1556        certificate: &VerifiedExecutableTransaction,
1557        input_object_count: usize,
1558        shared_object_count: usize,
1559    ) {
1560        // count signature by scheme, for zklogin and multisig
1561        if certificate.has_zklogin_sig() {
1562            self.metrics.zklogin_sig_count.inc();
1563        } else if certificate.has_upgraded_multisig() {
1564            self.metrics.multisig_sig_count.inc();
1565        }
1566
1567        self.metrics.total_effects.inc();
1568        self.metrics.total_certs.inc();
1569
1570        if shared_object_count > 0 {
1571            self.metrics.shared_obj_tx.inc();
1572        }
1573
1574        if certificate.is_sponsored_tx() {
1575            self.metrics.sponsored_tx.inc();
1576        }
1577
1578        self.metrics
1579            .num_input_objs
1580            .observe(input_object_count as f64);
1581        self.metrics
1582            .num_shared_objects
1583            .observe(shared_object_count as f64);
1584        self.metrics.batch_size.observe(
1585            certificate
1586                .data()
1587                .intent_message()
1588                .value
1589                .kind()
1590                .num_commands() as f64,
1591        );
1592    }
1593
1594    /// prepare_certificate validates the transaction input, and executes the
1595    /// certificate, returning effects, output objects, events, etc.
1596    ///
1597    /// It reads state from the db (both owned and shared locks), but it has no
1598    /// side effects.
1599    ///
1600    /// It can be generally understood that a failure of prepare_certificate
1601    /// indicates a non-transient error, e.g. the transaction input is
1602    /// somehow invalid, the correct locks are not held, etc. However, this
1603    /// is not entirely true, as a transient db read error may also cause
1604    /// this function to fail.
1605    #[instrument(level = "trace", skip_all)]
1606    fn prepare_certificate(
1607        &self,
1608        _execution_guard: &ExecutionLockReadGuard<'_>,
1609        certificate: &VerifiedExecutableTransaction,
1610        tx_input_objects: InputObjects,
1611        account_object: Option<ObjectReadResult>,
1612        move_authenticator_input_objects: Option<InputObjects>,
1613        epoch_store: &Arc<AuthorityPerEpochStore>,
1614    ) -> IotaResult<(
1615        InnerTemporaryStore,
1616        TransactionEffects,
1617        Option<ExecutionError>,
1618    )> {
1619        let _scope = monitored_scope("Execution::prepare_certificate");
1620        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1621        let prepare_certificate_start_time = tokio::time::Instant::now();
1622
1623        let protocol_config = epoch_store.protocol_config();
1624
1625        let reference_gas_price = epoch_store.reference_gas_price();
1626
1627        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1628        let epoch_start_timestamp = epoch_store
1629            .epoch_start_config()
1630            .epoch_data()
1631            .epoch_start_timestamp();
1632
1633        let backing_store = self.get_backing_store().as_ref();
1634
1635        let tx_digest = *certificate.digest();
1636
1637        // TODO: We need to move this to a more appropriate place to avoid redundant
1638        // checks.
1639        let tx_data = certificate.data().transaction_data();
1640        tx_data.validity_check(protocol_config)?;
1641
1642        let (kind, signer, gas) = tx_data.execution_parts();
1643
1644        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1645        let (inner_temp_store, _, mut effects, execution_error_opt) = if let Some(
1646            move_authenticator,
1647        ) =
1648            certificate.sender_move_authenticator()
1649        {
1650            // Check if the sender needs to be authenticated in Move and, if so, execute the
1651            // authentication.
1652            // It is supposed that `MoveAuthenticator` availability is checked in
1653            // `SenderSignedData::validity_check`.
1654
1655            // Check basic `object_to_authenticate` preconditions and get its components.
1656            let (
1657                auth_account_object_id,
1658                auth_account_object_seq_number,
1659                auth_account_object_digest,
1660            ) = move_authenticator.object_to_authenticate_components()?;
1661
1662            // Since the `object_to_authenticate` components are provided, it is supposed
1663            // that the account object is loaded.
1664            let account_object = account_object.expect("Account object must be provided");
1665
1666            let authenticator_function_ref_for_execution = self.check_move_account(
1667                auth_account_object_id,
1668                auth_account_object_seq_number,
1669                auth_account_object_digest,
1670                account_object,
1671                &signer,
1672            )?;
1673
1674            let move_authenticator_input_objects = move_authenticator_input_objects.expect(
1675                    "In case of a `MoveAuthenticator` signature, the authenticator input objects must be provided",
1676                );
1677
1678            // Check the `MoveAuthenticator` input objects.
1679            // The `MoveAuthenticator` receiving objects are checked on the signing step.
1680            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
1681            // not a part of the transaction data.
1682            let authenticator_gas_budget = protocol_config.max_auth_gas();
1683            let (
1684                gas_status,
1685                authenticator_checked_input_objects,
1686                authenticator_and_tx_checked_input_objects,
1687            ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1688                certificate,
1689                tx_input_objects,
1690                move_authenticator_input_objects,
1691                authenticator_gas_budget,
1692                protocol_config,
1693                reference_gas_price,
1694            )?;
1695
1696            let owned_object_refs = authenticator_and_tx_checked_input_objects
1697                .inner()
1698                .filter_owned_objects();
1699            self.check_owned_locks(&owned_object_refs)?;
1700
1701            epoch_store
1702                .executor()
1703                .authenticate_then_execute_transaction_to_effects(
1704                    backing_store,
1705                    protocol_config,
1706                    self.metrics.limits_metrics.clone(),
1707                    self.config
1708                        .expensive_safety_check_config
1709                        .enable_deep_per_tx_iota_conservation_check(),
1710                    self.config.certificate_deny_config.certificate_deny_set(),
1711                    &epoch_id,
1712                    epoch_start_timestamp,
1713                    gas_status,
1714                    gas,
1715                    move_authenticator.to_owned(),
1716                    authenticator_function_ref_for_execution,
1717                    authenticator_checked_input_objects,
1718                    authenticator_and_tx_checked_input_objects,
1719                    kind,
1720                    signer,
1721                    tx_digest,
1722                    &mut None,
1723                )
1724        } else {
1725            // No Move authentication required, proceed to execute the transaction directly.
1726
1727            // The cost of partially re-auditing a transaction before execution is
1728            // tolerated.
1729            let (tx_gas_status, tx_checked_input_objects) =
1730                iota_transaction_checks::check_certificate_input(
1731                    certificate,
1732                    tx_input_objects,
1733                    protocol_config,
1734                    reference_gas_price,
1735                )?;
1736
1737            let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1738            self.check_owned_locks(&owned_object_refs)?;
1739            epoch_store.executor().execute_transaction_to_effects(
1740                backing_store,
1741                protocol_config,
1742                self.metrics.limits_metrics.clone(),
1743                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1744                // cyclic dependency w/ iota-adapter
1745                self.config
1746                    .expensive_safety_check_config
1747                    .enable_deep_per_tx_iota_conservation_check(),
1748                self.config.certificate_deny_config.certificate_deny_set(),
1749                &epoch_id,
1750                epoch_start_timestamp,
1751                tx_checked_input_objects,
1752                gas,
1753                tx_gas_status,
1754                kind,
1755                signer,
1756                tx_digest,
1757                &mut None,
1758            )
1759        };
1760
1761        fail_point_if!("cp_execution_nondeterminism", || {
1762            #[cfg(msim)]
1763            self.create_fail_state(certificate, epoch_store, &mut effects);
1764        });
1765
1766        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1767        if elapsed > 0.0 {
1768            self.metrics
1769                .prepare_cert_gas_latency_ratio
1770                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1771        }
1772
1773        Ok((inner_temp_store, effects, execution_error_opt.err()))
1774    }
1775
1776    pub fn prepare_certificate_for_benchmark(
1777        &self,
1778        certificate: &VerifiedExecutableTransaction,
1779        input_objects: InputObjects,
1780        epoch_store: &Arc<AuthorityPerEpochStore>,
1781    ) -> IotaResult<(
1782        InnerTemporaryStore,
1783        TransactionEffects,
1784        Option<ExecutionError>,
1785    )> {
1786        let lock = RwLock::new(epoch_store.epoch());
1787        let execution_guard = lock.try_read().unwrap();
1788
1789        self.prepare_certificate(
1790            &execution_guard,
1791            certificate,
1792            input_objects,
1793            None,
1794            None,
1795            epoch_store,
1796        )
1797    }
1798
1799    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1800    /// `VmChecks::Enabled` instead.
1801    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1802    #[allow(clippy::type_complexity)]
1803    pub fn dry_exec_transaction(
1804        &self,
1805        transaction: TransactionData,
1806        transaction_digest: TransactionDigest,
1807    ) -> IotaResult<(
1808        DryRunTransactionBlockResponse,
1809        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1810        TransactionEffects,
1811        Option<ObjectID>,
1812    )> {
1813        let epoch_store = self.load_epoch_store_one_call_per_task();
1814        if !self.is_fullnode(&epoch_store) {
1815            return Err(IotaError::UnsupportedFeature {
1816                error: "dry-exec is only supported on fullnodes".to_string(),
1817            });
1818        }
1819
1820        if transaction.kind().is_system_tx() {
1821            return Err(IotaError::UnsupportedFeature {
1822                error: "dry-exec does not support system transactions".to_string(),
1823            });
1824        }
1825
1826        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1827    }
1828
1829    #[allow(clippy::type_complexity)]
1830    pub fn dry_exec_transaction_for_benchmark(
1831        &self,
1832        transaction: TransactionData,
1833        transaction_digest: TransactionDigest,
1834    ) -> IotaResult<(
1835        DryRunTransactionBlockResponse,
1836        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1837        TransactionEffects,
1838        Option<ObjectID>,
1839    )> {
1840        let epoch_store = self.load_epoch_store_one_call_per_task();
1841        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1842    }
1843
1844    #[instrument(level = "trace", skip_all)]
1845    #[allow(clippy::type_complexity)]
1846    fn dry_exec_transaction_impl(
1847        &self,
1848        epoch_store: &AuthorityPerEpochStore,
1849        transaction: TransactionData,
1850        transaction_digest: TransactionDigest,
1851    ) -> IotaResult<(
1852        DryRunTransactionBlockResponse,
1853        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1854        TransactionEffects,
1855        Option<ObjectID>,
1856    )> {
1857        // Cheap validity checks for a transaction, including input size limits.
1858        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1859
1860        let input_object_kinds = transaction.input_objects()?;
1861        let receiving_object_refs = transaction.receiving_objects();
1862
1863        iota_transaction_checks::deny::check_transaction_for_signing(
1864            &transaction,
1865            &[],
1866            &input_object_kinds,
1867            &receiving_object_refs,
1868            &self.config.transaction_deny_config,
1869            self.get_backing_package_store().as_ref(),
1870        )?;
1871
1872        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1873            // We don't want to cache this transaction since it's a dry run.
1874            None,
1875            &input_object_kinds,
1876            &receiving_object_refs,
1877            epoch_store.epoch(),
1878        )?;
1879
1880        // make a gas object if one was not provided
1881        let mut transaction = transaction;
1882        let mut gas_object_refs = transaction.gas().to_vec();
1883        let reference_gas_price = epoch_store.reference_gas_price();
1884        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1885            let sender = transaction.gas_owner();
1886            let gas_object_id = ObjectID::random();
1887            let gas_object = Object::new_move(
1888                MoveObject::new_gas_coin(
1889                    OBJECT_START_VERSION,
1890                    gas_object_id,
1891                    SIMULATION_GAS_COIN_VALUE,
1892                ),
1893                Owner::AddressOwner(sender),
1894                TransactionDigest::genesis_marker(),
1895            );
1896            let gas_object_ref = gas_object.compute_object_reference();
1897            gas_object_refs = vec![gas_object_ref];
1898            // Add gas object to transaction gas payment
1899            transaction.gas_data_mut().payment = gas_object_refs.clone();
1900            (
1901                iota_transaction_checks::check_transaction_input_with_given_gas(
1902                    epoch_store.protocol_config(),
1903                    reference_gas_price,
1904                    &transaction,
1905                    input_objects,
1906                    receiving_objects,
1907                    gas_object,
1908                    &self.metrics.bytecode_verifier_metrics,
1909                    &self.config.verifier_signing_config,
1910                )?,
1911                Some(gas_object_id),
1912            )
1913        } else {
1914            // `MoveAuthenticator`s are not supported in dry runs, so we set the
1915            // `authenticator_gas_budget` to 0.
1916            let authenticator_gas_budget = 0;
1917
1918            (
1919                iota_transaction_checks::check_transaction_input(
1920                    epoch_store.protocol_config(),
1921                    reference_gas_price,
1922                    &transaction,
1923                    input_objects,
1924                    &receiving_objects,
1925                    &self.metrics.bytecode_verifier_metrics,
1926                    &self.config.verifier_signing_config,
1927                    authenticator_gas_budget,
1928                )?,
1929                None,
1930            )
1931        };
1932
1933        let protocol_config = epoch_store.protocol_config();
1934        let (kind, signer, _) = transaction.execution_parts();
1935
1936        let silent = true;
1937        let executor = iota_execution::executor(protocol_config, silent, None)
1938            .expect("Creating an executor should not fail here");
1939
1940        let expensive_checks = false;
1941        let (inner_temp_store, _, effects, execution_error) = executor
1942            .execute_transaction_to_effects(
1943                self.get_backing_store().as_ref(),
1944                protocol_config,
1945                self.metrics.limits_metrics.clone(),
1946                expensive_checks,
1947                self.config.certificate_deny_config.certificate_deny_set(),
1948                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1949                epoch_store
1950                    .epoch_start_config()
1951                    .epoch_data()
1952                    .epoch_start_timestamp(),
1953                checked_input_objects,
1954                gas_object_refs,
1955                gas_status,
1956                kind,
1957                signer,
1958                transaction_digest,
1959                &mut None,
1960            );
1961        let tx_digest = *effects.transaction_digest();
1962
1963        let module_cache =
1964            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1965
1966        let mut layout_resolver =
1967            epoch_store
1968                .executor()
1969                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1970                    &inner_temp_store,
1971                    self.get_backing_package_store(),
1972                )));
1973        // Returning empty vector here because we recalculate changes in the rpc layer.
1974        let object_changes = Vec::new();
1975
1976        // Returning empty vector here because we recalculate changes in the rpc layer.
1977        let balance_changes = Vec::new();
1978
1979        let written_with_kind = effects
1980            .created()
1981            .into_iter()
1982            .map(|(oref, _)| (oref, WriteKind::Create))
1983            .chain(
1984                effects
1985                    .unwrapped()
1986                    .into_iter()
1987                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1988            )
1989            .chain(
1990                effects
1991                    .mutated()
1992                    .into_iter()
1993                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
1994            )
1995            .map(|(oref, kind)| {
1996                let obj = inner_temp_store.written.get(&oref.0).unwrap();
1997                // TODO: Avoid clones.
1998                (oref.0, (oref, obj.clone(), kind))
1999            })
2000            .collect();
2001
2002        let execution_error_source = execution_error
2003            .as_ref()
2004            .err()
2005            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2006
2007        Ok((
2008            DryRunTransactionBlockResponse {
2009                // to avoid cloning `transaction`, fields are populated in this order
2010                suggested_gas_price: self
2011                    .congestion_tracker
2012                    .get_prediction_suggested_gas_price(&transaction),
2013                input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
2014                    .map_err(|e| IotaError::TransactionSerialization {
2015                        error: format!(
2016                            "Failed to convert transaction to IotaTransactionBlockData: {e}",
2017                        ),
2018                    })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
2019                effects: effects.clone().try_into()?,
2020                events: IotaTransactionBlockEvents::try_from(
2021                    inner_temp_store.events.clone(),
2022                    tx_digest,
2023                    None,
2024                    layout_resolver.as_mut(),
2025                )?,
2026                object_changes,
2027                balance_changes,
2028                execution_error_source,
2029            },
2030            written_with_kind,
2031            effects,
2032            mock_gas,
2033        ))
2034    }
2035
2036    pub fn simulate_transaction(
2037        &self,
2038        mut transaction: TransactionData,
2039        checks: VmChecks,
2040    ) -> IotaResult<SimulateTransactionResult> {
2041        if transaction.kind().is_system_tx() {
2042            return Err(IotaError::UnsupportedFeature {
2043                error: "simulate does not support system transactions".to_string(),
2044            });
2045        }
2046
2047        let epoch_store = self.load_epoch_store_one_call_per_task();
2048        if !self.is_fullnode(&epoch_store) {
2049            return Err(IotaError::UnsupportedFeature {
2050                error: "simulate is only supported on fullnodes".to_string(),
2051            });
2052        }
2053
2054        // Cheap validity checks for a transaction, including input size limits.
2055        // This does not check if gas objects are missing since we may create a
2056        // mock gas object. It checks for other transaction input validity.
2057        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2058
2059        let input_object_kinds = transaction.input_objects()?;
2060        let receiving_object_refs = transaction.receiving_objects();
2061
2062        // Since we need to simulate a validator signing the transaction, the first step
2063        // is to check if some transaction elements are denied.
2064        iota_transaction_checks::deny::check_transaction_for_signing(
2065            &transaction,
2066            &[],
2067            &input_object_kinds,
2068            &receiving_object_refs,
2069            &self.config.transaction_deny_config,
2070            self.get_backing_package_store().as_ref(),
2071        )?;
2072
2073        // Load input and receiving objects
2074        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2075            // We don't want to cache this transaction since it's a simulation.
2076            None,
2077            &input_object_kinds,
2078            &receiving_object_refs,
2079            epoch_store.epoch(),
2080        )?;
2081
2082        // Create a mock gas object if one was not provided
2083        let mock_gas_id = if transaction.gas().is_empty() {
2084            let mock_gas_object = Object::new_move(
2085                MoveObject::new_gas_coin(
2086                    OBJECT_START_VERSION,
2087                    ObjectID::MAX,
2088                    SIMULATION_GAS_COIN_VALUE,
2089                ),
2090                Owner::AddressOwner(transaction.gas_data().owner),
2091                TransactionDigest::genesis_marker(),
2092            );
2093            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2094            transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2095            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2096            Some(mock_gas_object.id())
2097        } else {
2098            None
2099        };
2100
2101        let protocol_config = epoch_store.protocol_config();
2102
2103        // `MoveAuthenticator`s are not supported in simulation, so we set the
2104        // `authenticator_gas_budget` to 0.
2105        let authenticator_gas_budget = 0;
2106
2107        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
2108        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
2109        let (gas_status, checked_input_objects) = if checks.enabled() {
2110            iota_transaction_checks::check_transaction_input(
2111                protocol_config,
2112                epoch_store.reference_gas_price(),
2113                &transaction,
2114                input_objects,
2115                &receiving_objects,
2116                &self.metrics.bytecode_verifier_metrics,
2117                &self.config.verifier_signing_config,
2118                authenticator_gas_budget,
2119            )?
2120        } else {
2121            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2122                protocol_config,
2123                transaction.kind(),
2124                input_objects,
2125                receiving_objects,
2126            )?;
2127            let gas_status = IotaGasStatus::new(
2128                transaction.gas_budget(),
2129                transaction.gas_price(),
2130                epoch_store.reference_gas_price(),
2131                protocol_config,
2132            )?;
2133
2134            (gas_status, checked_input_objects)
2135        };
2136
2137        // Create a new executor for the simulation
2138        let executor = iota_execution::executor(
2139            protocol_config,
2140            true, // silent
2141            None,
2142        )
2143        .expect("Creating an executor should not fail here");
2144
2145        // Execute the simulation
2146        let (kind, signer, gas_coins) = transaction.execution_parts();
2147        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2148            self.get_backing_store().as_ref(),
2149            protocol_config,
2150            self.metrics.limits_metrics.clone(),
2151            false, // expensive_checks
2152            self.config.certificate_deny_config.certificate_deny_set(),
2153            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2154            epoch_store
2155                .epoch_start_config()
2156                .epoch_data()
2157                .epoch_start_timestamp(),
2158            checked_input_objects,
2159            gas_coins,
2160            gas_status,
2161            kind,
2162            signer,
2163            transaction.digest(),
2164            checks.disabled(),
2165        );
2166
2167        // In the case of a dev inspect, the execution_result could be filled with some
2168        // values. Else, execution_result is empty in the case of a dry run.
2169        Ok(SimulateTransactionResult {
2170            input_objects: inner_temp_store.input_objects,
2171            output_objects: inner_temp_store.written,
2172            events: effects.events_digest().map(|_| inner_temp_store.events),
2173            effects,
2174            execution_result,
2175            mock_gas_id,
2176        })
2177    }
2178
2179    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
2180    /// `VmChecks::DISABLED` instead.
2181    /// The object ID for gas can be any
2182    /// object ID, even for an uncreated object
2183    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2184    pub async fn dev_inspect_transaction_block(
2185        &self,
2186        sender: IotaAddress,
2187        transaction_kind: TransactionKind,
2188        gas_price: Option<u64>,
2189        gas_budget: Option<u64>,
2190        gas_sponsor: Option<IotaAddress>,
2191        gas_objects: Option<Vec<ObjectRef>>,
2192        show_raw_txn_data_and_effects: Option<bool>,
2193        skip_checks: Option<bool>,
2194    ) -> IotaResult<DevInspectResults> {
2195        let epoch_store = self.load_epoch_store_one_call_per_task();
2196
2197        if !self.is_fullnode(&epoch_store) {
2198            return Err(IotaError::UnsupportedFeature {
2199                error: "dev-inspect is only supported on fullnodes".to_string(),
2200            });
2201        }
2202
2203        if transaction_kind.is_system_tx() {
2204            return Err(IotaError::UnsupportedFeature {
2205                error: "system transactions are not supported".to_string(),
2206            });
2207        }
2208
2209        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2210        let skip_checks = skip_checks.unwrap_or(true);
2211        let reference_gas_price = epoch_store.reference_gas_price();
2212        let protocol_config = epoch_store.protocol_config();
2213        let max_tx_gas = protocol_config.max_tx_gas();
2214
2215        let price = gas_price.unwrap_or(reference_gas_price);
2216        let budget = gas_budget.unwrap_or(max_tx_gas);
2217        let owner = gas_sponsor.unwrap_or(sender);
2218        // Payment might be empty here, but it's fine we'll have to deal with it later
2219        // after reading all the input objects.
2220        let payment = gas_objects.unwrap_or_default();
2221        let transaction = TransactionData::V1(TransactionDataV1 {
2222            kind: transaction_kind.clone(),
2223            sender,
2224            gas_data: GasData {
2225                payment,
2226                owner,
2227                price,
2228                budget,
2229            },
2230            expiration: TransactionExpiration::None,
2231        });
2232
2233        let raw_txn_data = if show_raw_txn_data_and_effects {
2234            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2235                error: "Failed to serialize transaction during dev inspect".to_string(),
2236            })?
2237        } else {
2238            vec![]
2239        };
2240
2241        transaction.validity_check_no_gas_check(protocol_config)?;
2242
2243        let input_object_kinds = transaction.input_objects()?;
2244        let receiving_object_refs = transaction.receiving_objects();
2245
2246        iota_transaction_checks::deny::check_transaction_for_signing(
2247            &transaction,
2248            &[],
2249            &input_object_kinds,
2250            &receiving_object_refs,
2251            &self.config.transaction_deny_config,
2252            self.get_backing_package_store().as_ref(),
2253        )?;
2254
2255        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2256            // We don't want to cache this transaction since it's a dev inspect.
2257            None,
2258            &input_object_kinds,
2259            &receiving_object_refs,
2260            epoch_store.epoch(),
2261        )?;
2262
2263        // Create and use a dummy gas object if there is no gas object provided.
2264        let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2265            SIMULATION_GAS_COIN_VALUE,
2266            transaction.gas_owner(),
2267        );
2268
2269        let gas_objects = if transaction.gas().is_empty() {
2270            let gas_object_ref = dummy_gas_object.compute_object_reference();
2271            vec![gas_object_ref]
2272        } else {
2273            transaction.gas().to_vec()
2274        };
2275
2276        let (gas_status, checked_input_objects) = if skip_checks {
2277            // If we are skipping checks, then we call the check_dev_inspect_input function
2278            // which will perform only lightweight checks on the transaction
2279            // input. And if the gas field is empty, that means we will
2280            // use the dummy gas object so we need to add it to the input objects vector.
2281            if transaction.gas().is_empty() {
2282                input_objects.push(ObjectReadResult::new(
2283                    InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2284                    dummy_gas_object.into(),
2285                ));
2286            }
2287            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2288                protocol_config,
2289                &transaction_kind,
2290                input_objects,
2291                receiving_objects,
2292            )?;
2293            let gas_status = IotaGasStatus::new(
2294                max_tx_gas,
2295                transaction.gas_price(),
2296                reference_gas_price,
2297                protocol_config,
2298            )?;
2299
2300            (gas_status, checked_input_objects)
2301        } else {
2302            // If we are not skipping checks, then we call the check_transaction_input
2303            // function and its dummy gas variant which will perform full
2304            // fledged checks just like a real transaction execution.
2305            if transaction.gas().is_empty() {
2306                iota_transaction_checks::check_transaction_input_with_given_gas(
2307                    epoch_store.protocol_config(),
2308                    reference_gas_price,
2309                    &transaction,
2310                    input_objects,
2311                    receiving_objects,
2312                    dummy_gas_object,
2313                    &self.metrics.bytecode_verifier_metrics,
2314                    &self.config.verifier_signing_config,
2315                )?
2316            } else {
2317                // `MoveAuthenticator`s are not supported in dev inspects, so we set the
2318                // `authenticator_gas_budget` to 0.
2319                let authenticator_gas_budget = 0;
2320
2321                iota_transaction_checks::check_transaction_input(
2322                    epoch_store.protocol_config(),
2323                    reference_gas_price,
2324                    &transaction,
2325                    input_objects,
2326                    &receiving_objects,
2327                    &self.metrics.bytecode_verifier_metrics,
2328                    &self.config.verifier_signing_config,
2329                    authenticator_gas_budget,
2330                )?
2331            }
2332        };
2333
2334        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2335            .expect("Creating an executor should not fail here");
2336        let intent_msg = IntentMessage::new(
2337            Intent {
2338                version: IntentVersion::V0,
2339                scope: IntentScope::TransactionData,
2340                app_id: IntentAppId::Iota,
2341            },
2342            transaction,
2343        );
2344        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2345        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2346            self.get_backing_store().as_ref(),
2347            protocol_config,
2348            self.metrics.limits_metrics.clone(),
2349            // expensive checks
2350            false,
2351            self.config.certificate_deny_config.certificate_deny_set(),
2352            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2353            epoch_store
2354                .epoch_start_config()
2355                .epoch_data()
2356                .epoch_start_timestamp(),
2357            checked_input_objects,
2358            gas_objects,
2359            gas_status,
2360            transaction_kind,
2361            sender,
2362            transaction_digest,
2363            skip_checks,
2364        );
2365
2366        let raw_effects = if show_raw_txn_data_and_effects {
2367            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2368                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2369            })?
2370        } else {
2371            vec![]
2372        };
2373
2374        let mut layout_resolver =
2375            epoch_store
2376                .executor()
2377                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2378                    &inner_temp_store,
2379                    self.get_backing_package_store(),
2380                )));
2381
2382        DevInspectResults::new(
2383            effects,
2384            inner_temp_store.events.clone(),
2385            execution_result,
2386            raw_txn_data,
2387            raw_effects,
2388            layout_resolver.as_mut(),
2389        )
2390    }
2391
2392    // Only used for testing because of how epoch store is loaded.
2393    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2394        let epoch_store = self.epoch_store_for_testing();
2395        Ok(epoch_store.reference_gas_price())
2396    }
2397
2398    #[instrument(level = "trace", skip_all)]
2399    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2400        self.get_transaction_cache_reader()
2401            .try_is_tx_already_executed(digest)
2402    }
2403
2404    /// Non-fallible version of `try_is_tx_already_executed`.
2405    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2406        self.try_is_tx_already_executed(digest)
2407            .expect("storage access failed")
2408    }
2409
2410    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2411    #[instrument(level = "debug", skip_all, err)]
2412    fn index_tx(
2413        &self,
2414        indexes: &IndexStore,
2415        digest: &TransactionDigest,
2416        // TODO: index_tx really just need the transaction data here.
2417        cert: &VerifiedExecutableTransaction,
2418        effects: &TransactionEffects,
2419        events: &TransactionEvents,
2420        timestamp_ms: u64,
2421        tx_coins: Option<TxCoins>,
2422        written: &WrittenObjects,
2423        inner_temporary_store: &InnerTemporaryStore,
2424    ) -> IotaResult<u64> {
2425        let changes = self
2426            .process_object_index(effects, written, inner_temporary_store)
2427            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2428
2429        indexes.index_tx(
2430            cert.data().intent_message().value.sender(),
2431            cert.data()
2432                .intent_message()
2433                .value
2434                .input_objects()?
2435                .iter()
2436                .map(|o| o.object_id()),
2437            effects
2438                .all_changed_objects()
2439                .into_iter()
2440                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2441            cert.data()
2442                .intent_message()
2443                .value
2444                .move_calls()
2445                .into_iter()
2446                .map(|(package, module, function)| {
2447                    (*package, module.to_owned(), function.to_owned())
2448                }),
2449            events,
2450            changes,
2451            digest,
2452            timestamp_ms,
2453            tx_coins,
2454        )
2455    }
2456
2457    #[cfg(msim)]
2458    fn create_fail_state(
2459        &self,
2460        certificate: &VerifiedExecutableTransaction,
2461        epoch_store: &Arc<AuthorityPerEpochStore>,
2462        effects: &mut TransactionEffects,
2463    ) {
2464        use std::cell::RefCell;
2465        thread_local! {
2466            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2467        }
2468        if !certificate.data().intent_message().value.is_system_tx() {
2469            let committee = epoch_store.committee();
2470            let cur_stake = (**committee).weight(&self.name);
2471            if cur_stake > 0 {
2472                FAIL_STATE.with_borrow_mut(|fail_state| {
2473                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2474                    if fail_state.0 < committee.validity_threshold() {
2475                        fail_state.0 += cur_stake;
2476                        fail_state.1.insert(self.name);
2477                    }
2478
2479                    if fail_state.1.contains(&self.name) {
2480                        info!("cp_exec failing tx");
2481                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2482                    }
2483                });
2484            }
2485        }
2486    }
2487
2488    fn process_object_index(
2489        &self,
2490        effects: &TransactionEffects,
2491        written: &WrittenObjects,
2492        inner_temporary_store: &InnerTemporaryStore,
2493    ) -> IotaResult<ObjectIndexChanges> {
2494        let epoch_store = self.load_epoch_store_one_call_per_task();
2495        let mut layout_resolver =
2496            epoch_store
2497                .executor()
2498                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2499                    inner_temporary_store,
2500                    self.get_backing_package_store(),
2501                )));
2502
2503        let modified_at_version = effects
2504            .modified_at_versions()
2505            .into_iter()
2506            .collect::<HashMap<_, _>>();
2507
2508        let tx_digest = effects.transaction_digest();
2509        let mut deleted_owners = vec![];
2510        let mut deleted_dynamic_fields = vec![];
2511        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2512            let old_version = modified_at_version.get(&id).unwrap();
2513            // When we process the index, the latest object hasn't been written yet so
2514            // the old object must be present.
2515            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2516                |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2517            ) {
2518                Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2519                Owner::ObjectOwner(object_id) => {
2520                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2521                }
2522                _ => {}
2523            }
2524        }
2525
2526        let mut new_owners = vec![];
2527        let mut new_dynamic_fields = vec![];
2528
2529        for (oref, owner, kind) in effects.all_changed_objects() {
2530            let id = &oref.0;
2531            // For mutated objects, retrieve old owner and delete old index if there is a
2532            // owner change.
2533            if let WriteKind::Mutate = kind {
2534                let Some(old_version) = modified_at_version.get(id) else {
2535                    panic!(
2536                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2537                    );
2538                };
2539                // When we process the index, the latest object hasn't been written yet so
2540                // the old object must be present.
2541                let Some(old_object) = self
2542                    .get_object_store()
2543                    .try_get_object_by_key(id, *old_version)?
2544                else {
2545                    panic!(
2546                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2547                    );
2548                };
2549                if old_object.owner != owner {
2550                    match old_object.owner {
2551                        Owner::AddressOwner(addr) => {
2552                            deleted_owners.push((addr, *id));
2553                        }
2554                        Owner::ObjectOwner(object_id) => {
2555                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2556                        }
2557                        _ => {}
2558                    }
2559                }
2560            }
2561
2562            match owner {
2563                Owner::AddressOwner(addr) => {
2564                    // TODO: We can remove the object fetching after we added ObjectType to
2565                    // TransactionEffects
2566                    let new_object = written.get(id).unwrap_or_else(
2567                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2568                    );
2569                    assert_eq!(
2570                        new_object.version(),
2571                        oref.1,
2572                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2573                        tx_digest,
2574                        id,
2575                        new_object.version(),
2576                        oref.1
2577                    );
2578
2579                    let type_ = new_object
2580                        .type_()
2581                        .map(|type_| ObjectType::Struct(type_.clone()))
2582                        .unwrap_or(ObjectType::Package);
2583
2584                    new_owners.push((
2585                        (addr, *id),
2586                        ObjectInfo {
2587                            object_id: *id,
2588                            version: oref.1,
2589                            digest: oref.2,
2590                            type_,
2591                            owner,
2592                            previous_transaction: *effects.transaction_digest(),
2593                        },
2594                    ));
2595                }
2596                Owner::ObjectOwner(owner) => {
2597                    let new_object = written.get(id).unwrap_or_else(
2598                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2599                    );
2600                    assert_eq!(
2601                        new_object.version(),
2602                        oref.1,
2603                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2604                        tx_digest,
2605                        id,
2606                        new_object.version(),
2607                        oref.1
2608                    );
2609
2610                    let Some(df_info) = self
2611                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2612                        .unwrap_or_else(|e| {
2613                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2614                            None
2615                        }
2616                    )
2617                        else {
2618                            // Skip indexing for non dynamic field objects.
2619                            continue;
2620                        };
2621                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2622                }
2623                _ => {}
2624            }
2625        }
2626
2627        Ok(ObjectIndexChanges {
2628            deleted_owners,
2629            deleted_dynamic_fields,
2630            new_owners,
2631            new_dynamic_fields,
2632        })
2633    }
2634
2635    fn try_create_dynamic_field_info(
2636        &self,
2637        o: &Object,
2638        written: &WrittenObjects,
2639        resolver: &mut dyn LayoutResolver,
2640    ) -> IotaResult<Option<DynamicFieldInfo>> {
2641        // Skip if not a move object
2642        let Some(move_object) = o.data.try_as_move().cloned() else {
2643            return Ok(None);
2644        };
2645
2646        // We only index dynamic field objects
2647        if !move_object.type_().is_dynamic_field() {
2648            return Ok(None);
2649        }
2650
2651        let layout = resolver
2652            .get_annotated_layout(&move_object.type_().clone().into())?
2653            .into_layout();
2654
2655        let field =
2656            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2657                IotaError::ObjectDeserialization {
2658                    error: e.to_string(),
2659                }
2660            })?;
2661
2662        let type_ = field.kind;
2663        let name_type: TypeTag = field.name_layout.into();
2664        let bcs_name = field.name_bytes.to_owned();
2665
2666        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2667            .map_err(|e| {
2668                warn!("{e}");
2669                IotaError::ObjectDeserialization {
2670                    error: e.to_string(),
2671                }
2672            })?;
2673
2674        let name = DynamicFieldName {
2675            type_: name_type,
2676            value: IotaMoveValue::from(name_value).to_json_value(),
2677        };
2678
2679        let value_metadata = field.value_metadata().map_err(|e| {
2680            warn!("{e}");
2681            IotaError::ObjectDeserialization {
2682                error: e.to_string(),
2683            }
2684        })?;
2685
2686        Ok(Some(match value_metadata {
2687            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2688                name,
2689                bcs_name,
2690                type_,
2691                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2692                object_id: o.id(),
2693                version: o.version(),
2694                digest: o.digest(),
2695            },
2696
2697            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2698                // Find the actual object from storage using the object id obtained from the
2699                // wrapper.
2700
2701                // Try to find the object in the written objects first.
2702                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2703                    (
2704                        object.version(),
2705                        object.digest(),
2706                        object.data.type_().unwrap().clone(),
2707                    )
2708                } else {
2709                    // If not found, try to find it in the database.
2710                    let object = self
2711                        .get_object_store()
2712                        .try_get_object_by_key(&object_id, o.version())?
2713                        .ok_or_else(|| UserInputError::ObjectNotFound {
2714                            object_id,
2715                            version: Some(o.version()),
2716                        })?;
2717                    let version = object.version();
2718                    let digest = object.digest();
2719                    let object_type = object.data.type_().unwrap().clone();
2720                    (version, digest, object_type)
2721                };
2722
2723                DynamicFieldInfo {
2724                    name,
2725                    bcs_name,
2726                    type_,
2727                    object_type: object_type.to_string(),
2728                    object_id,
2729                    version,
2730                    digest,
2731                }
2732            }
2733        }))
2734    }
2735
2736    #[instrument(level = "trace", skip_all, err)]
2737    fn post_process_one_tx(
2738        &self,
2739        certificate: &VerifiedExecutableTransaction,
2740        effects: &TransactionEffects,
2741        inner_temporary_store: &InnerTemporaryStore,
2742        epoch_store: &Arc<AuthorityPerEpochStore>,
2743    ) -> IotaResult {
2744        if self.indexes.is_none() {
2745            return Ok(());
2746        }
2747
2748        let tx_digest = certificate.digest();
2749        let timestamp_ms = Self::unixtime_now_ms();
2750        let events = &inner_temporary_store.events;
2751        let written = &inner_temporary_store.written;
2752        let tx_coins =
2753            self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2754
2755        // Index tx
2756        if let Some(indexes) = &self.indexes {
2757            let _ = self
2758                .index_tx(
2759                    indexes.as_ref(),
2760                    tx_digest,
2761                    certificate,
2762                    effects,
2763                    events,
2764                    timestamp_ms,
2765                    tx_coins,
2766                    written,
2767                    inner_temporary_store,
2768                )
2769                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2770                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2771                .expect("Indexing tx should not fail");
2772
2773            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2774            let events = self.make_transaction_block_events(
2775                events.clone(),
2776                *tx_digest,
2777                timestamp_ms,
2778                epoch_store,
2779                inner_temporary_store,
2780            )?;
2781            // Emit events
2782            self.subscription_handler
2783                .process_tx(certificate.data().transaction_data(), &effects, &events)
2784                .tap_ok(|_| {
2785                    self.metrics
2786                        .post_processing_total_tx_had_event_processed
2787                        .inc()
2788                })
2789                .tap_err(|e| {
2790                    warn!(
2791                        ?tx_digest,
2792                        "Post processing - Couldn't process events for tx: {}", e
2793                    )
2794                })?;
2795
2796            self.metrics
2797                .post_processing_total_events_emitted
2798                .inc_by(events.data.len() as u64);
2799        };
2800        Ok(())
2801    }
2802
2803    fn make_transaction_block_events(
2804        &self,
2805        transaction_events: TransactionEvents,
2806        digest: TransactionDigest,
2807        timestamp_ms: u64,
2808        epoch_store: &Arc<AuthorityPerEpochStore>,
2809        inner_temporary_store: &InnerTemporaryStore,
2810    ) -> IotaResult<IotaTransactionBlockEvents> {
2811        let mut layout_resolver =
2812            epoch_store
2813                .executor()
2814                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2815                    inner_temporary_store,
2816                    self.get_backing_package_store(),
2817                )));
2818        IotaTransactionBlockEvents::try_from(
2819            transaction_events,
2820            digest,
2821            Some(timestamp_ms),
2822            layout_resolver.as_mut(),
2823        )
2824    }
2825
2826    pub fn unixtime_now_ms() -> u64 {
2827        let now = SystemTime::now()
2828            .duration_since(UNIX_EPOCH)
2829            .expect("Time went backwards")
2830            .as_millis();
2831        u64::try_from(now).expect("Travelling in time machine")
2832    }
2833
2834    #[instrument(level = "trace", skip_all)]
2835    pub async fn handle_transaction_info_request(
2836        &self,
2837        request: TransactionInfoRequest,
2838    ) -> IotaResult<TransactionInfoResponse> {
2839        let epoch_store = self.load_epoch_store_one_call_per_task();
2840        let (transaction, status) = self
2841            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2842            .ok_or(IotaError::TransactionNotFound {
2843                digest: request.transaction_digest,
2844            })?;
2845        Ok(TransactionInfoResponse {
2846            transaction,
2847            status,
2848        })
2849    }
2850
2851    #[instrument(level = "trace", skip_all)]
2852    pub async fn handle_object_info_request(
2853        &self,
2854        request: ObjectInfoRequest,
2855    ) -> IotaResult<ObjectInfoResponse> {
2856        let epoch_store = self.load_epoch_store_one_call_per_task();
2857
2858        let requested_object_seq = match request.request_kind {
2859            ObjectInfoRequestKind::LatestObjectInfo => {
2860                let (_, seq, _) = self
2861                    .try_get_object_or_tombstone(request.object_id)
2862                    .await?
2863                    .ok_or_else(|| {
2864                        IotaError::from(UserInputError::ObjectNotFound {
2865                            object_id: request.object_id,
2866                            version: None,
2867                        })
2868                    })?;
2869                seq
2870            }
2871            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2872        };
2873
2874        let object = self
2875            .get_object_store()
2876            .try_get_object_by_key(&request.object_id, requested_object_seq)?
2877            .ok_or_else(|| {
2878                IotaError::from(UserInputError::ObjectNotFound {
2879                    object_id: request.object_id,
2880                    version: Some(requested_object_seq),
2881                })
2882            })?;
2883
2884        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2885            (request.generate_layout, object.data.try_as_move())
2886        {
2887            Some(into_struct_layout(
2888                epoch_store
2889                    .executor()
2890                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2891                    .get_annotated_layout(&move_obj.type_().clone().into())?,
2892            )?)
2893        } else {
2894            None
2895        };
2896
2897        let lock = if !object.is_address_owned() {
2898            // Only address owned objects have locks.
2899            None
2900        } else {
2901            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2902                .await?
2903                .map(|s| s.into_inner())
2904        };
2905
2906        Ok(ObjectInfoResponse {
2907            object,
2908            layout,
2909            lock_for_debugging: lock,
2910        })
2911    }
2912
2913    #[instrument(level = "trace", skip_all)]
2914    pub fn handle_checkpoint_request(
2915        &self,
2916        request: &CheckpointRequest,
2917    ) -> IotaResult<CheckpointResponse> {
2918        let summary = if request.certified {
2919            let summary = match request.sequence_number {
2920                Some(seq) => self
2921                    .checkpoint_store
2922                    .get_checkpoint_by_sequence_number(seq)?,
2923                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2924            }
2925            .map(|v| v.into_inner());
2926            summary.map(CheckpointSummaryResponse::Certified)
2927        } else {
2928            let summary = match request.sequence_number {
2929                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2930                None => self
2931                    .checkpoint_store
2932                    .get_latest_locally_computed_checkpoint()?,
2933            };
2934            summary.map(CheckpointSummaryResponse::Pending)
2935        };
2936        let contents = match &summary {
2937            Some(s) => self
2938                .checkpoint_store
2939                .get_checkpoint_contents(&s.content_digest())?,
2940            None => None,
2941        };
2942        Ok(CheckpointResponse {
2943            checkpoint: summary,
2944            contents,
2945        })
2946    }
2947
2948    fn check_protocol_version(
2949        supported_protocol_versions: SupportedProtocolVersions,
2950        current_version: ProtocolVersion,
2951    ) {
2952        info!("current protocol version is now {:?}", current_version);
2953        info!("supported versions are: {:?}", supported_protocol_versions);
2954        if !supported_protocol_versions.is_version_supported(current_version) {
2955            let msg = format!(
2956                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2957            );
2958
2959            error!("{}", msg);
2960            eprintln!("{msg}");
2961
2962            #[cfg(not(msim))]
2963            std::process::exit(1);
2964
2965            #[cfg(msim)]
2966            iota_simulator::task::shutdown_current_node();
2967        }
2968    }
2969
2970    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
2971    pub async fn new(
2972        name: AuthorityName,
2973        secret: StableSyncAuthoritySigner,
2974        supported_protocol_versions: SupportedProtocolVersions,
2975        store: Arc<AuthorityStore>,
2976        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2977        epoch_store: Arc<AuthorityPerEpochStore>,
2978        committee_store: Arc<CommitteeStore>,
2979        indexes: Option<Arc<IndexStore>>,
2980        rest_index: Option<Arc<RestIndexStore>>,
2981        checkpoint_store: Arc<CheckpointStore>,
2982        prometheus_registry: &Registry,
2983        genesis_objects: &[Object],
2984        db_checkpoint_config: &DBCheckpointConfig,
2985        config: NodeConfig,
2986        archive_readers: ArchiveReaderBalancer,
2987        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2988        chain_identifier: ChainIdentifier,
2989        pruner_db: Option<Arc<AuthorityPrunerTables>>,
2990    ) -> Arc<Self> {
2991        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2992
2993        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2994        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2995        let transaction_manager = Arc::new(TransactionManager::new(
2996            execution_cache_trait_pointers.object_cache_reader.clone(),
2997            execution_cache_trait_pointers
2998                .transaction_cache_reader
2999                .clone(),
3000            &epoch_store,
3001            tx_ready_certificates,
3002            metrics.clone(),
3003        ));
3004        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3005
3006        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3007            epoch_store.get_parent_path(),
3008            &config.authority_store_pruning_config,
3009        );
3010        let _pruner = AuthorityStorePruner::new(
3011            store.perpetual_tables.clone(),
3012            checkpoint_store.clone(),
3013            rest_index.clone(),
3014            indexes.clone(),
3015            config.authority_store_pruning_config.clone(),
3016            epoch_store.committee().authority_exists(&name),
3017            epoch_store.epoch_start_state().epoch_duration_ms(),
3018            prometheus_registry,
3019            archive_readers,
3020            pruner_db,
3021        );
3022        let input_loader =
3023            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3024        let epoch = epoch_store.epoch();
3025        let rgp = epoch_store.reference_gas_price();
3026        let state = Arc::new(AuthorityState {
3027            name,
3028            secret,
3029            execution_lock: RwLock::new(epoch),
3030            epoch_store: ArcSwap::new(epoch_store.clone()),
3031            input_loader,
3032            execution_cache_trait_pointers,
3033            indexes,
3034            rest_index,
3035            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3036            checkpoint_store,
3037            committee_store,
3038            transaction_manager,
3039            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3040            metrics,
3041            _pruner,
3042            _authority_per_epoch_pruner,
3043            db_checkpoint_config: db_checkpoint_config.clone(),
3044            config,
3045            overload_info: AuthorityOverloadInfo::default(),
3046            validator_tx_finalizer,
3047            chain_identifier,
3048            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3049        });
3050
3051        // Start a task to execute ready certificates.
3052        let authority_state = Arc::downgrade(&state);
3053        spawn_monitored_task!(execution_process(
3054            authority_state,
3055            rx_ready_certificates,
3056            rx_execution_shutdown,
3057        ));
3058
3059        // TODO: This doesn't belong to the constructor of AuthorityState.
3060        state
3061            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3062            .expect("Error indexing genesis objects.");
3063
3064        state
3065    }
3066
3067    // TODO: Consolidate our traits to reduce the number of methods here.
3068    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3069        &self.execution_cache_trait_pointers.object_cache_reader
3070    }
3071
3072    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3073        &self.execution_cache_trait_pointers.transaction_cache_reader
3074    }
3075
3076    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3077        &self.execution_cache_trait_pointers.cache_writer
3078    }
3079
3080    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3081        &self.execution_cache_trait_pointers.backing_store
3082    }
3083
3084    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3085        &self.execution_cache_trait_pointers.backing_package_store
3086    }
3087
3088    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3089        &self.execution_cache_trait_pointers.object_store
3090    }
3091
3092    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3093        &self.execution_cache_trait_pointers.reconfig_api
3094    }
3095
3096    pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
3097        &self.execution_cache_trait_pointers.accumulator_store
3098    }
3099
3100    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3101        &self.execution_cache_trait_pointers.checkpoint_cache
3102    }
3103
3104    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3105        &self.execution_cache_trait_pointers.state_sync_store
3106    }
3107
3108    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3109        &self.execution_cache_trait_pointers.cache_commit
3110    }
3111
3112    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3113        self.execution_cache_trait_pointers
3114            .testing_api
3115            .database_for_testing()
3116    }
3117
3118    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3119        &self,
3120        config: NodeConfig,
3121        metrics: Arc<AuthorityStorePruningMetrics>,
3122    ) -> anyhow::Result<()> {
3123        let archive_readers =
3124            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3125        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3126            &self.database_for_testing().perpetual_tables,
3127            &self.checkpoint_store,
3128            self.rest_index.as_deref(),
3129            None,
3130            config.authority_store_pruning_config,
3131            metrics,
3132            archive_readers,
3133            EPOCH_DURATION_MS_FOR_TESTING,
3134        )
3135        .await
3136    }
3137
3138    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3139        &self.transaction_manager
3140    }
3141
3142    /// Adds transactions / certificates to transaction manager for ordered
3143    /// execution.
3144    pub fn enqueue_transactions_for_execution(
3145        &self,
3146        txns: Vec<VerifiedExecutableTransaction>,
3147        epoch_store: &Arc<AuthorityPerEpochStore>,
3148    ) {
3149        self.transaction_manager.enqueue(txns, epoch_store)
3150    }
3151
3152    /// Adds certificates to transaction manager for ordered execution.
3153    pub fn enqueue_certificates_for_execution(
3154        &self,
3155        certs: Vec<VerifiedCertificate>,
3156        epoch_store: &Arc<AuthorityPerEpochStore>,
3157    ) {
3158        self.transaction_manager
3159            .enqueue_certificates(certs, epoch_store)
3160    }
3161
3162    pub fn enqueue_with_expected_effects_digest(
3163        &self,
3164        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3165        epoch_store: &AuthorityPerEpochStore,
3166    ) {
3167        self.transaction_manager
3168            .enqueue_with_expected_effects_digest(certs, epoch_store)
3169    }
3170
3171    fn create_owner_index_if_empty(
3172        &self,
3173        genesis_objects: &[Object],
3174        epoch_store: &Arc<AuthorityPerEpochStore>,
3175    ) -> IotaResult {
3176        let Some(index_store) = &self.indexes else {
3177            return Ok(());
3178        };
3179        if !index_store.is_empty() {
3180            return Ok(());
3181        }
3182
3183        let mut new_owners = vec![];
3184        let mut new_dynamic_fields = vec![];
3185        let mut layout_resolver = epoch_store
3186            .executor()
3187            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3188        for o in genesis_objects.iter() {
3189            match o.owner {
3190                Owner::AddressOwner(addr) => new_owners.push((
3191                    (addr, o.id()),
3192                    ObjectInfo::new(&o.compute_object_reference(), o),
3193                )),
3194                Owner::ObjectOwner(object_id) => {
3195                    let id = o.id();
3196                    let info = match self.try_create_dynamic_field_info(
3197                        o,
3198                        &BTreeMap::new(),
3199                        layout_resolver.as_mut(),
3200                    ) {
3201                        Ok(Some(info)) => info,
3202                        Ok(None) => continue,
3203                        Err(IotaError::UserInput {
3204                            error:
3205                                UserInputError::ObjectNotFound {
3206                                    object_id: not_found_id,
3207                                    version,
3208                                },
3209                        }) => {
3210                            warn!(
3211                                ?not_found_id,
3212                                ?version,
3213                                object_owner=?object_id,
3214                                field=?id,
3215                                "Skipping dynamic field: referenced genesis object not found"
3216                            );
3217                            continue;
3218                        }
3219                        Err(e) => return Err(e),
3220                    };
3221                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3222                }
3223                _ => {}
3224            }
3225        }
3226
3227        index_store.insert_genesis_objects(ObjectIndexChanges {
3228            deleted_owners: vec![],
3229            deleted_dynamic_fields: vec![],
3230            new_owners,
3231            new_dynamic_fields,
3232        })
3233    }
3234
3235    /// Attempts to acquire execution lock for an executable transaction.
3236    /// Returns the lock if the transaction is matching current executed epoch
3237    /// Returns None otherwise
3238    pub fn execution_lock_for_executable_transaction(
3239        &self,
3240        transaction: &VerifiedExecutableTransaction,
3241    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3242        let lock = self
3243            .execution_lock
3244            .try_read()
3245            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3246        if *lock == transaction.auth_sig().epoch() {
3247            Ok(lock)
3248        } else {
3249            Err(IotaError::WrongEpoch {
3250                expected_epoch: *lock,
3251                actual_epoch: transaction.auth_sig().epoch(),
3252            })
3253        }
3254    }
3255
3256    /// Acquires the execution lock for the duration of a transaction signing
3257    /// request. This prevents reconfiguration from starting until we are
3258    /// finished handling the signing request. Otherwise, in-memory lock
3259    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3260    /// while we are attempting to acquire locks for the transaction.
3261    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3262        self.execution_lock
3263            .try_read()
3264            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3265    }
3266
3267    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3268        self.execution_lock.write().await
3269    }
3270
3271    #[instrument(level = "error", skip_all)]
3272    pub async fn reconfigure(
3273        &self,
3274        cur_epoch_store: &AuthorityPerEpochStore,
3275        supported_protocol_versions: SupportedProtocolVersions,
3276        new_committee: Committee,
3277        epoch_start_configuration: EpochStartConfiguration,
3278        accumulator: Arc<StateAccumulator>,
3279        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3280        epoch_supply_change: i64,
3281        epoch_last_checkpoint: CheckpointSequenceNumber,
3282    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3283        Self::check_protocol_version(
3284            supported_protocol_versions,
3285            epoch_start_configuration
3286                .epoch_start_state()
3287                .protocol_version(),
3288        );
3289        self.metrics.reset_on_reconfigure();
3290        self.committee_store.insert_new_committee(&new_committee)?;
3291
3292        // Wait until no transactions are being executed.
3293        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3294
3295        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3296        cur_epoch_store.epoch_terminated().await;
3297
3298        let highest_locally_built_checkpoint_seq = self
3299            .checkpoint_store
3300            .get_latest_locally_computed_checkpoint()?
3301            .map(|c| *c.sequence_number())
3302            .unwrap_or(0);
3303
3304        assert!(
3305            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3306            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3307        );
3308        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3309            // if we built the last checkpoint locally (as opposed to receiving it from a
3310            // peer), then all shared_version_assignments except the one for the
3311            // ChangeEpoch transaction should have been removed
3312            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3313            // Note that while 1 is the typical value, 0 is possible if the node restarts
3314            // after committing the last checkpoint but before reconfiguring.
3315            if num_shared_version_assignments > 1 {
3316                // If this happens in prod, we have a memory leak, but not a correctness issue.
3317                debug_fatal!(
3318                    "all shared_version_assignments should have been removed \
3319                    (num_shared_version_assignments: {num_shared_version_assignments})"
3320                );
3321            }
3322        }
3323
3324        // Safe to reconfigure now. No transactions are being executed,
3325        // and no epoch-specific tasks are running.
3326
3327        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3328        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3329        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3330            .await?;
3331        self.get_reconfig_api()
3332            .clear_state_end_of_epoch(&execution_lock);
3333        self.check_system_consistency(
3334            cur_epoch_store,
3335            accumulator,
3336            expensive_safety_check_config,
3337            epoch_supply_change,
3338        )?;
3339        self.get_reconfig_api()
3340            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3341        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3342            if self
3343                .db_checkpoint_config
3344                .perform_db_checkpoints_at_epoch_end
3345            {
3346                let checkpoint_indexes = self
3347                    .db_checkpoint_config
3348                    .perform_index_db_checkpoints_at_epoch_end
3349                    .unwrap_or(false);
3350                let current_epoch = cur_epoch_store.epoch();
3351                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3352                self.checkpoint_all_dbs(
3353                    &epoch_checkpoint_path,
3354                    cur_epoch_store,
3355                    checkpoint_indexes,
3356                )?;
3357            }
3358        }
3359
3360        self.get_reconfig_api()
3361            .reconfigure_cache(&epoch_start_configuration)
3362            .await;
3363
3364        let new_epoch = new_committee.epoch;
3365        let new_epoch_store = self
3366            .reopen_epoch_db(
3367                cur_epoch_store,
3368                new_committee,
3369                epoch_start_configuration,
3370                expensive_safety_check_config,
3371                epoch_last_checkpoint,
3372            )
3373            .await?;
3374        assert_eq!(new_epoch_store.epoch(), new_epoch);
3375        self.transaction_manager.reconfigure(new_epoch);
3376        *execution_lock = new_epoch;
3377        // drop execution_lock after epoch store was updated
3378        // see also assert in AuthorityState::process_certificate
3379        // on the epoch store and execution lock epoch match
3380        Ok(new_epoch_store)
3381    }
3382
3383    /// Advance the epoch store to the next epoch for testing only.
3384    /// This only manually sets all the places where we have the epoch number.
3385    /// It doesn't properly reconfigure the node, hence should be only used for
3386    /// testing.
3387    pub async fn reconfigure_for_testing(&self) {
3388        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3389        let epoch_store = self.epoch_store_for_testing().clone();
3390        let protocol_config = epoch_store.protocol_config().clone();
3391        // The current protocol config used in the epoch store may have been overridden
3392        // and diverged from the protocol config definitions. That override may
3393        // have now been dropped when the initial guard was dropped. We reapply
3394        // the override before creating the new epoch store, to make sure that
3395        // the new epoch store has the same protocol config as the current one.
3396        // Since this is for testing only, we mostly like to keep the protocol config
3397        // the same across epochs.
3398        let _guard =
3399            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3400        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3401            self.get_backing_package_store().clone(),
3402            self.get_object_store().clone(),
3403            &self.config.expensive_safety_check_config,
3404            self.checkpoint_store
3405                .get_epoch_last_checkpoint(epoch_store.epoch())
3406                .unwrap()
3407                .map(|c| *c.sequence_number())
3408                .unwrap_or_default(),
3409        );
3410        let new_epoch = new_epoch_store.epoch();
3411        self.transaction_manager.reconfigure(new_epoch);
3412        self.epoch_store.store(new_epoch_store);
3413        epoch_store.epoch_terminated().await;
3414        *execution_lock = new_epoch;
3415    }
3416
3417    #[instrument(level = "error", skip_all)]
3418    fn check_system_consistency(
3419        &self,
3420        cur_epoch_store: &AuthorityPerEpochStore,
3421        accumulator: Arc<StateAccumulator>,
3422        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3423        epoch_supply_change: i64,
3424    ) -> IotaResult<()> {
3425        info!(
3426            "Performing iota conservation consistency check for epoch {}",
3427            cur_epoch_store.epoch()
3428        );
3429
3430        if cfg!(debug_assertions) {
3431            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3432        }
3433
3434        self.get_reconfig_api()
3435            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3436
3437        // check for root state hash consistency with live object set
3438        if expensive_safety_check_config.enable_state_consistency_check() {
3439            info!(
3440                "Performing state consistency check for epoch {}",
3441                cur_epoch_store.epoch()
3442            );
3443            self.expensive_check_is_consistent_state(
3444                accumulator,
3445                cur_epoch_store,
3446                cfg!(debug_assertions), // panic in debug mode only
3447            );
3448        }
3449
3450        if expensive_safety_check_config.enable_secondary_index_checks() {
3451            if let Some(indexes) = self.indexes.clone() {
3452                verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3453                    .expect("secondary indexes are inconsistent");
3454            }
3455        }
3456
3457        Ok(())
3458    }
3459
3460    fn expensive_check_is_consistent_state(
3461        &self,
3462        accumulator: Arc<StateAccumulator>,
3463        cur_epoch_store: &AuthorityPerEpochStore,
3464        panic: bool,
3465    ) {
3466        let live_object_set_hash = accumulator.digest_live_object_set();
3467
3468        let root_state_hash: ECMHLiveObjectSetDigest = self
3469            .get_accumulator_store()
3470            .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3471            .expect("Retrieving root state hash cannot fail")
3472            .expect("Root state hash for epoch must exist")
3473            .1
3474            .digest()
3475            .into();
3476
3477        let is_inconsistent = root_state_hash != live_object_set_hash;
3478        if is_inconsistent {
3479            if panic {
3480                panic!(
3481                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3482                );
3483            } else {
3484                error!(
3485                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3486                    root_state_hash, live_object_set_hash
3487                );
3488            }
3489        } else {
3490            info!("State consistency check passed");
3491        }
3492
3493        if !panic {
3494            accumulator.set_inconsistent_state(is_inconsistent);
3495        }
3496    }
3497
3498    pub fn current_epoch_for_testing(&self) -> EpochId {
3499        self.epoch_store_for_testing().epoch()
3500    }
3501
3502    #[instrument(level = "error", skip_all)]
3503    pub fn checkpoint_all_dbs(
3504        &self,
3505        checkpoint_path: &Path,
3506        cur_epoch_store: &AuthorityPerEpochStore,
3507        checkpoint_indexes: bool,
3508    ) -> IotaResult {
3509        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3510        let current_epoch = cur_epoch_store.epoch();
3511
3512        if checkpoint_path.exists() {
3513            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3514            return Ok(());
3515        }
3516
3517        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3518        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3519
3520        if checkpoint_path_tmp.exists() {
3521            fs::remove_dir_all(&checkpoint_path_tmp)
3522                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3523        }
3524
3525        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3526        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3527
3528        // NOTE: Do not change the order of invoking these checkpoint calls
3529        // We want to snapshot checkpoint db first to not race with state sync
3530        self.checkpoint_store
3531            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3532
3533        self.get_reconfig_api()
3534            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3535
3536        self.committee_store
3537            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3538
3539        if checkpoint_indexes {
3540            if let Some(indexes) = self.indexes.as_ref() {
3541                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3542            }
3543            if let Some(rest_index) = self.rest_index.as_ref() {
3544                rest_index.checkpoint_db(&checkpoint_path_tmp.join("grpc_indexes"))?;
3545            }
3546        }
3547
3548        fs::rename(checkpoint_path_tmp, checkpoint_path)
3549            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3550        Ok(())
3551    }
3552
3553    /// Load the current epoch store. This can change during reconfiguration. To
3554    /// ensure that we never end up accessing different epoch stores in a
3555    /// single task, we need to make sure that this is called once per task.
3556    /// Each call needs to be carefully audited to ensure it is
3557    /// the case. This also means we should minimize the number of call-sites.
3558    /// Only call it when there is no way to obtain it from somewhere else.
3559    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3560        self.epoch_store.load()
3561    }
3562
3563    // Load the epoch store, should be used in tests only.
3564    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3565        self.load_epoch_store_one_call_per_task()
3566    }
3567
3568    pub fn clone_committee_for_testing(&self) -> Committee {
3569        Committee::clone(self.epoch_store_for_testing().committee())
3570    }
3571
3572    #[instrument(level = "trace", skip_all)]
3573    pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3574        self.get_object_store()
3575            .try_get_object(object_id)
3576            .map_err(Into::into)
3577    }
3578
3579    /// Non-fallible version of `try_get_object`.
3580    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3581        self.try_get_object(object_id)
3582            .await
3583            .expect("storage access failed")
3584    }
3585
3586    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3587        Ok(self
3588            .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3589            .await?
3590            .expect("framework object should always exist")
3591            .compute_object_reference())
3592    }
3593
3594    // This function is only used for testing.
3595    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3596        self.get_object_cache_reader()
3597            .try_get_iota_system_state_object_unsafe()
3598    }
3599
3600    #[instrument(level = "trace", skip_all)]
3601    pub fn get_checkpoint_by_sequence_number(
3602        &self,
3603        sequence_number: CheckpointSequenceNumber,
3604    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3605        Ok(self
3606            .checkpoint_store
3607            .get_checkpoint_by_sequence_number(sequence_number)?)
3608    }
3609
3610    #[instrument(level = "trace", skip_all)]
3611    pub fn get_transaction_checkpoint_for_tests(
3612        &self,
3613        digest: &TransactionDigest,
3614        epoch_store: &AuthorityPerEpochStore,
3615    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3616        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3617        let Some(checkpoint) = checkpoint else {
3618            return Ok(None);
3619        };
3620        let checkpoint = self
3621            .checkpoint_store
3622            .get_checkpoint_by_sequence_number(checkpoint)?;
3623        Ok(checkpoint)
3624    }
3625
3626    #[instrument(level = "trace", skip_all)]
3627    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3628        Ok(
3629            match self
3630                .get_object_cache_reader()
3631                .try_get_latest_object_or_tombstone(*object_id)?
3632            {
3633                Some((_, ObjectOrTombstone::Object(object))) => {
3634                    let layout = self.get_object_layout(&object)?;
3635                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3636                }
3637                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3638                None => ObjectRead::NotExists(*object_id),
3639            },
3640        )
3641    }
3642
3643    /// Chain Identifier is the digest of the genesis checkpoint.
3644    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3645        self.chain_identifier
3646    }
3647
3648    #[instrument(level = "trace", skip_all)]
3649    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3650    where
3651        T: DeserializeOwned,
3652    {
3653        let o = self.get_object_read(object_id)?.into_object()?;
3654        if let Some(move_object) = o.data.try_as_move() {
3655            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3656                IotaError::ObjectDeserialization {
3657                    error: format!("{e}"),
3658                }
3659            })?)
3660        } else {
3661            Err(IotaError::ObjectDeserialization {
3662                error: format!("Provided object : [{object_id}] is not a Move object."),
3663            })
3664        }
3665    }
3666
3667    /// This function aims to serve rpc reads on past objects and
3668    /// we don't expect it to be called for other purposes.
3669    /// Depending on the object pruning policies that will be enforced in the
3670    /// future there is no software-level guarantee/SLA to retrieve an object
3671    /// with an old version even if it exists/existed.
3672    #[instrument(level = "trace", skip_all)]
3673    pub fn get_past_object_read(
3674        &self,
3675        object_id: &ObjectID,
3676        version: SequenceNumber,
3677    ) -> IotaResult<PastObjectRead> {
3678        // Firstly we see if the object ever existed by getting its latest data
3679        let Some(obj_ref) = self
3680            .get_object_cache_reader()
3681            .try_get_latest_object_ref_or_tombstone(*object_id)?
3682        else {
3683            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3684        };
3685
3686        if version > obj_ref.1 {
3687            return Ok(PastObjectRead::VersionTooHigh {
3688                object_id: *object_id,
3689                asked_version: version,
3690                latest_version: obj_ref.1,
3691            });
3692        }
3693
3694        if version < obj_ref.1 {
3695            // Read past objects
3696            return Ok(match self.read_object_at_version(object_id, version)? {
3697                Some((object, layout)) => {
3698                    let obj_ref = object.compute_object_reference();
3699                    PastObjectRead::VersionFound(obj_ref, object, layout)
3700                }
3701
3702                None => PastObjectRead::VersionNotFound(*object_id, version),
3703            });
3704        }
3705
3706        if !obj_ref.2.is_alive() {
3707            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3708        }
3709
3710        match self.read_object_at_version(object_id, obj_ref.1)? {
3711            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3712            None => {
3713                error!(
3714                    "Object with in parent_entry is missing from object store, datastore is \
3715                     inconsistent",
3716                );
3717                Err(UserInputError::ObjectNotFound {
3718                    object_id: *object_id,
3719                    version: Some(obj_ref.1),
3720                }
3721                .into())
3722            }
3723        }
3724    }
3725
3726    #[instrument(level = "trace", skip_all)]
3727    fn read_object_at_version(
3728        &self,
3729        object_id: &ObjectID,
3730        version: SequenceNumber,
3731    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3732        let Some(object) = self
3733            .get_object_cache_reader()
3734            .try_get_object_by_key(object_id, version)?
3735        else {
3736            return Ok(None);
3737        };
3738
3739        let layout = self.get_object_layout(&object)?;
3740        Ok(Some((object, layout)))
3741    }
3742
3743    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3744        let layout = object
3745            .data
3746            .try_as_move()
3747            .map(|object| {
3748                into_struct_layout(
3749                    self.load_epoch_store_one_call_per_task()
3750                        .executor()
3751                        // TODO(cache) - must read through cache
3752                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3753                        .get_annotated_layout(&object.type_().clone().into())?,
3754                )
3755            })
3756            .transpose()?;
3757        Ok(layout)
3758    }
3759
3760    fn get_owner_at_version(
3761        &self,
3762        object_id: &ObjectID,
3763        version: SequenceNumber,
3764    ) -> IotaResult<Owner> {
3765        self.get_object_store()
3766            .try_get_object_by_key(object_id, version)?
3767            .ok_or_else(|| {
3768                IotaError::from(UserInputError::ObjectNotFound {
3769                    object_id: *object_id,
3770                    version: Some(version),
3771                })
3772            })
3773            .map(|o| o.owner)
3774    }
3775
3776    #[instrument(level = "trace", skip_all)]
3777    pub fn get_owner_objects(
3778        &self,
3779        owner: IotaAddress,
3780        // If `Some`, the query will start from the next item after the specified cursor
3781        cursor: Option<ObjectID>,
3782        limit: usize,
3783        filter: Option<IotaObjectDataFilter>,
3784    ) -> IotaResult<Vec<ObjectInfo>> {
3785        if let Some(indexes) = &self.indexes {
3786            indexes.get_owner_objects(owner, cursor, limit, filter)
3787        } else {
3788            Err(IotaError::IndexStoreNotAvailable)
3789        }
3790    }
3791
3792    #[instrument(level = "trace", skip_all)]
3793    pub fn get_owned_coins_iterator_with_cursor(
3794        &self,
3795        owner: IotaAddress,
3796        // If `Some`, the query will start from the next item after the specified cursor
3797        cursor: (String, ObjectID),
3798        limit: usize,
3799        one_coin_type_only: bool,
3800    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3801        if let Some(indexes) = &self.indexes {
3802            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3803        } else {
3804            Err(IotaError::IndexStoreNotAvailable)
3805        }
3806    }
3807
3808    #[instrument(level = "trace", skip_all)]
3809    pub fn get_owner_objects_iterator(
3810        &self,
3811        owner: IotaAddress,
3812        // If `Some`, the query will start from the next item after the specified cursor
3813        cursor: Option<ObjectID>,
3814        filter: Option<IotaObjectDataFilter>,
3815    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3816        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3817        if let Some(indexes) = &self.indexes {
3818            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3819        } else {
3820            Err(IotaError::IndexStoreNotAvailable)
3821        }
3822    }
3823
3824    #[instrument(level = "trace", skip_all)]
3825    pub async fn get_move_objects<T>(
3826        &self,
3827        owner: IotaAddress,
3828        type_: MoveObjectType,
3829    ) -> IotaResult<Vec<T>>
3830    where
3831        T: DeserializeOwned,
3832    {
3833        let object_ids = self
3834            .get_owner_objects_iterator(owner, None, None)?
3835            .filter(|o| match &o.type_ {
3836                ObjectType::Struct(s) => &type_ == s,
3837                ObjectType::Package => false,
3838            })
3839            .map(|info| ObjectKey(info.object_id, info.version))
3840            .collect::<Vec<_>>();
3841        let mut move_objects = vec![];
3842
3843        let objects = self
3844            .get_object_store()
3845            .try_multi_get_objects_by_key(&object_ids)?;
3846
3847        for (o, id) in objects.into_iter().zip(object_ids) {
3848            let object = o.ok_or_else(|| {
3849                IotaError::from(UserInputError::ObjectNotFound {
3850                    object_id: id.0,
3851                    version: Some(id.1),
3852                })
3853            })?;
3854            let move_object = object.data.try_as_move().ok_or_else(|| {
3855                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3856            })?;
3857            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3858                IotaError::ObjectDeserialization {
3859                    error: format!("{e}"),
3860                }
3861            })?);
3862        }
3863        Ok(move_objects)
3864    }
3865
3866    #[instrument(level = "trace", skip_all)]
3867    pub fn get_dynamic_fields(
3868        &self,
3869        owner: ObjectID,
3870        // If `Some`, the query will start from the next item after the specified cursor
3871        cursor: Option<ObjectID>,
3872        limit: usize,
3873    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3874        Ok(self
3875            .get_dynamic_fields_iterator(owner, cursor)?
3876            .take(limit)
3877            .collect::<Result<Vec<_>, _>>()?)
3878    }
3879
3880    fn get_dynamic_fields_iterator(
3881        &self,
3882        owner: ObjectID,
3883        // If `Some`, the query will start from the next item after the specified cursor
3884        cursor: Option<ObjectID>,
3885    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3886    {
3887        if let Some(indexes) = &self.indexes {
3888            indexes.get_dynamic_fields_iterator(owner, cursor)
3889        } else {
3890            Err(IotaError::IndexStoreNotAvailable)
3891        }
3892    }
3893
3894    #[instrument(level = "trace", skip_all)]
3895    pub fn get_dynamic_field_object_id(
3896        &self,
3897        owner: ObjectID,
3898        name_type: TypeTag,
3899        name_bcs_bytes: &[u8],
3900    ) -> IotaResult<Option<ObjectID>> {
3901        if let Some(indexes) = &self.indexes {
3902            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3903        } else {
3904            Err(IotaError::IndexStoreNotAvailable)
3905        }
3906    }
3907
3908    #[instrument(level = "trace", skip_all)]
3909    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3910        Ok(self.get_indexes()?.next_sequence_number())
3911    }
3912
3913    #[instrument(level = "trace", skip_all)]
3914    pub async fn get_executed_transaction_and_effects(
3915        &self,
3916        digest: TransactionDigest,
3917        kv_store: Arc<TransactionKeyValueStore>,
3918    ) -> IotaResult<(Transaction, TransactionEffects)> {
3919        let transaction = kv_store.get_tx(digest).await?;
3920        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3921        Ok((transaction, effects))
3922    }
3923
3924    #[instrument(level = "trace", skip_all)]
3925    pub fn multi_get_checkpoint_by_sequence_number(
3926        &self,
3927        sequence_numbers: &[CheckpointSequenceNumber],
3928    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3929        Ok(self
3930            .checkpoint_store
3931            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3932    }
3933
3934    #[instrument(level = "trace", skip_all)]
3935    pub fn get_transaction_events(
3936        &self,
3937        digest: &TransactionEventsDigest,
3938    ) -> IotaResult<TransactionEvents> {
3939        self.get_transaction_cache_reader()
3940            .try_get_events(digest)?
3941            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3942    }
3943
3944    pub fn get_transaction_input_objects(
3945        &self,
3946        effects: &TransactionEffects,
3947    ) -> anyhow::Result<Vec<Object>> {
3948        iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
3949            .map_err(Into::into)
3950    }
3951
3952    pub fn get_transaction_output_objects(
3953        &self,
3954        effects: &TransactionEffects,
3955    ) -> anyhow::Result<Vec<Object>> {
3956        iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
3957            .map_err(Into::into)
3958    }
3959
3960    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3961        match &self.indexes {
3962            Some(i) => Ok(i.clone()),
3963            None => Err(IotaError::UnsupportedFeature {
3964                error: "extended object indexing is not enabled on this server".into(),
3965            }),
3966        }
3967    }
3968
3969    pub async fn get_transactions_for_tests(
3970        self: &Arc<Self>,
3971        filter: Option<TransactionFilter>,
3972        cursor: Option<TransactionDigest>,
3973        limit: Option<usize>,
3974        reverse: bool,
3975    ) -> IotaResult<Vec<TransactionDigest>> {
3976        let metrics = KeyValueStoreMetrics::new_for_tests();
3977        let kv_store = Arc::new(TransactionKeyValueStore::new(
3978            "rocksdb",
3979            metrics,
3980            self.clone(),
3981        ));
3982        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3983            .await
3984    }
3985
3986    #[instrument(level = "trace", skip_all)]
3987    pub async fn get_transactions(
3988        &self,
3989        kv_store: &Arc<TransactionKeyValueStore>,
3990        filter: Option<TransactionFilter>,
3991        // If `Some`, the query will start from the next item after the specified cursor
3992        cursor: Option<TransactionDigest>,
3993        limit: Option<usize>,
3994        reverse: bool,
3995    ) -> IotaResult<Vec<TransactionDigest>> {
3996        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3997            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3998            let iter = checkpoint_contents.iter().map(|c| c.transaction);
3999            if reverse {
4000                let iter = iter
4001                    .rev()
4002                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4003                    .skip(usize::from(cursor.is_some()));
4004                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4005            } else {
4006                let iter = iter
4007                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4008                    .skip(usize::from(cursor.is_some()));
4009                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4010            }
4011        }
4012        self.get_indexes()?
4013            .get_transactions(filter, cursor, limit, reverse)
4014    }
4015
4016    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4017        &self.checkpoint_store
4018    }
4019
4020    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4021        self.get_checkpoint_store()
4022            .get_highest_executed_checkpoint_seq_number()?
4023            .ok_or(IotaError::UserInput {
4024                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4025            })
4026    }
4027
4028    #[cfg(msim)]
4029    pub fn get_highest_pruned_checkpoint_for_testing(
4030        &self,
4031    ) -> IotaResult<CheckpointSequenceNumber> {
4032        self.database_for_testing()
4033            .perpetual_tables
4034            .get_highest_pruned_checkpoint()
4035            .map(|c| c.unwrap_or(0))
4036            .map_err(Into::into)
4037    }
4038
4039    #[instrument(level = "trace", skip_all)]
4040    pub fn get_checkpoint_summary_by_sequence_number(
4041        &self,
4042        sequence_number: CheckpointSequenceNumber,
4043    ) -> IotaResult<CheckpointSummary> {
4044        let verified_checkpoint = self
4045            .get_checkpoint_store()
4046            .get_checkpoint_by_sequence_number(sequence_number)?;
4047        match verified_checkpoint {
4048            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4049            None => Err(IotaError::UserInput {
4050                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4051            }),
4052        }
4053    }
4054
4055    #[instrument(level = "trace", skip_all)]
4056    pub fn get_checkpoint_summary_by_digest(
4057        &self,
4058        digest: CheckpointDigest,
4059    ) -> IotaResult<CheckpointSummary> {
4060        let verified_checkpoint = self
4061            .get_checkpoint_store()
4062            .get_checkpoint_by_digest(&digest)?;
4063        match verified_checkpoint {
4064            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4065            None => Err(IotaError::UserInput {
4066                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4067            }),
4068        }
4069    }
4070
4071    #[instrument(level = "trace", skip_all)]
4072    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4073        if is_system_package(package_id) {
4074            return self.find_genesis_txn_digest();
4075        }
4076        Ok(self
4077            .get_object_read(&package_id)?
4078            .into_object()?
4079            .previous_transaction)
4080    }
4081
4082    #[instrument(level = "trace", skip_all)]
4083    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4084        let summary = self
4085            .get_verified_checkpoint_by_sequence_number(0)?
4086            .into_message();
4087        let content = self.get_checkpoint_contents(summary.content_digest)?;
4088        let genesis_transaction = content.enumerate_transactions(&summary).next();
4089        Ok(genesis_transaction
4090            .ok_or(IotaError::UserInput {
4091                error: UserInputError::GenesisTransactionNotFound,
4092            })?
4093            .1
4094            .transaction)
4095    }
4096
4097    #[instrument(level = "trace", skip_all)]
4098    pub fn get_verified_checkpoint_by_sequence_number(
4099        &self,
4100        sequence_number: CheckpointSequenceNumber,
4101    ) -> IotaResult<VerifiedCheckpoint> {
4102        let verified_checkpoint = self
4103            .get_checkpoint_store()
4104            .get_checkpoint_by_sequence_number(sequence_number)?;
4105        match verified_checkpoint {
4106            Some(verified_checkpoint) => Ok(verified_checkpoint),
4107            None => Err(IotaError::UserInput {
4108                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4109            }),
4110        }
4111    }
4112
4113    #[instrument(level = "trace", skip_all)]
4114    pub fn get_verified_checkpoint_summary_by_digest(
4115        &self,
4116        digest: CheckpointDigest,
4117    ) -> IotaResult<VerifiedCheckpoint> {
4118        let verified_checkpoint = self
4119            .get_checkpoint_store()
4120            .get_checkpoint_by_digest(&digest)?;
4121        match verified_checkpoint {
4122            Some(verified_checkpoint) => Ok(verified_checkpoint),
4123            None => Err(IotaError::UserInput {
4124                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4125            }),
4126        }
4127    }
4128
4129    #[instrument(level = "trace", skip_all)]
4130    pub fn get_checkpoint_contents(
4131        &self,
4132        digest: CheckpointContentsDigest,
4133    ) -> IotaResult<CheckpointContents> {
4134        self.get_checkpoint_store()
4135            .get_checkpoint_contents(&digest)?
4136            .ok_or(IotaError::UserInput {
4137                error: UserInputError::CheckpointContentsNotFound(digest),
4138            })
4139    }
4140
4141    #[instrument(level = "trace", skip_all)]
4142    pub fn get_checkpoint_contents_by_sequence_number(
4143        &self,
4144        sequence_number: CheckpointSequenceNumber,
4145    ) -> IotaResult<CheckpointContents> {
4146        let verified_checkpoint = self
4147            .get_checkpoint_store()
4148            .get_checkpoint_by_sequence_number(sequence_number)?;
4149        match verified_checkpoint {
4150            Some(verified_checkpoint) => {
4151                let content_digest = verified_checkpoint.into_inner().content_digest;
4152                self.get_checkpoint_contents(content_digest)
4153            }
4154            None => Err(IotaError::UserInput {
4155                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4156            }),
4157        }
4158    }
4159
4160    #[instrument(level = "trace", skip_all)]
4161    pub async fn query_events(
4162        &self,
4163        kv_store: &Arc<TransactionKeyValueStore>,
4164        query: EventFilter,
4165        // If `Some`, the query will start from the next item after the specified cursor
4166        cursor: Option<EventID>,
4167        limit: usize,
4168        descending: bool,
4169    ) -> IotaResult<Vec<IotaEvent>> {
4170        let index_store = self.get_indexes()?;
4171
4172        // Get the tx_num from tx_digest
4173        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4174            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4175                IotaError::TransactionNotFound {
4176                    digest: cursor.tx_digest,
4177                },
4178            )?;
4179            (tx_seq, cursor.event_seq as usize)
4180        } else if descending {
4181            (u64::MAX, usize::MAX)
4182        } else {
4183            (0, 0)
4184        };
4185
4186        let limit = limit + 1;
4187        let mut event_keys = match query {
4188            EventFilter::All(filters) => {
4189                if filters.is_empty() {
4190                    index_store.all_events(tx_num, event_num, limit, descending)?
4191                } else {
4192                    return Err(IotaError::UserInput {
4193                        error: UserInputError::Unsupported(
4194                            "This query type does not currently support filter combinations"
4195                                .to_string(),
4196                        ),
4197                    });
4198                }
4199            }
4200            EventFilter::Transaction(digest) => {
4201                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4202            }
4203            EventFilter::MoveModule { package, module } => {
4204                let module_id = ModuleId::new(package.into(), module);
4205                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4206            }
4207            EventFilter::MoveEventType(struct_name) => index_store
4208                .events_by_move_event_struct_name(
4209                    &struct_name,
4210                    tx_num,
4211                    event_num,
4212                    limit,
4213                    descending,
4214                )?,
4215            EventFilter::Sender(sender) => {
4216                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4217            }
4218            EventFilter::TimeRange {
4219                start_time,
4220                end_time,
4221            } => index_store
4222                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4223            EventFilter::MoveEventModule { package, module } => index_store
4224                .events_by_move_event_module(
4225                    &ModuleId::new(package.into(), module),
4226                    tx_num,
4227                    event_num,
4228                    limit,
4229                    descending,
4230                )?,
4231            // not using "_ =>" because we want to make sure we remember to add new variants here
4232            EventFilter::Package(_)
4233            | EventFilter::MoveEventField { .. }
4234            | EventFilter::Any(_)
4235            | EventFilter::And(_, _)
4236            | EventFilter::Or(_, _) => {
4237                return Err(IotaError::UserInput {
4238                    error: UserInputError::Unsupported(
4239                        "This query type is not supported by the full node.".to_string(),
4240                    ),
4241                });
4242            }
4243        };
4244
4245        // skip one event if exclusive cursor is provided,
4246        // otherwise truncate to the original limit.
4247        if cursor.is_some() {
4248            if !event_keys.is_empty() {
4249                event_keys.remove(0);
4250            }
4251        } else {
4252            event_keys.truncate(limit - 1);
4253        }
4254
4255        // get the unique set of digests from the event_keys
4256        let transaction_digests = event_keys
4257            .iter()
4258            .map(|(_, digest, _, _)| *digest)
4259            .collect::<HashSet<_>>()
4260            .into_iter()
4261            .collect::<Vec<_>>();
4262
4263        let events = kv_store
4264            .multi_get_events_by_tx_digests(&transaction_digests)
4265            .await?;
4266
4267        let events_map: HashMap<_, _> =
4268            transaction_digests.iter().zip(events.into_iter()).collect();
4269
4270        let stored_events = event_keys
4271            .into_iter()
4272            .map(|k| {
4273                (
4274                    k,
4275                    events_map
4276                        .get(&k.1)
4277                        .expect("fetched digest is missing")
4278                        .clone()
4279                        .and_then(|e| e.data.get(k.2).cloned()),
4280                )
4281            })
4282            .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4283                event
4284                    .map(|e| (e, tx_digest, event_seq, timestamp))
4285                    .ok_or(IotaError::TransactionEventsNotFound { digest })
4286            })
4287            .collect::<Result<Vec<_>, _>>()?;
4288
4289        let epoch_store = self.load_epoch_store_one_call_per_task();
4290        let backing_store = self.get_backing_package_store().as_ref();
4291        let mut layout_resolver = epoch_store
4292            .executor()
4293            .type_layout_resolver(Box::new(backing_store));
4294        let mut events = vec![];
4295        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4296            events.push(IotaEvent::try_from(
4297                e.clone(),
4298                tx_digest,
4299                event_seq as u64,
4300                Some(timestamp),
4301                layout_resolver.get_annotated_layout(&e.type_)?,
4302            )?)
4303        }
4304        Ok(events)
4305    }
4306
4307    pub async fn insert_genesis_object(&self, object: Object) {
4308        self.get_reconfig_api()
4309            .try_insert_genesis_object(object)
4310            .expect("Cannot insert genesis object")
4311    }
4312
4313    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4314        futures::future::join_all(
4315            objects
4316                .iter()
4317                .map(|o| self.insert_genesis_object(o.clone())),
4318        )
4319        .await;
4320    }
4321
4322    /// Make a status response for a transaction
4323    #[instrument(level = "trace", skip_all)]
4324    pub fn get_transaction_status(
4325        &self,
4326        transaction_digest: &TransactionDigest,
4327        epoch_store: &Arc<AuthorityPerEpochStore>,
4328    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4329        // TODO: In the case of read path, we should not have to re-sign the effects.
4330        if let Some(effects) =
4331            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4332        {
4333            if let Some(transaction) = self
4334                .get_transaction_cache_reader()
4335                .try_get_transaction_block(transaction_digest)?
4336            {
4337                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4338                let events = if let Some(digest) = effects.events_digest() {
4339                    self.get_transaction_events(digest)?
4340                } else {
4341                    TransactionEvents::default()
4342                };
4343                return Ok(Some((
4344                    (*transaction).clone().into_message(),
4345                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4346                )));
4347            } else {
4348                // The read of effects and read of transaction are not atomic. It's possible
4349                // that we reverted the transaction (during epoch change) in
4350                // between the above two reads, and we end up having effects but
4351                // not transaction. In this case, we just fall through.
4352                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4353            }
4354        }
4355        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4356            self.metrics.tx_already_processed.inc();
4357            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4358            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4359        } else {
4360            Ok(None)
4361        }
4362    }
4363
4364    /// Get the signed effects of the given transaction. If the effects was
4365    /// signed in a previous epoch, re-sign it so that the caller is able to
4366    /// form a cert of the effects in the current epoch.
4367    #[instrument(level = "trace", skip_all)]
4368    pub fn get_signed_effects_and_maybe_resign(
4369        &self,
4370        transaction_digest: &TransactionDigest,
4371        epoch_store: &Arc<AuthorityPerEpochStore>,
4372    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4373        let effects = self
4374            .get_transaction_cache_reader()
4375            .try_get_executed_effects(transaction_digest)?;
4376        match effects {
4377            Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4378            None => Ok(None),
4379        }
4380    }
4381
4382    #[instrument(level = "trace", skip_all)]
4383    pub(crate) fn sign_effects(
4384        &self,
4385        effects: TransactionEffects,
4386        epoch_store: &Arc<AuthorityPerEpochStore>,
4387    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4388        let tx_digest = *effects.transaction_digest();
4389        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4390            Some(sig) if sig.epoch == epoch_store.epoch() => {
4391                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4392            }
4393            _ => {
4394                // If the transaction was executed in previous epochs, the validator will
4395                // re-sign the effects with new current epoch so that a client is always able to
4396                // obtain an effects certificate at the current epoch.
4397                //
4398                // Why is this necessary? Consider the following case:
4399                // - assume there are 4 validators
4400                // - Quorum driver gets 2 signed effects before reconfig halt
4401                // - The tx makes it into final checkpoint.
4402                // - 2 validators go away and are replaced in the new epoch.
4403                // - The new epoch begins.
4404                // - The quorum driver cannot complete the partial effects cert from the
4405                //   previous epoch, because it may not be able to reach either of the 2 former
4406                //   validators.
4407                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4408                //   the new epoch, the QD can make a new effects cert and return it to the
4409                //   client.
4410                //
4411                // This is a considered a short-term workaround. Eventually, Quorum Driver
4412                // should be able to return either an effects certificate, -or-
4413                // a proof of inclusion in a checkpoint. In the case above, the
4414                // Quorum Driver would return a proof of inclusion in the final
4415                // checkpoint, and this code would no longer be necessary.
4416                debug!(
4417                    ?tx_digest,
4418                    epoch=?epoch_store.epoch(),
4419                    "Re-signing the effects with the current epoch"
4420                );
4421
4422                let sig = AuthoritySignInfo::new(
4423                    epoch_store.epoch(),
4424                    &effects,
4425                    Intent::iota_app(IntentScope::TransactionEffects),
4426                    self.name,
4427                    &*self.secret,
4428                );
4429
4430                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4431
4432                epoch_store.insert_effects_digest_and_signature(
4433                    &tx_digest,
4434                    effects.digest(),
4435                    &sig,
4436                )?;
4437
4438                effects
4439            }
4440        };
4441
4442        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4443            signed_effects,
4444        ))
4445    }
4446
4447    // Returns coin objects for indexing for fullnode if indexing is enabled.
4448    #[instrument(level = "trace", skip_all)]
4449    fn fullnode_only_get_tx_coins_for_indexing(
4450        &self,
4451        inner_temporary_store: &InnerTemporaryStore,
4452        epoch_store: &Arc<AuthorityPerEpochStore>,
4453    ) -> Option<TxCoins> {
4454        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4455            return None;
4456        }
4457        let written_coin_objects = inner_temporary_store
4458            .written
4459            .iter()
4460            .filter_map(|(k, v)| {
4461                if v.is_coin() {
4462                    Some((*k, v.clone()))
4463                } else {
4464                    None
4465                }
4466            })
4467            .collect();
4468        let input_coin_objects = inner_temporary_store
4469            .input_objects
4470            .iter()
4471            .filter_map(|(k, v)| {
4472                if v.is_coin() {
4473                    Some((*k, v.clone()))
4474                } else {
4475                    None
4476                }
4477            })
4478            .collect::<ObjectMap>();
4479        Some((input_coin_objects, written_coin_objects))
4480    }
4481
4482    /// Get the TransactionEnvelope that currently locks the given object, if
4483    /// any. Since object locks are only valid for one epoch, we also need
4484    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4485    /// no lock records for the given object can be found.
4486    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4487    /// object record is at a different version.
4488    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4489    /// certain transaction. Returns None if the a lock record is
4490    /// initialized for the given ObjectRef but not yet locked by any
4491    /// transaction,     or cannot find the transaction in transaction
4492    /// table, because of data race etc.
4493    #[instrument(level = "trace", skip_all)]
4494    pub async fn get_transaction_lock(
4495        &self,
4496        object_ref: &ObjectRef,
4497        epoch_store: &AuthorityPerEpochStore,
4498    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4499        let lock_info = self
4500            .get_object_cache_reader()
4501            .try_get_lock(*object_ref, epoch_store)?;
4502        let lock_info = match lock_info {
4503            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4504                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4505                    provided_obj_ref: *object_ref,
4506                    current_version: locked_ref.1,
4507                }
4508                .into());
4509            }
4510            ObjectLockStatus::Initialized => {
4511                return Ok(None);
4512            }
4513            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4514        };
4515
4516        epoch_store.get_signed_transaction(&lock_info)
4517    }
4518
4519    pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4520        self.get_object_cache_reader().try_get_objects(objects)
4521    }
4522
4523    /// Non-fallible version of `try_get_objects`.
4524    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4525        self.try_get_objects(objects)
4526            .await
4527            .expect("storage access failed")
4528    }
4529
4530    pub async fn try_get_object_or_tombstone(
4531        &self,
4532        object_id: ObjectID,
4533    ) -> IotaResult<Option<ObjectRef>> {
4534        self.get_object_cache_reader()
4535            .try_get_latest_object_ref_or_tombstone(object_id)
4536    }
4537
4538    /// Non-fallible version of `try_get_object_or_tombstone`.
4539    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4540        self.try_get_object_or_tombstone(object_id)
4541            .await
4542            .expect("storage access failed")
4543    }
4544
4545    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4546    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4547    /// upgrade.
4548    ///
4549    /// This method can be used to dynamic adjust the amount of buffer. If set
4550    /// to 0, the upgrade will go through with only 2f+1 votes.
4551    ///
4552    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4553    /// should have the same value), or you risk halting the chain.
4554    pub fn set_override_protocol_upgrade_buffer_stake(
4555        &self,
4556        expected_epoch: EpochId,
4557        buffer_stake_bps: u64,
4558    ) -> IotaResult {
4559        let epoch_store = self.load_epoch_store_one_call_per_task();
4560        let actual_epoch = epoch_store.epoch();
4561        if actual_epoch != expected_epoch {
4562            return Err(IotaError::WrongEpoch {
4563                expected_epoch,
4564                actual_epoch,
4565            });
4566        }
4567
4568        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4569    }
4570
4571    pub fn clear_override_protocol_upgrade_buffer_stake(
4572        &self,
4573        expected_epoch: EpochId,
4574    ) -> IotaResult {
4575        let epoch_store = self.load_epoch_store_one_call_per_task();
4576        let actual_epoch = epoch_store.epoch();
4577        if actual_epoch != expected_epoch {
4578            return Err(IotaError::WrongEpoch {
4579                expected_epoch,
4580                actual_epoch,
4581            });
4582        }
4583
4584        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4585    }
4586
4587    /// Get the set of system packages that are compiled in to this build, if
4588    /// those packages are compatible with the current versions of those
4589    /// packages on-chain.
4590    pub async fn get_available_system_packages(
4591        &self,
4592        binary_config: &BinaryConfig,
4593    ) -> Vec<ObjectRef> {
4594        let mut results = vec![];
4595
4596        let system_packages = BuiltInFramework::iter_system_packages();
4597
4598        // Add extra framework packages during simtest
4599        #[cfg(msim)]
4600        let extra_packages = framework_injection::get_extra_packages(self.name);
4601        #[cfg(msim)]
4602        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4603
4604        for system_package in system_packages {
4605            let modules = system_package.modules().to_vec();
4606            // In simtests, we could override the current built-in framework packages.
4607            #[cfg(msim)]
4608            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4609                .unwrap_or(modules);
4610
4611            let Some(obj_ref) = iota_framework::compare_system_package(
4612                &self.get_object_store(),
4613                &system_package.id,
4614                &modules,
4615                system_package.dependencies.to_vec(),
4616                binary_config,
4617            )
4618            .await
4619            else {
4620                return vec![];
4621            };
4622            results.push(obj_ref);
4623        }
4624
4625        results
4626    }
4627
4628    /// Return the new versions, module bytes, and dependencies for the packages
4629    /// that have been committed to for a framework upgrade, in
4630    /// `system_packages`.  Loads the module contents from the binary, and
4631    /// performs the following checks:
4632    ///
4633    /// - Whether its contents matches what is on-chain already, in which case
4634    ///   no upgrade is required, and its contents are omitted from the output.
4635    /// - Whether the contents in the binary can form a package whose digest
4636    ///   matches the input, meaning the framework will be upgraded, and this
4637    ///   authority can satisfy that upgrade, in which case the contents are
4638    ///   included in the output.
4639    ///
4640    /// If the current version of the framework can't be loaded, the binary does
4641    /// not contain the bytes for that framework ID, or the resulting
4642    /// package fails the digest check, `None` is returned indicating that
4643    /// this authority cannot run the upgrade that the network voted on.
4644    async fn get_system_package_bytes(
4645        &self,
4646        system_packages: Vec<ObjectRef>,
4647        binary_config: &BinaryConfig,
4648    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4649        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4650        let objects = self.get_objects(&ids).await;
4651
4652        let mut res = Vec::with_capacity(system_packages.len());
4653        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4654            let prev_transaction = match object {
4655                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4656                    // Skip this one because it doesn't need to be upgraded.
4657                    info!("Framework {} does not need updating", system_package_ref.0);
4658                    continue;
4659                }
4660
4661                Some(cur_object) => cur_object.previous_transaction,
4662                None => TransactionDigest::genesis_marker(),
4663            };
4664
4665            #[cfg(msim)]
4666            let SystemPackage {
4667                id: _,
4668                bytes,
4669                dependencies,
4670            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4671                .unwrap_or_else(|| {
4672                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4673                });
4674
4675            #[cfg(not(msim))]
4676            let SystemPackage {
4677                id: _,
4678                bytes,
4679                dependencies,
4680            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4681
4682            let modules: Vec<_> = bytes
4683                .iter()
4684                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4685                .collect();
4686
4687            let new_object = Object::new_system_package(
4688                &modules,
4689                system_package_ref.1,
4690                dependencies.clone(),
4691                prev_transaction,
4692            );
4693
4694            let new_ref = new_object.compute_object_reference();
4695            if new_ref != system_package_ref {
4696                error!(
4697                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4698                );
4699                return None;
4700            }
4701
4702            res.push((system_package_ref.1, bytes, dependencies));
4703        }
4704
4705        Some(res)
4706    }
4707
4708    /// Returns the new protocol version and system packages that the network
4709    /// has voted to upgrade to. If the proposed protocol version is not
4710    /// supported, None is returned.
4711    fn is_protocol_version_supported_v1(
4712        proposed_protocol_version: ProtocolVersion,
4713        committee: &Committee,
4714        capabilities: Vec<AuthorityCapabilitiesV1>,
4715        mut buffer_stake_bps: u64,
4716    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4717        if buffer_stake_bps > 10000 {
4718            warn!("clamping buffer_stake_bps to 10000");
4719            buffer_stake_bps = 10000;
4720        }
4721
4722        // For each validator, gather the protocol version and system packages that it
4723        // would like to upgrade to in the next epoch.
4724        let mut desired_upgrades: Vec<_> = capabilities
4725            .into_iter()
4726            .filter_map(|mut cap| {
4727                // A validator that lists no packages is voting against any change at all.
4728                if cap.available_system_packages.is_empty() {
4729                    return None;
4730                }
4731
4732                cap.available_system_packages.sort();
4733
4734                info!(
4735                    "validator {:?} supports {:?} with system packages: {:?}",
4736                    cap.authority.concise(),
4737                    cap.supported_protocol_versions,
4738                    cap.available_system_packages,
4739                );
4740
4741                // A validator that only supports the current protocol version is also voting
4742                // against any change, because framework upgrades always require a protocol
4743                // version bump.
4744                cap.supported_protocol_versions
4745                    .get_version_digest(proposed_protocol_version)
4746                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4747            })
4748            .collect();
4749
4750        // There can only be one set of votes that have a majority, find one if it
4751        // exists.
4752        desired_upgrades.sort();
4753        desired_upgrades
4754            .into_iter()
4755            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4756            .into_iter()
4757            .find_map(|((digest, packages), group)| {
4758                // should have been filtered out earlier.
4759                assert!(!packages.is_empty());
4760
4761                let mut stake_aggregator: StakeAggregator<(), true> =
4762                    StakeAggregator::new(Arc::new(committee.clone()));
4763
4764                for (_, _, authority) in group {
4765                    stake_aggregator.insert_generic(authority, ());
4766                }
4767
4768                let total_votes = stake_aggregator.total_votes();
4769                let quorum_threshold = committee.quorum_threshold();
4770                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4771
4772                info!(
4773                    protocol_config_digest = ?digest,
4774                    ?total_votes,
4775                    ?quorum_threshold,
4776                    ?buffer_stake_bps,
4777                    ?effective_threshold,
4778                    ?proposed_protocol_version,
4779                    ?packages,
4780                    "support for upgrade"
4781                );
4782
4783                let has_support = total_votes >= effective_threshold;
4784                has_support.then_some((proposed_protocol_version, digest, packages))
4785            })
4786    }
4787
4788    /// Selects the highest supported protocol version and system packages that
4789    /// the network has voted to upgrade to. If no upgrade is supported,
4790    /// returns the current protocol version and system packages.
4791    fn choose_protocol_version_and_system_packages_v1(
4792        current_protocol_version: ProtocolVersion,
4793        current_protocol_digest: Digest,
4794        committee: &Committee,
4795        capabilities: Vec<AuthorityCapabilitiesV1>,
4796        buffer_stake_bps: u64,
4797    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4798        let mut next_protocol_version = current_protocol_version;
4799        let mut system_packages = vec![];
4800        let mut protocol_version_digest = current_protocol_digest;
4801
4802        // Finds the highest supported protocol version and system packages by
4803        // incrementing the proposed protocol version by one until no further
4804        // upgrades are supported.
4805        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4806            next_protocol_version + 1,
4807            committee,
4808            capabilities.clone(),
4809            buffer_stake_bps,
4810        ) {
4811            next_protocol_version = version;
4812            protocol_version_digest = digest;
4813            system_packages = packages;
4814        }
4815
4816        (
4817            next_protocol_version,
4818            protocol_version_digest,
4819            system_packages,
4820        )
4821    }
4822
4823    /// Returns the indices of validators that support the given protocol
4824    /// version and digest. This includes both committee and non-committee
4825    /// validators based on their capabilities. Uses active validators
4826    /// instead of committee indices.
4827    fn get_validators_supporting_protocol_version(
4828        target_protocol_version: ProtocolVersion,
4829        target_digest: Digest,
4830        active_validators: &[AuthorityPublicKey],
4831        capabilities: &[AuthorityCapabilitiesV1],
4832    ) -> Vec<u64> {
4833        let mut eligible_validators = Vec::new();
4834
4835        for capability in capabilities {
4836            // Check if this validator supports the target protocol version and digest
4837            if let Some(digest) = capability
4838                .supported_protocol_versions
4839                .get_version_digest(target_protocol_version)
4840            {
4841                if digest == target_digest {
4842                    // Find the validator's index in the active validators list
4843                    if let Some(index) = active_validators
4844                        .iter()
4845                        .position(|name| AuthorityName::from(name) == capability.authority)
4846                    {
4847                        eligible_validators.push(index as u64);
4848                    }
4849                }
4850            }
4851        }
4852
4853        // Sort indices for deterministic behavior
4854        eligible_validators.sort();
4855        eligible_validators
4856    }
4857
4858    /// Calculates the sum of weights for eligible validators that are part of
4859    /// the committee. Takes the indices from
4860    /// get_validators_supporting_protocol_version and maps them back
4861    /// to committee members to get their weights.
4862    fn calculate_eligible_validators_weight(
4863        eligible_validator_indices: &[u64],
4864        active_validators: &[AuthorityPublicKey],
4865        committee: &Committee,
4866    ) -> u64 {
4867        let mut total_weight = 0u64;
4868
4869        for &index in eligible_validator_indices {
4870            let authority_pubkey = &active_validators[index as usize];
4871            // Check if this validator is in the committee and get their weight
4872            if let Some((_, weight)) = committee
4873                .members()
4874                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4875            {
4876                total_weight += weight;
4877            }
4878        }
4879
4880        total_weight
4881    }
4882
4883    #[instrument(level = "debug", skip_all)]
4884    fn create_authenticator_state_tx(
4885        &self,
4886        epoch_store: &Arc<AuthorityPerEpochStore>,
4887    ) -> Option<EndOfEpochTransactionKind> {
4888        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4889            info!("authenticator state transactions not enabled");
4890            return None;
4891        }
4892
4893        let authenticator_state_exists = epoch_store.authenticator_state_exists();
4894        let tx = if authenticator_state_exists {
4895            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4896            let min_epoch =
4897                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4898            let authenticator_obj_initial_shared_version = epoch_store
4899                .epoch_start_config()
4900                .authenticator_obj_initial_shared_version()
4901                .expect("initial version must exist");
4902
4903            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4904                min_epoch,
4905                authenticator_obj_initial_shared_version,
4906            );
4907
4908            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4909
4910            tx
4911        } else {
4912            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4913            info!("Creating AuthenticatorStateCreate tx");
4914            tx
4915        };
4916        Some(tx)
4917    }
4918
4919    /// Creates and execute the advance epoch transaction to effects without
4920    /// committing it to the database. The effects of the change epoch tx
4921    /// are only written to the database after a certified checkpoint has been
4922    /// formed and executed by CheckpointExecutor.
4923    ///
4924    /// When a framework upgraded has been decided on, but the validator does
4925    /// not have the new versions of the packages locally, the validator
4926    /// cannot form the ChangeEpochTx. In this case it returns Err,
4927    /// indicating that the checkpoint builder should give up trying to make the
4928    /// final checkpoint. As long as the network is able to create a certified
4929    /// checkpoint (which should be ensured by the capabilities vote), it
4930    /// will arrive via state sync and be executed by CheckpointExecutor.
4931    #[instrument(level = "error", skip_all)]
4932    pub async fn create_and_execute_advance_epoch_tx(
4933        &self,
4934        epoch_store: &Arc<AuthorityPerEpochStore>,
4935        gas_cost_summary: &GasCostSummary,
4936        checkpoint: CheckpointSequenceNumber,
4937        epoch_start_timestamp_ms: CheckpointTimestamp,
4938        scores: Vec<u64>,
4939    ) -> anyhow::Result<(
4940        IotaSystemState,
4941        Option<SystemEpochInfoEvent>,
4942        TransactionEffects,
4943    )> {
4944        let mut txns = Vec::new();
4945
4946        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4947            txns.push(tx);
4948        }
4949
4950        let next_epoch = epoch_store.epoch() + 1;
4951
4952        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4953        let authority_capabilities = epoch_store
4954            .get_capabilities_v1()
4955            .expect("read capabilities from db cannot fail");
4956        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4957            Self::choose_protocol_version_and_system_packages_v1(
4958                epoch_store.protocol_version(),
4959                SupportedProtocolVersionsWithHashes::protocol_config_digest(
4960                    epoch_store.protocol_config(),
4961                ),
4962                epoch_store.committee(),
4963                authority_capabilities.clone(),
4964                buffer_stake_bps,
4965            );
4966
4967        // since system packages are created during the current epoch, they should abide
4968        // by the rules of the current epoch, including the current epoch's max
4969        // Move binary format version
4970        let config = epoch_store.protocol_config();
4971        let binary_config = to_binary_config(config);
4972        let Some(next_epoch_system_package_bytes) = self
4973            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4974            .await
4975        else {
4976            error!(
4977                "upgraded system packages {:?} are not locally available, cannot create \
4978                ChangeEpochTx. validator binary must be upgraded to the correct version!",
4979                next_epoch_system_packages
4980            );
4981            // the checkpoint builder will keep retrying forever when it hits this error.
4982            // Eventually, one of two things will happen:
4983            // - The operator will upgrade this binary to one that has the new packages
4984            //   locally, and this function will succeed.
4985            // - The final checkpoint will be certified by other validators, we will receive
4986            //   it via state sync, and execute it. This will upgrade the framework
4987            //   packages, reconfigure, and most likely shut down in the new epoch (this
4988            //   validator likely doesn't support the new protocol version, or else it
4989            //   should have had the packages.)
4990            bail!("missing system packages: cannot form ChangeEpochTx");
4991        };
4992
4993        // Use ChangeEpochV3 or ChangeEpochV4 when the feature flags are enabled and
4994        // ChangeEpochV2 requirements are met
4995        if config.select_committee_from_eligible_validators() {
4996            // Get the list of eligible validators that support the target protocol version
4997            let active_validators = epoch_store.epoch_start_state().get_active_validators();
4998
4999            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5000
5001            // Use validators supporting the target protocol version as eligible validators
5002            // in the next version if select_committee_supporting_next_epoch_version feature
5003            // flag is set to true.
5004            if config.select_committee_supporting_next_epoch_version() {
5005                eligible_active_validators = Self::get_validators_supporting_protocol_version(
5006                    next_epoch_protocol_version,
5007                    next_epoch_protocol_digest,
5008                    &active_validators,
5009                    &authority_capabilities,
5010                );
5011
5012                // Calculate the total weight of eligible validators in the committee
5013                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5014                    &eligible_active_validators,
5015                    &active_validators,
5016                    epoch_store.committee(),
5017                );
5018
5019                // Safety check: ensure eligible validators have enough stake
5020                // Use the same effective threshold calculation that was used to decide the
5021                // protocol version
5022                let committee = epoch_store.committee();
5023                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5024
5025                if eligible_validators_weight < effective_threshold {
5026                    error!(
5027                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5028                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5029                    );
5030                    // Pass all active validator indices as eligible validators
5031                    // to perform selection among all of them.
5032                    eligible_active_validators = (0..active_validators.len() as u64).collect();
5033                }
5034            }
5035
5036            // Use ChangeEpochV4 when the pass_validator_scores_to_advance_epoch feature
5037            // flag is enabled.
5038            if config.pass_validator_scores_to_advance_epoch() {
5039                txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5040                    next_epoch,
5041                    next_epoch_protocol_version,
5042                    gas_cost_summary.storage_cost,
5043                    gas_cost_summary.computation_cost,
5044                    gas_cost_summary.computation_cost_burned,
5045                    gas_cost_summary.storage_rebate,
5046                    gas_cost_summary.non_refundable_storage_fee,
5047                    epoch_start_timestamp_ms,
5048                    next_epoch_system_package_bytes,
5049                    eligible_active_validators,
5050                    scores,
5051                    config.adjust_rewards_by_score(),
5052                ));
5053            } else {
5054                txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5055                    next_epoch,
5056                    next_epoch_protocol_version,
5057                    gas_cost_summary.storage_cost,
5058                    gas_cost_summary.computation_cost,
5059                    gas_cost_summary.computation_cost_burned,
5060                    gas_cost_summary.storage_rebate,
5061                    gas_cost_summary.non_refundable_storage_fee,
5062                    epoch_start_timestamp_ms,
5063                    next_epoch_system_package_bytes,
5064                    eligible_active_validators,
5065                ));
5066            }
5067        } else if config.protocol_defined_base_fee()
5068            && config.max_committee_members_count_as_option().is_some()
5069        {
5070            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5071                next_epoch,
5072                next_epoch_protocol_version,
5073                gas_cost_summary.storage_cost,
5074                gas_cost_summary.computation_cost,
5075                gas_cost_summary.computation_cost_burned,
5076                gas_cost_summary.storage_rebate,
5077                gas_cost_summary.non_refundable_storage_fee,
5078                epoch_start_timestamp_ms,
5079                next_epoch_system_package_bytes,
5080            ));
5081        } else {
5082            txns.push(EndOfEpochTransactionKind::new_change_epoch(
5083                next_epoch,
5084                next_epoch_protocol_version,
5085                gas_cost_summary.storage_cost,
5086                gas_cost_summary.computation_cost,
5087                gas_cost_summary.storage_rebate,
5088                gas_cost_summary.non_refundable_storage_fee,
5089                epoch_start_timestamp_ms,
5090                next_epoch_system_package_bytes,
5091            ));
5092        }
5093
5094        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5095
5096        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5097            tx.clone(),
5098            epoch_store.epoch(),
5099            checkpoint,
5100        );
5101
5102        let tx_digest = executable_tx.digest();
5103
5104        info!(
5105            ?next_epoch,
5106            ?next_epoch_protocol_version,
5107            ?next_epoch_system_packages,
5108            computation_cost=?gas_cost_summary.computation_cost,
5109            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5110            storage_cost=?gas_cost_summary.storage_cost,
5111            storage_rebate=?gas_cost_summary.storage_rebate,
5112            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5113            ?tx_digest,
5114            "Creating advance epoch transaction"
5115        );
5116
5117        fail_point_async!("change_epoch_tx_delay");
5118        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5119
5120        // The tx could have been executed by state sync already - if so simply return
5121        // an error. The checkpoint builder will shortly be terminated by
5122        // reconfiguration anyway.
5123        if self
5124            .get_transaction_cache_reader()
5125            .try_is_tx_already_executed(tx_digest)?
5126        {
5127            warn!("change epoch tx has already been executed via state sync");
5128            bail!("change epoch tx has already been executed via state sync",);
5129        }
5130
5131        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5132
5133        // We must manually assign the shared object versions to the transaction before
5134        // executing it. This is because we do not sequence end-of-epoch
5135        // transactions through consensus.
5136        epoch_store.assign_shared_object_versions_idempotent(
5137            self.get_object_cache_reader().as_ref(),
5138            std::slice::from_ref(&executable_tx),
5139        )?;
5140
5141        let (input_objects, _, _) =
5142            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5143
5144        let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5145            &execution_guard,
5146            &executable_tx,
5147            input_objects,
5148            None,
5149            None,
5150            epoch_store,
5151        )?;
5152        let system_obj = get_iota_system_state(&temporary_store.written)
5153            .expect("change epoch tx must write to system object");
5154        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
5155        let system_epoch_info_event = temporary_store
5156            .events
5157            .data
5158            .into_iter()
5159            .find(|event| event.is_system_epoch_info_event())
5160            .map(SystemEpochInfoEvent::from);
5161        // The system epoch info event can be `None` in case if the `advance_epoch`
5162        // Move function call failed and was executed in the safe mode.
5163        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5164
5165        // We must write tx and effects to the state sync tables so that state sync is
5166        // able to deliver to the transaction to CheckpointExecutor after it is
5167        // included in a certified checkpoint.
5168        self.get_state_sync_store()
5169            .try_insert_transaction_and_effects(&tx, &effects)
5170            .map_err(|err| {
5171                let err: anyhow::Error = err.into();
5172                err
5173            })?;
5174
5175        info!(
5176            "Effects summary of the change epoch transaction: {:?}",
5177            effects.summary_for_debug()
5178        );
5179        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5180        // The change epoch transaction cannot fail to execute.
5181        assert!(effects.status().is_ok());
5182        Ok((system_obj, system_epoch_info_event, effects))
5183    }
5184
5185    /// This function is called at the very end of the epoch.
5186    /// This step is required before updating new epoch in the db and calling
5187    /// reopen_epoch_db.
5188    #[instrument(level = "error", skip_all)]
5189    async fn revert_uncommitted_epoch_transactions(
5190        &self,
5191        epoch_store: &AuthorityPerEpochStore,
5192    ) -> IotaResult {
5193        {
5194            let state = epoch_store.get_reconfig_state_write_lock_guard();
5195            if state.should_accept_user_certs() {
5196                // Need to change this so that consensus adapter do not accept certificates from
5197                // user. This can happen if our local validator did not initiate
5198                // epoch change locally, but 2f+1 nodes already concluded the
5199                // epoch.
5200                //
5201                // This lock is essentially a barrier for
5202                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5203                // after this block
5204                epoch_store.close_user_certs(state);
5205            }
5206            // lock is dropped here
5207        }
5208        let pending_certificates = epoch_store.pending_consensus_certificates();
5209        info!(
5210            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5211            pending_certificates.len(),
5212            pending_certificates,
5213        );
5214        for digest in pending_certificates {
5215            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5216                info!(
5217                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5218                    digest
5219                );
5220                continue;
5221            }
5222            info!("Reverting {:?} at the end of epoch", digest);
5223            epoch_store.revert_executed_transaction(&digest)?;
5224            self.get_reconfig_api().try_revert_state_update(&digest)?;
5225        }
5226        info!("All uncommitted local transactions reverted");
5227        Ok(())
5228    }
5229
5230    #[instrument(level = "error", skip_all)]
5231    async fn reopen_epoch_db(
5232        &self,
5233        cur_epoch_store: &AuthorityPerEpochStore,
5234        new_committee: Committee,
5235        epoch_start_configuration: EpochStartConfiguration,
5236        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5237        epoch_last_checkpoint: CheckpointSequenceNumber,
5238    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5239        let new_epoch = new_committee.epoch;
5240        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5241        assert_eq!(
5242            epoch_start_configuration.epoch_start_state().epoch(),
5243            new_committee.epoch
5244        );
5245        fail_point!("before-open-new-epoch-store");
5246        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5247            self.name,
5248            new_committee,
5249            epoch_start_configuration,
5250            self.get_backing_package_store().clone(),
5251            self.get_object_store().clone(),
5252            expensive_safety_check_config,
5253            epoch_last_checkpoint,
5254        )?;
5255        self.epoch_store.store(new_epoch_store.clone());
5256        Ok(new_epoch_store)
5257    }
5258
5259    /// Checks if `authenticator` unlocks a valid Move account and returns the
5260    /// account-related `AuthenticatorFunctionRef` object.
5261    fn check_move_account(
5262        &self,
5263        auth_account_object_id: ObjectID,
5264        auth_account_object_seq_number: Option<SequenceNumber>,
5265        auth_account_object_digest: Option<ObjectDigest>,
5266        account_object: ObjectReadResult,
5267        signer: &IotaAddress,
5268    ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5269        let account_object = match account_object.object {
5270            ObjectReadResultKind::Object(object) => Ok(object),
5271            ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5272                Err(UserInputError::AccountObjectDeleted {
5273                    account_id: account_object.id(),
5274                    account_version: version,
5275                    transaction_digest: digest,
5276                })
5277            }
5278            // It is impossible to check the account object because it is used in a canceled
5279            // transaction and is not loaded.
5280            ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5281                Err(UserInputError::AccountObjectInCanceledTransaction {
5282                    account_id: account_object.id(),
5283                    account_version: version,
5284                })
5285            }
5286        }?;
5287
5288        let account_object_addr = IotaAddress::from(auth_account_object_id);
5289
5290        fp_ensure!(
5291            signer == &account_object_addr,
5292            UserInputError::IncorrectUserSignature {
5293                error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5294            }
5295            .into()
5296        );
5297
5298        fp_ensure!(
5299            account_object.is_shared() || account_object.is_immutable(),
5300            UserInputError::AccountObjectNotSupported {
5301                object_id: auth_account_object_id
5302            }
5303            .into()
5304        );
5305
5306        let auth_account_object_seq_number =
5307            if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5308                let account_object_version = account_object.version();
5309
5310                fp_ensure!(
5311                    account_object_version == auth_account_object_seq_number,
5312                    UserInputError::AccountObjectVersionMismatch {
5313                        object_id: auth_account_object_id,
5314                        expected_version: auth_account_object_seq_number,
5315                        actual_version: account_object_version,
5316                    }
5317                    .into()
5318                );
5319
5320                auth_account_object_seq_number
5321            } else {
5322                account_object.version()
5323            };
5324
5325        if let Some(auth_account_object_digest) = auth_account_object_digest {
5326            let expected_digest = account_object.digest();
5327            fp_ensure!(
5328                expected_digest == auth_account_object_digest,
5329                UserInputError::InvalidAccountObjectDigest {
5330                    object_id: auth_account_object_id,
5331                    expected_digest,
5332                    actual_digest: auth_account_object_digest,
5333                }
5334                .into()
5335            );
5336        }
5337
5338        let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5339            auth_account_object_id,
5340            &AuthenticatorFunctionRefV1Key::tag().into(),
5341            &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5342        )
5343        .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5344            account_object_id: auth_account_object_id,
5345        })?;
5346
5347        let authenticator_function_ref_field = self
5348            .get_object_cache_reader()
5349            .try_find_object_lt_or_eq_version(
5350                authenticator_function_ref_field_id,
5351                auth_account_object_seq_number,
5352            )?;
5353
5354        if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5355            let field_move_object = authenticator_function_ref_field_obj
5356                .data
5357                .try_as_move()
5358                .expect("dynamic field should never be a package object");
5359
5360            let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5361                field_move_object.to_rust().ok_or(
5362                    UserInputError::InvalidAuthenticatorFunctionRefField {
5363                        account_object_id: auth_account_object_id,
5364                    },
5365                )?;
5366
5367            Ok(AuthenticatorFunctionRefForExecution::new_v1(
5368                field.value,
5369                authenticator_function_ref_field_obj.compute_object_reference(),
5370                authenticator_function_ref_field_obj.owner,
5371                authenticator_function_ref_field_obj.storage_rebate,
5372                authenticator_function_ref_field_obj.previous_transaction,
5373            ))
5374        } else {
5375            Err(UserInputError::MoveAuthenticatorNotFound {
5376                authenticator_function_ref_id: authenticator_function_ref_field_id,
5377                account_object_id: auth_account_object_id,
5378                account_object_version: auth_account_object_seq_number,
5379            }
5380            .into())
5381        }
5382    }
5383
5384    fn read_objects_for_signing(
5385        &self,
5386        transaction: &VerifiedTransaction,
5387        epoch: u64,
5388    ) -> IotaResult<(
5389        InputObjects,
5390        ReceivingObjects,
5391        Option<InputObjects>,
5392        Option<ObjectReadResult>,
5393    )> {
5394        let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5395            Some(transaction.digest()),
5396            &transaction.collect_all_input_object_kind_for_reading()?,
5397            &transaction.data().transaction_data().receiving_objects(),
5398            epoch,
5399        )?;
5400
5401        transaction
5402            .split_input_objects_into_groups_for_reading(input_objects)
5403            .map(|(tx_input_objects, auth_input_objects, account_object)| {
5404                (
5405                    tx_input_objects,
5406                    tx_receiving_objects,
5407                    auth_input_objects,
5408                    account_object,
5409                )
5410            })
5411    }
5412
5413    fn check_transaction_inputs_for_signing(
5414        &self,
5415        protocol_config: &ProtocolConfig,
5416        reference_gas_price: u64,
5417        tx_data: &TransactionData,
5418        tx_input_objects: InputObjects,
5419        tx_receiving_objects: &ReceivingObjects,
5420        move_authenticator: Option<&MoveAuthenticator>,
5421        auth_input_objects: Option<InputObjects>,
5422        account_object: Option<ObjectReadResult>,
5423    ) -> IotaResult<(
5424        IotaGasStatus,
5425        CheckedInputObjects,
5426        Option<CheckedInputObjects>,
5427        Option<AuthenticatorFunctionRef>,
5428    )> {
5429        let (
5430            auth_checked_input_objects_union,
5431            authenticator_function_ref,
5432            authenticator_gas_budget,
5433        ) = if let Some(move_authenticator) = move_authenticator {
5434            let auth_input_objects =
5435                auth_input_objects.expect("MoveAuthenticator input objects must be provided");
5436            let account_object = account_object.expect("Move account object must be provided");
5437
5438            // Check basic `object_to_authenticate` preconditions and get its components.
5439            let (
5440                auth_account_object_id,
5441                auth_account_object_seq_number,
5442                auth_account_object_digest,
5443            ) = move_authenticator.object_to_authenticate_components()?;
5444
5445            // Make sure the sender is a Move account.
5446            let AuthenticatorFunctionRefForExecution {
5447                authenticator_function_ref,
5448                ..
5449            } = self.check_move_account(
5450                auth_account_object_id,
5451                auth_account_object_seq_number,
5452                auth_account_object_digest,
5453                account_object,
5454                &tx_data.sender(),
5455            )?;
5456
5457            // Check the MoveAuthenticator input objects.
5458            let auth_checked_input_objects =
5459                iota_transaction_checks::check_move_authenticator_input_for_signing(
5460                    auth_input_objects,
5461                )?;
5462
5463            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
5464            // not a part of the transaction data.
5465            let authenticator_gas_budget = protocol_config.max_auth_gas();
5466
5467            (
5468                Some(auth_checked_input_objects),
5469                Some(authenticator_function_ref),
5470                authenticator_gas_budget,
5471            )
5472        } else {
5473            (None, None, 0)
5474        };
5475
5476        // Check the transaction inputs.
5477        let (gas_status, tx_checked_input_objects) =
5478            iota_transaction_checks::check_transaction_input(
5479                protocol_config,
5480                reference_gas_price,
5481                tx_data,
5482                tx_input_objects,
5483                tx_receiving_objects,
5484                &self.metrics.bytecode_verifier_metrics,
5485                &self.config.verifier_signing_config,
5486                authenticator_gas_budget,
5487            )?;
5488
5489        Ok((
5490            gas_status,
5491            tx_checked_input_objects,
5492            auth_checked_input_objects_union,
5493            authenticator_function_ref,
5494        ))
5495    }
5496
5497    #[cfg(test)]
5498    pub(crate) fn iter_live_object_set_for_testing(
5499        &self,
5500    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5501        self.get_accumulator_store()
5502            .iter_cached_live_object_set_for_testing()
5503    }
5504
5505    #[cfg(test)]
5506    pub(crate) fn shutdown_execution_for_test(&self) {
5507        self.tx_execution_shutdown
5508            .lock()
5509            .take()
5510            .unwrap()
5511            .send(())
5512            .unwrap();
5513    }
5514
5515    /// NOTE: this function is only to be used for fuzzing and testing. Never
5516    /// use in prod
5517    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5518        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5519        self.get_object_cache_reader()
5520            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5521        self.get_reconfig_api()
5522            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5523    }
5524}
5525
5526pub struct RandomnessRoundReceiver {
5527    authority_state: Arc<AuthorityState>,
5528    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5529}
5530
5531impl RandomnessRoundReceiver {
5532    pub fn spawn(
5533        authority_state: Arc<AuthorityState>,
5534        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5535    ) -> JoinHandle<()> {
5536        let rrr = RandomnessRoundReceiver {
5537            authority_state,
5538            randomness_rx,
5539        };
5540        spawn_monitored_task!(rrr.run())
5541    }
5542
5543    async fn run(mut self) {
5544        info!("RandomnessRoundReceiver event loop started");
5545
5546        loop {
5547            tokio::select! {
5548                maybe_recv = self.randomness_rx.recv() => {
5549                    if let Some((epoch, round, bytes)) = maybe_recv {
5550                        self.handle_new_randomness(epoch, round, bytes);
5551                    } else {
5552                        break;
5553                    }
5554                },
5555            }
5556        }
5557
5558        info!("RandomnessRoundReceiver event loop ended");
5559    }
5560
5561    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5562    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5563        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5564        if epoch_store.epoch() != epoch {
5565            warn!(
5566                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5567                epoch_store.epoch()
5568            );
5569            return;
5570        }
5571        let transaction = VerifiedTransaction::new_randomness_state_update(
5572            epoch,
5573            round,
5574            bytes,
5575            epoch_store
5576                .epoch_start_config()
5577                .randomness_obj_initial_shared_version(),
5578        );
5579        debug!(
5580            "created randomness state update transaction with digest: {:?}",
5581            transaction.digest()
5582        );
5583        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5584        let digest = *transaction.digest();
5585
5586        // Randomness state updates contain the full bls signature for the random round,
5587        // which cannot necessarily be reconstructed again later. Therefore we must
5588        // immediately persist this transaction. If we crash before its outputs
5589        // are committed, this ensures we will be able to re-execute it.
5590        self.authority_state
5591            .get_cache_commit()
5592            .persist_transaction(&transaction);
5593
5594        // Send transaction to TransactionManager for execution.
5595        self.authority_state
5596            .transaction_manager()
5597            .enqueue(vec![transaction], &epoch_store);
5598
5599        let authority_state = self.authority_state.clone();
5600        spawn_monitored_task!(async move {
5601            // Wait for transaction execution in a separate task, to avoid deadlock in case
5602            // of out-of-order randomness generation. (Each
5603            // RandomnessStateUpdate depends on the output of the
5604            // RandomnessStateUpdate from the previous round.)
5605            //
5606            // We set a very long timeout so that in case this gets stuck for some reason,
5607            // the validator will eventually crash rather than continuing in a
5608            // zombie mode.
5609            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5610            let result = tokio::time::timeout(
5611                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5612                authority_state
5613                    .get_transaction_cache_reader()
5614                    .try_notify_read_executed_effects(&[digest]),
5615            )
5616            .await;
5617            let result = match result {
5618                Ok(result) => result,
5619                Err(_) => {
5620                    if cfg!(debug_assertions) {
5621                        // Crash on randomness update execution timeout in debug builds.
5622                        panic!(
5623                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5624                        );
5625                    }
5626                    warn!(
5627                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5628                    );
5629                    // Continue waiting as long as necessary in non-debug builds.
5630                    authority_state
5631                        .get_transaction_cache_reader()
5632                        .try_notify_read_executed_effects(&[digest])
5633                        .await
5634                }
5635            };
5636
5637            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5638            let effects = effects.pop().expect("should return effects");
5639            if *effects.status() != ExecutionStatus::Success {
5640                fatal!(
5641                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5642                );
5643            }
5644            debug!(
5645                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5646            );
5647        });
5648    }
5649}
5650
5651#[async_trait]
5652impl TransactionKeyValueStoreTrait for AuthorityState {
5653    async fn multi_get(
5654        &self,
5655        transaction_keys: &[TransactionDigest],
5656        effects_keys: &[TransactionDigest],
5657    ) -> IotaResult<KVStoreTransactionData> {
5658        let txns = if !transaction_keys.is_empty() {
5659            self.get_transaction_cache_reader()
5660                .try_multi_get_transaction_blocks(transaction_keys)?
5661                .into_iter()
5662                .map(|t| t.map(|t| (*t).clone().into_inner()))
5663                .collect()
5664        } else {
5665            vec![]
5666        };
5667
5668        let fx = if !effects_keys.is_empty() {
5669            self.get_transaction_cache_reader()
5670                .try_multi_get_executed_effects(effects_keys)?
5671        } else {
5672            vec![]
5673        };
5674
5675        Ok((txns, fx))
5676    }
5677
5678    async fn multi_get_checkpoints(
5679        &self,
5680        checkpoint_summaries: &[CheckpointSequenceNumber],
5681        checkpoint_contents: &[CheckpointSequenceNumber],
5682        checkpoint_summaries_by_digest: &[CheckpointDigest],
5683    ) -> IotaResult<(
5684        Vec<Option<CertifiedCheckpointSummary>>,
5685        Vec<Option<CheckpointContents>>,
5686        Vec<Option<CertifiedCheckpointSummary>>,
5687    )> {
5688        // TODO: use multi-get methods if it ever becomes important (unlikely)
5689        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5690        let store = self.get_checkpoint_store();
5691        for seq in checkpoint_summaries {
5692            let checkpoint = store
5693                .get_checkpoint_by_sequence_number(*seq)?
5694                .map(|c| c.into_inner());
5695
5696            summaries.push(checkpoint);
5697        }
5698
5699        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5700        for seq in checkpoint_contents {
5701            let checkpoint = store
5702                .get_checkpoint_by_sequence_number(*seq)?
5703                .and_then(|summary| {
5704                    store
5705                        .get_checkpoint_contents(&summary.content_digest)
5706                        .expect("db read cannot fail")
5707                });
5708            contents.push(checkpoint);
5709        }
5710
5711        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5712        for digest in checkpoint_summaries_by_digest {
5713            let checkpoint = store
5714                .get_checkpoint_by_digest(digest)?
5715                .map(|c| c.into_inner());
5716            summaries_by_digest.push(checkpoint);
5717        }
5718
5719        Ok((summaries, contents, summaries_by_digest))
5720    }
5721
5722    async fn get_transaction_perpetual_checkpoint(
5723        &self,
5724        digest: TransactionDigest,
5725    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5726        self.get_checkpoint_cache()
5727            .try_get_transaction_perpetual_checkpoint(&digest)
5728            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5729    }
5730
5731    async fn get_object(
5732        &self,
5733        object_id: ObjectID,
5734        version: VersionNumber,
5735    ) -> IotaResult<Option<Object>> {
5736        self.get_object_cache_reader()
5737            .try_get_object_by_key(&object_id, version)
5738    }
5739
5740    async fn multi_get_transactions_perpetual_checkpoints(
5741        &self,
5742        digests: &[TransactionDigest],
5743    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5744        let res = self
5745            .get_checkpoint_cache()
5746            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5747
5748        Ok(res
5749            .into_iter()
5750            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5751            .collect())
5752    }
5753
5754    #[instrument(skip(self))]
5755    async fn multi_get_events_by_tx_digests(
5756        &self,
5757        digests: &[TransactionDigest],
5758    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5759        if digests.is_empty() {
5760            return Ok(vec![]);
5761        }
5762        let events_digests: Vec<_> = self
5763            .get_transaction_cache_reader()
5764            .try_multi_get_executed_effects(digests)?
5765            .into_iter()
5766            .map(|t| t.and_then(|t| t.events_digest().cloned()))
5767            .collect();
5768        let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5769        let mut events = self
5770            .get_transaction_cache_reader()
5771            .try_multi_get_events(&non_empty_events)?
5772            .into_iter();
5773        Ok(events_digests
5774            .into_iter()
5775            .map(|ev| ev.and_then(|_| events.next()?))
5776            .collect())
5777    }
5778}
5779
5780#[cfg(msim)]
5781pub mod framework_injection {
5782    use std::{
5783        cell::RefCell,
5784        collections::{BTreeMap, BTreeSet},
5785    };
5786
5787    use iota_framework::{BuiltInFramework, SystemPackage};
5788    use iota_types::{
5789        base_types::{AuthorityName, ObjectID},
5790        is_system_package,
5791    };
5792    use move_binary_format::CompiledModule;
5793
5794    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5795
5796    // Thread local cache because all simtests run in a single unique thread.
5797    thread_local! {
5798        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5799    }
5800
5801    type Framework = Vec<CompiledModule>;
5802
5803    pub type PackageUpgradeCallback =
5804        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5805
5806    enum PackageOverrideConfig {
5807        Global(Framework),
5808        PerValidator(PackageUpgradeCallback),
5809    }
5810
5811    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5812        modules
5813            .iter()
5814            .map(|m| {
5815                let mut buf = Vec::new();
5816                m.serialize_with_version(m.version, &mut buf).unwrap();
5817                buf
5818            })
5819            .collect()
5820    }
5821
5822    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5823        OVERRIDE.with(|bs| {
5824            bs.borrow_mut()
5825                .insert(package_id, PackageOverrideConfig::Global(modules))
5826        });
5827    }
5828
5829    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5830        OVERRIDE.with(|bs| {
5831            bs.borrow_mut()
5832                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5833        });
5834    }
5835
5836    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5837        OVERRIDE.with(|cfg| {
5838            cfg.borrow().get(package_id).and_then(|entry| match entry {
5839                PackageOverrideConfig::Global(framework) => {
5840                    Some(compiled_modules_to_bytes(framework))
5841                }
5842                PackageOverrideConfig::PerValidator(func) => {
5843                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
5844                }
5845            })
5846        })
5847    }
5848
5849    pub fn get_override_modules(
5850        package_id: &ObjectID,
5851        name: AuthorityName,
5852    ) -> Option<Vec<CompiledModule>> {
5853        OVERRIDE.with(|cfg| {
5854            cfg.borrow().get(package_id).and_then(|entry| match entry {
5855                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5856                PackageOverrideConfig::PerValidator(func) => func(name),
5857            })
5858        })
5859    }
5860
5861    pub fn get_override_system_package(
5862        package_id: &ObjectID,
5863        name: AuthorityName,
5864    ) -> Option<SystemPackage> {
5865        let bytes = get_override_bytes(package_id, name)?;
5866        let dependencies = if is_system_package(*package_id) {
5867            BuiltInFramework::get_package_by_id(package_id)
5868                .dependencies
5869                .to_vec()
5870        } else {
5871            // Assume that entirely new injected packages depend on all existing system
5872            // packages.
5873            BuiltInFramework::all_package_ids()
5874        };
5875        Some(SystemPackage {
5876            id: *package_id,
5877            bytes,
5878            dependencies,
5879        })
5880    }
5881
5882    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5883        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5884        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5885            cfg.borrow()
5886                .keys()
5887                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5888                .collect()
5889        });
5890
5891        extra
5892            .into_iter()
5893            .map(|package| SystemPackage {
5894                id: package,
5895                bytes: get_override_bytes(&package, name).unwrap(),
5896                dependencies: BuiltInFramework::all_package_ids(),
5897            })
5898            .collect()
5899    }
5900}
5901
5902#[derive(Debug, Serialize, Deserialize, Clone)]
5903pub struct ObjDumpFormat {
5904    pub id: ObjectID,
5905    pub version: VersionNumber,
5906    pub digest: ObjectDigest,
5907    pub object: Object,
5908}
5909
5910impl ObjDumpFormat {
5911    fn new(object: Object) -> Self {
5912        let oref = object.compute_object_reference();
5913        Self {
5914            id: oref.0,
5915            version: oref.1,
5916            digest: oref.2,
5917            object,
5918        }
5919    }
5920}
5921
5922#[derive(Debug, Serialize, Deserialize, Clone)]
5923pub struct NodeStateDump {
5924    pub tx_digest: TransactionDigest,
5925    pub sender_signed_data: SenderSignedData,
5926    pub executed_epoch: u64,
5927    pub reference_gas_price: u64,
5928    pub protocol_version: u64,
5929    pub epoch_start_timestamp_ms: u64,
5930    pub computed_effects: TransactionEffects,
5931    pub expected_effects_digest: TransactionEffectsDigest,
5932    pub relevant_system_packages: Vec<ObjDumpFormat>,
5933    pub shared_objects: Vec<ObjDumpFormat>,
5934    pub loaded_child_objects: Vec<ObjDumpFormat>,
5935    pub modified_at_versions: Vec<ObjDumpFormat>,
5936    pub runtime_reads: Vec<ObjDumpFormat>,
5937    pub input_objects: Vec<ObjDumpFormat>,
5938}
5939
5940impl NodeStateDump {
5941    pub fn new(
5942        tx_digest: &TransactionDigest,
5943        effects: &TransactionEffects,
5944        expected_effects_digest: TransactionEffectsDigest,
5945        object_store: &dyn ObjectStore,
5946        epoch_store: &Arc<AuthorityPerEpochStore>,
5947        inner_temporary_store: &InnerTemporaryStore,
5948        certificate: &VerifiedExecutableTransaction,
5949    ) -> IotaResult<Self> {
5950        // Epoch info
5951        let executed_epoch = epoch_store.epoch();
5952        let reference_gas_price = epoch_store.reference_gas_price();
5953        let epoch_start_config = epoch_store.epoch_start_config();
5954        let protocol_version = epoch_store.protocol_version().as_u64();
5955        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5956
5957        // Record all system packages at this version
5958        let mut relevant_system_packages = Vec::new();
5959        for sys_package_id in BuiltInFramework::all_package_ids() {
5960            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5961                relevant_system_packages.push(ObjDumpFormat::new(w))
5962            }
5963        }
5964
5965        // Record all the shared objects
5966        let mut shared_objects = Vec::new();
5967        for kind in effects.input_shared_objects() {
5968            match kind {
5969                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5970                    if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5971                        shared_objects.push(ObjDumpFormat::new(w))
5972                    }
5973                }
5974                InputSharedObject::ReadDeleted(..)
5975                | InputSharedObject::MutateDeleted(..)
5976                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
5977                                                           * objects. */
5978            }
5979        }
5980
5981        // Record all loaded child objects
5982        // Child objects which are read but not mutated are not tracked anywhere else
5983        let mut loaded_child_objects = Vec::new();
5984        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5985            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5986                loaded_child_objects.push(ObjDumpFormat::new(w))
5987            }
5988        }
5989
5990        // Record all modified objects
5991        let mut modified_at_versions = Vec::new();
5992        for (id, ver) in effects.modified_at_versions() {
5993            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5994                modified_at_versions.push(ObjDumpFormat::new(w))
5995            }
5996        }
5997
5998        // Packages read at runtime, which were not previously loaded into the temoorary
5999        // store Some packages may be fetched at runtime and wont show up in
6000        // input objects
6001        let mut runtime_reads = Vec::new();
6002        for obj in inner_temporary_store
6003            .runtime_packages_loaded_from_db
6004            .values()
6005        {
6006            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6007        }
6008
6009        // All other input objects should already be in `inner_temporary_store.objects`
6010
6011        Ok(Self {
6012            tx_digest: *tx_digest,
6013            executed_epoch,
6014            reference_gas_price,
6015            epoch_start_timestamp_ms,
6016            protocol_version,
6017            relevant_system_packages,
6018            shared_objects,
6019            loaded_child_objects,
6020            modified_at_versions,
6021            runtime_reads,
6022            sender_signed_data: certificate.clone().into_message(),
6023            input_objects: inner_temporary_store
6024                .input_objects
6025                .values()
6026                .map(|o| ObjDumpFormat::new(o.clone()))
6027                .collect(),
6028            computed_effects: effects.clone(),
6029            expected_effects_digest,
6030        })
6031    }
6032
6033    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6034        let mut objects = Vec::new();
6035        objects.extend(self.relevant_system_packages.clone());
6036        objects.extend(self.shared_objects.clone());
6037        objects.extend(self.loaded_child_objects.clone());
6038        objects.extend(self.modified_at_versions.clone());
6039        objects.extend(self.runtime_reads.clone());
6040        objects.extend(self.input_objects.clone());
6041        objects
6042    }
6043
6044    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6045        let file_name = format!(
6046            "{}_{}_NODE_DUMP.json",
6047            self.tx_digest,
6048            AuthorityState::unixtime_now_ms()
6049        );
6050        let mut path = path.to_path_buf();
6051        path.push(&file_name);
6052        let mut file = File::create(path.clone())?;
6053        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6054        Ok(path)
6055    }
6056
6057    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6058        let file = File::open(path)?;
6059        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6060    }
6061}