iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49    key_value_store::{
50        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51    },
52    key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57    IOTA_SYSTEM_ADDRESS, TypeTag,
58    account_abstraction::{
59        account::AuthenticatorFunctionRefV1Key,
60        authenticator_function::{
61            AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
62            AuthenticatorFunctionRefV1,
63        },
64    },
65    authenticator_state::get_authenticator_state,
66    base_types::*,
67    committee::{Committee, EpochId, ProtocolVersion},
68    crypto::{
69        AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
70        default_hash,
71    },
72    deny_list_v1::check_coin_deny_list_v1_during_signing,
73    digests::{ChainIdentifier, Digest, TransactionEventsDigest},
74    dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
75    effects::{
76        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
77        TransactionEvents, VerifiedSignedTransactionEffects,
78    },
79    error::{ExecutionError, IotaError, IotaResult, UserInputError},
80    event::{Event, EventID, SystemEpochInfoEvent},
81    executable_transaction::VerifiedExecutableTransaction,
82    execution_config_utils::to_binary_config,
83    execution_status::ExecutionStatus,
84    fp_ensure,
85    gas::{GasCostSummary, IotaGasStatus},
86    gas_coin::NANOS_PER_IOTA,
87    inner_temporary_store::{
88        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
89        WrittenObjects,
90    },
91    iota_system_state::{
92        IotaSystemState, IotaSystemStateTrait,
93        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
94    },
95    is_system_package,
96    layout_resolver::{LayoutResolver, into_struct_layout},
97    message_envelope::Message,
98    messages_checkpoint::{
99        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
100        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
101        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
102        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
103    },
104    messages_consensus::AuthorityCapabilitiesV1,
105    messages_grpc::{
106        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
107        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
108        TransactionStatus,
109    },
110    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
111    move_authenticator::MoveAuthenticator,
112    object::{
113        MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
114        bounded_visitor::BoundedVisitor,
115    },
116    storage::{
117        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
118    },
119    supported_protocol_versions::{
120        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
121    },
122    transaction::*,
123    transaction_executor::{SimulateTransactionResult, VmChecks},
124};
125use itertools::Itertools;
126use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
127use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
128use parking_lot::Mutex;
129use prometheus::{
130    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131    register_histogram_vec_with_registry, register_histogram_with_registry,
132    register_int_counter_vec_with_registry, register_int_counter_with_registry,
133    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139    task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152    authority::{
153        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157        authority_store_tables::AuthorityPrunerTables,
158        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159    },
160    authority_client::NetworkAuthorityClient,
161    checkpoints::CheckpointStore,
162    congestion_tracker::CongestionTracker,
163    consensus_adapter::ConsensusAdapter,
164    epoch::committee_store::CommitteeStore,
165    execution_cache::{
166        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
167        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
168        TransactionCacheRead,
169    },
170    execution_driver::execution_process,
171    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
172    metrics::{LatencyObserver, RateTracker},
173    module_cache_metrics::ResolverMetrics,
174    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
175    rest_index::RestIndexStore,
176    stake_aggregator::StakeAggregator,
177    state_accumulator::{AccumulatorStore, StateAccumulator},
178    subscription_handler::SubscriptionHandler,
179    transaction_input_loader::TransactionInputLoader,
180    transaction_manager::TransactionManager,
181    transaction_outputs::TransactionOutputs,
182    validator_tx_finalizer::ValidatorTxFinalizer,
183    verify_indexes::verify_indexes,
184};
185
186#[cfg(test)]
187#[path = "unit_tests/authority_tests.rs"]
188pub mod authority_tests;
189
190#[cfg(test)]
191#[path = "unit_tests/transaction_tests.rs"]
192pub mod transaction_tests;
193
194#[cfg(test)]
195#[path = "unit_tests/batch_transaction_tests.rs"]
196mod batch_transaction_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/move_integration_tests.rs"]
200pub mod move_integration_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/gas_tests.rs"]
204mod gas_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/batch_verification_tests.rs"]
208mod batch_verification_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/coin_deny_list_tests.rs"]
212mod coin_deny_list_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/auth_unit_test_utils.rs"]
216pub mod auth_unit_test_utils;
217
218pub mod authority_test_utils;
219
220pub mod authority_per_epoch_store;
221pub mod authority_per_epoch_store_pruner;
222
223pub mod authority_store_pruner;
224pub mod authority_store_tables;
225pub mod authority_store_types;
226pub mod epoch_start_configuration;
227pub mod shared_object_congestion_tracker;
228pub mod shared_object_version_manager;
229pub mod suggested_gas_price_calculator;
230pub mod test_authority_builder;
231pub mod transaction_deferral;
232
233pub(crate) mod authority_store;
234pub mod backpressure;
235
236/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
237pub struct AuthorityMetrics {
238    tx_orders: IntCounter,
239    total_certs: IntCounter,
240    total_cert_attempts: IntCounter,
241    total_effects: IntCounter,
242    pub shared_obj_tx: IntCounter,
243    sponsored_tx: IntCounter,
244    tx_already_processed: IntCounter,
245    num_input_objs: Histogram,
246    num_shared_objects: Histogram,
247    batch_size: Histogram,
248
249    authority_state_handle_transaction_latency: Histogram,
250
251    execute_certificate_latency_single_writer: Histogram,
252    execute_certificate_latency_shared_object: Histogram,
253
254    internal_execution_latency: Histogram,
255    execution_load_input_objects_latency: Histogram,
256    prepare_certificate_latency: Histogram,
257    commit_certificate_latency: Histogram,
258    db_checkpoint_latency: Histogram,
259
260    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
261    pub(crate) transaction_manager_num_missing_objects: IntGauge,
262    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
263    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
264    pub(crate) transaction_manager_num_ready: IntGauge,
265    pub(crate) transaction_manager_object_cache_size: IntGauge,
266    pub(crate) transaction_manager_object_cache_hits: IntCounter,
267    pub(crate) transaction_manager_object_cache_misses: IntCounter,
268    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
269    pub(crate) transaction_manager_package_cache_size: IntGauge,
270    pub(crate) transaction_manager_package_cache_hits: IntCounter,
271    pub(crate) transaction_manager_package_cache_misses: IntCounter,
272    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
273    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
274
275    pub(crate) execution_driver_executed_transactions: IntCounter,
276    pub(crate) execution_driver_dispatch_queue: IntGauge,
277    pub(crate) execution_queueing_delay_s: Histogram,
278    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
279    pub(crate) execution_gas_latency_ratio: Histogram,
280
281    pub(crate) skipped_consensus_txns: IntCounter,
282    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
283
284    pub(crate) authority_overload_status: IntGauge,
285    pub(crate) authority_load_shedding_percentage: IntGauge,
286
287    pub(crate) transaction_overload_sources: IntCounterVec,
288
289    /// Post processing metrics
290    post_processing_total_events_emitted: IntCounter,
291    post_processing_total_tx_indexed: IntCounter,
292    post_processing_total_tx_had_event_processed: IntCounter,
293    post_processing_total_failures: IntCounter,
294
295    /// Consensus handler metrics
296    pub consensus_handler_processed: IntCounterVec,
297    pub consensus_handler_transaction_sizes: HistogramVec,
298    pub consensus_handler_num_low_scoring_authorities: IntGauge,
299    pub consensus_handler_scores: IntGaugeVec,
300    pub consensus_handler_deferred_transactions: IntCounter,
301    pub consensus_handler_congested_transactions: IntCounter,
302    pub consensus_handler_cancelled_transactions: IntCounter,
303    pub consensus_handler_max_object_costs: IntGaugeVec,
304    pub consensus_committed_subdags: IntCounterVec,
305    pub consensus_committed_messages: IntGaugeVec,
306    pub consensus_committed_user_transactions: IntGaugeVec,
307    pub consensus_handler_leader_round: IntGauge,
308    pub consensus_calculated_throughput: IntGauge,
309    pub consensus_calculated_throughput_profile: IntGauge,
310
311    pub limits_metrics: Arc<LimitsMetrics>,
312
313    /// bytecode verifier metrics for tracking timeouts
314    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
315
316    /// Count of zklogin signatures
317    pub zklogin_sig_count: IntCounter,
318    /// Count of multisig signatures
319    pub multisig_sig_count: IntCounter,
320
321    // Tracks recent average txn queueing delay between when it is ready for execution
322    // until it starts executing.
323    pub execution_queueing_latency: LatencyObserver,
324
325    // Tracks the rate of transactions become ready for execution in transaction manager.
326    // The need for the Mutex is that the tracker is updated in transaction manager and read
327    // in the overload_monitor. There should be low mutex contention because
328    // transaction manager is single threaded and the read rate in overload_monitor is
329    // low. In the case where transaction manager becomes multi-threaded, we can
330    // create one rate tracker per thread.
331    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
332
333    // Tracks the rate of transactions starts execution in execution driver.
334    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
335    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
336}
337
338// Override default Prom buckets for positive numbers in 0-10M range
339const POSITIVE_INT_BUCKETS: &[f64] = &[
340    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
341    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
342    7000000., 10000000.,
343];
344
345const LATENCY_SEC_BUCKETS: &[f64] = &[
346    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
347    10., 20., 30., 60., 90.,
348];
349
350// Buckets for low latency samples. Starts from 10us.
351const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
352    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
353    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
354];
355
356const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
357    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
358    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
359];
360
361/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
362pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
363
364impl AuthorityMetrics {
365    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
366        let execute_certificate_latency = register_histogram_vec_with_registry!(
367            "authority_state_execute_certificate_latency",
368            "Latency of executing certificates, including waiting for inputs",
369            &["tx_type"],
370            LATENCY_SEC_BUCKETS.to_vec(),
371            registry,
372        )
373        .unwrap();
374
375        let execute_certificate_latency_single_writer =
376            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
377        let execute_certificate_latency_shared_object =
378            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
379
380        Self {
381            tx_orders: register_int_counter_with_registry!(
382                "total_transaction_orders",
383                "Total number of transaction orders",
384                registry,
385            )
386            .unwrap(),
387            total_certs: register_int_counter_with_registry!(
388                "total_transaction_certificates",
389                "Total number of transaction certificates handled",
390                registry,
391            )
392            .unwrap(),
393            total_cert_attempts: register_int_counter_with_registry!(
394                "total_handle_certificate_attempts",
395                "Number of calls to handle_certificate",
396                registry,
397            )
398            .unwrap(),
399            // total_effects == total transactions finished
400            total_effects: register_int_counter_with_registry!(
401                "total_transaction_effects",
402                "Total number of transaction effects produced",
403                registry,
404            )
405            .unwrap(),
406
407            shared_obj_tx: register_int_counter_with_registry!(
408                "num_shared_obj_tx",
409                "Number of transactions involving shared objects",
410                registry,
411            )
412            .unwrap(),
413
414            sponsored_tx: register_int_counter_with_registry!(
415                "num_sponsored_tx",
416                "Number of sponsored transactions",
417                registry,
418            )
419            .unwrap(),
420
421            tx_already_processed: register_int_counter_with_registry!(
422                "num_tx_already_processed",
423                "Number of transaction orders already processed previously",
424                registry,
425            )
426            .unwrap(),
427            num_input_objs: register_histogram_with_registry!(
428                "num_input_objects",
429                "Distribution of number of input TX objects per TX",
430                POSITIVE_INT_BUCKETS.to_vec(),
431                registry,
432            )
433            .unwrap(),
434            num_shared_objects: register_histogram_with_registry!(
435                "num_shared_objects",
436                "Number of shared input objects per TX",
437                POSITIVE_INT_BUCKETS.to_vec(),
438                registry,
439            )
440            .unwrap(),
441            batch_size: register_histogram_with_registry!(
442                "batch_size",
443                "Distribution of size of transaction batch",
444                POSITIVE_INT_BUCKETS.to_vec(),
445                registry,
446            )
447            .unwrap(),
448            authority_state_handle_transaction_latency: register_histogram_with_registry!(
449                "authority_state_handle_transaction_latency",
450                "Latency of handling transactions",
451                LATENCY_SEC_BUCKETS.to_vec(),
452                registry,
453            )
454            .unwrap(),
455            execute_certificate_latency_single_writer,
456            execute_certificate_latency_shared_object,
457            internal_execution_latency: register_histogram_with_registry!(
458                "authority_state_internal_execution_latency",
459                "Latency of actual certificate executions",
460                LATENCY_SEC_BUCKETS.to_vec(),
461                registry,
462            )
463            .unwrap(),
464            execution_load_input_objects_latency: register_histogram_with_registry!(
465                "authority_state_execution_load_input_objects_latency",
466                "Latency of loading input objects for execution",
467                LOW_LATENCY_SEC_BUCKETS.to_vec(),
468                registry,
469            )
470            .unwrap(),
471            prepare_certificate_latency: register_histogram_with_registry!(
472                "authority_state_prepare_certificate_latency",
473                "Latency of executing certificates, before committing the results",
474                LATENCY_SEC_BUCKETS.to_vec(),
475                registry,
476            )
477            .unwrap(),
478            commit_certificate_latency: register_histogram_with_registry!(
479                "authority_state_commit_certificate_latency",
480                "Latency of committing certificate execution results",
481                LATENCY_SEC_BUCKETS.to_vec(),
482                registry,
483            )
484            .unwrap(),
485            db_checkpoint_latency: register_histogram_with_registry!(
486                "db_checkpoint_latency",
487                "Latency of checkpointing dbs",
488                LATENCY_SEC_BUCKETS.to_vec(),
489                registry,
490            ).unwrap(),
491            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
492                "transaction_manager_num_enqueued_certificates",
493                "Current number of certificates enqueued to TransactionManager",
494                &["result"],
495                registry,
496            )
497            .unwrap(),
498            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
499                "transaction_manager_num_missing_objects",
500                "Current number of missing objects in TransactionManager",
501                registry,
502            )
503            .unwrap(),
504            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
505                "transaction_manager_num_pending_certificates",
506                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
507                registry,
508            )
509            .unwrap(),
510            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
511                "transaction_manager_num_executing_certificates",
512                "Number of executing certificates, including queued and actually running certificates",
513                registry,
514            )
515            .unwrap(),
516            transaction_manager_num_ready: register_int_gauge_with_registry!(
517                "transaction_manager_num_ready",
518                "Number of ready transactions in TransactionManager",
519                registry,
520            )
521            .unwrap(),
522            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
523                "transaction_manager_object_cache_size",
524                "Current size of object-availability cache in TransactionManager",
525                registry,
526            )
527            .unwrap(),
528            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
529                "transaction_manager_object_cache_hits",
530                "Number of object-availability cache hits in TransactionManager",
531                registry,
532            )
533            .unwrap(),
534            authority_overload_status: register_int_gauge_with_registry!(
535                "authority_overload_status",
536                "Whether authority is current experiencing overload and enters load shedding mode.",
537                registry)
538            .unwrap(),
539            authority_load_shedding_percentage: register_int_gauge_with_registry!(
540                "authority_load_shedding_percentage",
541                "The percentage of transactions is shed when the authority is in load shedding mode.",
542                registry)
543            .unwrap(),
544            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
545                "transaction_manager_object_cache_misses",
546                "Number of object-availability cache misses in TransactionManager",
547                registry,
548            )
549            .unwrap(),
550            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
551                "transaction_manager_object_cache_evictions",
552                "Number of object-availability cache evictions in TransactionManager",
553                registry,
554            )
555            .unwrap(),
556            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
557                "transaction_manager_package_cache_size",
558                "Current size of package-availability cache in TransactionManager",
559                registry,
560            )
561            .unwrap(),
562            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
563                "transaction_manager_package_cache_hits",
564                "Number of package-availability cache hits in TransactionManager",
565                registry,
566            )
567            .unwrap(),
568            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
569                "transaction_manager_package_cache_misses",
570                "Number of package-availability cache misses in TransactionManager",
571                registry,
572            )
573            .unwrap(),
574            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
575                "transaction_manager_package_cache_evictions",
576                "Number of package-availability cache evictions in TransactionManager",
577                registry,
578            )
579            .unwrap(),
580            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
581                "transaction_manager_transaction_queue_age_s",
582                "Time spent in waiting for transaction in the queue",
583                LATENCY_SEC_BUCKETS.to_vec(),
584                registry,
585            )
586            .unwrap(),
587            transaction_overload_sources: register_int_counter_vec_with_registry!(
588                "transaction_overload_sources",
589                "Number of times each source indicates transaction overload.",
590                &["source"],
591                registry)
592            .unwrap(),
593            execution_driver_executed_transactions: register_int_counter_with_registry!(
594                "execution_driver_executed_transactions",
595                "Cumulative number of transaction executed by execution driver",
596                registry,
597            )
598            .unwrap(),
599            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
600                "execution_driver_dispatch_queue",
601                "Number of transaction pending in execution driver dispatch queue",
602                registry,
603            )
604            .unwrap(),
605            execution_queueing_delay_s: register_histogram_with_registry!(
606                "execution_queueing_delay_s",
607                "Queueing delay between a transaction is ready for execution until it starts executing.",
608                LATENCY_SEC_BUCKETS.to_vec(),
609                registry
610            )
611            .unwrap(),
612            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
613                "prepare_cert_gas_latency_ratio",
614                "The ratio of computation gas divided by VM execution latency.",
615                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
616                registry
617            )
618            .unwrap(),
619            execution_gas_latency_ratio: register_histogram_with_registry!(
620                "execution_gas_latency_ratio",
621                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
622                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
623                registry
624            )
625            .unwrap(),
626            skipped_consensus_txns: register_int_counter_with_registry!(
627                "skipped_consensus_txns",
628                "Total number of consensus transactions skipped",
629                registry,
630            )
631            .unwrap(),
632            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
633                "skipped_consensus_txns_cache_hit",
634                "Total number of consensus transactions skipped because of local cache hit",
635                registry,
636            )
637            .unwrap(),
638            post_processing_total_events_emitted: register_int_counter_with_registry!(
639                "post_processing_total_events_emitted",
640                "Total number of events emitted in post processing",
641                registry,
642            )
643            .unwrap(),
644            post_processing_total_tx_indexed: register_int_counter_with_registry!(
645                "post_processing_total_tx_indexed",
646                "Total number of txes indexed in post processing",
647                registry,
648            )
649            .unwrap(),
650            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
651                "post_processing_total_tx_had_event_processed",
652                "Total number of txes finished event processing in post processing",
653                registry,
654            )
655            .unwrap(),
656            post_processing_total_failures: register_int_counter_with_registry!(
657                "post_processing_total_failures",
658                "Total number of failure in post processing",
659                registry,
660            )
661            .unwrap(),
662            consensus_handler_processed: register_int_counter_vec_with_registry!(
663                "consensus_handler_processed",
664                "Number of transactions processed by consensus handler",
665                &["class"],
666                registry
667            ).unwrap(),
668            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
669                "consensus_handler_transaction_sizes",
670                "Sizes of each type of transactions processed by consensus handler",
671                &["class"],
672                POSITIVE_INT_BUCKETS.to_vec(),
673                registry
674            ).unwrap(),
675            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
676                "consensus_handler_num_low_scoring_authorities",
677                "Number of low scoring authorities based on reputation scores from consensus",
678                registry
679            ).unwrap(),
680            consensus_handler_scores: register_int_gauge_vec_with_registry!(
681                "consensus_handler_scores",
682                "scores from consensus for each authority",
683                &["authority"],
684                registry,
685            ).unwrap(),
686            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
687                "consensus_handler_deferred_transactions",
688                "Number of transactions deferred by consensus handler",
689                registry,
690            ).unwrap(),
691            consensus_handler_congested_transactions: register_int_counter_with_registry!(
692                "consensus_handler_congested_transactions",
693                "Number of transactions deferred by consensus handler due to congestion",
694                registry,
695            ).unwrap(),
696            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
697                "consensus_handler_cancelled_transactions",
698                "Number of transactions cancelled by consensus handler",
699                registry,
700            ).unwrap(),
701            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
702                "consensus_handler_max_congestion_control_object_costs",
703                "Max object costs for congestion control in the current consensus commit",
704                &["commit_type"],
705                registry,
706            ).unwrap(),
707            consensus_committed_subdags: register_int_counter_vec_with_registry!(
708                "consensus_committed_subdags",
709                "Number of committed subdags, sliced by leader",
710                &["authority"],
711                registry,
712            ).unwrap(),
713            consensus_committed_messages: register_int_gauge_vec_with_registry!(
714                "consensus_committed_messages",
715                "Total number of committed consensus messages, sliced by author",
716                &["authority"],
717                registry,
718            ).unwrap(),
719            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
720                "consensus_committed_user_transactions",
721                "Number of committed user transactions, sliced by submitter",
722                &["authority"],
723                registry,
724            ).unwrap(),
725            consensus_handler_leader_round: register_int_gauge_with_registry!(
726                "consensus_handler_leader_round",
727                "The leader round of the current consensus output being processed in the consensus handler",
728                registry,
729            ).unwrap(),
730            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
731            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
732            zklogin_sig_count: register_int_counter_with_registry!(
733                "zklogin_sig_count",
734                "Count of zkLogin signatures",
735                registry,
736            )
737            .unwrap(),
738            multisig_sig_count: register_int_counter_with_registry!(
739                "multisig_sig_count",
740                "Count of zkLogin signatures",
741                registry,
742            )
743            .unwrap(),
744            consensus_calculated_throughput: register_int_gauge_with_registry!(
745                "consensus_calculated_throughput",
746                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
747                registry,
748            ).unwrap(),
749            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
750                "consensus_calculated_throughput_profile",
751                "The current active calculated throughput profile",
752                registry
753            ).unwrap(),
754            execution_queueing_latency: LatencyObserver::new(),
755            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
756            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757        }
758    }
759
760    /// Reset metrics that contain `hostname` as one of the labels. This is
761    /// needed to avoid retaining metrics for long-gone committee members and
762    /// only exposing metrics for the committee in the current epoch.
763    pub fn reset_on_reconfigure(&self) {
764        self.consensus_committed_messages.reset();
765        self.consensus_handler_scores.reset();
766        self.consensus_committed_user_transactions.reset();
767    }
768}
769
770/// a Trait object for `Signer` that is:
771/// - Pin, i.e. confined to one place in memory (we don't want to copy private
772///   keys).
773/// - Sync, i.e. can be safely shared between threads.
774///
775/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
776pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
777
778pub struct AuthorityState {
779    // Fixed size, static, identity of the authority
780    /// The name of this authority.
781    pub name: AuthorityName,
782    /// The signature key of the authority.
783    pub secret: StableSyncAuthoritySigner,
784
785    /// The database
786    input_loader: TransactionInputLoader,
787    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
788
789    epoch_store: ArcSwap<AuthorityPerEpochStore>,
790
791    /// This lock denotes current 'execution epoch'.
792    /// Execution acquires read lock, checks certificate epoch and holds it
793    /// until all writes are complete. Reconfiguration acquires write lock,
794    /// changes the epoch and revert all transactions from previous epoch
795    /// that are executed but did not make into checkpoint.
796    execution_lock: RwLock<EpochId>,
797
798    pub indexes: Option<Arc<IndexStore>>,
799    pub rest_index: Option<Arc<RestIndexStore>>,
800
801    pub subscription_handler: Arc<SubscriptionHandler>,
802    pub checkpoint_store: Arc<CheckpointStore>,
803
804    committee_store: Arc<CommitteeStore>,
805
806    /// Manages pending certificates and their missing input objects.
807    transaction_manager: Arc<TransactionManager>,
808
809    /// Shuts down the execution task. Used only in testing.
810    #[cfg_attr(not(test), expect(unused))]
811    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
812
813    pub metrics: Arc<AuthorityMetrics>,
814    _pruner: AuthorityStorePruner,
815    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
816
817    /// Take db checkpoints of different dbs
818    db_checkpoint_config: DBCheckpointConfig,
819
820    pub config: NodeConfig,
821
822    /// Current overload status in this authority. Updated periodically.
823    pub overload_info: AuthorityOverloadInfo,
824
825    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
826
827    /// The chain identifier is derived from the digest of the genesis
828    /// checkpoint.
829    chain_identifier: ChainIdentifier,
830
831    pub(crate) congestion_tracker: Arc<CongestionTracker>,
832}
833
834/// The authority state encapsulates all state, drives execution, and ensures
835/// safety.
836///
837/// Note the authority operations can be accessed through a read ref (&) and do
838/// not require &mut. Internally a database is synchronized through a mutex
839/// lock.
840///
841/// Repeating valid commands should produce no changes and return no error.
842impl AuthorityState {
843    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
844        epoch_store.committee().authority_exists(&self.name)
845    }
846
847    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
848        epoch_store
849            .active_validators()
850            .iter()
851            .any(|a| AuthorityName::from(a) == self.name)
852    }
853
854    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
855        !self.is_committee_validator(epoch_store)
856    }
857
858    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
859        &self.committee_store
860    }
861
862    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
863        self.committee_store.clone()
864    }
865
866    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
867        &self.config.authority_overload_config
868    }
869
870    pub fn get_epoch_state_commitments(
871        &self,
872        epoch: EpochId,
873    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
874        self.checkpoint_store.get_epoch_state_commitments(epoch)
875    }
876
877    /// This is a private method and should be kept that way. It doesn't check
878    /// whether the provided transaction is a system transaction, and hence
879    /// can only be called internally.
880    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
881    async fn handle_transaction_impl(
882        &self,
883        transaction: VerifiedTransaction,
884        epoch_store: &Arc<AuthorityPerEpochStore>,
885    ) -> IotaResult<VerifiedSignedTransaction> {
886        // Ensure that validator cannot reconfigure while we are signing the tx
887        let _execution_lock = self.execution_lock_for_signing();
888
889        let protocol_config = epoch_store.protocol_config();
890        let reference_gas_price = epoch_store.reference_gas_price();
891
892        let epoch = epoch_store.epoch();
893
894        let tx_data = transaction.data().transaction_data();
895
896        // Note: the deny checks may do redundant package loads but:
897        // - they only load packages when there is an active package deny map
898        // - the loads are cached anyway
899        iota_transaction_checks::deny::check_transaction_for_signing(
900            tx_data,
901            transaction.tx_signatures(),
902            &transaction.input_objects()?,
903            &tx_data.receiving_objects(),
904            &self.config.transaction_deny_config,
905            self.get_backing_package_store().as_ref(),
906        )?;
907
908        // Load all transaction-related input objects.
909        // Authenticator input objects and the account object are loaded in the same
910        // call if there is a sender `MoveAuthenticator` signature present in the
911        // transaction.
912        let (tx_input_objects, tx_receiving_objects, auth_input_objects, account_object) =
913            self.read_objects_for_signing(&transaction, epoch)?;
914
915        // Get the sender `MoveAuthenticator`, if any.
916        // Only one `MoveAuthenticator` signature is possible, since it is not
917        // implemented for the sponsor at the moment.
918        let move_authenticator = transaction.sender_move_authenticator();
919
920        // Check the inputs for signing.
921        // If there is a sender `MoveAuthenticator` signature, its input objects and the
922        // account object are also checked and must be provided.
923        // It is also checked if there is enough gas to execute the transaction and its
924        // authenticators.
925        let (
926            gas_status,
927            tx_checked_input_objects,
928            auth_checked_input_objects,
929            authenticator_function_ref,
930        ) = self.check_transaction_inputs_for_signing(
931            protocol_config,
932            reference_gas_price,
933            tx_data,
934            tx_input_objects,
935            &tx_receiving_objects,
936            move_authenticator,
937            auth_input_objects,
938            account_object,
939        )?;
940
941        check_coin_deny_list_v1_during_signing(
942            tx_data.sender(),
943            &tx_checked_input_objects,
944            &tx_receiving_objects,
945            &auth_checked_input_objects,
946            &self.get_object_store(),
947        )?;
948
949        if let Some(move_authenticator) = move_authenticator {
950            // It is supposed that `MoveAuthenticator` availability is checked in
951            // `SenderSignedData::validity_check`.
952
953            let auth_checked_input_objects = auth_checked_input_objects
954                .expect("MoveAuthenticator input objects must be provided");
955            let authenticator_function_ref = authenticator_function_ref
956                .expect("AuthenticatorFunctionRefV1 object must be provided");
957
958            let (kind, signer, _) = tx_data.execution_parts();
959
960            // Execute the Move authenticator.
961            let validation_result = epoch_store.executor().authenticate_transaction(
962                self.get_backing_store().as_ref(),
963                protocol_config,
964                self.metrics.limits_metrics.clone(),
965                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
966                epoch_store
967                    .epoch_start_config()
968                    .epoch_data()
969                    .epoch_start_timestamp(),
970                gas_status,
971                move_authenticator.to_owned(),
972                authenticator_function_ref,
973                auth_checked_input_objects,
974                kind,
975                signer,
976                transaction.digest().to_owned(),
977                &mut None,
978            );
979
980            if let Err(validation_error) = validation_result {
981                return Err(IotaError::MoveAuthenticatorExecutionFailure {
982                    error: validation_error.to_string(),
983                });
984            }
985        }
986
987        let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
988
989        let signed_transaction =
990            VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
991
992        // Check and write locks, to signed transaction, into the database
993        // The call to self.set_transaction_lock checks the lock is not conflicting,
994        // and returns ConflictingTransaction error in case there is a lock on a
995        // different existing transaction.
996        self.get_cache_writer().try_acquire_transaction_locks(
997            epoch_store,
998            &owned_objects,
999            signed_transaction.clone(),
1000        )?;
1001
1002        Ok(signed_transaction)
1003    }
1004
1005    /// Initiate a new transaction.
1006    #[instrument(name = "handle_transaction", level = "trace", skip_all)]
1007    pub async fn handle_transaction(
1008        &self,
1009        epoch_store: &Arc<AuthorityPerEpochStore>,
1010        transaction: VerifiedTransaction,
1011    ) -> IotaResult<HandleTransactionResponse> {
1012        let tx_digest = *transaction.digest();
1013        debug!("handle_transaction");
1014
1015        // Ensure an idempotent answer.
1016        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1017            return Ok(HandleTransactionResponse { status });
1018        }
1019
1020        let _metrics_guard = self
1021            .metrics
1022            .authority_state_handle_transaction_latency
1023            .start_timer();
1024        self.metrics.tx_orders.inc();
1025
1026        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1027        match signed {
1028            Ok(s) => {
1029                if self.is_committee_validator(epoch_store) {
1030                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1031                        let tx = s.clone();
1032                        let validator_tx_finalizer = validator_tx_finalizer.clone();
1033                        let cache_reader = self.get_transaction_cache_reader().clone();
1034                        let epoch_store = epoch_store.clone();
1035                        spawn_monitored_task!(epoch_store.within_alive_epoch(
1036                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1037                        ));
1038                    }
1039                }
1040                Ok(HandleTransactionResponse {
1041                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
1042                })
1043            }
1044            // It happens frequently that while we are checking the validity of the transaction, it
1045            // has just been executed.
1046            // In that case, we could still return Ok to avoid showing confusing errors.
1047            Err(err) => Ok(HandleTransactionResponse {
1048                status: self
1049                    .get_transaction_status(&tx_digest, epoch_store)?
1050                    .ok_or(err)?
1051                    .1,
1052            }),
1053        }
1054    }
1055
1056    pub fn check_system_overload_at_signing(&self) -> bool {
1057        self.config
1058            .authority_overload_config
1059            .check_system_overload_at_signing
1060    }
1061
1062    pub fn check_system_overload_at_execution(&self) -> bool {
1063        self.config
1064            .authority_overload_config
1065            .check_system_overload_at_execution
1066    }
1067
1068    pub(crate) fn check_system_overload(
1069        &self,
1070        consensus_adapter: &Arc<ConsensusAdapter>,
1071        tx_data: &SenderSignedData,
1072        do_authority_overload_check: bool,
1073    ) -> IotaResult {
1074        if do_authority_overload_check {
1075            self.check_authority_overload(tx_data).tap_err(|_| {
1076                self.update_overload_metrics("execution_queue");
1077            })?;
1078        }
1079        self.transaction_manager
1080            .check_execution_overload(self.overload_config(), tx_data)
1081            .tap_err(|_| {
1082                self.update_overload_metrics("execution_pending");
1083            })?;
1084        consensus_adapter.check_consensus_overload().tap_err(|_| {
1085            self.update_overload_metrics("consensus");
1086        })?;
1087
1088        let pending_tx_count = self
1089            .get_cache_commit()
1090            .approximate_pending_transaction_count();
1091        if pending_tx_count
1092            > self
1093                .config
1094                .execution_cache_config
1095                .writeback_cache
1096                .backpressure_threshold_for_rpc()
1097        {
1098            return Err(IotaError::ValidatorOverloadedRetryAfter {
1099                retry_after_secs: 10,
1100            });
1101        }
1102
1103        Ok(())
1104    }
1105
1106    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1107        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1108            return Ok(());
1109        }
1110
1111        let load_shedding_percentage = self
1112            .overload_info
1113            .load_shedding_percentage
1114            .load(Ordering::Relaxed);
1115        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1116    }
1117
1118    fn update_overload_metrics(&self, source: &str) {
1119        self.metrics
1120            .transaction_overload_sources
1121            .with_label_values(&[source])
1122            .inc();
1123    }
1124
1125    /// Executes a certificate for its effects.
1126    #[instrument(level = "trace", skip_all)]
1127    pub async fn execute_certificate(
1128        &self,
1129        certificate: &VerifiedCertificate,
1130        epoch_store: &Arc<AuthorityPerEpochStore>,
1131    ) -> IotaResult<TransactionEffects> {
1132        let _metrics_guard = if certificate.contains_shared_object() {
1133            self.metrics
1134                .execute_certificate_latency_shared_object
1135                .start_timer()
1136        } else {
1137            self.metrics
1138                .execute_certificate_latency_single_writer
1139                .start_timer()
1140        };
1141        trace!("execute_certificate");
1142
1143        self.metrics.total_cert_attempts.inc();
1144
1145        if !certificate.contains_shared_object() {
1146            // Shared object transactions need to be sequenced by the consensus before
1147            // enqueueing for execution, done in
1148            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1149            // object transactions, they can be enqueued for execution immediately.
1150            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1151        }
1152
1153        // tx could be reverted when epoch ends, so we must be careful not to return a
1154        // result here after the epoch ends.
1155        epoch_store
1156            .within_alive_epoch(self.notify_read_effects(certificate))
1157            .await
1158            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1159            .and_then(|r| r)
1160    }
1161
1162    /// Internal logic to execute a certificate.
1163    ///
1164    /// Guarantees that
1165    /// - If input objects are available, return no permanent failure.
1166    /// - Execution and output commit are atomic. i.e. outputs are only written
1167    ///   to storage,
1168    /// on successful execution; crashed execution has no observable effect and
1169    /// can be retried.
1170    ///
1171    /// It is caller's responsibility to ensure input objects are available and
1172    /// locks are set. If this cannot be satisfied by the caller,
1173    /// execute_certificate() should be called instead.
1174    ///
1175    /// Should only be called within iota-core.
1176    #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1177    pub fn try_execute_immediately(
1178        &self,
1179        certificate: &VerifiedExecutableTransaction,
1180        expected_effects_digest: Option<TransactionEffectsDigest>,
1181        epoch_store: &Arc<AuthorityPerEpochStore>,
1182    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1183        let _scope = monitored_scope("Execution::try_execute_immediately");
1184        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1185
1186        let tx_digest = certificate.digest();
1187
1188        // Acquire a lock to prevent concurrent executions of the same transaction.
1189        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1190
1191        // The cert could have been processed by a concurrent attempt of the same cert,
1192        // so check if the effects have already been written.
1193        if let Some(effects) = self
1194            .get_transaction_cache_reader()
1195            .try_get_executed_effects(tx_digest)?
1196        {
1197            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1198                assert_eq!(
1199                    effects.digest(),
1200                    expected_effects_digest_inner,
1201                    "Unexpected effects digest for transaction {tx_digest:?}"
1202                );
1203            }
1204            tx_guard.release();
1205            return Ok((effects, None));
1206        }
1207
1208        let (tx_input_objects, authenticator_input_objects, account_object) =
1209            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1210
1211        // If no expected_effects_digest was provided, try to get it from storage.
1212        // We could be re-executing a previously executed but uncommitted transaction,
1213        // perhaps after restarting with a new binary. In this situation, if
1214        // we have published an effects signature, we must be sure not to
1215        // equivocate. TODO: read from cache instead of DB
1216        let expected_effects_digest =
1217            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1218
1219        self.process_certificate(
1220            tx_guard,
1221            certificate,
1222            tx_input_objects,
1223            account_object,
1224            authenticator_input_objects,
1225            expected_effects_digest,
1226            epoch_store,
1227        )
1228        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1229        .tap_ok(
1230            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1231        )
1232    }
1233
1234    pub fn read_objects_for_execution(
1235        &self,
1236        tx_lock: &CertLockGuard,
1237        certificate: &VerifiedExecutableTransaction,
1238        epoch_store: &Arc<AuthorityPerEpochStore>,
1239    ) -> IotaResult<(InputObjects, Option<InputObjects>, Option<ObjectReadResult>)> {
1240        let _scope = monitored_scope("Execution::load_input_objects");
1241        let _metrics_guard = self
1242            .metrics
1243            .execution_load_input_objects_latency
1244            .start_timer();
1245
1246        let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1247
1248        let input_objects = self.input_loader.read_objects_for_execution(
1249            epoch_store,
1250            &certificate.key(),
1251            tx_lock,
1252            &input_objects,
1253            epoch_store.epoch(),
1254        )?;
1255
1256        certificate.split_input_objects_into_groups_for_reading(input_objects)
1257    }
1258
1259    /// Test only wrapper for `try_execute_immediately()` above, useful for
1260    /// checking errors if the pre-conditions are not satisfied, and
1261    /// executing change epoch transactions.
1262    pub fn try_execute_for_test(
1263        &self,
1264        certificate: &VerifiedCertificate,
1265    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1266        let epoch_store = self.epoch_store_for_testing();
1267        let (effects, execution_error_opt) = self.try_execute_immediately(
1268            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1269            None,
1270            &epoch_store,
1271        )?;
1272        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1273        Ok((signed_effects, execution_error_opt))
1274    }
1275
1276    /// Non-fallible version of `try_execute_for_test()`.
1277    pub fn execute_for_test(
1278        &self,
1279        certificate: &VerifiedCertificate,
1280    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1281        self.try_execute_for_test(certificate)
1282            .expect("try_execute_for_test should not fail")
1283    }
1284
1285    pub async fn notify_read_effects(
1286        &self,
1287        certificate: &VerifiedCertificate,
1288    ) -> IotaResult<TransactionEffects> {
1289        self.get_transaction_cache_reader()
1290            .try_notify_read_executed_effects(&[*certificate.digest()])
1291            .await
1292            .map(|mut r| r.pop().expect("must return correct number of effects"))
1293    }
1294
1295    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1296        self.get_object_cache_reader()
1297            .try_check_owned_objects_are_live(owned_object_refs)
1298    }
1299
1300    /// This function captures the required state to debug a forked transaction.
1301    /// The dump is written to a file in dir `path`, with name prefixed by the
1302    /// transaction digest. NOTE: Since this info escapes the validator
1303    /// context, make sure not to leak any private info here
1304    pub(crate) fn debug_dump_transaction_state(
1305        &self,
1306        tx_digest: &TransactionDigest,
1307        effects: &TransactionEffects,
1308        expected_effects_digest: TransactionEffectsDigest,
1309        inner_temporary_store: &InnerTemporaryStore,
1310        certificate: &VerifiedExecutableTransaction,
1311        debug_dump_config: &StateDebugDumpConfig,
1312    ) -> IotaResult<PathBuf> {
1313        let dump_dir = debug_dump_config
1314            .dump_file_directory
1315            .as_ref()
1316            .cloned()
1317            .unwrap_or(std::env::temp_dir());
1318        let epoch_store = self.load_epoch_store_one_call_per_task();
1319
1320        NodeStateDump::new(
1321            tx_digest,
1322            effects,
1323            expected_effects_digest,
1324            self.get_object_store().as_ref(),
1325            &epoch_store,
1326            inner_temporary_store,
1327            certificate,
1328        )?
1329        .write_to_file(&dump_dir)
1330        .map_err(|e| IotaError::FileIO(e.to_string()))
1331    }
1332
1333    #[instrument(level = "trace", skip_all)]
1334    pub(crate) fn process_certificate(
1335        &self,
1336        tx_guard: CertTxGuard,
1337        certificate: &VerifiedExecutableTransaction,
1338        tx_input_objects: InputObjects,
1339        account_object: Option<ObjectReadResult>,
1340        authenticator_input_objects: Option<InputObjects>,
1341        expected_effects_digest: Option<TransactionEffectsDigest>,
1342        epoch_store: &Arc<AuthorityPerEpochStore>,
1343    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1344        let process_certificate_start_time = tokio::time::Instant::now();
1345        let digest = *certificate.digest();
1346
1347        fail_point_if!("correlated-crash-process-certificate", || {
1348            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1349                iota_simulator::task::kill_current_node(None);
1350            }
1351        });
1352
1353        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1354        // Any caller that verifies the signatures on the certificate will have already
1355        // checked the epoch. But paths that don't verify sigs (e.g. execution
1356        // from checkpoint, reading from db) present the possibility of an epoch
1357        // mismatch. If this cert is not finalzied in previous epoch, then it's
1358        // invalid.
1359        let execution_guard = match execution_guard {
1360            Ok(execution_guard) => execution_guard,
1361            Err(err) => {
1362                tx_guard.release();
1363                return Err(err);
1364            }
1365        };
1366        // Since we obtain a reference to the epoch store before taking the execution
1367        // lock, it's possible that reconfiguration has happened and they no
1368        // longer match.
1369        if *execution_guard != epoch_store.epoch() {
1370            tx_guard.release();
1371            info!("The epoch of the execution_guard doesn't match the epoch store");
1372            return Err(IotaError::WrongEpoch {
1373                expected_epoch: epoch_store.epoch(),
1374                actual_epoch: *execution_guard,
1375            });
1376        }
1377
1378        // Errors originating from prepare_certificate may be transient (failure to read
1379        // locks) or non-transient (transaction input is invalid, move vm
1380        // errors). However, all errors from this function occur before we have
1381        // written anything to the db, so we commit the tx guard and rely on the
1382        // client to retry the tx (if it was transient).
1383        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1384            &execution_guard,
1385            certificate,
1386            tx_input_objects,
1387            account_object,
1388            authenticator_input_objects,
1389            epoch_store,
1390        ) {
1391            Err(e) => {
1392                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1393                tx_guard.release();
1394                return Err(e);
1395            }
1396            Ok(res) => res,
1397        };
1398
1399        if let Some(expected_effects_digest) = expected_effects_digest {
1400            if effects.digest() != expected_effects_digest {
1401                // We dont want to mask the original error, so we log it and continue.
1402                match self.debug_dump_transaction_state(
1403                    &digest,
1404                    &effects,
1405                    expected_effects_digest,
1406                    &inner_temporary_store,
1407                    certificate,
1408                    &self.config.state_debug_dump_config,
1409                ) {
1410                    Ok(out_path) => {
1411                        info!(
1412                            "Dumped node state for transaction {} to {}",
1413                            digest,
1414                            out_path.as_path().display().to_string()
1415                        );
1416                    }
1417                    Err(e) => {
1418                        error!("Error dumping state for transaction {}: {e}", digest);
1419                    }
1420                }
1421                error!(
1422                    tx_digest = ?digest,
1423                    ?expected_effects_digest,
1424                    actual_effects = ?effects,
1425                    "fork detected!"
1426                );
1427                panic!(
1428                    "Transaction {} is expected to have effects digest {}, but got {}!",
1429                    digest,
1430                    expected_effects_digest,
1431                    effects.digest(),
1432                );
1433            }
1434        }
1435
1436        fail_point!("crash");
1437
1438        self.commit_certificate(
1439            certificate,
1440            inner_temporary_store,
1441            &effects,
1442            tx_guard,
1443            execution_guard,
1444            epoch_store,
1445        )?;
1446
1447        if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1448            certificate.data().transaction_data().kind()
1449        {
1450            if let Some(err) = &execution_error_opt {
1451                debug_fatal!("Authenticator state update failed: {:?}", err);
1452            }
1453            epoch_store.update_authenticator_state(auth_state);
1454
1455            // double check that the signature verifier always matches the authenticator
1456            // state
1457            if cfg!(debug_assertions) {
1458                let authenticator_state = get_authenticator_state(self.get_object_store())
1459                    .expect("Read cannot fail")
1460                    .expect("Authenticator state must exist");
1461
1462                let mut sys_jwks: Vec<_> = authenticator_state
1463                    .active_jwks
1464                    .into_iter()
1465                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1466                    .collect();
1467                let mut active_jwks: Vec<_> = epoch_store
1468                    .signature_verifier
1469                    .get_jwks()
1470                    .into_iter()
1471                    .collect();
1472                sys_jwks.sort();
1473                active_jwks.sort();
1474
1475                assert_eq!(sys_jwks, active_jwks);
1476            }
1477        }
1478
1479        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1480        if elapsed > 0.0 {
1481            self.metrics
1482                .execution_gas_latency_ratio
1483                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1484        };
1485        Ok((effects, execution_error_opt))
1486    }
1487
1488    #[instrument(level = "trace", skip_all)]
1489    fn commit_certificate(
1490        &self,
1491        certificate: &VerifiedExecutableTransaction,
1492        inner_temporary_store: InnerTemporaryStore,
1493        effects: &TransactionEffects,
1494        tx_guard: CertTxGuard,
1495        _execution_guard: ExecutionLockReadGuard<'_>,
1496        epoch_store: &Arc<AuthorityPerEpochStore>,
1497    ) -> IotaResult {
1498        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1499            monitored_scope("Execution::commit_certificate");
1500        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1501
1502        let tx_key = certificate.key();
1503        let tx_digest = certificate.digest();
1504        let input_object_count = inner_temporary_store.input_objects.len();
1505        let shared_object_count = effects.input_shared_objects().len();
1506
1507        let output_keys = inner_temporary_store.get_output_keys(effects);
1508
1509        // index certificate
1510        let _ = self
1511            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1512            .tap_err(|e| {
1513                self.metrics.post_processing_total_failures.inc();
1514                error!(?tx_digest, "tx post processing failed: {e}");
1515            });
1516
1517        // The insertion to epoch_store is not atomic with the insertion to the
1518        // perpetual store. This is OK because we insert to the epoch store
1519        // first. And during lookups we always look up in the perpetual store first.
1520        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1521
1522        // Allow testing what happens if we crash here.
1523        fail_point!("crash");
1524
1525        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1526            certificate.clone().into_unsigned(),
1527            effects.clone(),
1528            inner_temporary_store,
1529        );
1530        self.get_cache_writer()
1531            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1532
1533        if certificate.transaction_data().is_end_of_epoch_tx() {
1534            // At the end of epoch, since system packages may have been upgraded, force
1535            // reload them in the cache.
1536            self.get_object_cache_reader()
1537                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1538        }
1539
1540        // commit_certificate finished, the tx is fully committed to the store.
1541        tx_guard.commit_tx();
1542
1543        // Notifies transaction manager about transaction and output objects committed.
1544        // This provides necessary information to transaction manager to start executing
1545        // additional ready transactions.
1546        self.transaction_manager
1547            .notify_commit(tx_digest, output_keys, epoch_store);
1548
1549        self.update_metrics(certificate, input_object_count, shared_object_count);
1550
1551        Ok(())
1552    }
1553
1554    fn update_metrics(
1555        &self,
1556        certificate: &VerifiedExecutableTransaction,
1557        input_object_count: usize,
1558        shared_object_count: usize,
1559    ) {
1560        // count signature by scheme, for zklogin and multisig
1561        if certificate.has_zklogin_sig() {
1562            self.metrics.zklogin_sig_count.inc();
1563        } else if certificate.has_upgraded_multisig() {
1564            self.metrics.multisig_sig_count.inc();
1565        }
1566
1567        self.metrics.total_effects.inc();
1568        self.metrics.total_certs.inc();
1569
1570        if shared_object_count > 0 {
1571            self.metrics.shared_obj_tx.inc();
1572        }
1573
1574        if certificate.is_sponsored_tx() {
1575            self.metrics.sponsored_tx.inc();
1576        }
1577
1578        self.metrics
1579            .num_input_objs
1580            .observe(input_object_count as f64);
1581        self.metrics
1582            .num_shared_objects
1583            .observe(shared_object_count as f64);
1584        self.metrics.batch_size.observe(
1585            certificate
1586                .data()
1587                .intent_message()
1588                .value
1589                .kind()
1590                .num_commands() as f64,
1591        );
1592    }
1593
1594    /// prepare_certificate validates the transaction input, and executes the
1595    /// certificate, returning effects, output objects, events, etc.
1596    ///
1597    /// It reads state from the db (both owned and shared locks), but it has no
1598    /// side effects.
1599    ///
1600    /// It can be generally understood that a failure of prepare_certificate
1601    /// indicates a non-transient error, e.g. the transaction input is
1602    /// somehow invalid, the correct locks are not held, etc. However, this
1603    /// is not entirely true, as a transient db read error may also cause
1604    /// this function to fail.
1605    #[instrument(level = "trace", skip_all)]
1606    fn prepare_certificate(
1607        &self,
1608        _execution_guard: &ExecutionLockReadGuard<'_>,
1609        certificate: &VerifiedExecutableTransaction,
1610        tx_input_objects: InputObjects,
1611        account_object: Option<ObjectReadResult>,
1612        move_authenticator_input_objects: Option<InputObjects>,
1613        epoch_store: &Arc<AuthorityPerEpochStore>,
1614    ) -> IotaResult<(
1615        InnerTemporaryStore,
1616        TransactionEffects,
1617        Option<ExecutionError>,
1618    )> {
1619        let _scope = monitored_scope("Execution::prepare_certificate");
1620        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1621        let prepare_certificate_start_time = tokio::time::Instant::now();
1622
1623        let protocol_config = epoch_store.protocol_config();
1624
1625        let reference_gas_price = epoch_store.reference_gas_price();
1626
1627        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1628        let epoch_start_timestamp = epoch_store
1629            .epoch_start_config()
1630            .epoch_data()
1631            .epoch_start_timestamp();
1632
1633        let backing_store = self.get_backing_store().as_ref();
1634
1635        let tx_digest = *certificate.digest();
1636
1637        // TODO: We need to move this to a more appropriate place to avoid redundant
1638        // checks.
1639        let tx_data = certificate.data().transaction_data();
1640        tx_data.validity_check(protocol_config)?;
1641
1642        let (kind, signer, gas) = tx_data.execution_parts();
1643
1644        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1645        let (inner_temp_store, _, mut effects, execution_error_opt) = if let Some(
1646            move_authenticator,
1647        ) =
1648            certificate.sender_move_authenticator()
1649        {
1650            // Check if the sender needs to be authenticated in Move and, if so, execute the
1651            // authentication.
1652            // It is supposed that `MoveAuthenticator` availability is checked in
1653            // `SenderSignedData::validity_check`.
1654
1655            // Check basic `object_to_authenticate` preconditions and get its components.
1656            let (
1657                auth_account_object_id,
1658                auth_account_object_seq_number,
1659                auth_account_object_digest,
1660            ) = move_authenticator.object_to_authenticate_components()?;
1661
1662            // Since the `object_to_authenticate` components are provided, it is supposed
1663            // that the account object is loaded.
1664            let account_object = account_object.expect("Account object must be provided");
1665
1666            let authenticator_function_ref_for_execution = self.check_move_account(
1667                auth_account_object_id,
1668                auth_account_object_seq_number,
1669                auth_account_object_digest,
1670                account_object,
1671                &signer,
1672            )?;
1673
1674            let move_authenticator_input_objects = move_authenticator_input_objects.expect(
1675                    "In case of a `MoveAuthenticator` signature, the authenticator input objects must be provided",
1676                );
1677
1678            // Check the `MoveAuthenticator` input objects.
1679            // The `MoveAuthenticator` receiving objects are checked on the signing step.
1680            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
1681            // not a part of the transaction data.
1682            let authenticator_gas_budget = protocol_config.max_auth_gas();
1683            let (
1684                gas_status,
1685                authenticator_checked_input_objects,
1686                authenticator_and_tx_checked_input_objects,
1687            ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1688                certificate,
1689                tx_input_objects,
1690                move_authenticator_input_objects,
1691                authenticator_gas_budget,
1692                protocol_config,
1693                reference_gas_price,
1694            )?;
1695
1696            let owned_object_refs = authenticator_and_tx_checked_input_objects
1697                .inner()
1698                .filter_owned_objects();
1699            self.check_owned_locks(&owned_object_refs)?;
1700
1701            epoch_store
1702                .executor()
1703                .authenticate_then_execute_transaction_to_effects(
1704                    backing_store,
1705                    protocol_config,
1706                    self.metrics.limits_metrics.clone(),
1707                    self.config
1708                        .expensive_safety_check_config
1709                        .enable_deep_per_tx_iota_conservation_check(),
1710                    self.config.certificate_deny_config.certificate_deny_set(),
1711                    &epoch_id,
1712                    epoch_start_timestamp,
1713                    gas_status,
1714                    gas,
1715                    move_authenticator.to_owned(),
1716                    authenticator_function_ref_for_execution,
1717                    authenticator_checked_input_objects,
1718                    authenticator_and_tx_checked_input_objects,
1719                    kind,
1720                    signer,
1721                    tx_digest,
1722                    &mut None,
1723                )
1724        } else {
1725            // No Move authentication required, proceed to execute the transaction directly.
1726
1727            // The cost of partially re-auditing a transaction before execution is
1728            // tolerated.
1729            let (tx_gas_status, tx_checked_input_objects) =
1730                iota_transaction_checks::check_certificate_input(
1731                    certificate,
1732                    tx_input_objects,
1733                    protocol_config,
1734                    reference_gas_price,
1735                )?;
1736
1737            let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1738            self.check_owned_locks(&owned_object_refs)?;
1739            epoch_store.executor().execute_transaction_to_effects(
1740                backing_store,
1741                protocol_config,
1742                self.metrics.limits_metrics.clone(),
1743                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1744                // cyclic dependency w/ iota-adapter
1745                self.config
1746                    .expensive_safety_check_config
1747                    .enable_deep_per_tx_iota_conservation_check(),
1748                self.config.certificate_deny_config.certificate_deny_set(),
1749                &epoch_id,
1750                epoch_start_timestamp,
1751                tx_checked_input_objects,
1752                gas,
1753                tx_gas_status,
1754                kind,
1755                signer,
1756                tx_digest,
1757                &mut None,
1758            )
1759        };
1760
1761        fail_point_if!("cp_execution_nondeterminism", || {
1762            #[cfg(msim)]
1763            self.create_fail_state(certificate, epoch_store, &mut effects);
1764        });
1765
1766        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1767        if elapsed > 0.0 {
1768            self.metrics
1769                .prepare_cert_gas_latency_ratio
1770                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1771        }
1772
1773        Ok((inner_temp_store, effects, execution_error_opt.err()))
1774    }
1775
1776    pub fn prepare_certificate_for_benchmark(
1777        &self,
1778        certificate: &VerifiedExecutableTransaction,
1779        input_objects: InputObjects,
1780        epoch_store: &Arc<AuthorityPerEpochStore>,
1781    ) -> IotaResult<(
1782        InnerTemporaryStore,
1783        TransactionEffects,
1784        Option<ExecutionError>,
1785    )> {
1786        let lock = RwLock::new(epoch_store.epoch());
1787        let execution_guard = lock.try_read().unwrap();
1788
1789        self.prepare_certificate(
1790            &execution_guard,
1791            certificate,
1792            input_objects,
1793            None,
1794            None,
1795            epoch_store,
1796        )
1797    }
1798
1799    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1800    /// `VmChecks::Enabled` instead.
1801    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1802    #[allow(clippy::type_complexity)]
1803    pub fn dry_exec_transaction(
1804        &self,
1805        transaction: TransactionData,
1806        transaction_digest: TransactionDigest,
1807    ) -> IotaResult<(
1808        DryRunTransactionBlockResponse,
1809        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1810        TransactionEffects,
1811        Option<ObjectID>,
1812    )> {
1813        let epoch_store = self.load_epoch_store_one_call_per_task();
1814        if !self.is_fullnode(&epoch_store) {
1815            return Err(IotaError::UnsupportedFeature {
1816                error: "dry-exec is only supported on fullnodes".to_string(),
1817            });
1818        }
1819
1820        if transaction.kind().is_system_tx() {
1821            return Err(IotaError::UnsupportedFeature {
1822                error: "dry-exec does not support system transactions".to_string(),
1823            });
1824        }
1825
1826        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1827    }
1828
1829    #[allow(clippy::type_complexity)]
1830    pub fn dry_exec_transaction_for_benchmark(
1831        &self,
1832        transaction: TransactionData,
1833        transaction_digest: TransactionDigest,
1834    ) -> IotaResult<(
1835        DryRunTransactionBlockResponse,
1836        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1837        TransactionEffects,
1838        Option<ObjectID>,
1839    )> {
1840        let epoch_store = self.load_epoch_store_one_call_per_task();
1841        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1842    }
1843
1844    #[instrument(level = "trace", skip_all)]
1845    #[allow(clippy::type_complexity)]
1846    fn dry_exec_transaction_impl(
1847        &self,
1848        epoch_store: &AuthorityPerEpochStore,
1849        transaction: TransactionData,
1850        transaction_digest: TransactionDigest,
1851    ) -> IotaResult<(
1852        DryRunTransactionBlockResponse,
1853        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1854        TransactionEffects,
1855        Option<ObjectID>,
1856    )> {
1857        // Cheap validity checks for a transaction, including input size limits.
1858        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1859
1860        let input_object_kinds = transaction.input_objects()?;
1861        let receiving_object_refs = transaction.receiving_objects();
1862
1863        iota_transaction_checks::deny::check_transaction_for_signing(
1864            &transaction,
1865            &[],
1866            &input_object_kinds,
1867            &receiving_object_refs,
1868            &self.config.transaction_deny_config,
1869            self.get_backing_package_store().as_ref(),
1870        )?;
1871
1872        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1873            // We don't want to cache this transaction since it's a dry run.
1874            None,
1875            &input_object_kinds,
1876            &receiving_object_refs,
1877            epoch_store.epoch(),
1878        )?;
1879
1880        // make a gas object if one was not provided
1881        let mut transaction = transaction;
1882        let mut gas_object_refs = transaction.gas().to_vec();
1883        let reference_gas_price = epoch_store.reference_gas_price();
1884        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1885            let sender = transaction.gas_owner();
1886            let gas_object_id = ObjectID::random();
1887            let gas_object = Object::new_move(
1888                MoveObject::new_gas_coin(
1889                    OBJECT_START_VERSION,
1890                    gas_object_id,
1891                    SIMULATION_GAS_COIN_VALUE,
1892                ),
1893                Owner::AddressOwner(sender),
1894                TransactionDigest::genesis_marker(),
1895            );
1896            let gas_object_ref = gas_object.compute_object_reference();
1897            gas_object_refs = vec![gas_object_ref];
1898            // Add gas object to transaction gas payment
1899            transaction.gas_data_mut().payment = gas_object_refs.clone();
1900            (
1901                iota_transaction_checks::check_transaction_input_with_given_gas(
1902                    epoch_store.protocol_config(),
1903                    reference_gas_price,
1904                    &transaction,
1905                    input_objects,
1906                    receiving_objects,
1907                    gas_object,
1908                    &self.metrics.bytecode_verifier_metrics,
1909                    &self.config.verifier_signing_config,
1910                )?,
1911                Some(gas_object_id),
1912            )
1913        } else {
1914            // `MoveAuthenticator`s are not supported in dry runs, so we set the
1915            // `authenticator_gas_budget` to 0.
1916            let authenticator_gas_budget = 0;
1917
1918            (
1919                iota_transaction_checks::check_transaction_input(
1920                    epoch_store.protocol_config(),
1921                    reference_gas_price,
1922                    &transaction,
1923                    input_objects,
1924                    &receiving_objects,
1925                    &self.metrics.bytecode_verifier_metrics,
1926                    &self.config.verifier_signing_config,
1927                    authenticator_gas_budget,
1928                )?,
1929                None,
1930            )
1931        };
1932
1933        let protocol_config = epoch_store.protocol_config();
1934        let (kind, signer, _) = transaction.execution_parts();
1935
1936        let silent = true;
1937        let executor = iota_execution::executor(protocol_config, silent, None)
1938            .expect("Creating an executor should not fail here");
1939
1940        let expensive_checks = false;
1941        let (inner_temp_store, _, effects, execution_error) = executor
1942            .execute_transaction_to_effects(
1943                self.get_backing_store().as_ref(),
1944                protocol_config,
1945                self.metrics.limits_metrics.clone(),
1946                expensive_checks,
1947                self.config.certificate_deny_config.certificate_deny_set(),
1948                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1949                epoch_store
1950                    .epoch_start_config()
1951                    .epoch_data()
1952                    .epoch_start_timestamp(),
1953                checked_input_objects,
1954                gas_object_refs,
1955                gas_status,
1956                kind,
1957                signer,
1958                transaction_digest,
1959                &mut None,
1960            );
1961        let tx_digest = *effects.transaction_digest();
1962
1963        let module_cache =
1964            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1965
1966        let mut layout_resolver =
1967            epoch_store
1968                .executor()
1969                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1970                    &inner_temp_store,
1971                    self.get_backing_package_store(),
1972                )));
1973        // Returning empty vector here because we recalculate changes in the rpc layer.
1974        let object_changes = Vec::new();
1975
1976        // Returning empty vector here because we recalculate changes in the rpc layer.
1977        let balance_changes = Vec::new();
1978
1979        let written_with_kind = effects
1980            .created()
1981            .into_iter()
1982            .map(|(oref, _)| (oref, WriteKind::Create))
1983            .chain(
1984                effects
1985                    .unwrapped()
1986                    .into_iter()
1987                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1988            )
1989            .chain(
1990                effects
1991                    .mutated()
1992                    .into_iter()
1993                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
1994            )
1995            .map(|(oref, kind)| {
1996                let obj = inner_temp_store.written.get(&oref.0).unwrap();
1997                // TODO: Avoid clones.
1998                (oref.0, (oref, obj.clone(), kind))
1999            })
2000            .collect();
2001
2002        let execution_error_source = execution_error
2003            .as_ref()
2004            .err()
2005            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2006
2007        Ok((
2008            DryRunTransactionBlockResponse {
2009                // to avoid cloning `transaction`, fields are populated in this order
2010                suggested_gas_price: self
2011                    .congestion_tracker
2012                    .get_prediction_suggested_gas_price(&transaction),
2013                input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
2014                    .map_err(|e| IotaError::TransactionSerialization {
2015                        error: format!(
2016                            "Failed to convert transaction to IotaTransactionBlockData: {e}",
2017                        ),
2018                    })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
2019                effects: effects.clone().try_into()?,
2020                events: IotaTransactionBlockEvents::try_from(
2021                    inner_temp_store.events.clone(),
2022                    tx_digest,
2023                    None,
2024                    layout_resolver.as_mut(),
2025                )?,
2026                object_changes,
2027                balance_changes,
2028                execution_error_source,
2029            },
2030            written_with_kind,
2031            effects,
2032            mock_gas,
2033        ))
2034    }
2035
2036    pub fn simulate_transaction(
2037        &self,
2038        mut transaction: TransactionData,
2039        checks: VmChecks,
2040    ) -> IotaResult<SimulateTransactionResult> {
2041        if transaction.kind().is_system_tx() {
2042            return Err(IotaError::UnsupportedFeature {
2043                error: "simulate does not support system transactions".to_string(),
2044            });
2045        }
2046
2047        let epoch_store = self.load_epoch_store_one_call_per_task();
2048        if !self.is_fullnode(&epoch_store) {
2049            return Err(IotaError::UnsupportedFeature {
2050                error: "simulate is only supported on fullnodes".to_string(),
2051            });
2052        }
2053
2054        // Cheap validity checks for a transaction, including input size limits.
2055        // This does not check if gas objects are missing since we may create a
2056        // mock gas object. It checks for other transaction input validity.
2057        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2058
2059        let input_object_kinds = transaction.input_objects()?;
2060        let receiving_object_refs = transaction.receiving_objects();
2061
2062        // Since we need to simulate a validator signing the transaction, the first step
2063        // is to check if some transaction elements are denied.
2064        iota_transaction_checks::deny::check_transaction_for_signing(
2065            &transaction,
2066            &[],
2067            &input_object_kinds,
2068            &receiving_object_refs,
2069            &self.config.transaction_deny_config,
2070            self.get_backing_package_store().as_ref(),
2071        )?;
2072
2073        // Load input and receiving objects
2074        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2075            // We don't want to cache this transaction since it's a simulation.
2076            None,
2077            &input_object_kinds,
2078            &receiving_object_refs,
2079            epoch_store.epoch(),
2080        )?;
2081
2082        // Create a mock gas object if one was not provided
2083        let mock_gas_id = if transaction.gas().is_empty() {
2084            let mock_gas_object = Object::new_move(
2085                MoveObject::new_gas_coin(
2086                    OBJECT_START_VERSION,
2087                    ObjectID::MAX,
2088                    SIMULATION_GAS_COIN_VALUE,
2089                ),
2090                Owner::AddressOwner(transaction.gas_data().owner),
2091                TransactionDigest::genesis_marker(),
2092            );
2093            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2094            transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2095            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2096            Some(mock_gas_object.id())
2097        } else {
2098            None
2099        };
2100
2101        let protocol_config = epoch_store.protocol_config();
2102
2103        // `MoveAuthenticator`s are not supported in simulation, so we set the
2104        // `authenticator_gas_budget` to 0.
2105        let authenticator_gas_budget = 0;
2106
2107        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
2108        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
2109        let (gas_status, checked_input_objects) = if checks.enabled() {
2110            iota_transaction_checks::check_transaction_input(
2111                protocol_config,
2112                epoch_store.reference_gas_price(),
2113                &transaction,
2114                input_objects,
2115                &receiving_objects,
2116                &self.metrics.bytecode_verifier_metrics,
2117                &self.config.verifier_signing_config,
2118                authenticator_gas_budget,
2119            )?
2120        } else {
2121            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2122                protocol_config,
2123                transaction.kind(),
2124                input_objects,
2125                receiving_objects,
2126            )?;
2127            let gas_status = IotaGasStatus::new(
2128                transaction.gas_budget(),
2129                transaction.gas_price(),
2130                epoch_store.reference_gas_price(),
2131                protocol_config,
2132            )?;
2133
2134            (gas_status, checked_input_objects)
2135        };
2136
2137        // Create a new executor for the simulation
2138        let executor = iota_execution::executor(
2139            protocol_config,
2140            true, // silent
2141            None,
2142        )
2143        .expect("Creating an executor should not fail here");
2144
2145        // Execute the simulation
2146        let (kind, signer, gas_coins) = transaction.execution_parts();
2147        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2148            self.get_backing_store().as_ref(),
2149            protocol_config,
2150            self.metrics.limits_metrics.clone(),
2151            false, // expensive_checks
2152            self.config.certificate_deny_config.certificate_deny_set(),
2153            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2154            epoch_store
2155                .epoch_start_config()
2156                .epoch_data()
2157                .epoch_start_timestamp(),
2158            checked_input_objects,
2159            gas_coins,
2160            gas_status,
2161            kind,
2162            signer,
2163            transaction.digest(),
2164            checks.disabled(),
2165        );
2166
2167        // In the case of a dev inspect, the execution_result could be filled with some
2168        // values. Else, execution_result is empty in the case of a dry run.
2169        Ok(SimulateTransactionResult {
2170            input_objects: inner_temp_store.input_objects,
2171            output_objects: inner_temp_store.written,
2172            events: effects.events_digest().map(|_| inner_temp_store.events),
2173            effects,
2174            execution_result,
2175            suggested_gas_price: self
2176                .congestion_tracker
2177                .get_prediction_suggested_gas_price(&transaction),
2178            mock_gas_id,
2179        })
2180    }
2181
2182    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
2183    /// `VmChecks::DISABLED` instead.
2184    /// The object ID for gas can be any
2185    /// object ID, even for an uncreated object
2186    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2187    pub async fn dev_inspect_transaction_block(
2188        &self,
2189        sender: IotaAddress,
2190        transaction_kind: TransactionKind,
2191        gas_price: Option<u64>,
2192        gas_budget: Option<u64>,
2193        gas_sponsor: Option<IotaAddress>,
2194        gas_objects: Option<Vec<ObjectRef>>,
2195        show_raw_txn_data_and_effects: Option<bool>,
2196        skip_checks: Option<bool>,
2197    ) -> IotaResult<DevInspectResults> {
2198        let epoch_store = self.load_epoch_store_one_call_per_task();
2199
2200        if !self.is_fullnode(&epoch_store) {
2201            return Err(IotaError::UnsupportedFeature {
2202                error: "dev-inspect is only supported on fullnodes".to_string(),
2203            });
2204        }
2205
2206        if transaction_kind.is_system_tx() {
2207            return Err(IotaError::UnsupportedFeature {
2208                error: "system transactions are not supported".to_string(),
2209            });
2210        }
2211
2212        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2213        let skip_checks = skip_checks.unwrap_or(true);
2214        let reference_gas_price = epoch_store.reference_gas_price();
2215        let protocol_config = epoch_store.protocol_config();
2216        let max_tx_gas = protocol_config.max_tx_gas();
2217
2218        let price = gas_price.unwrap_or(reference_gas_price);
2219        let budget = gas_budget.unwrap_or(max_tx_gas);
2220        let owner = gas_sponsor.unwrap_or(sender);
2221        // Payment might be empty here, but it's fine we'll have to deal with it later
2222        // after reading all the input objects.
2223        let payment = gas_objects.unwrap_or_default();
2224        let transaction = TransactionData::V1(TransactionDataV1 {
2225            kind: transaction_kind.clone(),
2226            sender,
2227            gas_data: GasData {
2228                payment,
2229                owner,
2230                price,
2231                budget,
2232            },
2233            expiration: TransactionExpiration::None,
2234        });
2235
2236        let raw_txn_data = if show_raw_txn_data_and_effects {
2237            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2238                error: "Failed to serialize transaction during dev inspect".to_string(),
2239            })?
2240        } else {
2241            vec![]
2242        };
2243
2244        transaction.validity_check_no_gas_check(protocol_config)?;
2245
2246        let input_object_kinds = transaction.input_objects()?;
2247        let receiving_object_refs = transaction.receiving_objects();
2248
2249        iota_transaction_checks::deny::check_transaction_for_signing(
2250            &transaction,
2251            &[],
2252            &input_object_kinds,
2253            &receiving_object_refs,
2254            &self.config.transaction_deny_config,
2255            self.get_backing_package_store().as_ref(),
2256        )?;
2257
2258        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2259            // We don't want to cache this transaction since it's a dev inspect.
2260            None,
2261            &input_object_kinds,
2262            &receiving_object_refs,
2263            epoch_store.epoch(),
2264        )?;
2265
2266        // Create and use a dummy gas object if there is no gas object provided.
2267        let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2268            SIMULATION_GAS_COIN_VALUE,
2269            transaction.gas_owner(),
2270        );
2271
2272        let gas_objects = if transaction.gas().is_empty() {
2273            let gas_object_ref = dummy_gas_object.compute_object_reference();
2274            vec![gas_object_ref]
2275        } else {
2276            transaction.gas().to_vec()
2277        };
2278
2279        let (gas_status, checked_input_objects) = if skip_checks {
2280            // If we are skipping checks, then we call the check_dev_inspect_input function
2281            // which will perform only lightweight checks on the transaction
2282            // input. And if the gas field is empty, that means we will
2283            // use the dummy gas object so we need to add it to the input objects vector.
2284            if transaction.gas().is_empty() {
2285                input_objects.push(ObjectReadResult::new(
2286                    InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2287                    dummy_gas_object.into(),
2288                ));
2289            }
2290            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2291                protocol_config,
2292                &transaction_kind,
2293                input_objects,
2294                receiving_objects,
2295            )?;
2296            let gas_status = IotaGasStatus::new(
2297                max_tx_gas,
2298                transaction.gas_price(),
2299                reference_gas_price,
2300                protocol_config,
2301            )?;
2302
2303            (gas_status, checked_input_objects)
2304        } else {
2305            // If we are not skipping checks, then we call the check_transaction_input
2306            // function and its dummy gas variant which will perform full
2307            // fledged checks just like a real transaction execution.
2308            if transaction.gas().is_empty() {
2309                iota_transaction_checks::check_transaction_input_with_given_gas(
2310                    epoch_store.protocol_config(),
2311                    reference_gas_price,
2312                    &transaction,
2313                    input_objects,
2314                    receiving_objects,
2315                    dummy_gas_object,
2316                    &self.metrics.bytecode_verifier_metrics,
2317                    &self.config.verifier_signing_config,
2318                )?
2319            } else {
2320                // `MoveAuthenticator`s are not supported in dev inspects, so we set the
2321                // `authenticator_gas_budget` to 0.
2322                let authenticator_gas_budget = 0;
2323
2324                iota_transaction_checks::check_transaction_input(
2325                    epoch_store.protocol_config(),
2326                    reference_gas_price,
2327                    &transaction,
2328                    input_objects,
2329                    &receiving_objects,
2330                    &self.metrics.bytecode_verifier_metrics,
2331                    &self.config.verifier_signing_config,
2332                    authenticator_gas_budget,
2333                )?
2334            }
2335        };
2336
2337        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2338            .expect("Creating an executor should not fail here");
2339        let intent_msg = IntentMessage::new(
2340            Intent {
2341                version: IntentVersion::V0,
2342                scope: IntentScope::TransactionData,
2343                app_id: IntentAppId::Iota,
2344            },
2345            transaction,
2346        );
2347        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2348        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2349            self.get_backing_store().as_ref(),
2350            protocol_config,
2351            self.metrics.limits_metrics.clone(),
2352            // expensive checks
2353            false,
2354            self.config.certificate_deny_config.certificate_deny_set(),
2355            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2356            epoch_store
2357                .epoch_start_config()
2358                .epoch_data()
2359                .epoch_start_timestamp(),
2360            checked_input_objects,
2361            gas_objects,
2362            gas_status,
2363            transaction_kind,
2364            sender,
2365            transaction_digest,
2366            skip_checks,
2367        );
2368
2369        let raw_effects = if show_raw_txn_data_and_effects {
2370            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2371                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2372            })?
2373        } else {
2374            vec![]
2375        };
2376
2377        let mut layout_resolver =
2378            epoch_store
2379                .executor()
2380                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2381                    &inner_temp_store,
2382                    self.get_backing_package_store(),
2383                )));
2384
2385        DevInspectResults::new(
2386            effects,
2387            inner_temp_store.events.clone(),
2388            execution_result,
2389            raw_txn_data,
2390            raw_effects,
2391            layout_resolver.as_mut(),
2392        )
2393    }
2394
2395    // Only used for testing because of how epoch store is loaded.
2396    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2397        let epoch_store = self.epoch_store_for_testing();
2398        Ok(epoch_store.reference_gas_price())
2399    }
2400
2401    #[instrument(level = "trace", skip_all)]
2402    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2403        self.get_transaction_cache_reader()
2404            .try_is_tx_already_executed(digest)
2405    }
2406
2407    /// Non-fallible version of `try_is_tx_already_executed`.
2408    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2409        self.try_is_tx_already_executed(digest)
2410            .expect("storage access failed")
2411    }
2412
2413    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2414    #[instrument(level = "debug", skip_all, err)]
2415    fn index_tx(
2416        &self,
2417        indexes: &IndexStore,
2418        digest: &TransactionDigest,
2419        // TODO: index_tx really just need the transaction data here.
2420        cert: &VerifiedExecutableTransaction,
2421        effects: &TransactionEffects,
2422        events: &TransactionEvents,
2423        timestamp_ms: u64,
2424        tx_coins: Option<TxCoins>,
2425        written: &WrittenObjects,
2426        inner_temporary_store: &InnerTemporaryStore,
2427    ) -> IotaResult<u64> {
2428        let changes = self
2429            .process_object_index(effects, written, inner_temporary_store)
2430            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2431
2432        indexes.index_tx(
2433            cert.data().intent_message().value.sender(),
2434            cert.data()
2435                .intent_message()
2436                .value
2437                .input_objects()?
2438                .iter()
2439                .map(|o| o.object_id()),
2440            effects
2441                .all_changed_objects()
2442                .into_iter()
2443                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2444            cert.data()
2445                .intent_message()
2446                .value
2447                .move_calls()
2448                .into_iter()
2449                .map(|(package, module, function)| {
2450                    (*package, module.to_owned(), function.to_owned())
2451                }),
2452            events,
2453            changes,
2454            digest,
2455            timestamp_ms,
2456            tx_coins,
2457        )
2458    }
2459
2460    #[cfg(msim)]
2461    fn create_fail_state(
2462        &self,
2463        certificate: &VerifiedExecutableTransaction,
2464        epoch_store: &Arc<AuthorityPerEpochStore>,
2465        effects: &mut TransactionEffects,
2466    ) {
2467        use std::cell::RefCell;
2468        thread_local! {
2469            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2470        }
2471        if !certificate.data().intent_message().value.is_system_tx() {
2472            let committee = epoch_store.committee();
2473            let cur_stake = (**committee).weight(&self.name);
2474            if cur_stake > 0 {
2475                FAIL_STATE.with_borrow_mut(|fail_state| {
2476                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2477                    if fail_state.0 < committee.validity_threshold() {
2478                        fail_state.0 += cur_stake;
2479                        fail_state.1.insert(self.name);
2480                    }
2481
2482                    if fail_state.1.contains(&self.name) {
2483                        info!("cp_exec failing tx");
2484                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2485                    }
2486                });
2487            }
2488        }
2489    }
2490
2491    fn process_object_index(
2492        &self,
2493        effects: &TransactionEffects,
2494        written: &WrittenObjects,
2495        inner_temporary_store: &InnerTemporaryStore,
2496    ) -> IotaResult<ObjectIndexChanges> {
2497        let epoch_store = self.load_epoch_store_one_call_per_task();
2498        let mut layout_resolver =
2499            epoch_store
2500                .executor()
2501                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2502                    inner_temporary_store,
2503                    self.get_backing_package_store(),
2504                )));
2505
2506        let modified_at_version = effects
2507            .modified_at_versions()
2508            .into_iter()
2509            .collect::<HashMap<_, _>>();
2510
2511        let tx_digest = effects.transaction_digest();
2512        let mut deleted_owners = vec![];
2513        let mut deleted_dynamic_fields = vec![];
2514        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2515            let old_version = modified_at_version.get(&id).unwrap();
2516            // When we process the index, the latest object hasn't been written yet so
2517            // the old object must be present.
2518            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2519                |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2520            ) {
2521                Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2522                Owner::ObjectOwner(object_id) => {
2523                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2524                }
2525                _ => {}
2526            }
2527        }
2528
2529        let mut new_owners = vec![];
2530        let mut new_dynamic_fields = vec![];
2531
2532        for (oref, owner, kind) in effects.all_changed_objects() {
2533            let id = &oref.0;
2534            // For mutated objects, retrieve old owner and delete old index if there is a
2535            // owner change.
2536            if let WriteKind::Mutate = kind {
2537                let Some(old_version) = modified_at_version.get(id) else {
2538                    panic!(
2539                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2540                    );
2541                };
2542                // When we process the index, the latest object hasn't been written yet so
2543                // the old object must be present.
2544                let Some(old_object) = self
2545                    .get_object_store()
2546                    .try_get_object_by_key(id, *old_version)?
2547                else {
2548                    panic!(
2549                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2550                    );
2551                };
2552                if old_object.owner != owner {
2553                    match old_object.owner {
2554                        Owner::AddressOwner(addr) => {
2555                            deleted_owners.push((addr, *id));
2556                        }
2557                        Owner::ObjectOwner(object_id) => {
2558                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2559                        }
2560                        _ => {}
2561                    }
2562                }
2563            }
2564
2565            match owner {
2566                Owner::AddressOwner(addr) => {
2567                    // TODO: We can remove the object fetching after we added ObjectType to
2568                    // TransactionEffects
2569                    let new_object = written.get(id).unwrap_or_else(
2570                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2571                    );
2572                    assert_eq!(
2573                        new_object.version(),
2574                        oref.1,
2575                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2576                        tx_digest,
2577                        id,
2578                        new_object.version(),
2579                        oref.1
2580                    );
2581
2582                    let type_ = new_object
2583                        .type_()
2584                        .map(|type_| ObjectType::Struct(type_.clone()))
2585                        .unwrap_or(ObjectType::Package);
2586
2587                    new_owners.push((
2588                        (addr, *id),
2589                        ObjectInfo {
2590                            object_id: *id,
2591                            version: oref.1,
2592                            digest: oref.2,
2593                            type_,
2594                            owner,
2595                            previous_transaction: *effects.transaction_digest(),
2596                        },
2597                    ));
2598                }
2599                Owner::ObjectOwner(owner) => {
2600                    let new_object = written.get(id).unwrap_or_else(
2601                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2602                    );
2603                    assert_eq!(
2604                        new_object.version(),
2605                        oref.1,
2606                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2607                        tx_digest,
2608                        id,
2609                        new_object.version(),
2610                        oref.1
2611                    );
2612
2613                    let Some(df_info) = self
2614                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2615                        .unwrap_or_else(|e| {
2616                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2617                            None
2618                        }
2619                    )
2620                        else {
2621                            // Skip indexing for non dynamic field objects.
2622                            continue;
2623                        };
2624                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2625                }
2626                _ => {}
2627            }
2628        }
2629
2630        Ok(ObjectIndexChanges {
2631            deleted_owners,
2632            deleted_dynamic_fields,
2633            new_owners,
2634            new_dynamic_fields,
2635        })
2636    }
2637
2638    fn try_create_dynamic_field_info(
2639        &self,
2640        o: &Object,
2641        written: &WrittenObjects,
2642        resolver: &mut dyn LayoutResolver,
2643    ) -> IotaResult<Option<DynamicFieldInfo>> {
2644        // Skip if not a move object
2645        let Some(move_object) = o.data.try_as_move().cloned() else {
2646            return Ok(None);
2647        };
2648
2649        // We only index dynamic field objects
2650        if !move_object.type_().is_dynamic_field() {
2651            return Ok(None);
2652        }
2653
2654        let layout = resolver
2655            .get_annotated_layout(&move_object.type_().clone().into())?
2656            .into_layout();
2657
2658        let field =
2659            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2660                IotaError::ObjectDeserialization {
2661                    error: e.to_string(),
2662                }
2663            })?;
2664
2665        let type_ = field.kind;
2666        let name_type: TypeTag = field.name_layout.into();
2667        let bcs_name = field.name_bytes.to_owned();
2668
2669        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2670            .map_err(|e| {
2671                warn!("{e}");
2672                IotaError::ObjectDeserialization {
2673                    error: e.to_string(),
2674                }
2675            })?;
2676
2677        let name = DynamicFieldName {
2678            type_: name_type,
2679            value: IotaMoveValue::from(name_value).to_json_value(),
2680        };
2681
2682        let value_metadata = field.value_metadata().map_err(|e| {
2683            warn!("{e}");
2684            IotaError::ObjectDeserialization {
2685                error: e.to_string(),
2686            }
2687        })?;
2688
2689        Ok(Some(match value_metadata {
2690            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2691                name,
2692                bcs_name,
2693                type_,
2694                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2695                object_id: o.id(),
2696                version: o.version(),
2697                digest: o.digest(),
2698            },
2699
2700            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2701                // Find the actual object from storage using the object id obtained from the
2702                // wrapper.
2703
2704                // Try to find the object in the written objects first.
2705                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2706                    (
2707                        object.version(),
2708                        object.digest(),
2709                        object.data.type_().unwrap().clone(),
2710                    )
2711                } else {
2712                    // If not found, try to find it in the database.
2713                    let object = self
2714                        .get_object_store()
2715                        .try_get_object_by_key(&object_id, o.version())?
2716                        .ok_or_else(|| UserInputError::ObjectNotFound {
2717                            object_id,
2718                            version: Some(o.version()),
2719                        })?;
2720                    let version = object.version();
2721                    let digest = object.digest();
2722                    let object_type = object.data.type_().unwrap().clone();
2723                    (version, digest, object_type)
2724                };
2725
2726                DynamicFieldInfo {
2727                    name,
2728                    bcs_name,
2729                    type_,
2730                    object_type: object_type.to_string(),
2731                    object_id,
2732                    version,
2733                    digest,
2734                }
2735            }
2736        }))
2737    }
2738
2739    #[instrument(level = "trace", skip_all, err)]
2740    fn post_process_one_tx(
2741        &self,
2742        certificate: &VerifiedExecutableTransaction,
2743        effects: &TransactionEffects,
2744        inner_temporary_store: &InnerTemporaryStore,
2745        epoch_store: &Arc<AuthorityPerEpochStore>,
2746    ) -> IotaResult {
2747        if self.indexes.is_none() {
2748            return Ok(());
2749        }
2750
2751        let tx_digest = certificate.digest();
2752        let timestamp_ms = Self::unixtime_now_ms();
2753        let events = &inner_temporary_store.events;
2754        let written = &inner_temporary_store.written;
2755        let tx_coins =
2756            self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2757
2758        // Index tx
2759        if let Some(indexes) = &self.indexes {
2760            let _ = self
2761                .index_tx(
2762                    indexes.as_ref(),
2763                    tx_digest,
2764                    certificate,
2765                    effects,
2766                    events,
2767                    timestamp_ms,
2768                    tx_coins,
2769                    written,
2770                    inner_temporary_store,
2771                )
2772                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2773                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2774                .expect("Indexing tx should not fail");
2775
2776            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2777            let events = self.make_transaction_block_events(
2778                events.clone(),
2779                *tx_digest,
2780                timestamp_ms,
2781                epoch_store,
2782                inner_temporary_store,
2783            )?;
2784            // Emit events
2785            self.subscription_handler
2786                .process_tx(certificate.data().transaction_data(), &effects, &events)
2787                .tap_ok(|_| {
2788                    self.metrics
2789                        .post_processing_total_tx_had_event_processed
2790                        .inc()
2791                })
2792                .tap_err(|e| {
2793                    warn!(
2794                        ?tx_digest,
2795                        "Post processing - Couldn't process events for tx: {}", e
2796                    )
2797                })?;
2798
2799            self.metrics
2800                .post_processing_total_events_emitted
2801                .inc_by(events.data.len() as u64);
2802        };
2803        Ok(())
2804    }
2805
2806    fn make_transaction_block_events(
2807        &self,
2808        transaction_events: TransactionEvents,
2809        digest: TransactionDigest,
2810        timestamp_ms: u64,
2811        epoch_store: &Arc<AuthorityPerEpochStore>,
2812        inner_temporary_store: &InnerTemporaryStore,
2813    ) -> IotaResult<IotaTransactionBlockEvents> {
2814        let mut layout_resolver =
2815            epoch_store
2816                .executor()
2817                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2818                    inner_temporary_store,
2819                    self.get_backing_package_store(),
2820                )));
2821        IotaTransactionBlockEvents::try_from(
2822            transaction_events,
2823            digest,
2824            Some(timestamp_ms),
2825            layout_resolver.as_mut(),
2826        )
2827    }
2828
2829    pub fn unixtime_now_ms() -> u64 {
2830        let now = SystemTime::now()
2831            .duration_since(UNIX_EPOCH)
2832            .expect("Time went backwards")
2833            .as_millis();
2834        u64::try_from(now).expect("Travelling in time machine")
2835    }
2836
2837    #[instrument(level = "trace", skip_all)]
2838    pub async fn handle_transaction_info_request(
2839        &self,
2840        request: TransactionInfoRequest,
2841    ) -> IotaResult<TransactionInfoResponse> {
2842        let epoch_store = self.load_epoch_store_one_call_per_task();
2843        let (transaction, status) = self
2844            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2845            .ok_or(IotaError::TransactionNotFound {
2846                digest: request.transaction_digest,
2847            })?;
2848        Ok(TransactionInfoResponse {
2849            transaction,
2850            status,
2851        })
2852    }
2853
2854    #[instrument(level = "trace", skip_all)]
2855    pub async fn handle_object_info_request(
2856        &self,
2857        request: ObjectInfoRequest,
2858    ) -> IotaResult<ObjectInfoResponse> {
2859        let epoch_store = self.load_epoch_store_one_call_per_task();
2860
2861        let requested_object_seq = match request.request_kind {
2862            ObjectInfoRequestKind::LatestObjectInfo => {
2863                let (_, seq, _) = self
2864                    .try_get_object_or_tombstone(request.object_id)
2865                    .await?
2866                    .ok_or_else(|| {
2867                        IotaError::from(UserInputError::ObjectNotFound {
2868                            object_id: request.object_id,
2869                            version: None,
2870                        })
2871                    })?;
2872                seq
2873            }
2874            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2875        };
2876
2877        let object = self
2878            .get_object_store()
2879            .try_get_object_by_key(&request.object_id, requested_object_seq)?
2880            .ok_or_else(|| {
2881                IotaError::from(UserInputError::ObjectNotFound {
2882                    object_id: request.object_id,
2883                    version: Some(requested_object_seq),
2884                })
2885            })?;
2886
2887        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2888            (request.generate_layout, object.data.try_as_move())
2889        {
2890            Some(into_struct_layout(
2891                epoch_store
2892                    .executor()
2893                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2894                    .get_annotated_layout(&move_obj.type_().clone().into())?,
2895            )?)
2896        } else {
2897            None
2898        };
2899
2900        let lock = if !object.is_address_owned() {
2901            // Only address owned objects have locks.
2902            None
2903        } else {
2904            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2905                .await?
2906                .map(|s| s.into_inner())
2907        };
2908
2909        Ok(ObjectInfoResponse {
2910            object,
2911            layout,
2912            lock_for_debugging: lock,
2913        })
2914    }
2915
2916    #[instrument(level = "trace", skip_all)]
2917    pub fn handle_checkpoint_request(
2918        &self,
2919        request: &CheckpointRequest,
2920    ) -> IotaResult<CheckpointResponse> {
2921        let summary = if request.certified {
2922            let summary = match request.sequence_number {
2923                Some(seq) => self
2924                    .checkpoint_store
2925                    .get_checkpoint_by_sequence_number(seq)?,
2926                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2927            }
2928            .map(|v| v.into_inner());
2929            summary.map(CheckpointSummaryResponse::Certified)
2930        } else {
2931            let summary = match request.sequence_number {
2932                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2933                None => self
2934                    .checkpoint_store
2935                    .get_latest_locally_computed_checkpoint()?,
2936            };
2937            summary.map(CheckpointSummaryResponse::Pending)
2938        };
2939        let contents = match &summary {
2940            Some(s) => self
2941                .checkpoint_store
2942                .get_checkpoint_contents(&s.content_digest())?,
2943            None => None,
2944        };
2945        Ok(CheckpointResponse {
2946            checkpoint: summary,
2947            contents,
2948        })
2949    }
2950
2951    fn check_protocol_version(
2952        supported_protocol_versions: SupportedProtocolVersions,
2953        current_version: ProtocolVersion,
2954    ) {
2955        info!("current protocol version is now {:?}", current_version);
2956        info!("supported versions are: {:?}", supported_protocol_versions);
2957        if !supported_protocol_versions.is_version_supported(current_version) {
2958            let msg = format!(
2959                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2960            );
2961
2962            error!("{}", msg);
2963            eprintln!("{msg}");
2964
2965            #[cfg(not(msim))]
2966            std::process::exit(1);
2967
2968            #[cfg(msim)]
2969            iota_simulator::task::shutdown_current_node();
2970        }
2971    }
2972
2973    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
2974    pub async fn new(
2975        name: AuthorityName,
2976        secret: StableSyncAuthoritySigner,
2977        supported_protocol_versions: SupportedProtocolVersions,
2978        store: Arc<AuthorityStore>,
2979        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2980        epoch_store: Arc<AuthorityPerEpochStore>,
2981        committee_store: Arc<CommitteeStore>,
2982        indexes: Option<Arc<IndexStore>>,
2983        rest_index: Option<Arc<RestIndexStore>>,
2984        checkpoint_store: Arc<CheckpointStore>,
2985        prometheus_registry: &Registry,
2986        genesis_objects: &[Object],
2987        db_checkpoint_config: &DBCheckpointConfig,
2988        config: NodeConfig,
2989        archive_readers: ArchiveReaderBalancer,
2990        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2991        chain_identifier: ChainIdentifier,
2992        pruner_db: Option<Arc<AuthorityPrunerTables>>,
2993    ) -> Arc<Self> {
2994        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2995
2996        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2997        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2998        let transaction_manager = Arc::new(TransactionManager::new(
2999            execution_cache_trait_pointers.object_cache_reader.clone(),
3000            execution_cache_trait_pointers
3001                .transaction_cache_reader
3002                .clone(),
3003            &epoch_store,
3004            tx_ready_certificates,
3005            metrics.clone(),
3006        ));
3007        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3008
3009        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3010            epoch_store.get_parent_path(),
3011            &config.authority_store_pruning_config,
3012        );
3013        let _pruner = AuthorityStorePruner::new(
3014            store.perpetual_tables.clone(),
3015            checkpoint_store.clone(),
3016            rest_index.clone(),
3017            indexes.clone(),
3018            config.authority_store_pruning_config.clone(),
3019            epoch_store.committee().authority_exists(&name),
3020            epoch_store.epoch_start_state().epoch_duration_ms(),
3021            prometheus_registry,
3022            archive_readers,
3023            pruner_db,
3024        );
3025        let input_loader =
3026            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3027        let epoch = epoch_store.epoch();
3028        let rgp = epoch_store.reference_gas_price();
3029        let state = Arc::new(AuthorityState {
3030            name,
3031            secret,
3032            execution_lock: RwLock::new(epoch),
3033            epoch_store: ArcSwap::new(epoch_store.clone()),
3034            input_loader,
3035            execution_cache_trait_pointers,
3036            indexes,
3037            rest_index,
3038            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3039            checkpoint_store,
3040            committee_store,
3041            transaction_manager,
3042            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3043            metrics,
3044            _pruner,
3045            _authority_per_epoch_pruner,
3046            db_checkpoint_config: db_checkpoint_config.clone(),
3047            config,
3048            overload_info: AuthorityOverloadInfo::default(),
3049            validator_tx_finalizer,
3050            chain_identifier,
3051            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3052        });
3053
3054        // Start a task to execute ready certificates.
3055        let authority_state = Arc::downgrade(&state);
3056        spawn_monitored_task!(execution_process(
3057            authority_state,
3058            rx_ready_certificates,
3059            rx_execution_shutdown,
3060        ));
3061
3062        // TODO: This doesn't belong to the constructor of AuthorityState.
3063        state
3064            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3065            .expect("Error indexing genesis objects.");
3066
3067        state
3068    }
3069
3070    // TODO: Consolidate our traits to reduce the number of methods here.
3071    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3072        &self.execution_cache_trait_pointers.object_cache_reader
3073    }
3074
3075    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3076        &self.execution_cache_trait_pointers.transaction_cache_reader
3077    }
3078
3079    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3080        &self.execution_cache_trait_pointers.cache_writer
3081    }
3082
3083    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3084        &self.execution_cache_trait_pointers.backing_store
3085    }
3086
3087    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3088        &self.execution_cache_trait_pointers.backing_package_store
3089    }
3090
3091    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3092        &self.execution_cache_trait_pointers.object_store
3093    }
3094
3095    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3096        &self.execution_cache_trait_pointers.reconfig_api
3097    }
3098
3099    pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
3100        &self.execution_cache_trait_pointers.accumulator_store
3101    }
3102
3103    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3104        &self.execution_cache_trait_pointers.checkpoint_cache
3105    }
3106
3107    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3108        &self.execution_cache_trait_pointers.state_sync_store
3109    }
3110
3111    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3112        &self.execution_cache_trait_pointers.cache_commit
3113    }
3114
3115    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3116        self.execution_cache_trait_pointers
3117            .testing_api
3118            .database_for_testing()
3119    }
3120
3121    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3122        &self,
3123        config: NodeConfig,
3124        metrics: Arc<AuthorityStorePruningMetrics>,
3125    ) -> anyhow::Result<()> {
3126        let archive_readers =
3127            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3128        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3129            &self.database_for_testing().perpetual_tables,
3130            &self.checkpoint_store,
3131            self.rest_index.as_deref(),
3132            None,
3133            config.authority_store_pruning_config,
3134            metrics,
3135            archive_readers,
3136            EPOCH_DURATION_MS_FOR_TESTING,
3137        )
3138        .await
3139    }
3140
3141    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3142        &self.transaction_manager
3143    }
3144
3145    /// Adds transactions / certificates to transaction manager for ordered
3146    /// execution.
3147    pub fn enqueue_transactions_for_execution(
3148        &self,
3149        txns: Vec<VerifiedExecutableTransaction>,
3150        epoch_store: &Arc<AuthorityPerEpochStore>,
3151    ) {
3152        self.transaction_manager.enqueue(txns, epoch_store)
3153    }
3154
3155    /// Adds certificates to transaction manager for ordered execution.
3156    pub fn enqueue_certificates_for_execution(
3157        &self,
3158        certs: Vec<VerifiedCertificate>,
3159        epoch_store: &Arc<AuthorityPerEpochStore>,
3160    ) {
3161        self.transaction_manager
3162            .enqueue_certificates(certs, epoch_store)
3163    }
3164
3165    pub fn enqueue_with_expected_effects_digest(
3166        &self,
3167        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3168        epoch_store: &AuthorityPerEpochStore,
3169    ) {
3170        self.transaction_manager
3171            .enqueue_with_expected_effects_digest(certs, epoch_store)
3172    }
3173
3174    fn create_owner_index_if_empty(
3175        &self,
3176        genesis_objects: &[Object],
3177        epoch_store: &Arc<AuthorityPerEpochStore>,
3178    ) -> IotaResult {
3179        let Some(index_store) = &self.indexes else {
3180            return Ok(());
3181        };
3182        if !index_store.is_empty() {
3183            return Ok(());
3184        }
3185
3186        let mut new_owners = vec![];
3187        let mut new_dynamic_fields = vec![];
3188        let mut layout_resolver = epoch_store
3189            .executor()
3190            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3191        for o in genesis_objects.iter() {
3192            match o.owner {
3193                Owner::AddressOwner(addr) => new_owners.push((
3194                    (addr, o.id()),
3195                    ObjectInfo::new(&o.compute_object_reference(), o),
3196                )),
3197                Owner::ObjectOwner(object_id) => {
3198                    let id = o.id();
3199                    let info = match self.try_create_dynamic_field_info(
3200                        o,
3201                        &BTreeMap::new(),
3202                        layout_resolver.as_mut(),
3203                    ) {
3204                        Ok(Some(info)) => info,
3205                        Ok(None) => continue,
3206                        Err(IotaError::UserInput {
3207                            error:
3208                                UserInputError::ObjectNotFound {
3209                                    object_id: not_found_id,
3210                                    version,
3211                                },
3212                        }) => {
3213                            warn!(
3214                                ?not_found_id,
3215                                ?version,
3216                                object_owner=?object_id,
3217                                field=?id,
3218                                "Skipping dynamic field: referenced genesis object not found"
3219                            );
3220                            continue;
3221                        }
3222                        Err(e) => return Err(e),
3223                    };
3224                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3225                }
3226                _ => {}
3227            }
3228        }
3229
3230        index_store.insert_genesis_objects(ObjectIndexChanges {
3231            deleted_owners: vec![],
3232            deleted_dynamic_fields: vec![],
3233            new_owners,
3234            new_dynamic_fields,
3235        })
3236    }
3237
3238    /// Attempts to acquire execution lock for an executable transaction.
3239    /// Returns the lock if the transaction is matching current executed epoch
3240    /// Returns None otherwise
3241    pub fn execution_lock_for_executable_transaction(
3242        &self,
3243        transaction: &VerifiedExecutableTransaction,
3244    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3245        let lock = self
3246            .execution_lock
3247            .try_read()
3248            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3249        if *lock == transaction.auth_sig().epoch() {
3250            Ok(lock)
3251        } else {
3252            Err(IotaError::WrongEpoch {
3253                expected_epoch: *lock,
3254                actual_epoch: transaction.auth_sig().epoch(),
3255            })
3256        }
3257    }
3258
3259    /// Acquires the execution lock for the duration of a transaction signing
3260    /// request. This prevents reconfiguration from starting until we are
3261    /// finished handling the signing request. Otherwise, in-memory lock
3262    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3263    /// while we are attempting to acquire locks for the transaction.
3264    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3265        self.execution_lock
3266            .try_read()
3267            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3268    }
3269
3270    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3271        self.execution_lock.write().await
3272    }
3273
3274    #[instrument(level = "error", skip_all)]
3275    pub async fn reconfigure(
3276        &self,
3277        cur_epoch_store: &AuthorityPerEpochStore,
3278        supported_protocol_versions: SupportedProtocolVersions,
3279        new_committee: Committee,
3280        epoch_start_configuration: EpochStartConfiguration,
3281        accumulator: Arc<StateAccumulator>,
3282        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3283        epoch_supply_change: i64,
3284        epoch_last_checkpoint: CheckpointSequenceNumber,
3285    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3286        Self::check_protocol_version(
3287            supported_protocol_versions,
3288            epoch_start_configuration
3289                .epoch_start_state()
3290                .protocol_version(),
3291        );
3292        self.metrics.reset_on_reconfigure();
3293        self.committee_store.insert_new_committee(&new_committee)?;
3294
3295        // Wait until no transactions are being executed.
3296        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3297
3298        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3299        cur_epoch_store.epoch_terminated().await;
3300
3301        let highest_locally_built_checkpoint_seq = self
3302            .checkpoint_store
3303            .get_latest_locally_computed_checkpoint()?
3304            .map(|c| *c.sequence_number())
3305            .unwrap_or(0);
3306
3307        assert!(
3308            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3309            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3310        );
3311        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3312            // if we built the last checkpoint locally (as opposed to receiving it from a
3313            // peer), then all shared_version_assignments except the one for the
3314            // ChangeEpoch transaction should have been removed
3315            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3316            // Note that while 1 is the typical value, 0 is possible if the node restarts
3317            // after committing the last checkpoint but before reconfiguring.
3318            if num_shared_version_assignments > 1 {
3319                // If this happens in prod, we have a memory leak, but not a correctness issue.
3320                debug_fatal!(
3321                    "all shared_version_assignments should have been removed \
3322                    (num_shared_version_assignments: {num_shared_version_assignments})"
3323                );
3324            }
3325        }
3326
3327        // Safe to reconfigure now. No transactions are being executed,
3328        // and no epoch-specific tasks are running.
3329
3330        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3331        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3332        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3333            .await?;
3334        self.get_reconfig_api()
3335            .clear_state_end_of_epoch(&execution_lock);
3336        self.check_system_consistency(
3337            cur_epoch_store,
3338            accumulator,
3339            expensive_safety_check_config,
3340            epoch_supply_change,
3341        )?;
3342        self.get_reconfig_api()
3343            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3344        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3345            if self
3346                .db_checkpoint_config
3347                .perform_db_checkpoints_at_epoch_end
3348            {
3349                let checkpoint_indexes = self
3350                    .db_checkpoint_config
3351                    .perform_index_db_checkpoints_at_epoch_end
3352                    .unwrap_or(false);
3353                let current_epoch = cur_epoch_store.epoch();
3354                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3355                self.checkpoint_all_dbs(
3356                    &epoch_checkpoint_path,
3357                    cur_epoch_store,
3358                    checkpoint_indexes,
3359                )?;
3360            }
3361        }
3362
3363        self.get_reconfig_api()
3364            .reconfigure_cache(&epoch_start_configuration)
3365            .await;
3366
3367        let new_epoch = new_committee.epoch;
3368        let new_epoch_store = self
3369            .reopen_epoch_db(
3370                cur_epoch_store,
3371                new_committee,
3372                epoch_start_configuration,
3373                expensive_safety_check_config,
3374                epoch_last_checkpoint,
3375            )
3376            .await?;
3377        assert_eq!(new_epoch_store.epoch(), new_epoch);
3378        self.transaction_manager.reconfigure(new_epoch);
3379        *execution_lock = new_epoch;
3380        // drop execution_lock after epoch store was updated
3381        // see also assert in AuthorityState::process_certificate
3382        // on the epoch store and execution lock epoch match
3383        Ok(new_epoch_store)
3384    }
3385
3386    /// Advance the epoch store to the next epoch for testing only.
3387    /// This only manually sets all the places where we have the epoch number.
3388    /// It doesn't properly reconfigure the node, hence should be only used for
3389    /// testing.
3390    pub async fn reconfigure_for_testing(&self) {
3391        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3392        let epoch_store = self.epoch_store_for_testing().clone();
3393        let protocol_config = epoch_store.protocol_config().clone();
3394        // The current protocol config used in the epoch store may have been overridden
3395        // and diverged from the protocol config definitions. That override may
3396        // have now been dropped when the initial guard was dropped. We reapply
3397        // the override before creating the new epoch store, to make sure that
3398        // the new epoch store has the same protocol config as the current one.
3399        // Since this is for testing only, we mostly like to keep the protocol config
3400        // the same across epochs.
3401        let _guard =
3402            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3403        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3404            self.get_backing_package_store().clone(),
3405            self.get_object_store().clone(),
3406            &self.config.expensive_safety_check_config,
3407            self.checkpoint_store
3408                .get_epoch_last_checkpoint(epoch_store.epoch())
3409                .unwrap()
3410                .map(|c| *c.sequence_number())
3411                .unwrap_or_default(),
3412        );
3413        let new_epoch = new_epoch_store.epoch();
3414        self.transaction_manager.reconfigure(new_epoch);
3415        self.epoch_store.store(new_epoch_store);
3416        epoch_store.epoch_terminated().await;
3417        *execution_lock = new_epoch;
3418    }
3419
3420    #[instrument(level = "error", skip_all)]
3421    fn check_system_consistency(
3422        &self,
3423        cur_epoch_store: &AuthorityPerEpochStore,
3424        accumulator: Arc<StateAccumulator>,
3425        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3426        epoch_supply_change: i64,
3427    ) -> IotaResult<()> {
3428        info!(
3429            "Performing iota conservation consistency check for epoch {}",
3430            cur_epoch_store.epoch()
3431        );
3432
3433        if cfg!(debug_assertions) {
3434            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3435        }
3436
3437        self.get_reconfig_api()
3438            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3439
3440        // check for root state hash consistency with live object set
3441        if expensive_safety_check_config.enable_state_consistency_check() {
3442            info!(
3443                "Performing state consistency check for epoch {}",
3444                cur_epoch_store.epoch()
3445            );
3446            self.expensive_check_is_consistent_state(
3447                accumulator,
3448                cur_epoch_store,
3449                cfg!(debug_assertions), // panic in debug mode only
3450            );
3451        }
3452
3453        if expensive_safety_check_config.enable_secondary_index_checks() {
3454            if let Some(indexes) = self.indexes.clone() {
3455                verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3456                    .expect("secondary indexes are inconsistent");
3457            }
3458        }
3459
3460        Ok(())
3461    }
3462
3463    fn expensive_check_is_consistent_state(
3464        &self,
3465        accumulator: Arc<StateAccumulator>,
3466        cur_epoch_store: &AuthorityPerEpochStore,
3467        panic: bool,
3468    ) {
3469        let live_object_set_hash = accumulator.digest_live_object_set();
3470
3471        let root_state_hash: ECMHLiveObjectSetDigest = self
3472            .get_accumulator_store()
3473            .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3474            .expect("Retrieving root state hash cannot fail")
3475            .expect("Root state hash for epoch must exist")
3476            .1
3477            .digest()
3478            .into();
3479
3480        let is_inconsistent = root_state_hash != live_object_set_hash;
3481        if is_inconsistent {
3482            if panic {
3483                panic!(
3484                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3485                );
3486            } else {
3487                error!(
3488                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3489                    root_state_hash, live_object_set_hash
3490                );
3491            }
3492        } else {
3493            info!("State consistency check passed");
3494        }
3495
3496        if !panic {
3497            accumulator.set_inconsistent_state(is_inconsistent);
3498        }
3499    }
3500
3501    pub fn current_epoch_for_testing(&self) -> EpochId {
3502        self.epoch_store_for_testing().epoch()
3503    }
3504
3505    #[instrument(level = "error", skip_all)]
3506    pub fn checkpoint_all_dbs(
3507        &self,
3508        checkpoint_path: &Path,
3509        cur_epoch_store: &AuthorityPerEpochStore,
3510        checkpoint_indexes: bool,
3511    ) -> IotaResult {
3512        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3513        let current_epoch = cur_epoch_store.epoch();
3514
3515        if checkpoint_path.exists() {
3516            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3517            return Ok(());
3518        }
3519
3520        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3521        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3522
3523        if checkpoint_path_tmp.exists() {
3524            fs::remove_dir_all(&checkpoint_path_tmp)
3525                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3526        }
3527
3528        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3529        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3530
3531        // NOTE: Do not change the order of invoking these checkpoint calls
3532        // We want to snapshot checkpoint db first to not race with state sync
3533        self.checkpoint_store
3534            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3535
3536        self.get_reconfig_api()
3537            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3538
3539        self.committee_store
3540            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3541
3542        if checkpoint_indexes {
3543            if let Some(indexes) = self.indexes.as_ref() {
3544                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3545            }
3546            if let Some(rest_index) = self.rest_index.as_ref() {
3547                rest_index.checkpoint_db(&checkpoint_path_tmp.join("grpc_indexes"))?;
3548            }
3549        }
3550
3551        fs::rename(checkpoint_path_tmp, checkpoint_path)
3552            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3553        Ok(())
3554    }
3555
3556    /// Load the current epoch store. This can change during reconfiguration. To
3557    /// ensure that we never end up accessing different epoch stores in a
3558    /// single task, we need to make sure that this is called once per task.
3559    /// Each call needs to be carefully audited to ensure it is
3560    /// the case. This also means we should minimize the number of call-sites.
3561    /// Only call it when there is no way to obtain it from somewhere else.
3562    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3563        self.epoch_store.load()
3564    }
3565
3566    // Load the epoch store, should be used in tests only.
3567    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3568        self.load_epoch_store_one_call_per_task()
3569    }
3570
3571    pub fn clone_committee_for_testing(&self) -> Committee {
3572        Committee::clone(self.epoch_store_for_testing().committee())
3573    }
3574
3575    #[instrument(level = "trace", skip_all)]
3576    pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3577        self.get_object_store()
3578            .try_get_object(object_id)
3579            .map_err(Into::into)
3580    }
3581
3582    /// Non-fallible version of `try_get_object`.
3583    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3584        self.try_get_object(object_id)
3585            .await
3586            .expect("storage access failed")
3587    }
3588
3589    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3590        Ok(self
3591            .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3592            .await?
3593            .expect("framework object should always exist")
3594            .compute_object_reference())
3595    }
3596
3597    // This function is only used for testing.
3598    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3599        self.get_object_cache_reader()
3600            .try_get_iota_system_state_object_unsafe()
3601    }
3602
3603    #[instrument(level = "trace", skip_all)]
3604    pub fn get_checkpoint_by_sequence_number(
3605        &self,
3606        sequence_number: CheckpointSequenceNumber,
3607    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3608        Ok(self
3609            .checkpoint_store
3610            .get_checkpoint_by_sequence_number(sequence_number)?)
3611    }
3612
3613    #[instrument(level = "trace", skip_all)]
3614    pub fn get_transaction_checkpoint_for_tests(
3615        &self,
3616        digest: &TransactionDigest,
3617        epoch_store: &AuthorityPerEpochStore,
3618    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3619        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3620        let Some(checkpoint) = checkpoint else {
3621            return Ok(None);
3622        };
3623        let checkpoint = self
3624            .checkpoint_store
3625            .get_checkpoint_by_sequence_number(checkpoint)?;
3626        Ok(checkpoint)
3627    }
3628
3629    #[instrument(level = "trace", skip_all)]
3630    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3631        Ok(
3632            match self
3633                .get_object_cache_reader()
3634                .try_get_latest_object_or_tombstone(*object_id)?
3635            {
3636                Some((_, ObjectOrTombstone::Object(object))) => {
3637                    let layout = self.get_object_layout(&object)?;
3638                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3639                }
3640                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3641                None => ObjectRead::NotExists(*object_id),
3642            },
3643        )
3644    }
3645
3646    /// Chain Identifier is the digest of the genesis checkpoint.
3647    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3648        self.chain_identifier
3649    }
3650
3651    #[instrument(level = "trace", skip_all)]
3652    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3653    where
3654        T: DeserializeOwned,
3655    {
3656        let o = self.get_object_read(object_id)?.into_object()?;
3657        if let Some(move_object) = o.data.try_as_move() {
3658            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3659                IotaError::ObjectDeserialization {
3660                    error: format!("{e}"),
3661                }
3662            })?)
3663        } else {
3664            Err(IotaError::ObjectDeserialization {
3665                error: format!("Provided object : [{object_id}] is not a Move object."),
3666            })
3667        }
3668    }
3669
3670    /// This function aims to serve rpc reads on past objects and
3671    /// we don't expect it to be called for other purposes.
3672    /// Depending on the object pruning policies that will be enforced in the
3673    /// future there is no software-level guarantee/SLA to retrieve an object
3674    /// with an old version even if it exists/existed.
3675    #[instrument(level = "trace", skip_all)]
3676    pub fn get_past_object_read(
3677        &self,
3678        object_id: &ObjectID,
3679        version: SequenceNumber,
3680    ) -> IotaResult<PastObjectRead> {
3681        // Firstly we see if the object ever existed by getting its latest data
3682        let Some(obj_ref) = self
3683            .get_object_cache_reader()
3684            .try_get_latest_object_ref_or_tombstone(*object_id)?
3685        else {
3686            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3687        };
3688
3689        if version > obj_ref.1 {
3690            return Ok(PastObjectRead::VersionTooHigh {
3691                object_id: *object_id,
3692                asked_version: version,
3693                latest_version: obj_ref.1,
3694            });
3695        }
3696
3697        if version < obj_ref.1 {
3698            // Read past objects
3699            return Ok(match self.read_object_at_version(object_id, version)? {
3700                Some((object, layout)) => {
3701                    let obj_ref = object.compute_object_reference();
3702                    PastObjectRead::VersionFound(obj_ref, object, layout)
3703                }
3704
3705                None => PastObjectRead::VersionNotFound(*object_id, version),
3706            });
3707        }
3708
3709        if !obj_ref.2.is_alive() {
3710            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3711        }
3712
3713        match self.read_object_at_version(object_id, obj_ref.1)? {
3714            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3715            None => {
3716                error!(
3717                    "Object with in parent_entry is missing from object store, datastore is \
3718                     inconsistent",
3719                );
3720                Err(UserInputError::ObjectNotFound {
3721                    object_id: *object_id,
3722                    version: Some(obj_ref.1),
3723                }
3724                .into())
3725            }
3726        }
3727    }
3728
3729    #[instrument(level = "trace", skip_all)]
3730    fn read_object_at_version(
3731        &self,
3732        object_id: &ObjectID,
3733        version: SequenceNumber,
3734    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3735        let Some(object) = self
3736            .get_object_cache_reader()
3737            .try_get_object_by_key(object_id, version)?
3738        else {
3739            return Ok(None);
3740        };
3741
3742        let layout = self.get_object_layout(&object)?;
3743        Ok(Some((object, layout)))
3744    }
3745
3746    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3747        let layout = object
3748            .data
3749            .try_as_move()
3750            .map(|object| {
3751                into_struct_layout(
3752                    self.load_epoch_store_one_call_per_task()
3753                        .executor()
3754                        // TODO(cache) - must read through cache
3755                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3756                        .get_annotated_layout(&object.type_().clone().into())?,
3757                )
3758            })
3759            .transpose()?;
3760        Ok(layout)
3761    }
3762
3763    fn get_owner_at_version(
3764        &self,
3765        object_id: &ObjectID,
3766        version: SequenceNumber,
3767    ) -> IotaResult<Owner> {
3768        self.get_object_store()
3769            .try_get_object_by_key(object_id, version)?
3770            .ok_or_else(|| {
3771                IotaError::from(UserInputError::ObjectNotFound {
3772                    object_id: *object_id,
3773                    version: Some(version),
3774                })
3775            })
3776            .map(|o| o.owner)
3777    }
3778
3779    #[instrument(level = "trace", skip_all)]
3780    pub fn get_owner_objects(
3781        &self,
3782        owner: IotaAddress,
3783        // If `Some`, the query will start from the next item after the specified cursor
3784        cursor: Option<ObjectID>,
3785        limit: usize,
3786        filter: Option<IotaObjectDataFilter>,
3787    ) -> IotaResult<Vec<ObjectInfo>> {
3788        if let Some(indexes) = &self.indexes {
3789            indexes.get_owner_objects(owner, cursor, limit, filter)
3790        } else {
3791            Err(IotaError::IndexStoreNotAvailable)
3792        }
3793    }
3794
3795    #[instrument(level = "trace", skip_all)]
3796    pub fn get_owned_coins_iterator_with_cursor(
3797        &self,
3798        owner: IotaAddress,
3799        // If `Some`, the query will start from the next item after the specified cursor
3800        cursor: (String, ObjectID),
3801        limit: usize,
3802        one_coin_type_only: bool,
3803    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3804        if let Some(indexes) = &self.indexes {
3805            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3806        } else {
3807            Err(IotaError::IndexStoreNotAvailable)
3808        }
3809    }
3810
3811    #[instrument(level = "trace", skip_all)]
3812    pub fn get_owner_objects_iterator(
3813        &self,
3814        owner: IotaAddress,
3815        // If `Some`, the query will start from the next item after the specified cursor
3816        cursor: Option<ObjectID>,
3817        filter: Option<IotaObjectDataFilter>,
3818    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3819        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3820        if let Some(indexes) = &self.indexes {
3821            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3822        } else {
3823            Err(IotaError::IndexStoreNotAvailable)
3824        }
3825    }
3826
3827    #[instrument(level = "trace", skip_all)]
3828    pub async fn get_move_objects<T>(
3829        &self,
3830        owner: IotaAddress,
3831        type_: MoveObjectType,
3832    ) -> IotaResult<Vec<T>>
3833    where
3834        T: DeserializeOwned,
3835    {
3836        let object_ids = self
3837            .get_owner_objects_iterator(owner, None, None)?
3838            .filter(|o| match &o.type_ {
3839                ObjectType::Struct(s) => &type_ == s,
3840                ObjectType::Package => false,
3841            })
3842            .map(|info| ObjectKey(info.object_id, info.version))
3843            .collect::<Vec<_>>();
3844        let mut move_objects = vec![];
3845
3846        let objects = self
3847            .get_object_store()
3848            .try_multi_get_objects_by_key(&object_ids)?;
3849
3850        for (o, id) in objects.into_iter().zip(object_ids) {
3851            let object = o.ok_or_else(|| {
3852                IotaError::from(UserInputError::ObjectNotFound {
3853                    object_id: id.0,
3854                    version: Some(id.1),
3855                })
3856            })?;
3857            let move_object = object.data.try_as_move().ok_or_else(|| {
3858                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3859            })?;
3860            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3861                IotaError::ObjectDeserialization {
3862                    error: format!("{e}"),
3863                }
3864            })?);
3865        }
3866        Ok(move_objects)
3867    }
3868
3869    #[instrument(level = "trace", skip_all)]
3870    pub fn get_dynamic_fields(
3871        &self,
3872        owner: ObjectID,
3873        // If `Some`, the query will start from the next item after the specified cursor
3874        cursor: Option<ObjectID>,
3875        limit: usize,
3876    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3877        Ok(self
3878            .get_dynamic_fields_iterator(owner, cursor)?
3879            .take(limit)
3880            .collect::<Result<Vec<_>, _>>()?)
3881    }
3882
3883    fn get_dynamic_fields_iterator(
3884        &self,
3885        owner: ObjectID,
3886        // If `Some`, the query will start from the next item after the specified cursor
3887        cursor: Option<ObjectID>,
3888    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3889    {
3890        if let Some(indexes) = &self.indexes {
3891            indexes.get_dynamic_fields_iterator(owner, cursor)
3892        } else {
3893            Err(IotaError::IndexStoreNotAvailable)
3894        }
3895    }
3896
3897    #[instrument(level = "trace", skip_all)]
3898    pub fn get_dynamic_field_object_id(
3899        &self,
3900        owner: ObjectID,
3901        name_type: TypeTag,
3902        name_bcs_bytes: &[u8],
3903    ) -> IotaResult<Option<ObjectID>> {
3904        if let Some(indexes) = &self.indexes {
3905            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3906        } else {
3907            Err(IotaError::IndexStoreNotAvailable)
3908        }
3909    }
3910
3911    #[instrument(level = "trace", skip_all)]
3912    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3913        Ok(self.get_indexes()?.next_sequence_number())
3914    }
3915
3916    #[instrument(level = "trace", skip_all)]
3917    pub async fn get_executed_transaction_and_effects(
3918        &self,
3919        digest: TransactionDigest,
3920        kv_store: Arc<TransactionKeyValueStore>,
3921    ) -> IotaResult<(Transaction, TransactionEffects)> {
3922        let transaction = kv_store.get_tx(digest).await?;
3923        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3924        Ok((transaction, effects))
3925    }
3926
3927    #[instrument(level = "trace", skip_all)]
3928    pub fn multi_get_checkpoint_by_sequence_number(
3929        &self,
3930        sequence_numbers: &[CheckpointSequenceNumber],
3931    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3932        Ok(self
3933            .checkpoint_store
3934            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3935    }
3936
3937    #[instrument(level = "trace", skip_all)]
3938    pub fn get_transaction_events(
3939        &self,
3940        digest: &TransactionEventsDigest,
3941    ) -> IotaResult<TransactionEvents> {
3942        self.get_transaction_cache_reader()
3943            .try_get_events(digest)?
3944            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3945    }
3946
3947    pub fn get_transaction_input_objects(
3948        &self,
3949        effects: &TransactionEffects,
3950    ) -> anyhow::Result<Vec<Object>> {
3951        iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
3952            .map_err(Into::into)
3953    }
3954
3955    pub fn get_transaction_output_objects(
3956        &self,
3957        effects: &TransactionEffects,
3958    ) -> anyhow::Result<Vec<Object>> {
3959        iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
3960            .map_err(Into::into)
3961    }
3962
3963    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3964        match &self.indexes {
3965            Some(i) => Ok(i.clone()),
3966            None => Err(IotaError::UnsupportedFeature {
3967                error: "extended object indexing is not enabled on this server".into(),
3968            }),
3969        }
3970    }
3971
3972    pub async fn get_transactions_for_tests(
3973        self: &Arc<Self>,
3974        filter: Option<TransactionFilter>,
3975        cursor: Option<TransactionDigest>,
3976        limit: Option<usize>,
3977        reverse: bool,
3978    ) -> IotaResult<Vec<TransactionDigest>> {
3979        let metrics = KeyValueStoreMetrics::new_for_tests();
3980        let kv_store = Arc::new(TransactionKeyValueStore::new(
3981            "rocksdb",
3982            metrics,
3983            self.clone(),
3984        ));
3985        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3986            .await
3987    }
3988
3989    #[instrument(level = "trace", skip_all)]
3990    pub async fn get_transactions(
3991        &self,
3992        kv_store: &Arc<TransactionKeyValueStore>,
3993        filter: Option<TransactionFilter>,
3994        // If `Some`, the query will start from the next item after the specified cursor
3995        cursor: Option<TransactionDigest>,
3996        limit: Option<usize>,
3997        reverse: bool,
3998    ) -> IotaResult<Vec<TransactionDigest>> {
3999        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4000            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4001            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4002            if reverse {
4003                let iter = iter
4004                    .rev()
4005                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4006                    .skip(usize::from(cursor.is_some()));
4007                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4008            } else {
4009                let iter = iter
4010                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4011                    .skip(usize::from(cursor.is_some()));
4012                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4013            }
4014        }
4015        self.get_indexes()?
4016            .get_transactions(filter, cursor, limit, reverse)
4017    }
4018
4019    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4020        &self.checkpoint_store
4021    }
4022
4023    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4024        self.get_checkpoint_store()
4025            .get_highest_executed_checkpoint_seq_number()?
4026            .ok_or(IotaError::UserInput {
4027                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4028            })
4029    }
4030
4031    #[cfg(msim)]
4032    pub fn get_highest_pruned_checkpoint_for_testing(
4033        &self,
4034    ) -> IotaResult<CheckpointSequenceNumber> {
4035        self.database_for_testing()
4036            .perpetual_tables
4037            .get_highest_pruned_checkpoint()
4038            .map(|c| c.unwrap_or(0))
4039            .map_err(Into::into)
4040    }
4041
4042    #[instrument(level = "trace", skip_all)]
4043    pub fn get_checkpoint_summary_by_sequence_number(
4044        &self,
4045        sequence_number: CheckpointSequenceNumber,
4046    ) -> IotaResult<CheckpointSummary> {
4047        let verified_checkpoint = self
4048            .get_checkpoint_store()
4049            .get_checkpoint_by_sequence_number(sequence_number)?;
4050        match verified_checkpoint {
4051            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4052            None => Err(IotaError::UserInput {
4053                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4054            }),
4055        }
4056    }
4057
4058    #[instrument(level = "trace", skip_all)]
4059    pub fn get_checkpoint_summary_by_digest(
4060        &self,
4061        digest: CheckpointDigest,
4062    ) -> IotaResult<CheckpointSummary> {
4063        let verified_checkpoint = self
4064            .get_checkpoint_store()
4065            .get_checkpoint_by_digest(&digest)?;
4066        match verified_checkpoint {
4067            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4068            None => Err(IotaError::UserInput {
4069                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4070            }),
4071        }
4072    }
4073
4074    #[instrument(level = "trace", skip_all)]
4075    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4076        if is_system_package(package_id) {
4077            return self.find_genesis_txn_digest();
4078        }
4079        Ok(self
4080            .get_object_read(&package_id)?
4081            .into_object()?
4082            .previous_transaction)
4083    }
4084
4085    #[instrument(level = "trace", skip_all)]
4086    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4087        let summary = self
4088            .get_verified_checkpoint_by_sequence_number(0)?
4089            .into_message();
4090        let content = self.get_checkpoint_contents(summary.content_digest)?;
4091        let genesis_transaction = content.enumerate_transactions(&summary).next();
4092        Ok(genesis_transaction
4093            .ok_or(IotaError::UserInput {
4094                error: UserInputError::GenesisTransactionNotFound,
4095            })?
4096            .1
4097            .transaction)
4098    }
4099
4100    #[instrument(level = "trace", skip_all)]
4101    pub fn get_verified_checkpoint_by_sequence_number(
4102        &self,
4103        sequence_number: CheckpointSequenceNumber,
4104    ) -> IotaResult<VerifiedCheckpoint> {
4105        let verified_checkpoint = self
4106            .get_checkpoint_store()
4107            .get_checkpoint_by_sequence_number(sequence_number)?;
4108        match verified_checkpoint {
4109            Some(verified_checkpoint) => Ok(verified_checkpoint),
4110            None => Err(IotaError::UserInput {
4111                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4112            }),
4113        }
4114    }
4115
4116    #[instrument(level = "trace", skip_all)]
4117    pub fn get_verified_checkpoint_summary_by_digest(
4118        &self,
4119        digest: CheckpointDigest,
4120    ) -> IotaResult<VerifiedCheckpoint> {
4121        let verified_checkpoint = self
4122            .get_checkpoint_store()
4123            .get_checkpoint_by_digest(&digest)?;
4124        match verified_checkpoint {
4125            Some(verified_checkpoint) => Ok(verified_checkpoint),
4126            None => Err(IotaError::UserInput {
4127                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4128            }),
4129        }
4130    }
4131
4132    #[instrument(level = "trace", skip_all)]
4133    pub fn get_checkpoint_contents(
4134        &self,
4135        digest: CheckpointContentsDigest,
4136    ) -> IotaResult<CheckpointContents> {
4137        self.get_checkpoint_store()
4138            .get_checkpoint_contents(&digest)?
4139            .ok_or(IotaError::UserInput {
4140                error: UserInputError::CheckpointContentsNotFound(digest),
4141            })
4142    }
4143
4144    #[instrument(level = "trace", skip_all)]
4145    pub fn get_checkpoint_contents_by_sequence_number(
4146        &self,
4147        sequence_number: CheckpointSequenceNumber,
4148    ) -> IotaResult<CheckpointContents> {
4149        let verified_checkpoint = self
4150            .get_checkpoint_store()
4151            .get_checkpoint_by_sequence_number(sequence_number)?;
4152        match verified_checkpoint {
4153            Some(verified_checkpoint) => {
4154                let content_digest = verified_checkpoint.into_inner().content_digest;
4155                self.get_checkpoint_contents(content_digest)
4156            }
4157            None => Err(IotaError::UserInput {
4158                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4159            }),
4160        }
4161    }
4162
4163    #[instrument(level = "trace", skip_all)]
4164    pub async fn query_events(
4165        &self,
4166        kv_store: &Arc<TransactionKeyValueStore>,
4167        query: EventFilter,
4168        // If `Some`, the query will start from the next item after the specified cursor
4169        cursor: Option<EventID>,
4170        limit: usize,
4171        descending: bool,
4172    ) -> IotaResult<Vec<IotaEvent>> {
4173        let index_store = self.get_indexes()?;
4174
4175        // Get the tx_num from tx_digest
4176        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4177            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4178                IotaError::TransactionNotFound {
4179                    digest: cursor.tx_digest,
4180                },
4181            )?;
4182            (tx_seq, cursor.event_seq as usize)
4183        } else if descending {
4184            (u64::MAX, usize::MAX)
4185        } else {
4186            (0, 0)
4187        };
4188
4189        let limit = limit + 1;
4190        let mut event_keys = match query {
4191            EventFilter::All(filters) => {
4192                if filters.is_empty() {
4193                    index_store.all_events(tx_num, event_num, limit, descending)?
4194                } else {
4195                    return Err(IotaError::UserInput {
4196                        error: UserInputError::Unsupported(
4197                            "This query type does not currently support filter combinations"
4198                                .to_string(),
4199                        ),
4200                    });
4201                }
4202            }
4203            EventFilter::Transaction(digest) => {
4204                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4205            }
4206            EventFilter::MoveModule { package, module } => {
4207                let module_id = ModuleId::new(package.into(), module);
4208                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4209            }
4210            EventFilter::MoveEventType(struct_name) => index_store
4211                .events_by_move_event_struct_name(
4212                    &struct_name,
4213                    tx_num,
4214                    event_num,
4215                    limit,
4216                    descending,
4217                )?,
4218            EventFilter::Sender(sender) => {
4219                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4220            }
4221            EventFilter::TimeRange {
4222                start_time,
4223                end_time,
4224            } => index_store
4225                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4226            EventFilter::MoveEventModule { package, module } => index_store
4227                .events_by_move_event_module(
4228                    &ModuleId::new(package.into(), module),
4229                    tx_num,
4230                    event_num,
4231                    limit,
4232                    descending,
4233                )?,
4234            // not using "_ =>" because we want to make sure we remember to add new variants here
4235            EventFilter::Package(_)
4236            | EventFilter::MoveEventField { .. }
4237            | EventFilter::Any(_)
4238            | EventFilter::And(_, _)
4239            | EventFilter::Or(_, _) => {
4240                return Err(IotaError::UserInput {
4241                    error: UserInputError::Unsupported(
4242                        "This query type is not supported by the full node.".to_string(),
4243                    ),
4244                });
4245            }
4246        };
4247
4248        // skip one event if exclusive cursor is provided,
4249        // otherwise truncate to the original limit.
4250        if cursor.is_some() {
4251            if !event_keys.is_empty() {
4252                event_keys.remove(0);
4253            }
4254        } else {
4255            event_keys.truncate(limit - 1);
4256        }
4257
4258        // get the unique set of digests from the event_keys
4259        let transaction_digests = event_keys
4260            .iter()
4261            .map(|(_, digest, _, _)| *digest)
4262            .collect::<HashSet<_>>()
4263            .into_iter()
4264            .collect::<Vec<_>>();
4265
4266        let events = kv_store
4267            .multi_get_events_by_tx_digests(&transaction_digests)
4268            .await?;
4269
4270        let events_map: HashMap<_, _> =
4271            transaction_digests.iter().zip(events.into_iter()).collect();
4272
4273        let stored_events = event_keys
4274            .into_iter()
4275            .map(|k| {
4276                (
4277                    k,
4278                    events_map
4279                        .get(&k.1)
4280                        .expect("fetched digest is missing")
4281                        .clone()
4282                        .and_then(|e| e.data.get(k.2).cloned()),
4283                )
4284            })
4285            .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4286                event
4287                    .map(|e| (e, tx_digest, event_seq, timestamp))
4288                    .ok_or(IotaError::TransactionEventsNotFound { digest })
4289            })
4290            .collect::<Result<Vec<_>, _>>()?;
4291
4292        let epoch_store = self.load_epoch_store_one_call_per_task();
4293        let backing_store = self.get_backing_package_store().as_ref();
4294        let mut layout_resolver = epoch_store
4295            .executor()
4296            .type_layout_resolver(Box::new(backing_store));
4297        let mut events = vec![];
4298        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4299            events.push(IotaEvent::try_from(
4300                e.clone(),
4301                tx_digest,
4302                event_seq as u64,
4303                Some(timestamp),
4304                layout_resolver.get_annotated_layout(&e.type_)?,
4305            )?)
4306        }
4307        Ok(events)
4308    }
4309
4310    pub async fn insert_genesis_object(&self, object: Object) {
4311        self.get_reconfig_api()
4312            .try_insert_genesis_object(object)
4313            .expect("Cannot insert genesis object")
4314    }
4315
4316    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4317        futures::future::join_all(
4318            objects
4319                .iter()
4320                .map(|o| self.insert_genesis_object(o.clone())),
4321        )
4322        .await;
4323    }
4324
4325    /// Make a status response for a transaction
4326    #[instrument(level = "trace", skip_all)]
4327    pub fn get_transaction_status(
4328        &self,
4329        transaction_digest: &TransactionDigest,
4330        epoch_store: &Arc<AuthorityPerEpochStore>,
4331    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4332        // TODO: In the case of read path, we should not have to re-sign the effects.
4333        if let Some(effects) =
4334            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4335        {
4336            if let Some(transaction) = self
4337                .get_transaction_cache_reader()
4338                .try_get_transaction_block(transaction_digest)?
4339            {
4340                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4341                let events = if let Some(digest) = effects.events_digest() {
4342                    self.get_transaction_events(digest)?
4343                } else {
4344                    TransactionEvents::default()
4345                };
4346                return Ok(Some((
4347                    (*transaction).clone().into_message(),
4348                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4349                )));
4350            } else {
4351                // The read of effects and read of transaction are not atomic. It's possible
4352                // that we reverted the transaction (during epoch change) in
4353                // between the above two reads, and we end up having effects but
4354                // not transaction. In this case, we just fall through.
4355                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4356            }
4357        }
4358        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4359            self.metrics.tx_already_processed.inc();
4360            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4361            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4362        } else {
4363            Ok(None)
4364        }
4365    }
4366
4367    /// Get the signed effects of the given transaction. If the effects was
4368    /// signed in a previous epoch, re-sign it so that the caller is able to
4369    /// form a cert of the effects in the current epoch.
4370    #[instrument(level = "trace", skip_all)]
4371    pub fn get_signed_effects_and_maybe_resign(
4372        &self,
4373        transaction_digest: &TransactionDigest,
4374        epoch_store: &Arc<AuthorityPerEpochStore>,
4375    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4376        let effects = self
4377            .get_transaction_cache_reader()
4378            .try_get_executed_effects(transaction_digest)?;
4379        match effects {
4380            Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4381            None => Ok(None),
4382        }
4383    }
4384
4385    #[instrument(level = "trace", skip_all)]
4386    pub(crate) fn sign_effects(
4387        &self,
4388        effects: TransactionEffects,
4389        epoch_store: &Arc<AuthorityPerEpochStore>,
4390    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4391        let tx_digest = *effects.transaction_digest();
4392        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4393            Some(sig) if sig.epoch == epoch_store.epoch() => {
4394                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4395            }
4396            _ => {
4397                // If the transaction was executed in previous epochs, the validator will
4398                // re-sign the effects with new current epoch so that a client is always able to
4399                // obtain an effects certificate at the current epoch.
4400                //
4401                // Why is this necessary? Consider the following case:
4402                // - assume there are 4 validators
4403                // - Quorum driver gets 2 signed effects before reconfig halt
4404                // - The tx makes it into final checkpoint.
4405                // - 2 validators go away and are replaced in the new epoch.
4406                // - The new epoch begins.
4407                // - The quorum driver cannot complete the partial effects cert from the
4408                //   previous epoch, because it may not be able to reach either of the 2 former
4409                //   validators.
4410                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4411                //   the new epoch, the QD can make a new effects cert and return it to the
4412                //   client.
4413                //
4414                // This is a considered a short-term workaround. Eventually, Quorum Driver
4415                // should be able to return either an effects certificate, -or-
4416                // a proof of inclusion in a checkpoint. In the case above, the
4417                // Quorum Driver would return a proof of inclusion in the final
4418                // checkpoint, and this code would no longer be necessary.
4419                debug!(
4420                    ?tx_digest,
4421                    epoch=?epoch_store.epoch(),
4422                    "Re-signing the effects with the current epoch"
4423                );
4424
4425                let sig = AuthoritySignInfo::new(
4426                    epoch_store.epoch(),
4427                    &effects,
4428                    Intent::iota_app(IntentScope::TransactionEffects),
4429                    self.name,
4430                    &*self.secret,
4431                );
4432
4433                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4434
4435                epoch_store.insert_effects_digest_and_signature(
4436                    &tx_digest,
4437                    effects.digest(),
4438                    &sig,
4439                )?;
4440
4441                effects
4442            }
4443        };
4444
4445        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4446            signed_effects,
4447        ))
4448    }
4449
4450    // Returns coin objects for indexing for fullnode if indexing is enabled.
4451    #[instrument(level = "trace", skip_all)]
4452    fn fullnode_only_get_tx_coins_for_indexing(
4453        &self,
4454        inner_temporary_store: &InnerTemporaryStore,
4455        epoch_store: &Arc<AuthorityPerEpochStore>,
4456    ) -> Option<TxCoins> {
4457        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4458            return None;
4459        }
4460        let written_coin_objects = inner_temporary_store
4461            .written
4462            .iter()
4463            .filter_map(|(k, v)| {
4464                if v.is_coin() {
4465                    Some((*k, v.clone()))
4466                } else {
4467                    None
4468                }
4469            })
4470            .collect();
4471        let input_coin_objects = inner_temporary_store
4472            .input_objects
4473            .iter()
4474            .filter_map(|(k, v)| {
4475                if v.is_coin() {
4476                    Some((*k, v.clone()))
4477                } else {
4478                    None
4479                }
4480            })
4481            .collect::<ObjectMap>();
4482        Some((input_coin_objects, written_coin_objects))
4483    }
4484
4485    /// Get the TransactionEnvelope that currently locks the given object, if
4486    /// any. Since object locks are only valid for one epoch, we also need
4487    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4488    /// no lock records for the given object can be found.
4489    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4490    /// object record is at a different version.
4491    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4492    /// certain transaction. Returns None if the a lock record is
4493    /// initialized for the given ObjectRef but not yet locked by any
4494    /// transaction,     or cannot find the transaction in transaction
4495    /// table, because of data race etc.
4496    #[instrument(level = "trace", skip_all)]
4497    pub async fn get_transaction_lock(
4498        &self,
4499        object_ref: &ObjectRef,
4500        epoch_store: &AuthorityPerEpochStore,
4501    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4502        let lock_info = self
4503            .get_object_cache_reader()
4504            .try_get_lock(*object_ref, epoch_store)?;
4505        let lock_info = match lock_info {
4506            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4507                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4508                    provided_obj_ref: *object_ref,
4509                    current_version: locked_ref.1,
4510                }
4511                .into());
4512            }
4513            ObjectLockStatus::Initialized => {
4514                return Ok(None);
4515            }
4516            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4517        };
4518
4519        epoch_store.get_signed_transaction(&lock_info)
4520    }
4521
4522    pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4523        self.get_object_cache_reader().try_get_objects(objects)
4524    }
4525
4526    /// Non-fallible version of `try_get_objects`.
4527    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4528        self.try_get_objects(objects)
4529            .await
4530            .expect("storage access failed")
4531    }
4532
4533    pub async fn try_get_object_or_tombstone(
4534        &self,
4535        object_id: ObjectID,
4536    ) -> IotaResult<Option<ObjectRef>> {
4537        self.get_object_cache_reader()
4538            .try_get_latest_object_ref_or_tombstone(object_id)
4539    }
4540
4541    /// Non-fallible version of `try_get_object_or_tombstone`.
4542    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4543        self.try_get_object_or_tombstone(object_id)
4544            .await
4545            .expect("storage access failed")
4546    }
4547
4548    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4549    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4550    /// upgrade.
4551    ///
4552    /// This method can be used to dynamic adjust the amount of buffer. If set
4553    /// to 0, the upgrade will go through with only 2f+1 votes.
4554    ///
4555    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4556    /// should have the same value), or you risk halting the chain.
4557    pub fn set_override_protocol_upgrade_buffer_stake(
4558        &self,
4559        expected_epoch: EpochId,
4560        buffer_stake_bps: u64,
4561    ) -> IotaResult {
4562        let epoch_store = self.load_epoch_store_one_call_per_task();
4563        let actual_epoch = epoch_store.epoch();
4564        if actual_epoch != expected_epoch {
4565            return Err(IotaError::WrongEpoch {
4566                expected_epoch,
4567                actual_epoch,
4568            });
4569        }
4570
4571        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4572    }
4573
4574    pub fn clear_override_protocol_upgrade_buffer_stake(
4575        &self,
4576        expected_epoch: EpochId,
4577    ) -> IotaResult {
4578        let epoch_store = self.load_epoch_store_one_call_per_task();
4579        let actual_epoch = epoch_store.epoch();
4580        if actual_epoch != expected_epoch {
4581            return Err(IotaError::WrongEpoch {
4582                expected_epoch,
4583                actual_epoch,
4584            });
4585        }
4586
4587        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4588    }
4589
4590    /// Get the set of system packages that are compiled in to this build, if
4591    /// those packages are compatible with the current versions of those
4592    /// packages on-chain.
4593    pub async fn get_available_system_packages(
4594        &self,
4595        binary_config: &BinaryConfig,
4596    ) -> Vec<ObjectRef> {
4597        let mut results = vec![];
4598
4599        let system_packages = BuiltInFramework::iter_system_packages();
4600
4601        // Add extra framework packages during simtest
4602        #[cfg(msim)]
4603        let extra_packages = framework_injection::get_extra_packages(self.name);
4604        #[cfg(msim)]
4605        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4606
4607        for system_package in system_packages {
4608            let modules = system_package.modules().to_vec();
4609            // In simtests, we could override the current built-in framework packages.
4610            #[cfg(msim)]
4611            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4612                .unwrap_or(modules);
4613
4614            let Some(obj_ref) = iota_framework::compare_system_package(
4615                &self.get_object_store(),
4616                &system_package.id,
4617                &modules,
4618                system_package.dependencies.to_vec(),
4619                binary_config,
4620            )
4621            .await
4622            else {
4623                return vec![];
4624            };
4625            results.push(obj_ref);
4626        }
4627
4628        results
4629    }
4630
4631    /// Return the new versions, module bytes, and dependencies for the packages
4632    /// that have been committed to for a framework upgrade, in
4633    /// `system_packages`.  Loads the module contents from the binary, and
4634    /// performs the following checks:
4635    ///
4636    /// - Whether its contents matches what is on-chain already, in which case
4637    ///   no upgrade is required, and its contents are omitted from the output.
4638    /// - Whether the contents in the binary can form a package whose digest
4639    ///   matches the input, meaning the framework will be upgraded, and this
4640    ///   authority can satisfy that upgrade, in which case the contents are
4641    ///   included in the output.
4642    ///
4643    /// If the current version of the framework can't be loaded, the binary does
4644    /// not contain the bytes for that framework ID, or the resulting
4645    /// package fails the digest check, `None` is returned indicating that
4646    /// this authority cannot run the upgrade that the network voted on.
4647    async fn get_system_package_bytes(
4648        &self,
4649        system_packages: Vec<ObjectRef>,
4650        binary_config: &BinaryConfig,
4651    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4652        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4653        let objects = self.get_objects(&ids).await;
4654
4655        let mut res = Vec::with_capacity(system_packages.len());
4656        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4657            let prev_transaction = match object {
4658                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4659                    // Skip this one because it doesn't need to be upgraded.
4660                    info!("Framework {} does not need updating", system_package_ref.0);
4661                    continue;
4662                }
4663
4664                Some(cur_object) => cur_object.previous_transaction,
4665                None => TransactionDigest::genesis_marker(),
4666            };
4667
4668            #[cfg(msim)]
4669            let SystemPackage {
4670                id: _,
4671                bytes,
4672                dependencies,
4673            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4674                .unwrap_or_else(|| {
4675                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4676                });
4677
4678            #[cfg(not(msim))]
4679            let SystemPackage {
4680                id: _,
4681                bytes,
4682                dependencies,
4683            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4684
4685            let modules: Vec<_> = bytes
4686                .iter()
4687                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4688                .collect();
4689
4690            let new_object = Object::new_system_package(
4691                &modules,
4692                system_package_ref.1,
4693                dependencies.clone(),
4694                prev_transaction,
4695            );
4696
4697            let new_ref = new_object.compute_object_reference();
4698            if new_ref != system_package_ref {
4699                error!(
4700                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4701                );
4702                return None;
4703            }
4704
4705            res.push((system_package_ref.1, bytes, dependencies));
4706        }
4707
4708        Some(res)
4709    }
4710
4711    /// Returns the new protocol version and system packages that the network
4712    /// has voted to upgrade to. If the proposed protocol version is not
4713    /// supported, None is returned.
4714    fn is_protocol_version_supported_v1(
4715        proposed_protocol_version: ProtocolVersion,
4716        committee: &Committee,
4717        capabilities: Vec<AuthorityCapabilitiesV1>,
4718        mut buffer_stake_bps: u64,
4719    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4720        if buffer_stake_bps > 10000 {
4721            warn!("clamping buffer_stake_bps to 10000");
4722            buffer_stake_bps = 10000;
4723        }
4724
4725        // For each validator, gather the protocol version and system packages that it
4726        // would like to upgrade to in the next epoch.
4727        let mut desired_upgrades: Vec<_> = capabilities
4728            .into_iter()
4729            .filter_map(|mut cap| {
4730                // A validator that lists no packages is voting against any change at all.
4731                if cap.available_system_packages.is_empty() {
4732                    return None;
4733                }
4734
4735                cap.available_system_packages.sort();
4736
4737                info!(
4738                    "validator {:?} supports {:?} with system packages: {:?}",
4739                    cap.authority.concise(),
4740                    cap.supported_protocol_versions,
4741                    cap.available_system_packages,
4742                );
4743
4744                // A validator that only supports the current protocol version is also voting
4745                // against any change, because framework upgrades always require a protocol
4746                // version bump.
4747                cap.supported_protocol_versions
4748                    .get_version_digest(proposed_protocol_version)
4749                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4750            })
4751            .collect();
4752
4753        // There can only be one set of votes that have a majority, find one if it
4754        // exists.
4755        desired_upgrades.sort();
4756        desired_upgrades
4757            .into_iter()
4758            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4759            .into_iter()
4760            .find_map(|((digest, packages), group)| {
4761                // should have been filtered out earlier.
4762                assert!(!packages.is_empty());
4763
4764                let mut stake_aggregator: StakeAggregator<(), true> =
4765                    StakeAggregator::new(Arc::new(committee.clone()));
4766
4767                for (_, _, authority) in group {
4768                    stake_aggregator.insert_generic(authority, ());
4769                }
4770
4771                let total_votes = stake_aggregator.total_votes();
4772                let quorum_threshold = committee.quorum_threshold();
4773                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4774
4775                info!(
4776                    protocol_config_digest = ?digest,
4777                    ?total_votes,
4778                    ?quorum_threshold,
4779                    ?buffer_stake_bps,
4780                    ?effective_threshold,
4781                    ?proposed_protocol_version,
4782                    ?packages,
4783                    "support for upgrade"
4784                );
4785
4786                let has_support = total_votes >= effective_threshold;
4787                has_support.then_some((proposed_protocol_version, digest, packages))
4788            })
4789    }
4790
4791    /// Selects the highest supported protocol version and system packages that
4792    /// the network has voted to upgrade to. If no upgrade is supported,
4793    /// returns the current protocol version and system packages.
4794    fn choose_protocol_version_and_system_packages_v1(
4795        current_protocol_version: ProtocolVersion,
4796        current_protocol_digest: Digest,
4797        committee: &Committee,
4798        capabilities: Vec<AuthorityCapabilitiesV1>,
4799        buffer_stake_bps: u64,
4800    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4801        let mut next_protocol_version = current_protocol_version;
4802        let mut system_packages = vec![];
4803        let mut protocol_version_digest = current_protocol_digest;
4804
4805        // Finds the highest supported protocol version and system packages by
4806        // incrementing the proposed protocol version by one until no further
4807        // upgrades are supported.
4808        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4809            next_protocol_version + 1,
4810            committee,
4811            capabilities.clone(),
4812            buffer_stake_bps,
4813        ) {
4814            next_protocol_version = version;
4815            protocol_version_digest = digest;
4816            system_packages = packages;
4817        }
4818
4819        (
4820            next_protocol_version,
4821            protocol_version_digest,
4822            system_packages,
4823        )
4824    }
4825
4826    /// Returns the indices of validators that support the given protocol
4827    /// version and digest. This includes both committee and non-committee
4828    /// validators based on their capabilities. Uses active validators
4829    /// instead of committee indices.
4830    fn get_validators_supporting_protocol_version(
4831        target_protocol_version: ProtocolVersion,
4832        target_digest: Digest,
4833        active_validators: &[AuthorityPublicKey],
4834        capabilities: &[AuthorityCapabilitiesV1],
4835    ) -> Vec<u64> {
4836        let mut eligible_validators = Vec::new();
4837
4838        for capability in capabilities {
4839            // Check if this validator supports the target protocol version and digest
4840            if let Some(digest) = capability
4841                .supported_protocol_versions
4842                .get_version_digest(target_protocol_version)
4843            {
4844                if digest == target_digest {
4845                    // Find the validator's index in the active validators list
4846                    if let Some(index) = active_validators
4847                        .iter()
4848                        .position(|name| AuthorityName::from(name) == capability.authority)
4849                    {
4850                        eligible_validators.push(index as u64);
4851                    }
4852                }
4853            }
4854        }
4855
4856        // Sort indices for deterministic behavior
4857        eligible_validators.sort();
4858        eligible_validators
4859    }
4860
4861    /// Calculates the sum of weights for eligible validators that are part of
4862    /// the committee. Takes the indices from
4863    /// get_validators_supporting_protocol_version and maps them back
4864    /// to committee members to get their weights.
4865    fn calculate_eligible_validators_weight(
4866        eligible_validator_indices: &[u64],
4867        active_validators: &[AuthorityPublicKey],
4868        committee: &Committee,
4869    ) -> u64 {
4870        let mut total_weight = 0u64;
4871
4872        for &index in eligible_validator_indices {
4873            let authority_pubkey = &active_validators[index as usize];
4874            // Check if this validator is in the committee and get their weight
4875            if let Some((_, weight)) = committee
4876                .members()
4877                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4878            {
4879                total_weight += weight;
4880            }
4881        }
4882
4883        total_weight
4884    }
4885
4886    #[instrument(level = "debug", skip_all)]
4887    fn create_authenticator_state_tx(
4888        &self,
4889        epoch_store: &Arc<AuthorityPerEpochStore>,
4890    ) -> Option<EndOfEpochTransactionKind> {
4891        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4892            info!("authenticator state transactions not enabled");
4893            return None;
4894        }
4895
4896        let authenticator_state_exists = epoch_store.authenticator_state_exists();
4897        let tx = if authenticator_state_exists {
4898            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4899            let min_epoch =
4900                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4901            let authenticator_obj_initial_shared_version = epoch_store
4902                .epoch_start_config()
4903                .authenticator_obj_initial_shared_version()
4904                .expect("initial version must exist");
4905
4906            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4907                min_epoch,
4908                authenticator_obj_initial_shared_version,
4909            );
4910
4911            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4912
4913            tx
4914        } else {
4915            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4916            info!("Creating AuthenticatorStateCreate tx");
4917            tx
4918        };
4919        Some(tx)
4920    }
4921
4922    /// Creates and execute the advance epoch transaction to effects without
4923    /// committing it to the database. The effects of the change epoch tx
4924    /// are only written to the database after a certified checkpoint has been
4925    /// formed and executed by CheckpointExecutor.
4926    ///
4927    /// When a framework upgraded has been decided on, but the validator does
4928    /// not have the new versions of the packages locally, the validator
4929    /// cannot form the ChangeEpochTx. In this case it returns Err,
4930    /// indicating that the checkpoint builder should give up trying to make the
4931    /// final checkpoint. As long as the network is able to create a certified
4932    /// checkpoint (which should be ensured by the capabilities vote), it
4933    /// will arrive via state sync and be executed by CheckpointExecutor.
4934    #[instrument(level = "error", skip_all)]
4935    pub async fn create_and_execute_advance_epoch_tx(
4936        &self,
4937        epoch_store: &Arc<AuthorityPerEpochStore>,
4938        gas_cost_summary: &GasCostSummary,
4939        checkpoint: CheckpointSequenceNumber,
4940        epoch_start_timestamp_ms: CheckpointTimestamp,
4941        scores: Vec<u64>,
4942    ) -> anyhow::Result<(
4943        IotaSystemState,
4944        Option<SystemEpochInfoEvent>,
4945        TransactionEffects,
4946    )> {
4947        let mut txns = Vec::new();
4948
4949        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4950            txns.push(tx);
4951        }
4952
4953        let next_epoch = epoch_store.epoch() + 1;
4954
4955        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4956        let authority_capabilities = epoch_store
4957            .get_capabilities_v1()
4958            .expect("read capabilities from db cannot fail");
4959        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4960            Self::choose_protocol_version_and_system_packages_v1(
4961                epoch_store.protocol_version(),
4962                SupportedProtocolVersionsWithHashes::protocol_config_digest(
4963                    epoch_store.protocol_config(),
4964                ),
4965                epoch_store.committee(),
4966                authority_capabilities.clone(),
4967                buffer_stake_bps,
4968            );
4969
4970        // since system packages are created during the current epoch, they should abide
4971        // by the rules of the current epoch, including the current epoch's max
4972        // Move binary format version
4973        let config = epoch_store.protocol_config();
4974        let binary_config = to_binary_config(config);
4975        let Some(next_epoch_system_package_bytes) = self
4976            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4977            .await
4978        else {
4979            error!(
4980                "upgraded system packages {:?} are not locally available, cannot create \
4981                ChangeEpochTx. validator binary must be upgraded to the correct version!",
4982                next_epoch_system_packages
4983            );
4984            // the checkpoint builder will keep retrying forever when it hits this error.
4985            // Eventually, one of two things will happen:
4986            // - The operator will upgrade this binary to one that has the new packages
4987            //   locally, and this function will succeed.
4988            // - The final checkpoint will be certified by other validators, we will receive
4989            //   it via state sync, and execute it. This will upgrade the framework
4990            //   packages, reconfigure, and most likely shut down in the new epoch (this
4991            //   validator likely doesn't support the new protocol version, or else it
4992            //   should have had the packages.)
4993            bail!("missing system packages: cannot form ChangeEpochTx");
4994        };
4995
4996        // Use ChangeEpochV3 or ChangeEpochV4 when the feature flags are enabled and
4997        // ChangeEpochV2 requirements are met
4998        if config.select_committee_from_eligible_validators() {
4999            // Get the list of eligible validators that support the target protocol version
5000            let active_validators = epoch_store.epoch_start_state().get_active_validators();
5001
5002            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5003
5004            // Use validators supporting the target protocol version as eligible validators
5005            // in the next version if select_committee_supporting_next_epoch_version feature
5006            // flag is set to true.
5007            if config.select_committee_supporting_next_epoch_version() {
5008                eligible_active_validators = Self::get_validators_supporting_protocol_version(
5009                    next_epoch_protocol_version,
5010                    next_epoch_protocol_digest,
5011                    &active_validators,
5012                    &authority_capabilities,
5013                );
5014
5015                // Calculate the total weight of eligible validators in the committee
5016                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5017                    &eligible_active_validators,
5018                    &active_validators,
5019                    epoch_store.committee(),
5020                );
5021
5022                // Safety check: ensure eligible validators have enough stake
5023                // Use the same effective threshold calculation that was used to decide the
5024                // protocol version
5025                let committee = epoch_store.committee();
5026                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5027
5028                if eligible_validators_weight < effective_threshold {
5029                    error!(
5030                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5031                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5032                    );
5033                    // Pass all active validator indices as eligible validators
5034                    // to perform selection among all of them.
5035                    eligible_active_validators = (0..active_validators.len() as u64).collect();
5036                }
5037            }
5038
5039            // Use ChangeEpochV4 when the pass_validator_scores_to_advance_epoch feature
5040            // flag is enabled.
5041            if config.pass_validator_scores_to_advance_epoch() {
5042                txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5043                    next_epoch,
5044                    next_epoch_protocol_version,
5045                    gas_cost_summary.storage_cost,
5046                    gas_cost_summary.computation_cost,
5047                    gas_cost_summary.computation_cost_burned,
5048                    gas_cost_summary.storage_rebate,
5049                    gas_cost_summary.non_refundable_storage_fee,
5050                    epoch_start_timestamp_ms,
5051                    next_epoch_system_package_bytes,
5052                    eligible_active_validators,
5053                    scores,
5054                    config.adjust_rewards_by_score(),
5055                ));
5056            } else {
5057                txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5058                    next_epoch,
5059                    next_epoch_protocol_version,
5060                    gas_cost_summary.storage_cost,
5061                    gas_cost_summary.computation_cost,
5062                    gas_cost_summary.computation_cost_burned,
5063                    gas_cost_summary.storage_rebate,
5064                    gas_cost_summary.non_refundable_storage_fee,
5065                    epoch_start_timestamp_ms,
5066                    next_epoch_system_package_bytes,
5067                    eligible_active_validators,
5068                ));
5069            }
5070        } else if config.protocol_defined_base_fee()
5071            && config.max_committee_members_count_as_option().is_some()
5072        {
5073            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5074                next_epoch,
5075                next_epoch_protocol_version,
5076                gas_cost_summary.storage_cost,
5077                gas_cost_summary.computation_cost,
5078                gas_cost_summary.computation_cost_burned,
5079                gas_cost_summary.storage_rebate,
5080                gas_cost_summary.non_refundable_storage_fee,
5081                epoch_start_timestamp_ms,
5082                next_epoch_system_package_bytes,
5083            ));
5084        } else {
5085            txns.push(EndOfEpochTransactionKind::new_change_epoch(
5086                next_epoch,
5087                next_epoch_protocol_version,
5088                gas_cost_summary.storage_cost,
5089                gas_cost_summary.computation_cost,
5090                gas_cost_summary.storage_rebate,
5091                gas_cost_summary.non_refundable_storage_fee,
5092                epoch_start_timestamp_ms,
5093                next_epoch_system_package_bytes,
5094            ));
5095        }
5096
5097        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5098
5099        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5100            tx.clone(),
5101            epoch_store.epoch(),
5102            checkpoint,
5103        );
5104
5105        let tx_digest = executable_tx.digest();
5106
5107        info!(
5108            ?next_epoch,
5109            ?next_epoch_protocol_version,
5110            ?next_epoch_system_packages,
5111            computation_cost=?gas_cost_summary.computation_cost,
5112            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5113            storage_cost=?gas_cost_summary.storage_cost,
5114            storage_rebate=?gas_cost_summary.storage_rebate,
5115            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5116            ?tx_digest,
5117            "Creating advance epoch transaction"
5118        );
5119
5120        fail_point_async!("change_epoch_tx_delay");
5121        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5122
5123        // The tx could have been executed by state sync already - if so simply return
5124        // an error. The checkpoint builder will shortly be terminated by
5125        // reconfiguration anyway.
5126        if self
5127            .get_transaction_cache_reader()
5128            .try_is_tx_already_executed(tx_digest)?
5129        {
5130            warn!("change epoch tx has already been executed via state sync");
5131            bail!("change epoch tx has already been executed via state sync",);
5132        }
5133
5134        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5135
5136        // We must manually assign the shared object versions to the transaction before
5137        // executing it. This is because we do not sequence end-of-epoch
5138        // transactions through consensus.
5139        epoch_store.assign_shared_object_versions_idempotent(
5140            self.get_object_cache_reader().as_ref(),
5141            std::slice::from_ref(&executable_tx),
5142        )?;
5143
5144        let (input_objects, _, _) =
5145            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5146
5147        let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5148            &execution_guard,
5149            &executable_tx,
5150            input_objects,
5151            None,
5152            None,
5153            epoch_store,
5154        )?;
5155        let system_obj = get_iota_system_state(&temporary_store.written)
5156            .expect("change epoch tx must write to system object");
5157        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
5158        let system_epoch_info_event = temporary_store
5159            .events
5160            .data
5161            .into_iter()
5162            .find(|event| event.is_system_epoch_info_event())
5163            .map(SystemEpochInfoEvent::from);
5164        // The system epoch info event can be `None` in case if the `advance_epoch`
5165        // Move function call failed and was executed in the safe mode.
5166        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5167
5168        // We must write tx and effects to the state sync tables so that state sync is
5169        // able to deliver to the transaction to CheckpointExecutor after it is
5170        // included in a certified checkpoint.
5171        self.get_state_sync_store()
5172            .try_insert_transaction_and_effects(&tx, &effects)
5173            .map_err(|err| {
5174                let err: anyhow::Error = err.into();
5175                err
5176            })?;
5177
5178        info!(
5179            "Effects summary of the change epoch transaction: {:?}",
5180            effects.summary_for_debug()
5181        );
5182        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5183        // The change epoch transaction cannot fail to execute.
5184        assert!(effects.status().is_ok());
5185        Ok((system_obj, system_epoch_info_event, effects))
5186    }
5187
5188    /// This function is called at the very end of the epoch.
5189    /// This step is required before updating new epoch in the db and calling
5190    /// reopen_epoch_db.
5191    #[instrument(level = "error", skip_all)]
5192    async fn revert_uncommitted_epoch_transactions(
5193        &self,
5194        epoch_store: &AuthorityPerEpochStore,
5195    ) -> IotaResult {
5196        {
5197            let state = epoch_store.get_reconfig_state_write_lock_guard();
5198            if state.should_accept_user_certs() {
5199                // Need to change this so that consensus adapter do not accept certificates from
5200                // user. This can happen if our local validator did not initiate
5201                // epoch change locally, but 2f+1 nodes already concluded the
5202                // epoch.
5203                //
5204                // This lock is essentially a barrier for
5205                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5206                // after this block
5207                epoch_store.close_user_certs(state);
5208            }
5209            // lock is dropped here
5210        }
5211        let pending_certificates = epoch_store.pending_consensus_certificates();
5212        info!(
5213            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5214            pending_certificates.len(),
5215            pending_certificates,
5216        );
5217        for digest in pending_certificates {
5218            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5219                info!(
5220                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5221                    digest
5222                );
5223                continue;
5224            }
5225            info!("Reverting {:?} at the end of epoch", digest);
5226            epoch_store.revert_executed_transaction(&digest)?;
5227            self.get_reconfig_api().try_revert_state_update(&digest)?;
5228        }
5229        info!("All uncommitted local transactions reverted");
5230        Ok(())
5231    }
5232
5233    #[instrument(level = "error", skip_all)]
5234    async fn reopen_epoch_db(
5235        &self,
5236        cur_epoch_store: &AuthorityPerEpochStore,
5237        new_committee: Committee,
5238        epoch_start_configuration: EpochStartConfiguration,
5239        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5240        epoch_last_checkpoint: CheckpointSequenceNumber,
5241    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5242        let new_epoch = new_committee.epoch;
5243        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5244        assert_eq!(
5245            epoch_start_configuration.epoch_start_state().epoch(),
5246            new_committee.epoch
5247        );
5248        fail_point!("before-open-new-epoch-store");
5249        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5250            self.name,
5251            new_committee,
5252            epoch_start_configuration,
5253            self.get_backing_package_store().clone(),
5254            self.get_object_store().clone(),
5255            expensive_safety_check_config,
5256            epoch_last_checkpoint,
5257        )?;
5258        self.epoch_store.store(new_epoch_store.clone());
5259        Ok(new_epoch_store)
5260    }
5261
5262    /// Checks if `authenticator` unlocks a valid Move account and returns the
5263    /// account-related `AuthenticatorFunctionRef` object.
5264    fn check_move_account(
5265        &self,
5266        auth_account_object_id: ObjectID,
5267        auth_account_object_seq_number: Option<SequenceNumber>,
5268        auth_account_object_digest: Option<ObjectDigest>,
5269        account_object: ObjectReadResult,
5270        signer: &IotaAddress,
5271    ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5272        let account_object = match account_object.object {
5273            ObjectReadResultKind::Object(object) => Ok(object),
5274            ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5275                Err(UserInputError::AccountObjectDeleted {
5276                    account_id: account_object.id(),
5277                    account_version: version,
5278                    transaction_digest: digest,
5279                })
5280            }
5281            // It is impossible to check the account object because it is used in a canceled
5282            // transaction and is not loaded.
5283            ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5284                Err(UserInputError::AccountObjectInCanceledTransaction {
5285                    account_id: account_object.id(),
5286                    account_version: version,
5287                })
5288            }
5289        }?;
5290
5291        let account_object_addr = IotaAddress::from(auth_account_object_id);
5292
5293        fp_ensure!(
5294            signer == &account_object_addr,
5295            UserInputError::IncorrectUserSignature {
5296                error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5297            }
5298            .into()
5299        );
5300
5301        fp_ensure!(
5302            account_object.is_shared() || account_object.is_immutable(),
5303            UserInputError::AccountObjectNotSupported {
5304                object_id: auth_account_object_id
5305            }
5306            .into()
5307        );
5308
5309        let auth_account_object_seq_number =
5310            if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5311                let account_object_version = account_object.version();
5312
5313                fp_ensure!(
5314                    account_object_version == auth_account_object_seq_number,
5315                    UserInputError::AccountObjectVersionMismatch {
5316                        object_id: auth_account_object_id,
5317                        expected_version: auth_account_object_seq_number,
5318                        actual_version: account_object_version,
5319                    }
5320                    .into()
5321                );
5322
5323                auth_account_object_seq_number
5324            } else {
5325                account_object.version()
5326            };
5327
5328        if let Some(auth_account_object_digest) = auth_account_object_digest {
5329            let expected_digest = account_object.digest();
5330            fp_ensure!(
5331                expected_digest == auth_account_object_digest,
5332                UserInputError::InvalidAccountObjectDigest {
5333                    object_id: auth_account_object_id,
5334                    expected_digest,
5335                    actual_digest: auth_account_object_digest,
5336                }
5337                .into()
5338            );
5339        }
5340
5341        let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5342            auth_account_object_id,
5343            &AuthenticatorFunctionRefV1Key::tag().into(),
5344            &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5345        )
5346        .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5347            account_object_id: auth_account_object_id,
5348        })?;
5349
5350        let authenticator_function_ref_field = self
5351            .get_object_cache_reader()
5352            .try_find_object_lt_or_eq_version(
5353                authenticator_function_ref_field_id,
5354                auth_account_object_seq_number,
5355            )?;
5356
5357        if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5358            let field_move_object = authenticator_function_ref_field_obj
5359                .data
5360                .try_as_move()
5361                .expect("dynamic field should never be a package object");
5362
5363            let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5364                field_move_object.to_rust().ok_or(
5365                    UserInputError::InvalidAuthenticatorFunctionRefField {
5366                        account_object_id: auth_account_object_id,
5367                    },
5368                )?;
5369
5370            Ok(AuthenticatorFunctionRefForExecution::new_v1(
5371                field.value,
5372                authenticator_function_ref_field_obj.compute_object_reference(),
5373                authenticator_function_ref_field_obj.owner,
5374                authenticator_function_ref_field_obj.storage_rebate,
5375                authenticator_function_ref_field_obj.previous_transaction,
5376            ))
5377        } else {
5378            Err(UserInputError::MoveAuthenticatorNotFound {
5379                authenticator_function_ref_id: authenticator_function_ref_field_id,
5380                account_object_id: auth_account_object_id,
5381                account_object_version: auth_account_object_seq_number,
5382            }
5383            .into())
5384        }
5385    }
5386
5387    fn read_objects_for_signing(
5388        &self,
5389        transaction: &VerifiedTransaction,
5390        epoch: u64,
5391    ) -> IotaResult<(
5392        InputObjects,
5393        ReceivingObjects,
5394        Option<InputObjects>,
5395        Option<ObjectReadResult>,
5396    )> {
5397        let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5398            Some(transaction.digest()),
5399            &transaction.collect_all_input_object_kind_for_reading()?,
5400            &transaction.data().transaction_data().receiving_objects(),
5401            epoch,
5402        )?;
5403
5404        transaction
5405            .split_input_objects_into_groups_for_reading(input_objects)
5406            .map(|(tx_input_objects, auth_input_objects, account_object)| {
5407                (
5408                    tx_input_objects,
5409                    tx_receiving_objects,
5410                    auth_input_objects,
5411                    account_object,
5412                )
5413            })
5414    }
5415
5416    fn check_transaction_inputs_for_signing(
5417        &self,
5418        protocol_config: &ProtocolConfig,
5419        reference_gas_price: u64,
5420        tx_data: &TransactionData,
5421        tx_input_objects: InputObjects,
5422        tx_receiving_objects: &ReceivingObjects,
5423        move_authenticator: Option<&MoveAuthenticator>,
5424        auth_input_objects: Option<InputObjects>,
5425        account_object: Option<ObjectReadResult>,
5426    ) -> IotaResult<(
5427        IotaGasStatus,
5428        CheckedInputObjects,
5429        Option<CheckedInputObjects>,
5430        Option<AuthenticatorFunctionRef>,
5431    )> {
5432        let (
5433            auth_checked_input_objects_union,
5434            authenticator_function_ref,
5435            authenticator_gas_budget,
5436        ) = if let Some(move_authenticator) = move_authenticator {
5437            let auth_input_objects =
5438                auth_input_objects.expect("MoveAuthenticator input objects must be provided");
5439            let account_object = account_object.expect("Move account object must be provided");
5440
5441            // Check basic `object_to_authenticate` preconditions and get its components.
5442            let (
5443                auth_account_object_id,
5444                auth_account_object_seq_number,
5445                auth_account_object_digest,
5446            ) = move_authenticator.object_to_authenticate_components()?;
5447
5448            // Make sure the sender is a Move account.
5449            let AuthenticatorFunctionRefForExecution {
5450                authenticator_function_ref,
5451                ..
5452            } = self.check_move_account(
5453                auth_account_object_id,
5454                auth_account_object_seq_number,
5455                auth_account_object_digest,
5456                account_object,
5457                &tx_data.sender(),
5458            )?;
5459
5460            // Check the MoveAuthenticator input objects.
5461            let auth_checked_input_objects =
5462                iota_transaction_checks::check_move_authenticator_input_for_signing(
5463                    auth_input_objects,
5464                )?;
5465
5466            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
5467            // not a part of the transaction data.
5468            let authenticator_gas_budget = protocol_config.max_auth_gas();
5469
5470            (
5471                Some(auth_checked_input_objects),
5472                Some(authenticator_function_ref),
5473                authenticator_gas_budget,
5474            )
5475        } else {
5476            (None, None, 0)
5477        };
5478
5479        // Check the transaction inputs.
5480        let (gas_status, tx_checked_input_objects) =
5481            iota_transaction_checks::check_transaction_input(
5482                protocol_config,
5483                reference_gas_price,
5484                tx_data,
5485                tx_input_objects,
5486                tx_receiving_objects,
5487                &self.metrics.bytecode_verifier_metrics,
5488                &self.config.verifier_signing_config,
5489                authenticator_gas_budget,
5490            )?;
5491
5492        Ok((
5493            gas_status,
5494            tx_checked_input_objects,
5495            auth_checked_input_objects_union,
5496            authenticator_function_ref,
5497        ))
5498    }
5499
5500    #[cfg(test)]
5501    pub(crate) fn iter_live_object_set_for_testing(
5502        &self,
5503    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5504        self.get_accumulator_store()
5505            .iter_cached_live_object_set_for_testing()
5506    }
5507
5508    #[cfg(test)]
5509    pub(crate) fn shutdown_execution_for_test(&self) {
5510        self.tx_execution_shutdown
5511            .lock()
5512            .take()
5513            .unwrap()
5514            .send(())
5515            .unwrap();
5516    }
5517
5518    /// NOTE: this function is only to be used for fuzzing and testing. Never
5519    /// use in prod
5520    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5521        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5522        self.get_object_cache_reader()
5523            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5524        self.get_reconfig_api()
5525            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5526    }
5527}
5528
5529pub struct RandomnessRoundReceiver {
5530    authority_state: Arc<AuthorityState>,
5531    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5532}
5533
5534impl RandomnessRoundReceiver {
5535    pub fn spawn(
5536        authority_state: Arc<AuthorityState>,
5537        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5538    ) -> JoinHandle<()> {
5539        let rrr = RandomnessRoundReceiver {
5540            authority_state,
5541            randomness_rx,
5542        };
5543        spawn_monitored_task!(rrr.run())
5544    }
5545
5546    async fn run(mut self) {
5547        info!("RandomnessRoundReceiver event loop started");
5548
5549        loop {
5550            tokio::select! {
5551                maybe_recv = self.randomness_rx.recv() => {
5552                    if let Some((epoch, round, bytes)) = maybe_recv {
5553                        self.handle_new_randomness(epoch, round, bytes);
5554                    } else {
5555                        break;
5556                    }
5557                },
5558            }
5559        }
5560
5561        info!("RandomnessRoundReceiver event loop ended");
5562    }
5563
5564    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5565    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5566        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5567        if epoch_store.epoch() != epoch {
5568            warn!(
5569                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5570                epoch_store.epoch()
5571            );
5572            return;
5573        }
5574        let transaction = VerifiedTransaction::new_randomness_state_update(
5575            epoch,
5576            round,
5577            bytes,
5578            epoch_store
5579                .epoch_start_config()
5580                .randomness_obj_initial_shared_version(),
5581        );
5582        debug!(
5583            "created randomness state update transaction with digest: {:?}",
5584            transaction.digest()
5585        );
5586        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5587        let digest = *transaction.digest();
5588
5589        // Randomness state updates contain the full bls signature for the random round,
5590        // which cannot necessarily be reconstructed again later. Therefore we must
5591        // immediately persist this transaction. If we crash before its outputs
5592        // are committed, this ensures we will be able to re-execute it.
5593        self.authority_state
5594            .get_cache_commit()
5595            .persist_transaction(&transaction);
5596
5597        // Send transaction to TransactionManager for execution.
5598        self.authority_state
5599            .transaction_manager()
5600            .enqueue(vec![transaction], &epoch_store);
5601
5602        let authority_state = self.authority_state.clone();
5603        spawn_monitored_task!(async move {
5604            // Wait for transaction execution in a separate task, to avoid deadlock in case
5605            // of out-of-order randomness generation. (Each
5606            // RandomnessStateUpdate depends on the output of the
5607            // RandomnessStateUpdate from the previous round.)
5608            //
5609            // We set a very long timeout so that in case this gets stuck for some reason,
5610            // the validator will eventually crash rather than continuing in a
5611            // zombie mode.
5612            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5613            let result = tokio::time::timeout(
5614                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5615                authority_state
5616                    .get_transaction_cache_reader()
5617                    .try_notify_read_executed_effects(&[digest]),
5618            )
5619            .await;
5620            let result = match result {
5621                Ok(result) => result,
5622                Err(_) => {
5623                    if cfg!(debug_assertions) {
5624                        // Crash on randomness update execution timeout in debug builds.
5625                        panic!(
5626                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5627                        );
5628                    }
5629                    warn!(
5630                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5631                    );
5632                    // Continue waiting as long as necessary in non-debug builds.
5633                    authority_state
5634                        .get_transaction_cache_reader()
5635                        .try_notify_read_executed_effects(&[digest])
5636                        .await
5637                }
5638            };
5639
5640            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5641            let effects = effects.pop().expect("should return effects");
5642            if *effects.status() != ExecutionStatus::Success {
5643                fatal!(
5644                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5645                );
5646            }
5647            debug!(
5648                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5649            );
5650        });
5651    }
5652}
5653
5654#[async_trait]
5655impl TransactionKeyValueStoreTrait for AuthorityState {
5656    async fn multi_get(
5657        &self,
5658        transaction_keys: &[TransactionDigest],
5659        effects_keys: &[TransactionDigest],
5660    ) -> IotaResult<KVStoreTransactionData> {
5661        let txns = if !transaction_keys.is_empty() {
5662            self.get_transaction_cache_reader()
5663                .try_multi_get_transaction_blocks(transaction_keys)?
5664                .into_iter()
5665                .map(|t| t.map(|t| (*t).clone().into_inner()))
5666                .collect()
5667        } else {
5668            vec![]
5669        };
5670
5671        let fx = if !effects_keys.is_empty() {
5672            self.get_transaction_cache_reader()
5673                .try_multi_get_executed_effects(effects_keys)?
5674        } else {
5675            vec![]
5676        };
5677
5678        Ok((txns, fx))
5679    }
5680
5681    async fn multi_get_checkpoints(
5682        &self,
5683        checkpoint_summaries: &[CheckpointSequenceNumber],
5684        checkpoint_contents: &[CheckpointSequenceNumber],
5685        checkpoint_summaries_by_digest: &[CheckpointDigest],
5686    ) -> IotaResult<(
5687        Vec<Option<CertifiedCheckpointSummary>>,
5688        Vec<Option<CheckpointContents>>,
5689        Vec<Option<CertifiedCheckpointSummary>>,
5690    )> {
5691        // TODO: use multi-get methods if it ever becomes important (unlikely)
5692        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5693        let store = self.get_checkpoint_store();
5694        for seq in checkpoint_summaries {
5695            let checkpoint = store
5696                .get_checkpoint_by_sequence_number(*seq)?
5697                .map(|c| c.into_inner());
5698
5699            summaries.push(checkpoint);
5700        }
5701
5702        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5703        for seq in checkpoint_contents {
5704            let checkpoint = store
5705                .get_checkpoint_by_sequence_number(*seq)?
5706                .and_then(|summary| {
5707                    store
5708                        .get_checkpoint_contents(&summary.content_digest)
5709                        .expect("db read cannot fail")
5710                });
5711            contents.push(checkpoint);
5712        }
5713
5714        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5715        for digest in checkpoint_summaries_by_digest {
5716            let checkpoint = store
5717                .get_checkpoint_by_digest(digest)?
5718                .map(|c| c.into_inner());
5719            summaries_by_digest.push(checkpoint);
5720        }
5721
5722        Ok((summaries, contents, summaries_by_digest))
5723    }
5724
5725    async fn get_transaction_perpetual_checkpoint(
5726        &self,
5727        digest: TransactionDigest,
5728    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5729        self.get_checkpoint_cache()
5730            .try_get_transaction_perpetual_checkpoint(&digest)
5731            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5732    }
5733
5734    async fn get_object(
5735        &self,
5736        object_id: ObjectID,
5737        version: VersionNumber,
5738    ) -> IotaResult<Option<Object>> {
5739        self.get_object_cache_reader()
5740            .try_get_object_by_key(&object_id, version)
5741    }
5742
5743    async fn multi_get_transactions_perpetual_checkpoints(
5744        &self,
5745        digests: &[TransactionDigest],
5746    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5747        let res = self
5748            .get_checkpoint_cache()
5749            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5750
5751        Ok(res
5752            .into_iter()
5753            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5754            .collect())
5755    }
5756
5757    #[instrument(skip(self))]
5758    async fn multi_get_events_by_tx_digests(
5759        &self,
5760        digests: &[TransactionDigest],
5761    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5762        if digests.is_empty() {
5763            return Ok(vec![]);
5764        }
5765        let events_digests: Vec<_> = self
5766            .get_transaction_cache_reader()
5767            .try_multi_get_executed_effects(digests)?
5768            .into_iter()
5769            .map(|t| t.and_then(|t| t.events_digest().cloned()))
5770            .collect();
5771        let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5772        let mut events = self
5773            .get_transaction_cache_reader()
5774            .try_multi_get_events(&non_empty_events)?
5775            .into_iter();
5776        Ok(events_digests
5777            .into_iter()
5778            .map(|ev| ev.and_then(|_| events.next()?))
5779            .collect())
5780    }
5781}
5782
5783#[cfg(msim)]
5784pub mod framework_injection {
5785    use std::{
5786        cell::RefCell,
5787        collections::{BTreeMap, BTreeSet},
5788    };
5789
5790    use iota_framework::{BuiltInFramework, SystemPackage};
5791    use iota_types::{
5792        base_types::{AuthorityName, ObjectID},
5793        is_system_package,
5794    };
5795    use move_binary_format::CompiledModule;
5796
5797    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5798
5799    // Thread local cache because all simtests run in a single unique thread.
5800    thread_local! {
5801        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5802    }
5803
5804    type Framework = Vec<CompiledModule>;
5805
5806    pub type PackageUpgradeCallback =
5807        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5808
5809    enum PackageOverrideConfig {
5810        Global(Framework),
5811        PerValidator(PackageUpgradeCallback),
5812    }
5813
5814    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5815        modules
5816            .iter()
5817            .map(|m| {
5818                let mut buf = Vec::new();
5819                m.serialize_with_version(m.version, &mut buf).unwrap();
5820                buf
5821            })
5822            .collect()
5823    }
5824
5825    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5826        OVERRIDE.with(|bs| {
5827            bs.borrow_mut()
5828                .insert(package_id, PackageOverrideConfig::Global(modules))
5829        });
5830    }
5831
5832    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5833        OVERRIDE.with(|bs| {
5834            bs.borrow_mut()
5835                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5836        });
5837    }
5838
5839    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5840        OVERRIDE.with(|cfg| {
5841            cfg.borrow().get(package_id).and_then(|entry| match entry {
5842                PackageOverrideConfig::Global(framework) => {
5843                    Some(compiled_modules_to_bytes(framework))
5844                }
5845                PackageOverrideConfig::PerValidator(func) => {
5846                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
5847                }
5848            })
5849        })
5850    }
5851
5852    pub fn get_override_modules(
5853        package_id: &ObjectID,
5854        name: AuthorityName,
5855    ) -> Option<Vec<CompiledModule>> {
5856        OVERRIDE.with(|cfg| {
5857            cfg.borrow().get(package_id).and_then(|entry| match entry {
5858                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5859                PackageOverrideConfig::PerValidator(func) => func(name),
5860            })
5861        })
5862    }
5863
5864    pub fn get_override_system_package(
5865        package_id: &ObjectID,
5866        name: AuthorityName,
5867    ) -> Option<SystemPackage> {
5868        let bytes = get_override_bytes(package_id, name)?;
5869        let dependencies = if is_system_package(*package_id) {
5870            BuiltInFramework::get_package_by_id(package_id)
5871                .dependencies
5872                .to_vec()
5873        } else {
5874            // Assume that entirely new injected packages depend on all existing system
5875            // packages.
5876            BuiltInFramework::all_package_ids()
5877        };
5878        Some(SystemPackage {
5879            id: *package_id,
5880            bytes,
5881            dependencies,
5882        })
5883    }
5884
5885    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5886        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5887        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5888            cfg.borrow()
5889                .keys()
5890                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5891                .collect()
5892        });
5893
5894        extra
5895            .into_iter()
5896            .map(|package| SystemPackage {
5897                id: package,
5898                bytes: get_override_bytes(&package, name).unwrap(),
5899                dependencies: BuiltInFramework::all_package_ids(),
5900            })
5901            .collect()
5902    }
5903}
5904
5905#[derive(Debug, Serialize, Deserialize, Clone)]
5906pub struct ObjDumpFormat {
5907    pub id: ObjectID,
5908    pub version: VersionNumber,
5909    pub digest: ObjectDigest,
5910    pub object: Object,
5911}
5912
5913impl ObjDumpFormat {
5914    fn new(object: Object) -> Self {
5915        let oref = object.compute_object_reference();
5916        Self {
5917            id: oref.0,
5918            version: oref.1,
5919            digest: oref.2,
5920            object,
5921        }
5922    }
5923}
5924
5925#[derive(Debug, Serialize, Deserialize, Clone)]
5926pub struct NodeStateDump {
5927    pub tx_digest: TransactionDigest,
5928    pub sender_signed_data: SenderSignedData,
5929    pub executed_epoch: u64,
5930    pub reference_gas_price: u64,
5931    pub protocol_version: u64,
5932    pub epoch_start_timestamp_ms: u64,
5933    pub computed_effects: TransactionEffects,
5934    pub expected_effects_digest: TransactionEffectsDigest,
5935    pub relevant_system_packages: Vec<ObjDumpFormat>,
5936    pub shared_objects: Vec<ObjDumpFormat>,
5937    pub loaded_child_objects: Vec<ObjDumpFormat>,
5938    pub modified_at_versions: Vec<ObjDumpFormat>,
5939    pub runtime_reads: Vec<ObjDumpFormat>,
5940    pub input_objects: Vec<ObjDumpFormat>,
5941}
5942
5943impl NodeStateDump {
5944    pub fn new(
5945        tx_digest: &TransactionDigest,
5946        effects: &TransactionEffects,
5947        expected_effects_digest: TransactionEffectsDigest,
5948        object_store: &dyn ObjectStore,
5949        epoch_store: &Arc<AuthorityPerEpochStore>,
5950        inner_temporary_store: &InnerTemporaryStore,
5951        certificate: &VerifiedExecutableTransaction,
5952    ) -> IotaResult<Self> {
5953        // Epoch info
5954        let executed_epoch = epoch_store.epoch();
5955        let reference_gas_price = epoch_store.reference_gas_price();
5956        let epoch_start_config = epoch_store.epoch_start_config();
5957        let protocol_version = epoch_store.protocol_version().as_u64();
5958        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5959
5960        // Record all system packages at this version
5961        let mut relevant_system_packages = Vec::new();
5962        for sys_package_id in BuiltInFramework::all_package_ids() {
5963            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5964                relevant_system_packages.push(ObjDumpFormat::new(w))
5965            }
5966        }
5967
5968        // Record all the shared objects
5969        let mut shared_objects = Vec::new();
5970        for kind in effects.input_shared_objects() {
5971            match kind {
5972                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5973                    if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5974                        shared_objects.push(ObjDumpFormat::new(w))
5975                    }
5976                }
5977                InputSharedObject::ReadDeleted(..)
5978                | InputSharedObject::MutateDeleted(..)
5979                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
5980                                                           * objects. */
5981            }
5982        }
5983
5984        // Record all loaded child objects
5985        // Child objects which are read but not mutated are not tracked anywhere else
5986        let mut loaded_child_objects = Vec::new();
5987        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5988            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5989                loaded_child_objects.push(ObjDumpFormat::new(w))
5990            }
5991        }
5992
5993        // Record all modified objects
5994        let mut modified_at_versions = Vec::new();
5995        for (id, ver) in effects.modified_at_versions() {
5996            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5997                modified_at_versions.push(ObjDumpFormat::new(w))
5998            }
5999        }
6000
6001        // Packages read at runtime, which were not previously loaded into the temoorary
6002        // store Some packages may be fetched at runtime and wont show up in
6003        // input objects
6004        let mut runtime_reads = Vec::new();
6005        for obj in inner_temporary_store
6006            .runtime_packages_loaded_from_db
6007            .values()
6008        {
6009            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6010        }
6011
6012        // All other input objects should already be in `inner_temporary_store.objects`
6013
6014        Ok(Self {
6015            tx_digest: *tx_digest,
6016            executed_epoch,
6017            reference_gas_price,
6018            epoch_start_timestamp_ms,
6019            protocol_version,
6020            relevant_system_packages,
6021            shared_objects,
6022            loaded_child_objects,
6023            modified_at_versions,
6024            runtime_reads,
6025            sender_signed_data: certificate.clone().into_message(),
6026            input_objects: inner_temporary_store
6027                .input_objects
6028                .values()
6029                .map(|o| ObjDumpFormat::new(o.clone()))
6030                .collect(),
6031            computed_effects: effects.clone(),
6032            expected_effects_digest,
6033        })
6034    }
6035
6036    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6037        let mut objects = Vec::new();
6038        objects.extend(self.relevant_system_packages.clone());
6039        objects.extend(self.shared_objects.clone());
6040        objects.extend(self.loaded_child_objects.clone());
6041        objects.extend(self.modified_at_versions.clone());
6042        objects.extend(self.runtime_reads.clone());
6043        objects.extend(self.input_objects.clone());
6044        objects
6045    }
6046
6047    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6048        let file_name = format!(
6049            "{}_{}_NODE_DUMP.json",
6050            self.tx_digest,
6051            AuthorityState::unixtime_now_ms()
6052        );
6053        let mut path = path.to_path_buf();
6054        path.push(&file_name);
6055        let mut file = File::create(path.clone())?;
6056        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6057        Ok(path)
6058    }
6059
6060    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6061        let file = File::open(path)?;
6062        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6063    }
6064}