iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49    key_value_store::{
50        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51    },
52    key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57    IOTA_SYSTEM_ADDRESS, TypeTag,
58    account_abstraction::{
59        account::AuthenticatorFunctionRefV1Key,
60        authenticator_function::{
61            AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
62            AuthenticatorFunctionRefV1,
63        },
64    },
65    authenticator_state::get_authenticator_state,
66    base_types::*,
67    committee::{Committee, EpochId, ProtocolVersion},
68    crypto::{
69        AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
70        default_hash,
71    },
72    deny_list_v1::check_coin_deny_list_v1_during_signing,
73    digests::{ChainIdentifier, Digest, TransactionEventsDigest},
74    dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
75    effects::{
76        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
77        TransactionEvents, VerifiedSignedTransactionEffects,
78    },
79    error::{ExecutionError, IotaError, IotaResult, UserInputError},
80    event::{Event, EventID, SystemEpochInfoEvent},
81    executable_transaction::VerifiedExecutableTransaction,
82    execution_config_utils::to_binary_config,
83    execution_status::ExecutionStatus,
84    fp_ensure,
85    gas::{GasCostSummary, IotaGasStatus},
86    gas_coin::NANOS_PER_IOTA,
87    inner_temporary_store::{
88        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
89        WrittenObjects,
90    },
91    iota_system_state::{
92        IotaSystemState, IotaSystemStateTrait,
93        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
94    },
95    is_system_package,
96    layout_resolver::{LayoutResolver, into_struct_layout},
97    message_envelope::Message,
98    messages_checkpoint::{
99        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
100        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
101        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
102        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
103    },
104    messages_consensus::AuthorityCapabilitiesV1,
105    messages_grpc::{
106        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
107        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
108        TransactionStatus,
109    },
110    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
111    move_authenticator::MoveAuthenticator,
112    object::{
113        MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
114        bounded_visitor::BoundedVisitor,
115    },
116    storage::{
117        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
118    },
119    supported_protocol_versions::{
120        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
121    },
122    transaction::*,
123    transaction_executor::{SimulateTransactionResult, VmChecks},
124};
125use itertools::Itertools;
126use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
127use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
128use parking_lot::Mutex;
129use prometheus::{
130    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131    register_histogram_vec_with_registry, register_histogram_with_registry,
132    register_int_counter_vec_with_registry, register_int_counter_with_registry,
133    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139    task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152    authority::{
153        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157        authority_store_tables::AuthorityPrunerTables,
158        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159    },
160    authority_client::NetworkAuthorityClient,
161    checkpoints::CheckpointStore,
162    congestion_tracker::CongestionTracker,
163    consensus_adapter::ConsensusAdapter,
164    epoch::committee_store::CommitteeStore,
165    execution_cache::{
166        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
167        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
168        TransactionCacheRead,
169    },
170    execution_driver::execution_process,
171    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
172    metrics::{LatencyObserver, RateTracker},
173    module_cache_metrics::ResolverMetrics,
174    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
175    rest_index::RestIndexStore,
176    stake_aggregator::StakeAggregator,
177    state_accumulator::{AccumulatorStore, StateAccumulator},
178    subscription_handler::SubscriptionHandler,
179    transaction_input_loader::TransactionInputLoader,
180    transaction_manager::TransactionManager,
181    transaction_outputs::TransactionOutputs,
182    validator_tx_finalizer::ValidatorTxFinalizer,
183    verify_indexes::verify_indexes,
184};
185
186#[cfg(test)]
187#[path = "unit_tests/authority_tests.rs"]
188pub mod authority_tests;
189
190#[cfg(test)]
191#[path = "unit_tests/transaction_tests.rs"]
192pub mod transaction_tests;
193
194#[cfg(test)]
195#[path = "unit_tests/batch_transaction_tests.rs"]
196mod batch_transaction_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/move_integration_tests.rs"]
200pub mod move_integration_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/gas_tests.rs"]
204mod gas_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/batch_verification_tests.rs"]
208mod batch_verification_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/coin_deny_list_tests.rs"]
212mod coin_deny_list_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/auth_unit_test_utils.rs"]
216pub mod auth_unit_test_utils;
217
218pub mod authority_test_utils;
219
220pub mod authority_per_epoch_store;
221pub mod authority_per_epoch_store_pruner;
222
223pub mod authority_store_pruner;
224pub mod authority_store_tables;
225pub mod authority_store_types;
226pub mod epoch_start_configuration;
227pub mod shared_object_congestion_tracker;
228pub mod shared_object_version_manager;
229pub mod suggested_gas_price_calculator;
230pub mod test_authority_builder;
231pub mod transaction_deferral;
232
233pub(crate) mod authority_store;
234pub mod backpressure;
235
236/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
237pub struct AuthorityMetrics {
238    tx_orders: IntCounter,
239    total_certs: IntCounter,
240    total_cert_attempts: IntCounter,
241    total_effects: IntCounter,
242    pub shared_obj_tx: IntCounter,
243    sponsored_tx: IntCounter,
244    tx_already_processed: IntCounter,
245    num_input_objs: Histogram,
246    num_shared_objects: Histogram,
247    batch_size: Histogram,
248
249    authority_state_handle_transaction_latency: Histogram,
250
251    execute_certificate_latency_single_writer: Histogram,
252    execute_certificate_latency_shared_object: Histogram,
253
254    internal_execution_latency: Histogram,
255    execution_load_input_objects_latency: Histogram,
256    prepare_certificate_latency: Histogram,
257    commit_certificate_latency: Histogram,
258    db_checkpoint_latency: Histogram,
259
260    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
261    pub(crate) transaction_manager_num_missing_objects: IntGauge,
262    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
263    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
264    pub(crate) transaction_manager_num_ready: IntGauge,
265    pub(crate) transaction_manager_object_cache_size: IntGauge,
266    pub(crate) transaction_manager_object_cache_hits: IntCounter,
267    pub(crate) transaction_manager_object_cache_misses: IntCounter,
268    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
269    pub(crate) transaction_manager_package_cache_size: IntGauge,
270    pub(crate) transaction_manager_package_cache_hits: IntCounter,
271    pub(crate) transaction_manager_package_cache_misses: IntCounter,
272    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
273    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
274
275    pub(crate) execution_driver_executed_transactions: IntCounter,
276    pub(crate) execution_driver_dispatch_queue: IntGauge,
277    pub(crate) execution_queueing_delay_s: Histogram,
278    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
279    pub(crate) execution_gas_latency_ratio: Histogram,
280
281    pub(crate) skipped_consensus_txns: IntCounter,
282    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
283
284    pub(crate) authority_overload_status: IntGauge,
285    pub(crate) authority_load_shedding_percentage: IntGauge,
286
287    pub(crate) transaction_overload_sources: IntCounterVec,
288
289    /// Post processing metrics
290    post_processing_total_events_emitted: IntCounter,
291    post_processing_total_tx_indexed: IntCounter,
292    post_processing_total_tx_had_event_processed: IntCounter,
293    post_processing_total_failures: IntCounter,
294
295    /// Consensus handler metrics
296    pub consensus_handler_processed: IntCounterVec,
297    pub consensus_handler_transaction_sizes: HistogramVec,
298    pub consensus_handler_num_low_scoring_authorities: IntGauge,
299    pub consensus_handler_scores: IntGaugeVec,
300    pub consensus_handler_deferred_transactions: IntCounter,
301    pub consensus_handler_congested_transactions: IntCounter,
302    pub consensus_handler_cancelled_transactions: IntCounter,
303    pub consensus_handler_max_object_costs: IntGaugeVec,
304    pub consensus_committed_subdags: IntCounterVec,
305    pub consensus_committed_messages: IntGaugeVec,
306    pub consensus_committed_user_transactions: IntGaugeVec,
307    pub consensus_handler_leader_round: IntGauge,
308    pub consensus_calculated_throughput: IntGauge,
309    pub consensus_calculated_throughput_profile: IntGauge,
310
311    pub limits_metrics: Arc<LimitsMetrics>,
312
313    /// bytecode verifier metrics for tracking timeouts
314    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
315
316    /// Count of zklogin signatures
317    pub zklogin_sig_count: IntCounter,
318    /// Count of multisig signatures
319    pub multisig_sig_count: IntCounter,
320
321    // Tracks recent average txn queueing delay between when it is ready for execution
322    // until it starts executing.
323    pub execution_queueing_latency: LatencyObserver,
324
325    // Tracks the rate of transactions become ready for execution in transaction manager.
326    // The need for the Mutex is that the tracker is updated in transaction manager and read
327    // in the overload_monitor. There should be low mutex contention because
328    // transaction manager is single threaded and the read rate in overload_monitor is
329    // low. In the case where transaction manager becomes multi-threaded, we can
330    // create one rate tracker per thread.
331    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
332
333    // Tracks the rate of transactions starts execution in execution driver.
334    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
335    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
336}
337
338// Override default Prom buckets for positive numbers in 0-10M range
339const POSITIVE_INT_BUCKETS: &[f64] = &[
340    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
341    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
342    7000000., 10000000.,
343];
344
345const LATENCY_SEC_BUCKETS: &[f64] = &[
346    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
347    10., 20., 30., 60., 90.,
348];
349
350// Buckets for low latency samples. Starts from 10us.
351const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
352    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
353    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
354];
355
356const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
357    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
358    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
359];
360
361/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
362pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
363
364impl AuthorityMetrics {
365    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
366        let execute_certificate_latency = register_histogram_vec_with_registry!(
367            "authority_state_execute_certificate_latency",
368            "Latency of executing certificates, including waiting for inputs",
369            &["tx_type"],
370            LATENCY_SEC_BUCKETS.to_vec(),
371            registry,
372        )
373        .unwrap();
374
375        let execute_certificate_latency_single_writer =
376            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
377        let execute_certificate_latency_shared_object =
378            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
379
380        Self {
381            tx_orders: register_int_counter_with_registry!(
382                "total_transaction_orders",
383                "Total number of transaction orders",
384                registry,
385            )
386            .unwrap(),
387            total_certs: register_int_counter_with_registry!(
388                "total_transaction_certificates",
389                "Total number of transaction certificates handled",
390                registry,
391            )
392            .unwrap(),
393            total_cert_attempts: register_int_counter_with_registry!(
394                "total_handle_certificate_attempts",
395                "Number of calls to handle_certificate",
396                registry,
397            )
398            .unwrap(),
399            // total_effects == total transactions finished
400            total_effects: register_int_counter_with_registry!(
401                "total_transaction_effects",
402                "Total number of transaction effects produced",
403                registry,
404            )
405            .unwrap(),
406
407            shared_obj_tx: register_int_counter_with_registry!(
408                "num_shared_obj_tx",
409                "Number of transactions involving shared objects",
410                registry,
411            )
412            .unwrap(),
413
414            sponsored_tx: register_int_counter_with_registry!(
415                "num_sponsored_tx",
416                "Number of sponsored transactions",
417                registry,
418            )
419            .unwrap(),
420
421            tx_already_processed: register_int_counter_with_registry!(
422                "num_tx_already_processed",
423                "Number of transaction orders already processed previously",
424                registry,
425            )
426            .unwrap(),
427            num_input_objs: register_histogram_with_registry!(
428                "num_input_objects",
429                "Distribution of number of input TX objects per TX",
430                POSITIVE_INT_BUCKETS.to_vec(),
431                registry,
432            )
433            .unwrap(),
434            num_shared_objects: register_histogram_with_registry!(
435                "num_shared_objects",
436                "Number of shared input objects per TX",
437                POSITIVE_INT_BUCKETS.to_vec(),
438                registry,
439            )
440            .unwrap(),
441            batch_size: register_histogram_with_registry!(
442                "batch_size",
443                "Distribution of size of transaction batch",
444                POSITIVE_INT_BUCKETS.to_vec(),
445                registry,
446            )
447            .unwrap(),
448            authority_state_handle_transaction_latency: register_histogram_with_registry!(
449                "authority_state_handle_transaction_latency",
450                "Latency of handling transactions",
451                LATENCY_SEC_BUCKETS.to_vec(),
452                registry,
453            )
454            .unwrap(),
455            execute_certificate_latency_single_writer,
456            execute_certificate_latency_shared_object,
457            internal_execution_latency: register_histogram_with_registry!(
458                "authority_state_internal_execution_latency",
459                "Latency of actual certificate executions",
460                LATENCY_SEC_BUCKETS.to_vec(),
461                registry,
462            )
463            .unwrap(),
464            execution_load_input_objects_latency: register_histogram_with_registry!(
465                "authority_state_execution_load_input_objects_latency",
466                "Latency of loading input objects for execution",
467                LOW_LATENCY_SEC_BUCKETS.to_vec(),
468                registry,
469            )
470            .unwrap(),
471            prepare_certificate_latency: register_histogram_with_registry!(
472                "authority_state_prepare_certificate_latency",
473                "Latency of executing certificates, before committing the results",
474                LATENCY_SEC_BUCKETS.to_vec(),
475                registry,
476            )
477            .unwrap(),
478            commit_certificate_latency: register_histogram_with_registry!(
479                "authority_state_commit_certificate_latency",
480                "Latency of committing certificate execution results",
481                LATENCY_SEC_BUCKETS.to_vec(),
482                registry,
483            )
484            .unwrap(),
485            db_checkpoint_latency: register_histogram_with_registry!(
486                "db_checkpoint_latency",
487                "Latency of checkpointing dbs",
488                LATENCY_SEC_BUCKETS.to_vec(),
489                registry,
490            ).unwrap(),
491            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
492                "transaction_manager_num_enqueued_certificates",
493                "Current number of certificates enqueued to TransactionManager",
494                &["result"],
495                registry,
496            )
497            .unwrap(),
498            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
499                "transaction_manager_num_missing_objects",
500                "Current number of missing objects in TransactionManager",
501                registry,
502            )
503            .unwrap(),
504            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
505                "transaction_manager_num_pending_certificates",
506                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
507                registry,
508            )
509            .unwrap(),
510            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
511                "transaction_manager_num_executing_certificates",
512                "Number of executing certificates, including queued and actually running certificates",
513                registry,
514            )
515            .unwrap(),
516            transaction_manager_num_ready: register_int_gauge_with_registry!(
517                "transaction_manager_num_ready",
518                "Number of ready transactions in TransactionManager",
519                registry,
520            )
521            .unwrap(),
522            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
523                "transaction_manager_object_cache_size",
524                "Current size of object-availability cache in TransactionManager",
525                registry,
526            )
527            .unwrap(),
528            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
529                "transaction_manager_object_cache_hits",
530                "Number of object-availability cache hits in TransactionManager",
531                registry,
532            )
533            .unwrap(),
534            authority_overload_status: register_int_gauge_with_registry!(
535                "authority_overload_status",
536                "Whether authority is current experiencing overload and enters load shedding mode.",
537                registry)
538            .unwrap(),
539            authority_load_shedding_percentage: register_int_gauge_with_registry!(
540                "authority_load_shedding_percentage",
541                "The percentage of transactions is shed when the authority is in load shedding mode.",
542                registry)
543            .unwrap(),
544            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
545                "transaction_manager_object_cache_misses",
546                "Number of object-availability cache misses in TransactionManager",
547                registry,
548            )
549            .unwrap(),
550            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
551                "transaction_manager_object_cache_evictions",
552                "Number of object-availability cache evictions in TransactionManager",
553                registry,
554            )
555            .unwrap(),
556            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
557                "transaction_manager_package_cache_size",
558                "Current size of package-availability cache in TransactionManager",
559                registry,
560            )
561            .unwrap(),
562            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
563                "transaction_manager_package_cache_hits",
564                "Number of package-availability cache hits in TransactionManager",
565                registry,
566            )
567            .unwrap(),
568            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
569                "transaction_manager_package_cache_misses",
570                "Number of package-availability cache misses in TransactionManager",
571                registry,
572            )
573            .unwrap(),
574            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
575                "transaction_manager_package_cache_evictions",
576                "Number of package-availability cache evictions in TransactionManager",
577                registry,
578            )
579            .unwrap(),
580            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
581                "transaction_manager_transaction_queue_age_s",
582                "Time spent in waiting for transaction in the queue",
583                LATENCY_SEC_BUCKETS.to_vec(),
584                registry,
585            )
586            .unwrap(),
587            transaction_overload_sources: register_int_counter_vec_with_registry!(
588                "transaction_overload_sources",
589                "Number of times each source indicates transaction overload.",
590                &["source"],
591                registry)
592            .unwrap(),
593            execution_driver_executed_transactions: register_int_counter_with_registry!(
594                "execution_driver_executed_transactions",
595                "Cumulative number of transaction executed by execution driver",
596                registry,
597            )
598            .unwrap(),
599            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
600                "execution_driver_dispatch_queue",
601                "Number of transaction pending in execution driver dispatch queue",
602                registry,
603            )
604            .unwrap(),
605            execution_queueing_delay_s: register_histogram_with_registry!(
606                "execution_queueing_delay_s",
607                "Queueing delay between a transaction is ready for execution until it starts executing.",
608                LATENCY_SEC_BUCKETS.to_vec(),
609                registry
610            )
611            .unwrap(),
612            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
613                "prepare_cert_gas_latency_ratio",
614                "The ratio of computation gas divided by VM execution latency.",
615                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
616                registry
617            )
618            .unwrap(),
619            execution_gas_latency_ratio: register_histogram_with_registry!(
620                "execution_gas_latency_ratio",
621                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
622                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
623                registry
624            )
625            .unwrap(),
626            skipped_consensus_txns: register_int_counter_with_registry!(
627                "skipped_consensus_txns",
628                "Total number of consensus transactions skipped",
629                registry,
630            )
631            .unwrap(),
632            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
633                "skipped_consensus_txns_cache_hit",
634                "Total number of consensus transactions skipped because of local cache hit",
635                registry,
636            )
637            .unwrap(),
638            post_processing_total_events_emitted: register_int_counter_with_registry!(
639                "post_processing_total_events_emitted",
640                "Total number of events emitted in post processing",
641                registry,
642            )
643            .unwrap(),
644            post_processing_total_tx_indexed: register_int_counter_with_registry!(
645                "post_processing_total_tx_indexed",
646                "Total number of txes indexed in post processing",
647                registry,
648            )
649            .unwrap(),
650            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
651                "post_processing_total_tx_had_event_processed",
652                "Total number of txes finished event processing in post processing",
653                registry,
654            )
655            .unwrap(),
656            post_processing_total_failures: register_int_counter_with_registry!(
657                "post_processing_total_failures",
658                "Total number of failure in post processing",
659                registry,
660            )
661            .unwrap(),
662            consensus_handler_processed: register_int_counter_vec_with_registry!(
663                "consensus_handler_processed",
664                "Number of transactions processed by consensus handler",
665                &["class"],
666                registry
667            ).unwrap(),
668            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
669                "consensus_handler_transaction_sizes",
670                "Sizes of each type of transactions processed by consensus handler",
671                &["class"],
672                POSITIVE_INT_BUCKETS.to_vec(),
673                registry
674            ).unwrap(),
675            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
676                "consensus_handler_num_low_scoring_authorities",
677                "Number of low scoring authorities based on reputation scores from consensus",
678                registry
679            ).unwrap(),
680            consensus_handler_scores: register_int_gauge_vec_with_registry!(
681                "consensus_handler_scores",
682                "scores from consensus for each authority",
683                &["authority"],
684                registry,
685            ).unwrap(),
686            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
687                "consensus_handler_deferred_transactions",
688                "Number of transactions deferred by consensus handler",
689                registry,
690            ).unwrap(),
691            consensus_handler_congested_transactions: register_int_counter_with_registry!(
692                "consensus_handler_congested_transactions",
693                "Number of transactions deferred by consensus handler due to congestion",
694                registry,
695            ).unwrap(),
696            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
697                "consensus_handler_cancelled_transactions",
698                "Number of transactions cancelled by consensus handler",
699                registry,
700            ).unwrap(),
701            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
702                "consensus_handler_max_congestion_control_object_costs",
703                "Max object costs for congestion control in the current consensus commit",
704                &["commit_type"],
705                registry,
706            ).unwrap(),
707            consensus_committed_subdags: register_int_counter_vec_with_registry!(
708                "consensus_committed_subdags",
709                "Number of committed subdags, sliced by leader",
710                &["authority"],
711                registry,
712            ).unwrap(),
713            consensus_committed_messages: register_int_gauge_vec_with_registry!(
714                "consensus_committed_messages",
715                "Total number of committed consensus messages, sliced by author",
716                &["authority"],
717                registry,
718            ).unwrap(),
719            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
720                "consensus_committed_user_transactions",
721                "Number of committed user transactions, sliced by submitter",
722                &["authority"],
723                registry,
724            ).unwrap(),
725            consensus_handler_leader_round: register_int_gauge_with_registry!(
726                "consensus_handler_leader_round",
727                "The leader round of the current consensus output being processed in the consensus handler",
728                registry,
729            ).unwrap(),
730            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
731            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
732            zklogin_sig_count: register_int_counter_with_registry!(
733                "zklogin_sig_count",
734                "Count of zkLogin signatures",
735                registry,
736            )
737            .unwrap(),
738            multisig_sig_count: register_int_counter_with_registry!(
739                "multisig_sig_count",
740                "Count of zkLogin signatures",
741                registry,
742            )
743            .unwrap(),
744            consensus_calculated_throughput: register_int_gauge_with_registry!(
745                "consensus_calculated_throughput",
746                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
747                registry,
748            ).unwrap(),
749            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
750                "consensus_calculated_throughput_profile",
751                "The current active calculated throughput profile",
752                registry
753            ).unwrap(),
754            execution_queueing_latency: LatencyObserver::new(),
755            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
756            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757        }
758    }
759
760    /// Reset metrics that contain `hostname` as one of the labels. This is
761    /// needed to avoid retaining metrics for long-gone committee members and
762    /// only exposing metrics for the committee in the current epoch.
763    pub fn reset_on_reconfigure(&self) {
764        self.consensus_committed_messages.reset();
765        self.consensus_handler_scores.reset();
766        self.consensus_committed_user_transactions.reset();
767    }
768}
769
770/// a Trait object for `Signer` that is:
771/// - Pin, i.e. confined to one place in memory (we don't want to copy private
772///   keys).
773/// - Sync, i.e. can be safely shared between threads.
774///
775/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
776pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
777
778pub struct AuthorityState {
779    // Fixed size, static, identity of the authority
780    /// The name of this authority.
781    pub name: AuthorityName,
782    /// The signature key of the authority.
783    pub secret: StableSyncAuthoritySigner,
784
785    /// The database
786    input_loader: TransactionInputLoader,
787    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
788
789    epoch_store: ArcSwap<AuthorityPerEpochStore>,
790
791    /// This lock denotes current 'execution epoch'.
792    /// Execution acquires read lock, checks certificate epoch and holds it
793    /// until all writes are complete. Reconfiguration acquires write lock,
794    /// changes the epoch and revert all transactions from previous epoch
795    /// that are executed but did not make into checkpoint.
796    execution_lock: RwLock<EpochId>,
797
798    pub indexes: Option<Arc<IndexStore>>,
799    pub rest_index: Option<Arc<RestIndexStore>>,
800
801    pub subscription_handler: Arc<SubscriptionHandler>,
802    pub checkpoint_store: Arc<CheckpointStore>,
803
804    committee_store: Arc<CommitteeStore>,
805
806    /// Manages pending certificates and their missing input objects.
807    transaction_manager: Arc<TransactionManager>,
808
809    /// Shuts down the execution task. Used only in testing.
810    #[cfg_attr(not(test), expect(unused))]
811    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
812
813    pub metrics: Arc<AuthorityMetrics>,
814    _pruner: AuthorityStorePruner,
815    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
816
817    /// Take db checkpoints of different dbs
818    db_checkpoint_config: DBCheckpointConfig,
819
820    pub config: NodeConfig,
821
822    /// Current overload status in this authority. Updated periodically.
823    pub overload_info: AuthorityOverloadInfo,
824
825    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
826
827    /// The chain identifier is derived from the digest of the genesis
828    /// checkpoint.
829    chain_identifier: ChainIdentifier,
830
831    pub(crate) congestion_tracker: Arc<CongestionTracker>,
832}
833
834/// The authority state encapsulates all state, drives execution, and ensures
835/// safety.
836///
837/// Note the authority operations can be accessed through a read ref (&) and do
838/// not require &mut. Internally a database is synchronized through a mutex
839/// lock.
840///
841/// Repeating valid commands should produce no changes and return no error.
842impl AuthorityState {
843    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
844        epoch_store.committee().authority_exists(&self.name)
845    }
846
847    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
848        epoch_store
849            .active_validators()
850            .iter()
851            .any(|a| AuthorityName::from(a) == self.name)
852    }
853
854    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
855        !self.is_committee_validator(epoch_store)
856    }
857
858    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
859        &self.committee_store
860    }
861
862    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
863        self.committee_store.clone()
864    }
865
866    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
867        &self.config.authority_overload_config
868    }
869
870    pub fn get_epoch_state_commitments(
871        &self,
872        epoch: EpochId,
873    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
874        self.checkpoint_store.get_epoch_state_commitments(epoch)
875    }
876
877    /// This is a private method and should be kept that way. It doesn't check
878    /// whether the provided transaction is a system transaction, and hence
879    /// can only be called internally.
880    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
881    async fn handle_transaction_impl(
882        &self,
883        transaction: VerifiedTransaction,
884        epoch_store: &Arc<AuthorityPerEpochStore>,
885    ) -> IotaResult<VerifiedSignedTransaction> {
886        // Ensure that validator cannot reconfigure while we are signing the tx
887        let _execution_lock = self.execution_lock_for_signing();
888
889        let protocol_config = epoch_store.protocol_config();
890        let reference_gas_price = epoch_store.reference_gas_price();
891
892        let epoch = epoch_store.epoch();
893
894        let tx_data = transaction.data().transaction_data();
895
896        // Note: the deny checks may do redundant package loads but:
897        // - they only load packages when there is an active package deny map
898        // - the loads are cached anyway
899        iota_transaction_checks::deny::check_transaction_for_signing(
900            tx_data,
901            transaction.tx_signatures(),
902            &transaction.input_objects()?,
903            &tx_data.receiving_objects(),
904            &self.config.transaction_deny_config,
905            self.get_backing_package_store().as_ref(),
906        )?;
907
908        // Load all transaction-related input objects.
909        // Authenticator input objects and the account object are loaded in the same
910        // call if there is a sender `MoveAuthenticator` signature present in the
911        // transaction.
912        let (tx_input_objects, tx_receiving_objects, auth_input_objects, account_object) =
913            self.read_objects_for_signing(&transaction, epoch)?;
914
915        // Get the sender `MoveAuthenticator`, if any.
916        // Only one `MoveAuthenticator` signature is possible, since it is not
917        // implemented for the sponsor at the moment.
918        let move_authenticator = transaction.sender_move_authenticator();
919
920        // Check the inputs for signing.
921        // If there is a sender `MoveAuthenticator` signature, its input objects and the
922        // account object are also checked and must be provided.
923        // It is also checked if there is enough gas to execute the transaction and its
924        // authenticators.
925        let (
926            gas_status,
927            tx_checked_input_objects,
928            auth_checked_input_objects,
929            authenticator_function_ref,
930        ) = self.check_transaction_inputs_for_signing(
931            protocol_config,
932            reference_gas_price,
933            tx_data,
934            tx_input_objects,
935            &tx_receiving_objects,
936            move_authenticator,
937            auth_input_objects,
938            account_object,
939        )?;
940
941        check_coin_deny_list_v1_during_signing(
942            tx_data.sender(),
943            &tx_checked_input_objects,
944            &tx_receiving_objects,
945            &auth_checked_input_objects,
946            &self.get_object_store(),
947        )?;
948
949        if let Some(move_authenticator) = move_authenticator {
950            // It is supposed that `MoveAuthenticator` availability is checked in
951            // `SenderSignedData::validity_check`.
952
953            let auth_checked_input_objects = auth_checked_input_objects
954                .expect("MoveAuthenticator input objects must be provided");
955            let authenticator_function_ref = authenticator_function_ref
956                .expect("AuthenticatorFunctionRefV1 object must be provided");
957
958            let (kind, signer, _) = tx_data.execution_parts();
959
960            // Execute the Move authenticator.
961            let validation_result = epoch_store.executor().authenticate_transaction(
962                self.get_backing_store().as_ref(),
963                protocol_config,
964                self.metrics.limits_metrics.clone(),
965                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
966                epoch_store
967                    .epoch_start_config()
968                    .epoch_data()
969                    .epoch_start_timestamp(),
970                gas_status,
971                move_authenticator.to_owned(),
972                authenticator_function_ref,
973                auth_checked_input_objects,
974                kind,
975                signer,
976                transaction.digest().to_owned(),
977                &mut None,
978            );
979
980            if let Err(validation_error) = validation_result {
981                return Err(IotaError::MoveAuthenticatorExecutionFailure {
982                    error: validation_error.to_string(),
983                });
984            }
985        }
986
987        let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
988
989        let signed_transaction =
990            VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
991
992        // Check and write locks, to signed transaction, into the database
993        // The call to self.set_transaction_lock checks the lock is not conflicting,
994        // and returns ConflictingTransaction error in case there is a lock on a
995        // different existing transaction.
996        self.get_cache_writer().try_acquire_transaction_locks(
997            epoch_store,
998            &owned_objects,
999            signed_transaction.clone(),
1000        )?;
1001
1002        Ok(signed_transaction)
1003    }
1004
1005    /// Initiate a new transaction.
1006    #[instrument(name = "handle_transaction", level = "trace", skip_all)]
1007    pub async fn handle_transaction(
1008        &self,
1009        epoch_store: &Arc<AuthorityPerEpochStore>,
1010        transaction: VerifiedTransaction,
1011    ) -> IotaResult<HandleTransactionResponse> {
1012        let tx_digest = *transaction.digest();
1013        debug!("handle_transaction");
1014
1015        // Ensure an idempotent answer.
1016        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1017            return Ok(HandleTransactionResponse { status });
1018        }
1019
1020        let _metrics_guard = self
1021            .metrics
1022            .authority_state_handle_transaction_latency
1023            .start_timer();
1024        self.metrics.tx_orders.inc();
1025
1026        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1027        match signed {
1028            Ok(s) => {
1029                if self.is_committee_validator(epoch_store) {
1030                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1031                        let tx = s.clone();
1032                        let validator_tx_finalizer = validator_tx_finalizer.clone();
1033                        let cache_reader = self.get_transaction_cache_reader().clone();
1034                        let epoch_store = epoch_store.clone();
1035                        spawn_monitored_task!(epoch_store.within_alive_epoch(
1036                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1037                        ));
1038                    }
1039                }
1040                Ok(HandleTransactionResponse {
1041                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
1042                })
1043            }
1044            // It happens frequently that while we are checking the validity of the transaction, it
1045            // has just been executed.
1046            // In that case, we could still return Ok to avoid showing confusing errors.
1047            Err(err) => Ok(HandleTransactionResponse {
1048                status: self
1049                    .get_transaction_status(&tx_digest, epoch_store)?
1050                    .ok_or(err)?
1051                    .1,
1052            }),
1053        }
1054    }
1055
1056    pub fn check_system_overload_at_signing(&self) -> bool {
1057        self.config
1058            .authority_overload_config
1059            .check_system_overload_at_signing
1060    }
1061
1062    pub fn check_system_overload_at_execution(&self) -> bool {
1063        self.config
1064            .authority_overload_config
1065            .check_system_overload_at_execution
1066    }
1067
1068    pub(crate) fn check_system_overload(
1069        &self,
1070        consensus_adapter: &Arc<ConsensusAdapter>,
1071        tx_data: &SenderSignedData,
1072        do_authority_overload_check: bool,
1073    ) -> IotaResult {
1074        if do_authority_overload_check {
1075            self.check_authority_overload(tx_data).tap_err(|_| {
1076                self.update_overload_metrics("execution_queue");
1077            })?;
1078        }
1079        self.transaction_manager
1080            .check_execution_overload(self.overload_config(), tx_data)
1081            .tap_err(|_| {
1082                self.update_overload_metrics("execution_pending");
1083            })?;
1084        consensus_adapter.check_consensus_overload().tap_err(|_| {
1085            self.update_overload_metrics("consensus");
1086        })?;
1087
1088        let pending_tx_count = self
1089            .get_cache_commit()
1090            .approximate_pending_transaction_count();
1091        if pending_tx_count
1092            > self
1093                .config
1094                .execution_cache_config
1095                .writeback_cache
1096                .backpressure_threshold_for_rpc()
1097        {
1098            return Err(IotaError::ValidatorOverloadedRetryAfter {
1099                retry_after_secs: 10,
1100            });
1101        }
1102
1103        Ok(())
1104    }
1105
1106    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1107        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1108            return Ok(());
1109        }
1110
1111        let load_shedding_percentage = self
1112            .overload_info
1113            .load_shedding_percentage
1114            .load(Ordering::Relaxed);
1115        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1116    }
1117
1118    fn update_overload_metrics(&self, source: &str) {
1119        self.metrics
1120            .transaction_overload_sources
1121            .with_label_values(&[source])
1122            .inc();
1123    }
1124
1125    /// Executes a certificate for its effects.
1126    #[instrument(level = "trace", skip_all)]
1127    pub async fn execute_certificate(
1128        &self,
1129        certificate: &VerifiedCertificate,
1130        epoch_store: &Arc<AuthorityPerEpochStore>,
1131    ) -> IotaResult<TransactionEffects> {
1132        let _metrics_guard = if certificate.contains_shared_object() {
1133            self.metrics
1134                .execute_certificate_latency_shared_object
1135                .start_timer()
1136        } else {
1137            self.metrics
1138                .execute_certificate_latency_single_writer
1139                .start_timer()
1140        };
1141        trace!("execute_certificate");
1142
1143        self.metrics.total_cert_attempts.inc();
1144
1145        if !certificate.contains_shared_object() {
1146            // Shared object transactions need to be sequenced by the consensus before
1147            // enqueueing for execution, done in
1148            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1149            // object transactions, they can be enqueued for execution immediately.
1150            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1151        }
1152
1153        // tx could be reverted when epoch ends, so we must be careful not to return a
1154        // result here after the epoch ends.
1155        epoch_store
1156            .within_alive_epoch(self.notify_read_effects(certificate))
1157            .await
1158            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1159            .and_then(|r| r)
1160    }
1161
1162    /// Internal logic to execute a certificate.
1163    ///
1164    /// Guarantees that
1165    /// - If input objects are available, return no permanent failure.
1166    /// - Execution and output commit are atomic. i.e. outputs are only written
1167    ///   to storage,
1168    /// on successful execution; crashed execution has no observable effect and
1169    /// can be retried.
1170    ///
1171    /// It is caller's responsibility to ensure input objects are available and
1172    /// locks are set. If this cannot be satisfied by the caller,
1173    /// execute_certificate() should be called instead.
1174    ///
1175    /// Should only be called within iota-core.
1176    #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1177    pub fn try_execute_immediately(
1178        &self,
1179        certificate: &VerifiedExecutableTransaction,
1180        expected_effects_digest: Option<TransactionEffectsDigest>,
1181        epoch_store: &Arc<AuthorityPerEpochStore>,
1182    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1183        let _scope = monitored_scope("Execution::try_execute_immediately");
1184        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1185
1186        let tx_digest = certificate.digest();
1187
1188        // Acquire a lock to prevent concurrent executions of the same transaction.
1189        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1190
1191        // The cert could have been processed by a concurrent attempt of the same cert,
1192        // so check if the effects have already been written.
1193        if let Some(effects) = self
1194            .get_transaction_cache_reader()
1195            .try_get_executed_effects(tx_digest)?
1196        {
1197            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1198                assert_eq!(
1199                    effects.digest(),
1200                    expected_effects_digest_inner,
1201                    "Unexpected effects digest for transaction {tx_digest:?}"
1202                );
1203            }
1204            tx_guard.release();
1205            return Ok((effects, None));
1206        }
1207
1208        let (tx_input_objects, authenticator_input_objects, account_object) =
1209            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1210
1211        // If no expected_effects_digest was provided, try to get it from storage.
1212        // We could be re-executing a previously executed but uncommitted transaction,
1213        // perhaps after restarting with a new binary. In this situation, if
1214        // we have published an effects signature, we must be sure not to
1215        // equivocate. TODO: read from cache instead of DB
1216        let expected_effects_digest =
1217            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1218
1219        self.process_certificate(
1220            tx_guard,
1221            certificate,
1222            tx_input_objects,
1223            account_object,
1224            authenticator_input_objects,
1225            expected_effects_digest,
1226            epoch_store,
1227        )
1228        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1229        .tap_ok(
1230            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1231        )
1232    }
1233
1234    pub fn read_objects_for_execution(
1235        &self,
1236        tx_lock: &CertLockGuard,
1237        certificate: &VerifiedExecutableTransaction,
1238        epoch_store: &Arc<AuthorityPerEpochStore>,
1239    ) -> IotaResult<(InputObjects, Option<InputObjects>, Option<ObjectReadResult>)> {
1240        let _scope = monitored_scope("Execution::load_input_objects");
1241        let _metrics_guard = self
1242            .metrics
1243            .execution_load_input_objects_latency
1244            .start_timer();
1245
1246        let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1247
1248        let input_objects = self.input_loader.read_objects_for_execution(
1249            epoch_store,
1250            &certificate.key(),
1251            tx_lock,
1252            &input_objects,
1253            epoch_store.epoch(),
1254        )?;
1255
1256        certificate.split_input_objects_into_groups_for_reading(input_objects)
1257    }
1258
1259    /// Test only wrapper for `try_execute_immediately()` above, useful for
1260    /// checking errors if the pre-conditions are not satisfied, and
1261    /// executing change epoch transactions.
1262    pub fn try_execute_for_test(
1263        &self,
1264        certificate: &VerifiedCertificate,
1265    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1266        let epoch_store = self.epoch_store_for_testing();
1267        let (effects, execution_error_opt) = self.try_execute_immediately(
1268            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1269            None,
1270            &epoch_store,
1271        )?;
1272        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1273        Ok((signed_effects, execution_error_opt))
1274    }
1275
1276    /// Non-fallible version of `try_execute_for_test()`.
1277    pub fn execute_for_test(
1278        &self,
1279        certificate: &VerifiedCertificate,
1280    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1281        self.try_execute_for_test(certificate)
1282            .expect("try_execute_for_test should not fail")
1283    }
1284
1285    pub async fn notify_read_effects(
1286        &self,
1287        certificate: &VerifiedCertificate,
1288    ) -> IotaResult<TransactionEffects> {
1289        self.get_transaction_cache_reader()
1290            .try_notify_read_executed_effects(&[*certificate.digest()])
1291            .await
1292            .map(|mut r| r.pop().expect("must return correct number of effects"))
1293    }
1294
1295    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1296        self.get_object_cache_reader()
1297            .try_check_owned_objects_are_live(owned_object_refs)
1298    }
1299
1300    /// This function captures the required state to debug a forked transaction.
1301    /// The dump is written to a file in dir `path`, with name prefixed by the
1302    /// transaction digest. NOTE: Since this info escapes the validator
1303    /// context, make sure not to leak any private info here
1304    pub(crate) fn debug_dump_transaction_state(
1305        &self,
1306        tx_digest: &TransactionDigest,
1307        effects: &TransactionEffects,
1308        expected_effects_digest: TransactionEffectsDigest,
1309        inner_temporary_store: &InnerTemporaryStore,
1310        certificate: &VerifiedExecutableTransaction,
1311        debug_dump_config: &StateDebugDumpConfig,
1312    ) -> IotaResult<PathBuf> {
1313        let dump_dir = debug_dump_config
1314            .dump_file_directory
1315            .as_ref()
1316            .cloned()
1317            .unwrap_or(std::env::temp_dir());
1318        let epoch_store = self.load_epoch_store_one_call_per_task();
1319
1320        NodeStateDump::new(
1321            tx_digest,
1322            effects,
1323            expected_effects_digest,
1324            self.get_object_store().as_ref(),
1325            &epoch_store,
1326            inner_temporary_store,
1327            certificate,
1328        )?
1329        .write_to_file(&dump_dir)
1330        .map_err(|e| IotaError::FileIO(e.to_string()))
1331    }
1332
1333    #[instrument(level = "trace", skip_all)]
1334    pub(crate) fn process_certificate(
1335        &self,
1336        tx_guard: CertTxGuard,
1337        certificate: &VerifiedExecutableTransaction,
1338        tx_input_objects: InputObjects,
1339        account_object: Option<ObjectReadResult>,
1340        authenticator_input_objects: Option<InputObjects>,
1341        expected_effects_digest: Option<TransactionEffectsDigest>,
1342        epoch_store: &Arc<AuthorityPerEpochStore>,
1343    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1344        let process_certificate_start_time = tokio::time::Instant::now();
1345        let digest = *certificate.digest();
1346
1347        fail_point_if!("correlated-crash-process-certificate", || {
1348            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1349                iota_simulator::task::kill_current_node(None);
1350            }
1351        });
1352
1353        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1354        // Any caller that verifies the signatures on the certificate will have already
1355        // checked the epoch. But paths that don't verify sigs (e.g. execution
1356        // from checkpoint, reading from db) present the possibility of an epoch
1357        // mismatch. If this cert is not finalzied in previous epoch, then it's
1358        // invalid.
1359        let execution_guard = match execution_guard {
1360            Ok(execution_guard) => execution_guard,
1361            Err(err) => {
1362                tx_guard.release();
1363                return Err(err);
1364            }
1365        };
1366        // Since we obtain a reference to the epoch store before taking the execution
1367        // lock, it's possible that reconfiguration has happened and they no
1368        // longer match.
1369        if *execution_guard != epoch_store.epoch() {
1370            tx_guard.release();
1371            info!("The epoch of the execution_guard doesn't match the epoch store");
1372            return Err(IotaError::WrongEpoch {
1373                expected_epoch: epoch_store.epoch(),
1374                actual_epoch: *execution_guard,
1375            });
1376        }
1377
1378        // Errors originating from prepare_certificate may be transient (failure to read
1379        // locks) or non-transient (transaction input is invalid, move vm
1380        // errors). However, all errors from this function occur before we have
1381        // written anything to the db, so we commit the tx guard and rely on the
1382        // client to retry the tx (if it was transient).
1383        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1384            &execution_guard,
1385            certificate,
1386            tx_input_objects,
1387            account_object,
1388            authenticator_input_objects,
1389            epoch_store,
1390        ) {
1391            Err(e) => {
1392                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1393                tx_guard.release();
1394                return Err(e);
1395            }
1396            Ok(res) => res,
1397        };
1398
1399        if let Some(expected_effects_digest) = expected_effects_digest {
1400            if effects.digest() != expected_effects_digest {
1401                // We dont want to mask the original error, so we log it and continue.
1402                match self.debug_dump_transaction_state(
1403                    &digest,
1404                    &effects,
1405                    expected_effects_digest,
1406                    &inner_temporary_store,
1407                    certificate,
1408                    &self.config.state_debug_dump_config,
1409                ) {
1410                    Ok(out_path) => {
1411                        info!(
1412                            "Dumped node state for transaction {} to {}",
1413                            digest,
1414                            out_path.as_path().display().to_string()
1415                        );
1416                    }
1417                    Err(e) => {
1418                        error!("Error dumping state for transaction {}: {e}", digest);
1419                    }
1420                }
1421                error!(
1422                    tx_digest = ?digest,
1423                    ?expected_effects_digest,
1424                    actual_effects = ?effects,
1425                    "fork detected!"
1426                );
1427                panic!(
1428                    "Transaction {} is expected to have effects digest {}, but got {}!",
1429                    digest,
1430                    expected_effects_digest,
1431                    effects.digest(),
1432                );
1433            }
1434        }
1435
1436        fail_point!("crash");
1437
1438        self.commit_certificate(
1439            certificate,
1440            inner_temporary_store,
1441            &effects,
1442            tx_guard,
1443            execution_guard,
1444            epoch_store,
1445        )?;
1446
1447        if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1448            certificate.data().transaction_data().kind()
1449        {
1450            if let Some(err) = &execution_error_opt {
1451                debug_fatal!("Authenticator state update failed: {:?}", err);
1452            }
1453            epoch_store.update_authenticator_state(auth_state);
1454
1455            // double check that the signature verifier always matches the authenticator
1456            // state
1457            if cfg!(debug_assertions) {
1458                let authenticator_state = get_authenticator_state(self.get_object_store())
1459                    .expect("Read cannot fail")
1460                    .expect("Authenticator state must exist");
1461
1462                let mut sys_jwks: Vec<_> = authenticator_state
1463                    .active_jwks
1464                    .into_iter()
1465                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1466                    .collect();
1467                let mut active_jwks: Vec<_> = epoch_store
1468                    .signature_verifier
1469                    .get_jwks()
1470                    .into_iter()
1471                    .collect();
1472                sys_jwks.sort();
1473                active_jwks.sort();
1474
1475                assert_eq!(sys_jwks, active_jwks);
1476            }
1477        }
1478
1479        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1480        if elapsed > 0.0 {
1481            self.metrics
1482                .execution_gas_latency_ratio
1483                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1484        };
1485        Ok((effects, execution_error_opt))
1486    }
1487
1488    #[instrument(level = "trace", skip_all)]
1489    fn commit_certificate(
1490        &self,
1491        certificate: &VerifiedExecutableTransaction,
1492        inner_temporary_store: InnerTemporaryStore,
1493        effects: &TransactionEffects,
1494        tx_guard: CertTxGuard,
1495        _execution_guard: ExecutionLockReadGuard<'_>,
1496        epoch_store: &Arc<AuthorityPerEpochStore>,
1497    ) -> IotaResult {
1498        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1499            monitored_scope("Execution::commit_certificate");
1500        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1501
1502        let tx_key = certificate.key();
1503        let tx_digest = certificate.digest();
1504        let input_object_count = inner_temporary_store.input_objects.len();
1505        let shared_object_count = effects.input_shared_objects().len();
1506
1507        let output_keys = inner_temporary_store.get_output_keys(effects);
1508
1509        // index certificate
1510        let _ = self
1511            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1512            .tap_err(|e| {
1513                self.metrics.post_processing_total_failures.inc();
1514                error!(?tx_digest, "tx post processing failed: {e}");
1515            });
1516
1517        // The insertion to epoch_store is not atomic with the insertion to the
1518        // perpetual store. This is OK because we insert to the epoch store
1519        // first. And during lookups we always look up in the perpetual store first.
1520        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1521
1522        // Allow testing what happens if we crash here.
1523        fail_point!("crash");
1524
1525        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1526            certificate.clone().into_unsigned(),
1527            effects.clone(),
1528            inner_temporary_store,
1529        );
1530        self.get_cache_writer()
1531            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1532
1533        if certificate.transaction_data().is_end_of_epoch_tx() {
1534            // At the end of epoch, since system packages may have been upgraded, force
1535            // reload them in the cache.
1536            self.get_object_cache_reader()
1537                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1538        }
1539
1540        // commit_certificate finished, the tx is fully committed to the store.
1541        tx_guard.commit_tx();
1542
1543        // Notifies transaction manager about transaction and output objects committed.
1544        // This provides necessary information to transaction manager to start executing
1545        // additional ready transactions.
1546        self.transaction_manager
1547            .notify_commit(tx_digest, output_keys, epoch_store);
1548
1549        self.update_metrics(certificate, input_object_count, shared_object_count);
1550
1551        Ok(())
1552    }
1553
1554    fn update_metrics(
1555        &self,
1556        certificate: &VerifiedExecutableTransaction,
1557        input_object_count: usize,
1558        shared_object_count: usize,
1559    ) {
1560        // count signature by scheme, for zklogin and multisig
1561        if certificate.has_zklogin_sig() {
1562            self.metrics.zklogin_sig_count.inc();
1563        } else if certificate.has_upgraded_multisig() {
1564            self.metrics.multisig_sig_count.inc();
1565        }
1566
1567        self.metrics.total_effects.inc();
1568        self.metrics.total_certs.inc();
1569
1570        if shared_object_count > 0 {
1571            self.metrics.shared_obj_tx.inc();
1572        }
1573
1574        if certificate.is_sponsored_tx() {
1575            self.metrics.sponsored_tx.inc();
1576        }
1577
1578        self.metrics
1579            .num_input_objs
1580            .observe(input_object_count as f64);
1581        self.metrics
1582            .num_shared_objects
1583            .observe(shared_object_count as f64);
1584        self.metrics.batch_size.observe(
1585            certificate
1586                .data()
1587                .intent_message()
1588                .value
1589                .kind()
1590                .num_commands() as f64,
1591        );
1592    }
1593
1594    /// prepare_certificate validates the transaction input, and executes the
1595    /// certificate, returning effects, output objects, events, etc.
1596    ///
1597    /// It reads state from the db (both owned and shared locks), but it has no
1598    /// side effects.
1599    ///
1600    /// It can be generally understood that a failure of prepare_certificate
1601    /// indicates a non-transient error, e.g. the transaction input is
1602    /// somehow invalid, the correct locks are not held, etc. However, this
1603    /// is not entirely true, as a transient db read error may also cause
1604    /// this function to fail.
1605    #[instrument(level = "trace", skip_all)]
1606    fn prepare_certificate(
1607        &self,
1608        _execution_guard: &ExecutionLockReadGuard<'_>,
1609        certificate: &VerifiedExecutableTransaction,
1610        tx_input_objects: InputObjects,
1611        account_object: Option<ObjectReadResult>,
1612        move_authenticator_input_objects: Option<InputObjects>,
1613        epoch_store: &Arc<AuthorityPerEpochStore>,
1614    ) -> IotaResult<(
1615        InnerTemporaryStore,
1616        TransactionEffects,
1617        Option<ExecutionError>,
1618    )> {
1619        let _scope = monitored_scope("Execution::prepare_certificate");
1620        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1621        let prepare_certificate_start_time = tokio::time::Instant::now();
1622
1623        let protocol_config = epoch_store.protocol_config();
1624
1625        let reference_gas_price = epoch_store.reference_gas_price();
1626
1627        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1628        let epoch_start_timestamp = epoch_store
1629            .epoch_start_config()
1630            .epoch_data()
1631            .epoch_start_timestamp();
1632
1633        let backing_store = self.get_backing_store().as_ref();
1634
1635        let tx_digest = *certificate.digest();
1636
1637        // TODO: We need to move this to a more appropriate place to avoid redundant
1638        // checks.
1639        let tx_data = certificate.data().transaction_data();
1640        tx_data.validity_check(protocol_config)?;
1641
1642        let (kind, signer, gas) = tx_data.execution_parts();
1643
1644        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1645        let (inner_temp_store, _, mut effects, execution_error_opt) = if let Some(
1646            move_authenticator,
1647        ) =
1648            certificate.sender_move_authenticator()
1649        {
1650            // Check if the sender needs to be authenticated in Move and, if so, execute the
1651            // authentication.
1652            // It is supposed that `MoveAuthenticator` availability is checked in
1653            // `SenderSignedData::validity_check`.
1654
1655            // Check basic `object_to_authenticate` preconditions and get its components.
1656            let (
1657                auth_account_object_id,
1658                auth_account_object_seq_number,
1659                auth_account_object_digest,
1660            ) = move_authenticator.object_to_authenticate_components()?;
1661
1662            // Since the `object_to_authenticate` components are provided, it is supposed
1663            // that the account object is loaded.
1664            let account_object = account_object.expect("Account object must be provided");
1665
1666            let authenticator_function_ref_for_execution = self.check_move_account(
1667                auth_account_object_id,
1668                auth_account_object_seq_number,
1669                auth_account_object_digest,
1670                account_object,
1671                &signer,
1672            )?;
1673
1674            let move_authenticator_input_objects = move_authenticator_input_objects.expect(
1675                    "In case of a `MoveAuthenticator` signature, the authenticator input objects must be provided",
1676                );
1677
1678            // Check the `MoveAuthenticator` input objects.
1679            // The `MoveAuthenticator` receiving objects are checked on the signing step.
1680            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
1681            // not a part of the transaction data.
1682            let authenticator_gas_budget = protocol_config.max_auth_gas();
1683            let (
1684                gas_status,
1685                authenticator_checked_input_objects,
1686                authenticator_and_tx_checked_input_objects,
1687            ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1688                certificate,
1689                tx_input_objects,
1690                move_authenticator_input_objects,
1691                authenticator_gas_budget,
1692                protocol_config,
1693                reference_gas_price,
1694            )?;
1695
1696            let owned_object_refs = authenticator_and_tx_checked_input_objects
1697                .inner()
1698                .filter_owned_objects();
1699            self.check_owned_locks(&owned_object_refs)?;
1700
1701            epoch_store
1702                .executor()
1703                .authenticate_then_execute_transaction_to_effects(
1704                    backing_store,
1705                    protocol_config,
1706                    self.metrics.limits_metrics.clone(),
1707                    self.config
1708                        .expensive_safety_check_config
1709                        .enable_deep_per_tx_iota_conservation_check(),
1710                    self.config.certificate_deny_config.certificate_deny_set(),
1711                    &epoch_id,
1712                    epoch_start_timestamp,
1713                    gas_status,
1714                    gas,
1715                    move_authenticator.to_owned(),
1716                    authenticator_function_ref_for_execution,
1717                    authenticator_checked_input_objects,
1718                    authenticator_and_tx_checked_input_objects,
1719                    kind,
1720                    signer,
1721                    tx_digest,
1722                    &mut None,
1723                )
1724        } else {
1725            // No Move authentication required, proceed to execute the transaction directly.
1726
1727            // The cost of partially re-auditing a transaction before execution is
1728            // tolerated.
1729            let (tx_gas_status, tx_checked_input_objects) =
1730                iota_transaction_checks::check_certificate_input(
1731                    certificate,
1732                    tx_input_objects,
1733                    protocol_config,
1734                    reference_gas_price,
1735                )?;
1736
1737            let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1738            self.check_owned_locks(&owned_object_refs)?;
1739            epoch_store.executor().execute_transaction_to_effects(
1740                backing_store,
1741                protocol_config,
1742                self.metrics.limits_metrics.clone(),
1743                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1744                // cyclic dependency w/ iota-adapter
1745                self.config
1746                    .expensive_safety_check_config
1747                    .enable_deep_per_tx_iota_conservation_check(),
1748                self.config.certificate_deny_config.certificate_deny_set(),
1749                &epoch_id,
1750                epoch_start_timestamp,
1751                tx_checked_input_objects,
1752                gas,
1753                tx_gas_status,
1754                kind,
1755                signer,
1756                tx_digest,
1757                &mut None,
1758            )
1759        };
1760
1761        fail_point_if!("cp_execution_nondeterminism", || {
1762            #[cfg(msim)]
1763            self.create_fail_state(certificate, epoch_store, &mut effects);
1764        });
1765
1766        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1767        if elapsed > 0.0 {
1768            self.metrics
1769                .prepare_cert_gas_latency_ratio
1770                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1771        }
1772
1773        Ok((inner_temp_store, effects, execution_error_opt.err()))
1774    }
1775
1776    pub fn prepare_certificate_for_benchmark(
1777        &self,
1778        certificate: &VerifiedExecutableTransaction,
1779        input_objects: InputObjects,
1780        epoch_store: &Arc<AuthorityPerEpochStore>,
1781    ) -> IotaResult<(
1782        InnerTemporaryStore,
1783        TransactionEffects,
1784        Option<ExecutionError>,
1785    )> {
1786        let lock = RwLock::new(epoch_store.epoch());
1787        let execution_guard = lock.try_read().unwrap();
1788
1789        self.prepare_certificate(
1790            &execution_guard,
1791            certificate,
1792            input_objects,
1793            None,
1794            None,
1795            epoch_store,
1796        )
1797    }
1798
1799    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1800    /// `VmChecks::Enabled` instead.
1801    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1802    #[allow(clippy::type_complexity)]
1803    pub fn dry_exec_transaction(
1804        &self,
1805        transaction: TransactionData,
1806        transaction_digest: TransactionDigest,
1807    ) -> IotaResult<(
1808        DryRunTransactionBlockResponse,
1809        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1810        TransactionEffects,
1811        Option<ObjectID>,
1812    )> {
1813        let epoch_store = self.load_epoch_store_one_call_per_task();
1814        if !self.is_fullnode(&epoch_store) {
1815            return Err(IotaError::UnsupportedFeature {
1816                error: "dry-exec is only supported on fullnodes".to_string(),
1817            });
1818        }
1819
1820        if transaction.kind().is_system_tx() {
1821            return Err(IotaError::UnsupportedFeature {
1822                error: "dry-exec does not support system transactions".to_string(),
1823            });
1824        }
1825
1826        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1827    }
1828
1829    #[allow(clippy::type_complexity)]
1830    pub fn dry_exec_transaction_for_benchmark(
1831        &self,
1832        transaction: TransactionData,
1833        transaction_digest: TransactionDigest,
1834    ) -> IotaResult<(
1835        DryRunTransactionBlockResponse,
1836        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1837        TransactionEffects,
1838        Option<ObjectID>,
1839    )> {
1840        let epoch_store = self.load_epoch_store_one_call_per_task();
1841        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1842    }
1843
1844    #[instrument(level = "trace", skip_all)]
1845    #[allow(clippy::type_complexity)]
1846    fn dry_exec_transaction_impl(
1847        &self,
1848        epoch_store: &AuthorityPerEpochStore,
1849        transaction: TransactionData,
1850        transaction_digest: TransactionDigest,
1851    ) -> IotaResult<(
1852        DryRunTransactionBlockResponse,
1853        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1854        TransactionEffects,
1855        Option<ObjectID>,
1856    )> {
1857        // Cheap validity checks for a transaction, including input size limits.
1858        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1859
1860        let input_object_kinds = transaction.input_objects()?;
1861        let receiving_object_refs = transaction.receiving_objects();
1862
1863        iota_transaction_checks::deny::check_transaction_for_signing(
1864            &transaction,
1865            &[],
1866            &input_object_kinds,
1867            &receiving_object_refs,
1868            &self.config.transaction_deny_config,
1869            self.get_backing_package_store().as_ref(),
1870        )?;
1871
1872        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1873            // We don't want to cache this transaction since it's a dry run.
1874            None,
1875            &input_object_kinds,
1876            &receiving_object_refs,
1877            epoch_store.epoch(),
1878        )?;
1879
1880        // make a gas object if one was not provided
1881        let mut transaction = transaction;
1882        let mut gas_object_refs = transaction.gas().to_vec();
1883        let reference_gas_price = epoch_store.reference_gas_price();
1884        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1885            let sender = transaction.gas_owner();
1886            let gas_object_id = ObjectID::random();
1887            let gas_object = Object::new_move(
1888                MoveObject::new_gas_coin(
1889                    OBJECT_START_VERSION,
1890                    gas_object_id,
1891                    SIMULATION_GAS_COIN_VALUE,
1892                ),
1893                Owner::AddressOwner(sender),
1894                TransactionDigest::genesis_marker(),
1895            );
1896            let gas_object_ref = gas_object.compute_object_reference();
1897            gas_object_refs = vec![gas_object_ref];
1898            // Add gas object to transaction gas payment
1899            transaction.gas_data_mut().payment = gas_object_refs.clone();
1900            (
1901                iota_transaction_checks::check_transaction_input_with_given_gas(
1902                    epoch_store.protocol_config(),
1903                    reference_gas_price,
1904                    &transaction,
1905                    input_objects,
1906                    receiving_objects,
1907                    gas_object,
1908                    &self.metrics.bytecode_verifier_metrics,
1909                    &self.config.verifier_signing_config,
1910                )?,
1911                Some(gas_object_id),
1912            )
1913        } else {
1914            // `MoveAuthenticator`s are not supported in dry runs, so we set the
1915            // `authenticator_gas_budget` to 0.
1916            let authenticator_gas_budget = 0;
1917
1918            (
1919                iota_transaction_checks::check_transaction_input(
1920                    epoch_store.protocol_config(),
1921                    reference_gas_price,
1922                    &transaction,
1923                    input_objects,
1924                    &receiving_objects,
1925                    &self.metrics.bytecode_verifier_metrics,
1926                    &self.config.verifier_signing_config,
1927                    authenticator_gas_budget,
1928                )?,
1929                None,
1930            )
1931        };
1932
1933        let protocol_config = epoch_store.protocol_config();
1934        let (kind, signer, _) = transaction.execution_parts();
1935
1936        let silent = true;
1937        let executor = iota_execution::executor(protocol_config, silent, None)
1938            .expect("Creating an executor should not fail here");
1939
1940        let expensive_checks = false;
1941        let (inner_temp_store, _, effects, execution_error) = executor
1942            .execute_transaction_to_effects(
1943                self.get_backing_store().as_ref(),
1944                protocol_config,
1945                self.metrics.limits_metrics.clone(),
1946                expensive_checks,
1947                self.config.certificate_deny_config.certificate_deny_set(),
1948                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1949                epoch_store
1950                    .epoch_start_config()
1951                    .epoch_data()
1952                    .epoch_start_timestamp(),
1953                checked_input_objects,
1954                gas_object_refs,
1955                gas_status,
1956                kind,
1957                signer,
1958                transaction_digest,
1959                &mut None,
1960            );
1961        let tx_digest = *effects.transaction_digest();
1962
1963        let module_cache =
1964            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1965
1966        let mut layout_resolver =
1967            epoch_store
1968                .executor()
1969                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1970                    &inner_temp_store,
1971                    self.get_backing_package_store(),
1972                )));
1973        // Returning empty vector here because we recalculate changes in the rpc layer.
1974        let object_changes = Vec::new();
1975
1976        // Returning empty vector here because we recalculate changes in the rpc layer.
1977        let balance_changes = Vec::new();
1978
1979        let written_with_kind = effects
1980            .created()
1981            .into_iter()
1982            .map(|(oref, _)| (oref, WriteKind::Create))
1983            .chain(
1984                effects
1985                    .unwrapped()
1986                    .into_iter()
1987                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1988            )
1989            .chain(
1990                effects
1991                    .mutated()
1992                    .into_iter()
1993                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
1994            )
1995            .map(|(oref, kind)| {
1996                let obj = inner_temp_store.written.get(&oref.0).unwrap();
1997                // TODO: Avoid clones.
1998                (oref.0, (oref, obj.clone(), kind))
1999            })
2000            .collect();
2001
2002        let execution_error_source = execution_error
2003            .as_ref()
2004            .err()
2005            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2006
2007        Ok((
2008            DryRunTransactionBlockResponse {
2009                // to avoid cloning `transaction`, fields are populated in this order
2010                suggested_gas_price: self
2011                    .congestion_tracker
2012                    .get_prediction_suggested_gas_price(&transaction),
2013                input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
2014                    .map_err(|e| IotaError::TransactionSerialization {
2015                        error: format!(
2016                            "Failed to convert transaction to IotaTransactionBlockData: {e}",
2017                        ),
2018                    })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
2019                effects: effects.clone().try_into()?,
2020                events: IotaTransactionBlockEvents::try_from(
2021                    inner_temp_store.events.clone(),
2022                    tx_digest,
2023                    None,
2024                    layout_resolver.as_mut(),
2025                )?,
2026                object_changes,
2027                balance_changes,
2028                execution_error_source,
2029            },
2030            written_with_kind,
2031            effects,
2032            mock_gas,
2033        ))
2034    }
2035
2036    pub fn simulate_transaction(
2037        &self,
2038        mut transaction: TransactionData,
2039        checks: VmChecks,
2040    ) -> IotaResult<SimulateTransactionResult> {
2041        if transaction.kind().is_system_tx() {
2042            return Err(IotaError::UnsupportedFeature {
2043                error: "simulate does not support system transactions".to_string(),
2044            });
2045        }
2046
2047        let epoch_store = self.load_epoch_store_one_call_per_task();
2048        if !self.is_fullnode(&epoch_store) {
2049            return Err(IotaError::UnsupportedFeature {
2050                error: "simulate is only supported on fullnodes".to_string(),
2051            });
2052        }
2053
2054        // Cheap validity checks for a transaction, including input size limits.
2055        // This does not check if gas objects are missing since we may create a
2056        // mock gas object. It checks for other transaction input validity.
2057        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2058
2059        let input_object_kinds = transaction.input_objects()?;
2060        let receiving_object_refs = transaction.receiving_objects();
2061
2062        // Since we need to simulate a validator signing the transaction, the first step
2063        // is to check if some transaction elements are denied.
2064        iota_transaction_checks::deny::check_transaction_for_signing(
2065            &transaction,
2066            &[],
2067            &input_object_kinds,
2068            &receiving_object_refs,
2069            &self.config.transaction_deny_config,
2070            self.get_backing_package_store().as_ref(),
2071        )?;
2072
2073        // Load input and receiving objects
2074        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2075            // We don't want to cache this transaction since it's a simulation.
2076            None,
2077            &input_object_kinds,
2078            &receiving_object_refs,
2079            epoch_store.epoch(),
2080        )?;
2081
2082        // Create a mock gas object if one was not provided
2083        let mock_gas_id = if transaction.gas().is_empty() {
2084            let mock_gas_object = Object::new_move(
2085                MoveObject::new_gas_coin(
2086                    OBJECT_START_VERSION,
2087                    ObjectID::MAX,
2088                    SIMULATION_GAS_COIN_VALUE,
2089                ),
2090                Owner::AddressOwner(transaction.gas_data().owner),
2091                TransactionDigest::genesis_marker(),
2092            );
2093            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2094            transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2095            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2096            Some(mock_gas_object.id())
2097        } else {
2098            None
2099        };
2100
2101        let protocol_config = epoch_store.protocol_config();
2102
2103        // `MoveAuthenticator`s are not supported in simulation, so we set the
2104        // `authenticator_gas_budget` to 0.
2105        let authenticator_gas_budget = 0;
2106
2107        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
2108        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
2109        let (gas_status, checked_input_objects) = if checks.enabled() {
2110            iota_transaction_checks::check_transaction_input(
2111                protocol_config,
2112                epoch_store.reference_gas_price(),
2113                &transaction,
2114                input_objects,
2115                &receiving_objects,
2116                &self.metrics.bytecode_verifier_metrics,
2117                &self.config.verifier_signing_config,
2118                authenticator_gas_budget,
2119            )?
2120        } else {
2121            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2122                protocol_config,
2123                transaction.kind(),
2124                input_objects,
2125                receiving_objects,
2126            )?;
2127            let gas_status = IotaGasStatus::new(
2128                transaction.gas_budget(),
2129                transaction.gas_price(),
2130                epoch_store.reference_gas_price(),
2131                protocol_config,
2132            )?;
2133
2134            (gas_status, checked_input_objects)
2135        };
2136
2137        // Create a new executor for the simulation
2138        let executor = iota_execution::executor(
2139            protocol_config,
2140            true, // silent
2141            None,
2142        )
2143        .expect("Creating an executor should not fail here");
2144
2145        // Execute the simulation
2146        let (kind, signer, gas_coins) = transaction.execution_parts();
2147        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2148            self.get_backing_store().as_ref(),
2149            protocol_config,
2150            self.metrics.limits_metrics.clone(),
2151            false, // expensive_checks
2152            self.config.certificate_deny_config.certificate_deny_set(),
2153            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2154            epoch_store
2155                .epoch_start_config()
2156                .epoch_data()
2157                .epoch_start_timestamp(),
2158            checked_input_objects,
2159            gas_coins,
2160            gas_status,
2161            kind,
2162            signer,
2163            transaction.digest(),
2164            checks.disabled(),
2165        );
2166
2167        // In the case of a dev inspect, the execution_result could be filled with some
2168        // values. Else, execution_result is empty in the case of a dry run.
2169        Ok(SimulateTransactionResult {
2170            input_objects: inner_temp_store.input_objects,
2171            output_objects: inner_temp_store.written,
2172            events: effects.events_digest().map(|_| inner_temp_store.events),
2173            effects,
2174            execution_result,
2175            mock_gas_id,
2176        })
2177    }
2178
2179    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
2180    /// `VmChecks::DISABLED` instead.
2181    /// The object ID for gas can be any
2182    /// object ID, even for an uncreated object
2183    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2184    pub async fn dev_inspect_transaction_block(
2185        &self,
2186        sender: IotaAddress,
2187        transaction_kind: TransactionKind,
2188        gas_price: Option<u64>,
2189        gas_budget: Option<u64>,
2190        gas_sponsor: Option<IotaAddress>,
2191        gas_objects: Option<Vec<ObjectRef>>,
2192        show_raw_txn_data_and_effects: Option<bool>,
2193        skip_checks: Option<bool>,
2194    ) -> IotaResult<DevInspectResults> {
2195        let epoch_store = self.load_epoch_store_one_call_per_task();
2196
2197        if !self.is_fullnode(&epoch_store) {
2198            return Err(IotaError::UnsupportedFeature {
2199                error: "dev-inspect is only supported on fullnodes".to_string(),
2200            });
2201        }
2202
2203        if transaction_kind.is_system_tx() {
2204            return Err(IotaError::UnsupportedFeature {
2205                error: "system transactions are not supported".to_string(),
2206            });
2207        }
2208
2209        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2210        let skip_checks = skip_checks.unwrap_or(true);
2211        let reference_gas_price = epoch_store.reference_gas_price();
2212        let protocol_config = epoch_store.protocol_config();
2213        let max_tx_gas = protocol_config.max_tx_gas();
2214
2215        let price = gas_price.unwrap_or(reference_gas_price);
2216        let budget = gas_budget.unwrap_or(max_tx_gas);
2217        let owner = gas_sponsor.unwrap_or(sender);
2218        // Payment might be empty here, but it's fine we'll have to deal with it later
2219        // after reading all the input objects.
2220        let payment = gas_objects.unwrap_or_default();
2221        let transaction = TransactionData::V1(TransactionDataV1 {
2222            kind: transaction_kind.clone(),
2223            sender,
2224            gas_data: GasData {
2225                payment,
2226                owner,
2227                price,
2228                budget,
2229            },
2230            expiration: TransactionExpiration::None,
2231        });
2232
2233        let raw_txn_data = if show_raw_txn_data_and_effects {
2234            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2235                error: "Failed to serialize transaction during dev inspect".to_string(),
2236            })?
2237        } else {
2238            vec![]
2239        };
2240
2241        transaction.validity_check_no_gas_check(protocol_config)?;
2242
2243        let input_object_kinds = transaction.input_objects()?;
2244        let receiving_object_refs = transaction.receiving_objects();
2245
2246        iota_transaction_checks::deny::check_transaction_for_signing(
2247            &transaction,
2248            &[],
2249            &input_object_kinds,
2250            &receiving_object_refs,
2251            &self.config.transaction_deny_config,
2252            self.get_backing_package_store().as_ref(),
2253        )?;
2254
2255        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2256            // We don't want to cache this transaction since it's a dev inspect.
2257            None,
2258            &input_object_kinds,
2259            &receiving_object_refs,
2260            epoch_store.epoch(),
2261        )?;
2262
2263        // Create and use a dummy gas object if there is no gas object provided.
2264        let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2265            SIMULATION_GAS_COIN_VALUE,
2266            transaction.gas_owner(),
2267        );
2268
2269        let gas_objects = if transaction.gas().is_empty() {
2270            let gas_object_ref = dummy_gas_object.compute_object_reference();
2271            vec![gas_object_ref]
2272        } else {
2273            transaction.gas().to_vec()
2274        };
2275
2276        let (gas_status, checked_input_objects) = if skip_checks {
2277            // If we are skipping checks, then we call the check_dev_inspect_input function
2278            // which will perform only lightweight checks on the transaction
2279            // input. And if the gas field is empty, that means we will
2280            // use the dummy gas object so we need to add it to the input objects vector.
2281            if transaction.gas().is_empty() {
2282                input_objects.push(ObjectReadResult::new(
2283                    InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2284                    dummy_gas_object.into(),
2285                ));
2286            }
2287            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2288                protocol_config,
2289                &transaction_kind,
2290                input_objects,
2291                receiving_objects,
2292            )?;
2293            let gas_status = IotaGasStatus::new(
2294                max_tx_gas,
2295                transaction.gas_price(),
2296                reference_gas_price,
2297                protocol_config,
2298            )?;
2299
2300            (gas_status, checked_input_objects)
2301        } else {
2302            // If we are not skipping checks, then we call the check_transaction_input
2303            // function and its dummy gas variant which will perform full
2304            // fledged checks just like a real transaction execution.
2305            if transaction.gas().is_empty() {
2306                iota_transaction_checks::check_transaction_input_with_given_gas(
2307                    epoch_store.protocol_config(),
2308                    reference_gas_price,
2309                    &transaction,
2310                    input_objects,
2311                    receiving_objects,
2312                    dummy_gas_object,
2313                    &self.metrics.bytecode_verifier_metrics,
2314                    &self.config.verifier_signing_config,
2315                )?
2316            } else {
2317                // `MoveAuthenticator`s are not supported in dev inspects, so we set the
2318                // `authenticator_gas_budget` to 0.
2319                let authenticator_gas_budget = 0;
2320
2321                iota_transaction_checks::check_transaction_input(
2322                    epoch_store.protocol_config(),
2323                    reference_gas_price,
2324                    &transaction,
2325                    input_objects,
2326                    &receiving_objects,
2327                    &self.metrics.bytecode_verifier_metrics,
2328                    &self.config.verifier_signing_config,
2329                    authenticator_gas_budget,
2330                )?
2331            }
2332        };
2333
2334        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2335            .expect("Creating an executor should not fail here");
2336        let intent_msg = IntentMessage::new(
2337            Intent {
2338                version: IntentVersion::V0,
2339                scope: IntentScope::TransactionData,
2340                app_id: IntentAppId::Iota,
2341            },
2342            transaction,
2343        );
2344        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2345        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2346            self.get_backing_store().as_ref(),
2347            protocol_config,
2348            self.metrics.limits_metrics.clone(),
2349            // expensive checks
2350            false,
2351            self.config.certificate_deny_config.certificate_deny_set(),
2352            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2353            epoch_store
2354                .epoch_start_config()
2355                .epoch_data()
2356                .epoch_start_timestamp(),
2357            checked_input_objects,
2358            gas_objects,
2359            gas_status,
2360            transaction_kind,
2361            sender,
2362            transaction_digest,
2363            skip_checks,
2364        );
2365
2366        let raw_effects = if show_raw_txn_data_and_effects {
2367            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2368                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2369            })?
2370        } else {
2371            vec![]
2372        };
2373
2374        let mut layout_resolver =
2375            epoch_store
2376                .executor()
2377                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2378                    &inner_temp_store,
2379                    self.get_backing_package_store(),
2380                )));
2381
2382        DevInspectResults::new(
2383            effects,
2384            inner_temp_store.events.clone(),
2385            execution_result,
2386            raw_txn_data,
2387            raw_effects,
2388            layout_resolver.as_mut(),
2389        )
2390    }
2391
2392    // Only used for testing because of how epoch store is loaded.
2393    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2394        let epoch_store = self.epoch_store_for_testing();
2395        Ok(epoch_store.reference_gas_price())
2396    }
2397
2398    #[instrument(level = "trace", skip_all)]
2399    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2400        self.get_transaction_cache_reader()
2401            .try_is_tx_already_executed(digest)
2402    }
2403
2404    /// Non-fallible version of `try_is_tx_already_executed`.
2405    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2406        self.try_is_tx_already_executed(digest)
2407            .expect("storage access failed")
2408    }
2409
2410    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2411    #[instrument(level = "debug", skip_all, err)]
2412    fn index_tx(
2413        &self,
2414        indexes: &IndexStore,
2415        digest: &TransactionDigest,
2416        // TODO: index_tx really just need the transaction data here.
2417        cert: &VerifiedExecutableTransaction,
2418        effects: &TransactionEffects,
2419        events: &TransactionEvents,
2420        timestamp_ms: u64,
2421        tx_coins: Option<TxCoins>,
2422        written: &WrittenObjects,
2423        inner_temporary_store: &InnerTemporaryStore,
2424    ) -> IotaResult<u64> {
2425        let changes = self
2426            .process_object_index(effects, written, inner_temporary_store)
2427            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2428
2429        indexes.index_tx(
2430            cert.data().intent_message().value.sender(),
2431            cert.data()
2432                .intent_message()
2433                .value
2434                .input_objects()?
2435                .iter()
2436                .map(|o| o.object_id()),
2437            effects
2438                .all_changed_objects()
2439                .into_iter()
2440                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2441            cert.data()
2442                .intent_message()
2443                .value
2444                .move_calls()
2445                .into_iter()
2446                .map(|(package, module, function)| {
2447                    (*package, module.to_owned(), function.to_owned())
2448                }),
2449            events,
2450            changes,
2451            digest,
2452            timestamp_ms,
2453            tx_coins,
2454        )
2455    }
2456
2457    #[cfg(msim)]
2458    fn create_fail_state(
2459        &self,
2460        certificate: &VerifiedExecutableTransaction,
2461        epoch_store: &Arc<AuthorityPerEpochStore>,
2462        effects: &mut TransactionEffects,
2463    ) {
2464        use std::cell::RefCell;
2465        thread_local! {
2466            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2467        }
2468        if !certificate.data().intent_message().value.is_system_tx() {
2469            let committee = epoch_store.committee();
2470            let cur_stake = (**committee).weight(&self.name);
2471            if cur_stake > 0 {
2472                FAIL_STATE.with_borrow_mut(|fail_state| {
2473                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2474                    if fail_state.0 < committee.validity_threshold() {
2475                        fail_state.0 += cur_stake;
2476                        fail_state.1.insert(self.name);
2477                    }
2478
2479                    if fail_state.1.contains(&self.name) {
2480                        info!("cp_exec failing tx");
2481                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2482                    }
2483                });
2484            }
2485        }
2486    }
2487
2488    fn process_object_index(
2489        &self,
2490        effects: &TransactionEffects,
2491        written: &WrittenObjects,
2492        inner_temporary_store: &InnerTemporaryStore,
2493    ) -> IotaResult<ObjectIndexChanges> {
2494        let epoch_store = self.load_epoch_store_one_call_per_task();
2495        let mut layout_resolver =
2496            epoch_store
2497                .executor()
2498                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2499                    inner_temporary_store,
2500                    self.get_backing_package_store(),
2501                )));
2502
2503        let modified_at_version = effects
2504            .modified_at_versions()
2505            .into_iter()
2506            .collect::<HashMap<_, _>>();
2507
2508        let tx_digest = effects.transaction_digest();
2509        let mut deleted_owners = vec![];
2510        let mut deleted_dynamic_fields = vec![];
2511        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2512            let old_version = modified_at_version.get(&id).unwrap();
2513            // When we process the index, the latest object hasn't been written yet so
2514            // the old object must be present.
2515            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2516                |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2517            ) {
2518                Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2519                Owner::ObjectOwner(object_id) => {
2520                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2521                }
2522                _ => {}
2523            }
2524        }
2525
2526        let mut new_owners = vec![];
2527        let mut new_dynamic_fields = vec![];
2528
2529        for (oref, owner, kind) in effects.all_changed_objects() {
2530            let id = &oref.0;
2531            // For mutated objects, retrieve old owner and delete old index if there is a
2532            // owner change.
2533            if let WriteKind::Mutate = kind {
2534                let Some(old_version) = modified_at_version.get(id) else {
2535                    panic!(
2536                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2537                    );
2538                };
2539                // When we process the index, the latest object hasn't been written yet so
2540                // the old object must be present.
2541                let Some(old_object) = self
2542                    .get_object_store()
2543                    .try_get_object_by_key(id, *old_version)?
2544                else {
2545                    panic!(
2546                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2547                    );
2548                };
2549                if old_object.owner != owner {
2550                    match old_object.owner {
2551                        Owner::AddressOwner(addr) => {
2552                            deleted_owners.push((addr, *id));
2553                        }
2554                        Owner::ObjectOwner(object_id) => {
2555                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2556                        }
2557                        _ => {}
2558                    }
2559                }
2560            }
2561
2562            match owner {
2563                Owner::AddressOwner(addr) => {
2564                    // TODO: We can remove the object fetching after we added ObjectType to
2565                    // TransactionEffects
2566                    let new_object = written.get(id).unwrap_or_else(
2567                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2568                    );
2569                    assert_eq!(
2570                        new_object.version(),
2571                        oref.1,
2572                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2573                        tx_digest,
2574                        id,
2575                        new_object.version(),
2576                        oref.1
2577                    );
2578
2579                    let type_ = new_object
2580                        .type_()
2581                        .map(|type_| ObjectType::Struct(type_.clone()))
2582                        .unwrap_or(ObjectType::Package);
2583
2584                    new_owners.push((
2585                        (addr, *id),
2586                        ObjectInfo {
2587                            object_id: *id,
2588                            version: oref.1,
2589                            digest: oref.2,
2590                            type_,
2591                            owner,
2592                            previous_transaction: *effects.transaction_digest(),
2593                        },
2594                    ));
2595                }
2596                Owner::ObjectOwner(owner) => {
2597                    let new_object = written.get(id).unwrap_or_else(
2598                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2599                    );
2600                    assert_eq!(
2601                        new_object.version(),
2602                        oref.1,
2603                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2604                        tx_digest,
2605                        id,
2606                        new_object.version(),
2607                        oref.1
2608                    );
2609
2610                    let Some(df_info) = self
2611                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2612                        .unwrap_or_else(|e| {
2613                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2614                            None
2615                        }
2616                    )
2617                        else {
2618                            // Skip indexing for non dynamic field objects.
2619                            continue;
2620                        };
2621                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2622                }
2623                _ => {}
2624            }
2625        }
2626
2627        Ok(ObjectIndexChanges {
2628            deleted_owners,
2629            deleted_dynamic_fields,
2630            new_owners,
2631            new_dynamic_fields,
2632        })
2633    }
2634
2635    fn try_create_dynamic_field_info(
2636        &self,
2637        o: &Object,
2638        written: &WrittenObjects,
2639        resolver: &mut dyn LayoutResolver,
2640    ) -> IotaResult<Option<DynamicFieldInfo>> {
2641        // Skip if not a move object
2642        let Some(move_object) = o.data.try_as_move().cloned() else {
2643            return Ok(None);
2644        };
2645
2646        // We only index dynamic field objects
2647        if !move_object.type_().is_dynamic_field() {
2648            return Ok(None);
2649        }
2650
2651        let layout = resolver
2652            .get_annotated_layout(&move_object.type_().clone().into())?
2653            .into_layout();
2654
2655        let field =
2656            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2657                IotaError::ObjectDeserialization {
2658                    error: e.to_string(),
2659                }
2660            })?;
2661
2662        let type_ = field.kind;
2663        let name_type: TypeTag = field.name_layout.into();
2664        let bcs_name = field.name_bytes.to_owned();
2665
2666        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2667            .map_err(|e| {
2668                warn!("{e}");
2669                IotaError::ObjectDeserialization {
2670                    error: e.to_string(),
2671                }
2672            })?;
2673
2674        let name = DynamicFieldName {
2675            type_: name_type,
2676            value: IotaMoveValue::from(name_value).to_json_value(),
2677        };
2678
2679        let value_metadata = field.value_metadata().map_err(|e| {
2680            warn!("{e}");
2681            IotaError::ObjectDeserialization {
2682                error: e.to_string(),
2683            }
2684        })?;
2685
2686        Ok(Some(match value_metadata {
2687            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2688                name,
2689                bcs_name,
2690                type_,
2691                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2692                object_id: o.id(),
2693                version: o.version(),
2694                digest: o.digest(),
2695            },
2696
2697            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2698                // Find the actual object from storage using the object id obtained from the
2699                // wrapper.
2700
2701                // Try to find the object in the written objects first.
2702                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2703                    (
2704                        object.version(),
2705                        object.digest(),
2706                        object.data.type_().unwrap().clone(),
2707                    )
2708                } else {
2709                    // If not found, try to find it in the database.
2710                    let object = self
2711                        .get_object_store()
2712                        .try_get_object_by_key(&object_id, o.version())?
2713                        .ok_or_else(|| UserInputError::ObjectNotFound {
2714                            object_id,
2715                            version: Some(o.version()),
2716                        })?;
2717                    let version = object.version();
2718                    let digest = object.digest();
2719                    let object_type = object.data.type_().unwrap().clone();
2720                    (version, digest, object_type)
2721                };
2722
2723                DynamicFieldInfo {
2724                    name,
2725                    bcs_name,
2726                    type_,
2727                    object_type: object_type.to_string(),
2728                    object_id,
2729                    version,
2730                    digest,
2731                }
2732            }
2733        }))
2734    }
2735
2736    #[instrument(level = "trace", skip_all, err)]
2737    fn post_process_one_tx(
2738        &self,
2739        certificate: &VerifiedExecutableTransaction,
2740        effects: &TransactionEffects,
2741        inner_temporary_store: &InnerTemporaryStore,
2742        epoch_store: &Arc<AuthorityPerEpochStore>,
2743    ) -> IotaResult {
2744        if self.indexes.is_none() {
2745            return Ok(());
2746        }
2747
2748        let tx_digest = certificate.digest();
2749        let timestamp_ms = Self::unixtime_now_ms();
2750        let events = &inner_temporary_store.events;
2751        let written = &inner_temporary_store.written;
2752        let tx_coins =
2753            self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2754
2755        // Index tx
2756        if let Some(indexes) = &self.indexes {
2757            let _ = self
2758                .index_tx(
2759                    indexes.as_ref(),
2760                    tx_digest,
2761                    certificate,
2762                    effects,
2763                    events,
2764                    timestamp_ms,
2765                    tx_coins,
2766                    written,
2767                    inner_temporary_store,
2768                )
2769                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2770                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2771                .expect("Indexing tx should not fail");
2772
2773            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2774            let events = self.make_transaction_block_events(
2775                events.clone(),
2776                *tx_digest,
2777                timestamp_ms,
2778                epoch_store,
2779                inner_temporary_store,
2780            )?;
2781            // Emit events
2782            self.subscription_handler
2783                .process_tx(certificate.data().transaction_data(), &effects, &events)
2784                .tap_ok(|_| {
2785                    self.metrics
2786                        .post_processing_total_tx_had_event_processed
2787                        .inc()
2788                })
2789                .tap_err(|e| {
2790                    warn!(
2791                        ?tx_digest,
2792                        "Post processing - Couldn't process events for tx: {}", e
2793                    )
2794                })?;
2795
2796            self.metrics
2797                .post_processing_total_events_emitted
2798                .inc_by(events.data.len() as u64);
2799        };
2800        Ok(())
2801    }
2802
2803    fn make_transaction_block_events(
2804        &self,
2805        transaction_events: TransactionEvents,
2806        digest: TransactionDigest,
2807        timestamp_ms: u64,
2808        epoch_store: &Arc<AuthorityPerEpochStore>,
2809        inner_temporary_store: &InnerTemporaryStore,
2810    ) -> IotaResult<IotaTransactionBlockEvents> {
2811        let mut layout_resolver =
2812            epoch_store
2813                .executor()
2814                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2815                    inner_temporary_store,
2816                    self.get_backing_package_store(),
2817                )));
2818        IotaTransactionBlockEvents::try_from(
2819            transaction_events,
2820            digest,
2821            Some(timestamp_ms),
2822            layout_resolver.as_mut(),
2823        )
2824    }
2825
2826    pub fn unixtime_now_ms() -> u64 {
2827        let now = SystemTime::now()
2828            .duration_since(UNIX_EPOCH)
2829            .expect("Time went backwards")
2830            .as_millis();
2831        u64::try_from(now).expect("Travelling in time machine")
2832    }
2833
2834    #[instrument(level = "trace", skip_all)]
2835    pub async fn handle_transaction_info_request(
2836        &self,
2837        request: TransactionInfoRequest,
2838    ) -> IotaResult<TransactionInfoResponse> {
2839        let epoch_store = self.load_epoch_store_one_call_per_task();
2840        let (transaction, status) = self
2841            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2842            .ok_or(IotaError::TransactionNotFound {
2843                digest: request.transaction_digest,
2844            })?;
2845        Ok(TransactionInfoResponse {
2846            transaction,
2847            status,
2848        })
2849    }
2850
2851    #[instrument(level = "trace", skip_all)]
2852    pub async fn handle_object_info_request(
2853        &self,
2854        request: ObjectInfoRequest,
2855    ) -> IotaResult<ObjectInfoResponse> {
2856        let epoch_store = self.load_epoch_store_one_call_per_task();
2857
2858        let requested_object_seq = match request.request_kind {
2859            ObjectInfoRequestKind::LatestObjectInfo => {
2860                let (_, seq, _) = self
2861                    .try_get_object_or_tombstone(request.object_id)
2862                    .await?
2863                    .ok_or_else(|| {
2864                        IotaError::from(UserInputError::ObjectNotFound {
2865                            object_id: request.object_id,
2866                            version: None,
2867                        })
2868                    })?;
2869                seq
2870            }
2871            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2872        };
2873
2874        let object = self
2875            .get_object_store()
2876            .try_get_object_by_key(&request.object_id, requested_object_seq)?
2877            .ok_or_else(|| {
2878                IotaError::from(UserInputError::ObjectNotFound {
2879                    object_id: request.object_id,
2880                    version: Some(requested_object_seq),
2881                })
2882            })?;
2883
2884        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2885            (request.generate_layout, object.data.try_as_move())
2886        {
2887            Some(into_struct_layout(
2888                epoch_store
2889                    .executor()
2890                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2891                    .get_annotated_layout(&move_obj.type_().clone().into())?,
2892            )?)
2893        } else {
2894            None
2895        };
2896
2897        let lock = if !object.is_address_owned() {
2898            // Only address owned objects have locks.
2899            None
2900        } else {
2901            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2902                .await?
2903                .map(|s| s.into_inner())
2904        };
2905
2906        Ok(ObjectInfoResponse {
2907            object,
2908            layout,
2909            lock_for_debugging: lock,
2910        })
2911    }
2912
2913    #[instrument(level = "trace", skip_all)]
2914    pub fn handle_checkpoint_request(
2915        &self,
2916        request: &CheckpointRequest,
2917    ) -> IotaResult<CheckpointResponse> {
2918        let summary = if request.certified {
2919            let summary = match request.sequence_number {
2920                Some(seq) => self
2921                    .checkpoint_store
2922                    .get_checkpoint_by_sequence_number(seq)?,
2923                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2924            }
2925            .map(|v| v.into_inner());
2926            summary.map(CheckpointSummaryResponse::Certified)
2927        } else {
2928            let summary = match request.sequence_number {
2929                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2930                None => self
2931                    .checkpoint_store
2932                    .get_latest_locally_computed_checkpoint()?,
2933            };
2934            summary.map(CheckpointSummaryResponse::Pending)
2935        };
2936        let contents = match &summary {
2937            Some(s) => self
2938                .checkpoint_store
2939                .get_checkpoint_contents(&s.content_digest())?,
2940            None => None,
2941        };
2942        Ok(CheckpointResponse {
2943            checkpoint: summary,
2944            contents,
2945        })
2946    }
2947
2948    fn check_protocol_version(
2949        supported_protocol_versions: SupportedProtocolVersions,
2950        current_version: ProtocolVersion,
2951    ) {
2952        info!("current protocol version is now {:?}", current_version);
2953        info!("supported versions are: {:?}", supported_protocol_versions);
2954        if !supported_protocol_versions.is_version_supported(current_version) {
2955            let msg = format!(
2956                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2957            );
2958
2959            error!("{}", msg);
2960            eprintln!("{msg}");
2961
2962            #[cfg(not(msim))]
2963            std::process::exit(1);
2964
2965            #[cfg(msim)]
2966            iota_simulator::task::shutdown_current_node();
2967        }
2968    }
2969
2970    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
2971    pub async fn new(
2972        name: AuthorityName,
2973        secret: StableSyncAuthoritySigner,
2974        supported_protocol_versions: SupportedProtocolVersions,
2975        store: Arc<AuthorityStore>,
2976        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2977        epoch_store: Arc<AuthorityPerEpochStore>,
2978        committee_store: Arc<CommitteeStore>,
2979        indexes: Option<Arc<IndexStore>>,
2980        rest_index: Option<Arc<RestIndexStore>>,
2981        checkpoint_store: Arc<CheckpointStore>,
2982        prometheus_registry: &Registry,
2983        genesis_objects: &[Object],
2984        db_checkpoint_config: &DBCheckpointConfig,
2985        config: NodeConfig,
2986        archive_readers: ArchiveReaderBalancer,
2987        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2988        chain_identifier: ChainIdentifier,
2989        pruner_db: Option<Arc<AuthorityPrunerTables>>,
2990    ) -> Arc<Self> {
2991        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2992
2993        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2994        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2995        let transaction_manager = Arc::new(TransactionManager::new(
2996            execution_cache_trait_pointers.object_cache_reader.clone(),
2997            execution_cache_trait_pointers
2998                .transaction_cache_reader
2999                .clone(),
3000            &epoch_store,
3001            tx_ready_certificates,
3002            metrics.clone(),
3003        ));
3004        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3005
3006        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3007            epoch_store.get_parent_path(),
3008            &config.authority_store_pruning_config,
3009        );
3010        let _pruner = AuthorityStorePruner::new(
3011            store.perpetual_tables.clone(),
3012            checkpoint_store.clone(),
3013            rest_index.clone(),
3014            indexes.clone(),
3015            config.authority_store_pruning_config.clone(),
3016            epoch_store.committee().authority_exists(&name),
3017            epoch_store.epoch_start_state().epoch_duration_ms(),
3018            prometheus_registry,
3019            archive_readers,
3020            pruner_db,
3021        );
3022        let input_loader =
3023            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3024        let epoch = epoch_store.epoch();
3025        let rgp = epoch_store.reference_gas_price();
3026        let state = Arc::new(AuthorityState {
3027            name,
3028            secret,
3029            execution_lock: RwLock::new(epoch),
3030            epoch_store: ArcSwap::new(epoch_store.clone()),
3031            input_loader,
3032            execution_cache_trait_pointers,
3033            indexes,
3034            rest_index,
3035            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3036            checkpoint_store,
3037            committee_store,
3038            transaction_manager,
3039            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3040            metrics,
3041            _pruner,
3042            _authority_per_epoch_pruner,
3043            db_checkpoint_config: db_checkpoint_config.clone(),
3044            config,
3045            overload_info: AuthorityOverloadInfo::default(),
3046            validator_tx_finalizer,
3047            chain_identifier,
3048            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3049        });
3050
3051        // Start a task to execute ready certificates.
3052        let authority_state = Arc::downgrade(&state);
3053        spawn_monitored_task!(execution_process(
3054            authority_state,
3055            rx_ready_certificates,
3056            rx_execution_shutdown,
3057        ));
3058
3059        // TODO: This doesn't belong to the constructor of AuthorityState.
3060        state
3061            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3062            .expect("Error indexing genesis objects.");
3063
3064        state
3065    }
3066
3067    // TODO: Consolidate our traits to reduce the number of methods here.
3068    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3069        &self.execution_cache_trait_pointers.object_cache_reader
3070    }
3071
3072    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3073        &self.execution_cache_trait_pointers.transaction_cache_reader
3074    }
3075
3076    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3077        &self.execution_cache_trait_pointers.cache_writer
3078    }
3079
3080    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3081        &self.execution_cache_trait_pointers.backing_store
3082    }
3083
3084    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3085        &self.execution_cache_trait_pointers.backing_package_store
3086    }
3087
3088    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3089        &self.execution_cache_trait_pointers.object_store
3090    }
3091
3092    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3093        &self.execution_cache_trait_pointers.reconfig_api
3094    }
3095
3096    pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
3097        &self.execution_cache_trait_pointers.accumulator_store
3098    }
3099
3100    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3101        &self.execution_cache_trait_pointers.checkpoint_cache
3102    }
3103
3104    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3105        &self.execution_cache_trait_pointers.state_sync_store
3106    }
3107
3108    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3109        &self.execution_cache_trait_pointers.cache_commit
3110    }
3111
3112    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3113        self.execution_cache_trait_pointers
3114            .testing_api
3115            .database_for_testing()
3116    }
3117
3118    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3119        &self,
3120        config: NodeConfig,
3121        metrics: Arc<AuthorityStorePruningMetrics>,
3122    ) -> anyhow::Result<()> {
3123        let archive_readers =
3124            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3125        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3126            &self.database_for_testing().perpetual_tables,
3127            &self.checkpoint_store,
3128            self.rest_index.as_deref(),
3129            None,
3130            config.authority_store_pruning_config,
3131            metrics,
3132            archive_readers,
3133            EPOCH_DURATION_MS_FOR_TESTING,
3134        )
3135        .await
3136    }
3137
3138    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3139        &self.transaction_manager
3140    }
3141
3142    /// Adds transactions / certificates to transaction manager for ordered
3143    /// execution.
3144    pub fn enqueue_transactions_for_execution(
3145        &self,
3146        txns: Vec<VerifiedExecutableTransaction>,
3147        epoch_store: &Arc<AuthorityPerEpochStore>,
3148    ) {
3149        self.transaction_manager.enqueue(txns, epoch_store)
3150    }
3151
3152    /// Adds certificates to transaction manager for ordered execution.
3153    pub fn enqueue_certificates_for_execution(
3154        &self,
3155        certs: Vec<VerifiedCertificate>,
3156        epoch_store: &Arc<AuthorityPerEpochStore>,
3157    ) {
3158        self.transaction_manager
3159            .enqueue_certificates(certs, epoch_store)
3160    }
3161
3162    pub fn enqueue_with_expected_effects_digest(
3163        &self,
3164        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3165        epoch_store: &AuthorityPerEpochStore,
3166    ) {
3167        self.transaction_manager
3168            .enqueue_with_expected_effects_digest(certs, epoch_store)
3169    }
3170
3171    fn create_owner_index_if_empty(
3172        &self,
3173        genesis_objects: &[Object],
3174        epoch_store: &Arc<AuthorityPerEpochStore>,
3175    ) -> IotaResult {
3176        let Some(index_store) = &self.indexes else {
3177            return Ok(());
3178        };
3179        if !index_store.is_empty() {
3180            return Ok(());
3181        }
3182
3183        let mut new_owners = vec![];
3184        let mut new_dynamic_fields = vec![];
3185        let mut layout_resolver = epoch_store
3186            .executor()
3187            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3188        for o in genesis_objects.iter() {
3189            match o.owner {
3190                Owner::AddressOwner(addr) => new_owners.push((
3191                    (addr, o.id()),
3192                    ObjectInfo::new(&o.compute_object_reference(), o),
3193                )),
3194                Owner::ObjectOwner(object_id) => {
3195                    let id = o.id();
3196                    let info = match self.try_create_dynamic_field_info(
3197                        o,
3198                        &BTreeMap::new(),
3199                        layout_resolver.as_mut(),
3200                    ) {
3201                        Ok(Some(info)) => info,
3202                        Ok(None) => continue,
3203                        Err(IotaError::UserInput {
3204                            error:
3205                                UserInputError::ObjectNotFound {
3206                                    object_id: not_found_id,
3207                                    version,
3208                                },
3209                        }) => {
3210                            warn!(
3211                                ?not_found_id,
3212                                ?version,
3213                                object_owner=?object_id,
3214                                field=?id,
3215                                "Skipping dynamic field: referenced genesis object not found"
3216                            );
3217                            continue;
3218                        }
3219                        Err(e) => return Err(e),
3220                    };
3221                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3222                }
3223                _ => {}
3224            }
3225        }
3226
3227        index_store.insert_genesis_objects(ObjectIndexChanges {
3228            deleted_owners: vec![],
3229            deleted_dynamic_fields: vec![],
3230            new_owners,
3231            new_dynamic_fields,
3232        })
3233    }
3234
3235    /// Attempts to acquire execution lock for an executable transaction.
3236    /// Returns the lock if the transaction is matching current executed epoch
3237    /// Returns None otherwise
3238    pub fn execution_lock_for_executable_transaction(
3239        &self,
3240        transaction: &VerifiedExecutableTransaction,
3241    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3242        let lock = self
3243            .execution_lock
3244            .try_read()
3245            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3246        if *lock == transaction.auth_sig().epoch() {
3247            Ok(lock)
3248        } else {
3249            Err(IotaError::WrongEpoch {
3250                expected_epoch: *lock,
3251                actual_epoch: transaction.auth_sig().epoch(),
3252            })
3253        }
3254    }
3255
3256    /// Acquires the execution lock for the duration of a transaction signing
3257    /// request. This prevents reconfiguration from starting until we are
3258    /// finished handling the signing request. Otherwise, in-memory lock
3259    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3260    /// while we are attempting to acquire locks for the transaction.
3261    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3262        self.execution_lock
3263            .try_read()
3264            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3265    }
3266
3267    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3268        self.execution_lock.write().await
3269    }
3270
3271    #[instrument(level = "error", skip_all)]
3272    pub async fn reconfigure(
3273        &self,
3274        cur_epoch_store: &AuthorityPerEpochStore,
3275        supported_protocol_versions: SupportedProtocolVersions,
3276        new_committee: Committee,
3277        epoch_start_configuration: EpochStartConfiguration,
3278        accumulator: Arc<StateAccumulator>,
3279        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3280        epoch_supply_change: i64,
3281        epoch_last_checkpoint: CheckpointSequenceNumber,
3282    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3283        Self::check_protocol_version(
3284            supported_protocol_versions,
3285            epoch_start_configuration
3286                .epoch_start_state()
3287                .protocol_version(),
3288        );
3289        self.metrics.reset_on_reconfigure();
3290        self.committee_store.insert_new_committee(&new_committee)?;
3291
3292        // Wait until no transactions are being executed.
3293        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3294
3295        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3296        cur_epoch_store.epoch_terminated().await;
3297
3298        let highest_locally_built_checkpoint_seq = self
3299            .checkpoint_store
3300            .get_latest_locally_computed_checkpoint()?
3301            .map(|c| *c.sequence_number())
3302            .unwrap_or(0);
3303
3304        assert!(
3305            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3306            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3307        );
3308        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3309            // if we built the last checkpoint locally (as opposed to receiving it from a
3310            // peer), then all shared_version_assignments except the one for the
3311            // ChangeEpoch transaction should have been removed
3312            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3313            // Note that while 1 is the typical value, 0 is possible if the node restarts
3314            // after committing the last checkpoint but before reconfiguring.
3315            if num_shared_version_assignments > 1 {
3316                // If this happens in prod, we have a memory leak, but not a correctness issue.
3317                debug_fatal!(
3318                    "all shared_version_assignments should have been removed \
3319                    (num_shared_version_assignments: {num_shared_version_assignments})"
3320                );
3321            }
3322        }
3323
3324        // Safe to reconfigure now. No transactions are being executed,
3325        // and no epoch-specific tasks are running.
3326
3327        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3328        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3329        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3330            .await?;
3331        self.get_reconfig_api()
3332            .clear_state_end_of_epoch(&execution_lock);
3333        self.check_system_consistency(
3334            cur_epoch_store,
3335            accumulator,
3336            expensive_safety_check_config,
3337            epoch_supply_change,
3338        )?;
3339        self.get_reconfig_api()
3340            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3341        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3342            if self
3343                .db_checkpoint_config
3344                .perform_db_checkpoints_at_epoch_end
3345            {
3346                let checkpoint_indexes = self
3347                    .db_checkpoint_config
3348                    .perform_index_db_checkpoints_at_epoch_end
3349                    .unwrap_or(false);
3350                let current_epoch = cur_epoch_store.epoch();
3351                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3352                self.checkpoint_all_dbs(
3353                    &epoch_checkpoint_path,
3354                    cur_epoch_store,
3355                    checkpoint_indexes,
3356                )?;
3357            }
3358        }
3359
3360        self.get_reconfig_api()
3361            .reconfigure_cache(&epoch_start_configuration)
3362            .await;
3363
3364        let new_epoch = new_committee.epoch;
3365        let new_epoch_store = self
3366            .reopen_epoch_db(
3367                cur_epoch_store,
3368                new_committee,
3369                epoch_start_configuration,
3370                expensive_safety_check_config,
3371                epoch_last_checkpoint,
3372            )
3373            .await?;
3374        assert_eq!(new_epoch_store.epoch(), new_epoch);
3375        self.transaction_manager.reconfigure(new_epoch);
3376        *execution_lock = new_epoch;
3377        // drop execution_lock after epoch store was updated
3378        // see also assert in AuthorityState::process_certificate
3379        // on the epoch store and execution lock epoch match
3380        Ok(new_epoch_store)
3381    }
3382
3383    /// Advance the epoch store to the next epoch for testing only.
3384    /// This only manually sets all the places where we have the epoch number.
3385    /// It doesn't properly reconfigure the node, hence should be only used for
3386    /// testing.
3387    pub async fn reconfigure_for_testing(&self) {
3388        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3389        let epoch_store = self.epoch_store_for_testing().clone();
3390        let protocol_config = epoch_store.protocol_config().clone();
3391        // The current protocol config used in the epoch store may have been overridden
3392        // and diverged from the protocol config definitions. That override may
3393        // have now been dropped when the initial guard was dropped. We reapply
3394        // the override before creating the new epoch store, to make sure that
3395        // the new epoch store has the same protocol config as the current one.
3396        // Since this is for testing only, we mostly like to keep the protocol config
3397        // the same across epochs.
3398        let _guard =
3399            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3400        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3401            self.get_backing_package_store().clone(),
3402            self.get_object_store().clone(),
3403            &self.config.expensive_safety_check_config,
3404            self.checkpoint_store
3405                .get_epoch_last_checkpoint(epoch_store.epoch())
3406                .unwrap()
3407                .map(|c| *c.sequence_number())
3408                .unwrap_or_default(),
3409        );
3410        let new_epoch = new_epoch_store.epoch();
3411        self.transaction_manager.reconfigure(new_epoch);
3412        self.epoch_store.store(new_epoch_store);
3413        epoch_store.epoch_terminated().await;
3414        *execution_lock = new_epoch;
3415    }
3416
3417    #[instrument(level = "error", skip_all)]
3418    fn check_system_consistency(
3419        &self,
3420        cur_epoch_store: &AuthorityPerEpochStore,
3421        accumulator: Arc<StateAccumulator>,
3422        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3423        epoch_supply_change: i64,
3424    ) -> IotaResult<()> {
3425        info!(
3426            "Performing iota conservation consistency check for epoch {}",
3427            cur_epoch_store.epoch()
3428        );
3429
3430        if cfg!(debug_assertions) {
3431            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3432        }
3433
3434        self.get_reconfig_api()
3435            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3436
3437        // check for root state hash consistency with live object set
3438        if expensive_safety_check_config.enable_state_consistency_check() {
3439            info!(
3440                "Performing state consistency check for epoch {}",
3441                cur_epoch_store.epoch()
3442            );
3443            self.expensive_check_is_consistent_state(
3444                accumulator,
3445                cur_epoch_store,
3446                cfg!(debug_assertions), // panic in debug mode only
3447            );
3448        }
3449
3450        if expensive_safety_check_config.enable_secondary_index_checks() {
3451            if let Some(indexes) = self.indexes.clone() {
3452                verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3453                    .expect("secondary indexes are inconsistent");
3454            }
3455        }
3456
3457        Ok(())
3458    }
3459
3460    fn expensive_check_is_consistent_state(
3461        &self,
3462        accumulator: Arc<StateAccumulator>,
3463        cur_epoch_store: &AuthorityPerEpochStore,
3464        panic: bool,
3465    ) {
3466        let live_object_set_hash = accumulator.digest_live_object_set();
3467
3468        let root_state_hash: ECMHLiveObjectSetDigest = self
3469            .get_accumulator_store()
3470            .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3471            .expect("Retrieving root state hash cannot fail")
3472            .expect("Root state hash for epoch must exist")
3473            .1
3474            .digest()
3475            .into();
3476
3477        let is_inconsistent = root_state_hash != live_object_set_hash;
3478        if is_inconsistent {
3479            if panic {
3480                panic!(
3481                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3482                );
3483            } else {
3484                error!(
3485                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3486                    root_state_hash, live_object_set_hash
3487                );
3488            }
3489        } else {
3490            info!("State consistency check passed");
3491        }
3492
3493        if !panic {
3494            accumulator.set_inconsistent_state(is_inconsistent);
3495        }
3496    }
3497
3498    pub fn current_epoch_for_testing(&self) -> EpochId {
3499        self.epoch_store_for_testing().epoch()
3500    }
3501
3502    #[instrument(level = "error", skip_all)]
3503    pub fn checkpoint_all_dbs(
3504        &self,
3505        checkpoint_path: &Path,
3506        cur_epoch_store: &AuthorityPerEpochStore,
3507        checkpoint_indexes: bool,
3508    ) -> IotaResult {
3509        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3510        let current_epoch = cur_epoch_store.epoch();
3511
3512        if checkpoint_path.exists() {
3513            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3514            return Ok(());
3515        }
3516
3517        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3518        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3519
3520        if checkpoint_path_tmp.exists() {
3521            fs::remove_dir_all(&checkpoint_path_tmp)
3522                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3523        }
3524
3525        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3526        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3527
3528        // NOTE: Do not change the order of invoking these checkpoint calls
3529        // We want to snapshot checkpoint db first to not race with state sync
3530        self.checkpoint_store
3531            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3532
3533        self.get_reconfig_api()
3534            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3535
3536        self.committee_store
3537            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3538
3539        if checkpoint_indexes {
3540            if let Some(indexes) = self.indexes.as_ref() {
3541                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3542            }
3543        }
3544
3545        fs::rename(checkpoint_path_tmp, checkpoint_path)
3546            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3547        Ok(())
3548    }
3549
3550    /// Load the current epoch store. This can change during reconfiguration. To
3551    /// ensure that we never end up accessing different epoch stores in a
3552    /// single task, we need to make sure that this is called once per task.
3553    /// Each call needs to be carefully audited to ensure it is
3554    /// the case. This also means we should minimize the number of call-sites.
3555    /// Only call it when there is no way to obtain it from somewhere else.
3556    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3557        self.epoch_store.load()
3558    }
3559
3560    // Load the epoch store, should be used in tests only.
3561    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3562        self.load_epoch_store_one_call_per_task()
3563    }
3564
3565    pub fn clone_committee_for_testing(&self) -> Committee {
3566        Committee::clone(self.epoch_store_for_testing().committee())
3567    }
3568
3569    #[instrument(level = "trace", skip_all)]
3570    pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3571        self.get_object_store()
3572            .try_get_object(object_id)
3573            .map_err(Into::into)
3574    }
3575
3576    /// Non-fallible version of `try_get_object`.
3577    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3578        self.try_get_object(object_id)
3579            .await
3580            .expect("storage access failed")
3581    }
3582
3583    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3584        Ok(self
3585            .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3586            .await?
3587            .expect("framework object should always exist")
3588            .compute_object_reference())
3589    }
3590
3591    // This function is only used for testing.
3592    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3593        self.get_object_cache_reader()
3594            .try_get_iota_system_state_object_unsafe()
3595    }
3596
3597    #[instrument(level = "trace", skip_all)]
3598    pub fn get_checkpoint_by_sequence_number(
3599        &self,
3600        sequence_number: CheckpointSequenceNumber,
3601    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3602        Ok(self
3603            .checkpoint_store
3604            .get_checkpoint_by_sequence_number(sequence_number)?)
3605    }
3606
3607    #[instrument(level = "trace", skip_all)]
3608    pub fn get_transaction_checkpoint_for_tests(
3609        &self,
3610        digest: &TransactionDigest,
3611        epoch_store: &AuthorityPerEpochStore,
3612    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3613        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3614        let Some(checkpoint) = checkpoint else {
3615            return Ok(None);
3616        };
3617        let checkpoint = self
3618            .checkpoint_store
3619            .get_checkpoint_by_sequence_number(checkpoint)?;
3620        Ok(checkpoint)
3621    }
3622
3623    #[instrument(level = "trace", skip_all)]
3624    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3625        Ok(
3626            match self
3627                .get_object_cache_reader()
3628                .try_get_latest_object_or_tombstone(*object_id)?
3629            {
3630                Some((_, ObjectOrTombstone::Object(object))) => {
3631                    let layout = self.get_object_layout(&object)?;
3632                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3633                }
3634                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3635                None => ObjectRead::NotExists(*object_id),
3636            },
3637        )
3638    }
3639
3640    /// Chain Identifier is the digest of the genesis checkpoint.
3641    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3642        self.chain_identifier
3643    }
3644
3645    #[instrument(level = "trace", skip_all)]
3646    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3647    where
3648        T: DeserializeOwned,
3649    {
3650        let o = self.get_object_read(object_id)?.into_object()?;
3651        if let Some(move_object) = o.data.try_as_move() {
3652            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3653                IotaError::ObjectDeserialization {
3654                    error: format!("{e}"),
3655                }
3656            })?)
3657        } else {
3658            Err(IotaError::ObjectDeserialization {
3659                error: format!("Provided object : [{object_id}] is not a Move object."),
3660            })
3661        }
3662    }
3663
3664    /// This function aims to serve rpc reads on past objects and
3665    /// we don't expect it to be called for other purposes.
3666    /// Depending on the object pruning policies that will be enforced in the
3667    /// future there is no software-level guarantee/SLA to retrieve an object
3668    /// with an old version even if it exists/existed.
3669    #[instrument(level = "trace", skip_all)]
3670    pub fn get_past_object_read(
3671        &self,
3672        object_id: &ObjectID,
3673        version: SequenceNumber,
3674    ) -> IotaResult<PastObjectRead> {
3675        // Firstly we see if the object ever existed by getting its latest data
3676        let Some(obj_ref) = self
3677            .get_object_cache_reader()
3678            .try_get_latest_object_ref_or_tombstone(*object_id)?
3679        else {
3680            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3681        };
3682
3683        if version > obj_ref.1 {
3684            return Ok(PastObjectRead::VersionTooHigh {
3685                object_id: *object_id,
3686                asked_version: version,
3687                latest_version: obj_ref.1,
3688            });
3689        }
3690
3691        if version < obj_ref.1 {
3692            // Read past objects
3693            return Ok(match self.read_object_at_version(object_id, version)? {
3694                Some((object, layout)) => {
3695                    let obj_ref = object.compute_object_reference();
3696                    PastObjectRead::VersionFound(obj_ref, object, layout)
3697                }
3698
3699                None => PastObjectRead::VersionNotFound(*object_id, version),
3700            });
3701        }
3702
3703        if !obj_ref.2.is_alive() {
3704            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3705        }
3706
3707        match self.read_object_at_version(object_id, obj_ref.1)? {
3708            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3709            None => {
3710                error!(
3711                    "Object with in parent_entry is missing from object store, datastore is \
3712                     inconsistent",
3713                );
3714                Err(UserInputError::ObjectNotFound {
3715                    object_id: *object_id,
3716                    version: Some(obj_ref.1),
3717                }
3718                .into())
3719            }
3720        }
3721    }
3722
3723    #[instrument(level = "trace", skip_all)]
3724    fn read_object_at_version(
3725        &self,
3726        object_id: &ObjectID,
3727        version: SequenceNumber,
3728    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3729        let Some(object) = self
3730            .get_object_cache_reader()
3731            .try_get_object_by_key(object_id, version)?
3732        else {
3733            return Ok(None);
3734        };
3735
3736        let layout = self.get_object_layout(&object)?;
3737        Ok(Some((object, layout)))
3738    }
3739
3740    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3741        let layout = object
3742            .data
3743            .try_as_move()
3744            .map(|object| {
3745                into_struct_layout(
3746                    self.load_epoch_store_one_call_per_task()
3747                        .executor()
3748                        // TODO(cache) - must read through cache
3749                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3750                        .get_annotated_layout(&object.type_().clone().into())?,
3751                )
3752            })
3753            .transpose()?;
3754        Ok(layout)
3755    }
3756
3757    fn get_owner_at_version(
3758        &self,
3759        object_id: &ObjectID,
3760        version: SequenceNumber,
3761    ) -> IotaResult<Owner> {
3762        self.get_object_store()
3763            .try_get_object_by_key(object_id, version)?
3764            .ok_or_else(|| {
3765                IotaError::from(UserInputError::ObjectNotFound {
3766                    object_id: *object_id,
3767                    version: Some(version),
3768                })
3769            })
3770            .map(|o| o.owner)
3771    }
3772
3773    #[instrument(level = "trace", skip_all)]
3774    pub fn get_owner_objects(
3775        &self,
3776        owner: IotaAddress,
3777        // If `Some`, the query will start from the next item after the specified cursor
3778        cursor: Option<ObjectID>,
3779        limit: usize,
3780        filter: Option<IotaObjectDataFilter>,
3781    ) -> IotaResult<Vec<ObjectInfo>> {
3782        if let Some(indexes) = &self.indexes {
3783            indexes.get_owner_objects(owner, cursor, limit, filter)
3784        } else {
3785            Err(IotaError::IndexStoreNotAvailable)
3786        }
3787    }
3788
3789    #[instrument(level = "trace", skip_all)]
3790    pub fn get_owned_coins_iterator_with_cursor(
3791        &self,
3792        owner: IotaAddress,
3793        // If `Some`, the query will start from the next item after the specified cursor
3794        cursor: (String, ObjectID),
3795        limit: usize,
3796        one_coin_type_only: bool,
3797    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3798        if let Some(indexes) = &self.indexes {
3799            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3800        } else {
3801            Err(IotaError::IndexStoreNotAvailable)
3802        }
3803    }
3804
3805    #[instrument(level = "trace", skip_all)]
3806    pub fn get_owner_objects_iterator(
3807        &self,
3808        owner: IotaAddress,
3809        // If `Some`, the query will start from the next item after the specified cursor
3810        cursor: Option<ObjectID>,
3811        filter: Option<IotaObjectDataFilter>,
3812    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3813        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3814        if let Some(indexes) = &self.indexes {
3815            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3816        } else {
3817            Err(IotaError::IndexStoreNotAvailable)
3818        }
3819    }
3820
3821    #[instrument(level = "trace", skip_all)]
3822    pub async fn get_move_objects<T>(
3823        &self,
3824        owner: IotaAddress,
3825        type_: MoveObjectType,
3826    ) -> IotaResult<Vec<T>>
3827    where
3828        T: DeserializeOwned,
3829    {
3830        let object_ids = self
3831            .get_owner_objects_iterator(owner, None, None)?
3832            .filter(|o| match &o.type_ {
3833                ObjectType::Struct(s) => &type_ == s,
3834                ObjectType::Package => false,
3835            })
3836            .map(|info| ObjectKey(info.object_id, info.version))
3837            .collect::<Vec<_>>();
3838        let mut move_objects = vec![];
3839
3840        let objects = self
3841            .get_object_store()
3842            .try_multi_get_objects_by_key(&object_ids)?;
3843
3844        for (o, id) in objects.into_iter().zip(object_ids) {
3845            let object = o.ok_or_else(|| {
3846                IotaError::from(UserInputError::ObjectNotFound {
3847                    object_id: id.0,
3848                    version: Some(id.1),
3849                })
3850            })?;
3851            let move_object = object.data.try_as_move().ok_or_else(|| {
3852                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3853            })?;
3854            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3855                IotaError::ObjectDeserialization {
3856                    error: format!("{e}"),
3857                }
3858            })?);
3859        }
3860        Ok(move_objects)
3861    }
3862
3863    #[instrument(level = "trace", skip_all)]
3864    pub fn get_dynamic_fields(
3865        &self,
3866        owner: ObjectID,
3867        // If `Some`, the query will start from the next item after the specified cursor
3868        cursor: Option<ObjectID>,
3869        limit: usize,
3870    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3871        Ok(self
3872            .get_dynamic_fields_iterator(owner, cursor)?
3873            .take(limit)
3874            .collect::<Result<Vec<_>, _>>()?)
3875    }
3876
3877    fn get_dynamic_fields_iterator(
3878        &self,
3879        owner: ObjectID,
3880        // If `Some`, the query will start from the next item after the specified cursor
3881        cursor: Option<ObjectID>,
3882    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3883    {
3884        if let Some(indexes) = &self.indexes {
3885            indexes.get_dynamic_fields_iterator(owner, cursor)
3886        } else {
3887            Err(IotaError::IndexStoreNotAvailable)
3888        }
3889    }
3890
3891    #[instrument(level = "trace", skip_all)]
3892    pub fn get_dynamic_field_object_id(
3893        &self,
3894        owner: ObjectID,
3895        name_type: TypeTag,
3896        name_bcs_bytes: &[u8],
3897    ) -> IotaResult<Option<ObjectID>> {
3898        if let Some(indexes) = &self.indexes {
3899            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3900        } else {
3901            Err(IotaError::IndexStoreNotAvailable)
3902        }
3903    }
3904
3905    #[instrument(level = "trace", skip_all)]
3906    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3907        Ok(self.get_indexes()?.next_sequence_number())
3908    }
3909
3910    #[instrument(level = "trace", skip_all)]
3911    pub async fn get_executed_transaction_and_effects(
3912        &self,
3913        digest: TransactionDigest,
3914        kv_store: Arc<TransactionKeyValueStore>,
3915    ) -> IotaResult<(Transaction, TransactionEffects)> {
3916        let transaction = kv_store.get_tx(digest).await?;
3917        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3918        Ok((transaction, effects))
3919    }
3920
3921    #[instrument(level = "trace", skip_all)]
3922    pub fn multi_get_checkpoint_by_sequence_number(
3923        &self,
3924        sequence_numbers: &[CheckpointSequenceNumber],
3925    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3926        Ok(self
3927            .checkpoint_store
3928            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3929    }
3930
3931    #[instrument(level = "trace", skip_all)]
3932    pub fn get_transaction_events(
3933        &self,
3934        digest: &TransactionEventsDigest,
3935    ) -> IotaResult<TransactionEvents> {
3936        self.get_transaction_cache_reader()
3937            .try_get_events(digest)?
3938            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3939    }
3940
3941    pub fn get_transaction_input_objects(
3942        &self,
3943        effects: &TransactionEffects,
3944    ) -> anyhow::Result<Vec<Object>> {
3945        iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
3946            .map_err(Into::into)
3947    }
3948
3949    pub fn get_transaction_output_objects(
3950        &self,
3951        effects: &TransactionEffects,
3952    ) -> anyhow::Result<Vec<Object>> {
3953        iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
3954            .map_err(Into::into)
3955    }
3956
3957    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3958        match &self.indexes {
3959            Some(i) => Ok(i.clone()),
3960            None => Err(IotaError::UnsupportedFeature {
3961                error: "extended object indexing is not enabled on this server".into(),
3962            }),
3963        }
3964    }
3965
3966    pub async fn get_transactions_for_tests(
3967        self: &Arc<Self>,
3968        filter: Option<TransactionFilter>,
3969        cursor: Option<TransactionDigest>,
3970        limit: Option<usize>,
3971        reverse: bool,
3972    ) -> IotaResult<Vec<TransactionDigest>> {
3973        let metrics = KeyValueStoreMetrics::new_for_tests();
3974        let kv_store = Arc::new(TransactionKeyValueStore::new(
3975            "rocksdb",
3976            metrics,
3977            self.clone(),
3978        ));
3979        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3980            .await
3981    }
3982
3983    #[instrument(level = "trace", skip_all)]
3984    pub async fn get_transactions(
3985        &self,
3986        kv_store: &Arc<TransactionKeyValueStore>,
3987        filter: Option<TransactionFilter>,
3988        // If `Some`, the query will start from the next item after the specified cursor
3989        cursor: Option<TransactionDigest>,
3990        limit: Option<usize>,
3991        reverse: bool,
3992    ) -> IotaResult<Vec<TransactionDigest>> {
3993        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3994            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3995            let iter = checkpoint_contents.iter().map(|c| c.transaction);
3996            if reverse {
3997                let iter = iter
3998                    .rev()
3999                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4000                    .skip(usize::from(cursor.is_some()));
4001                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4002            } else {
4003                let iter = iter
4004                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4005                    .skip(usize::from(cursor.is_some()));
4006                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4007            }
4008        }
4009        self.get_indexes()?
4010            .get_transactions(filter, cursor, limit, reverse)
4011    }
4012
4013    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4014        &self.checkpoint_store
4015    }
4016
4017    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4018        self.get_checkpoint_store()
4019            .get_highest_executed_checkpoint_seq_number()?
4020            .ok_or(IotaError::UserInput {
4021                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4022            })
4023    }
4024
4025    #[cfg(msim)]
4026    pub fn get_highest_pruned_checkpoint_for_testing(
4027        &self,
4028    ) -> IotaResult<CheckpointSequenceNumber> {
4029        self.database_for_testing()
4030            .perpetual_tables
4031            .get_highest_pruned_checkpoint()
4032            .map(|c| c.unwrap_or(0))
4033            .map_err(Into::into)
4034    }
4035
4036    #[instrument(level = "trace", skip_all)]
4037    pub fn get_checkpoint_summary_by_sequence_number(
4038        &self,
4039        sequence_number: CheckpointSequenceNumber,
4040    ) -> IotaResult<CheckpointSummary> {
4041        let verified_checkpoint = self
4042            .get_checkpoint_store()
4043            .get_checkpoint_by_sequence_number(sequence_number)?;
4044        match verified_checkpoint {
4045            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4046            None => Err(IotaError::UserInput {
4047                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4048            }),
4049        }
4050    }
4051
4052    #[instrument(level = "trace", skip_all)]
4053    pub fn get_checkpoint_summary_by_digest(
4054        &self,
4055        digest: CheckpointDigest,
4056    ) -> IotaResult<CheckpointSummary> {
4057        let verified_checkpoint = self
4058            .get_checkpoint_store()
4059            .get_checkpoint_by_digest(&digest)?;
4060        match verified_checkpoint {
4061            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4062            None => Err(IotaError::UserInput {
4063                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4064            }),
4065        }
4066    }
4067
4068    #[instrument(level = "trace", skip_all)]
4069    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4070        if is_system_package(package_id) {
4071            return self.find_genesis_txn_digest();
4072        }
4073        Ok(self
4074            .get_object_read(&package_id)?
4075            .into_object()?
4076            .previous_transaction)
4077    }
4078
4079    #[instrument(level = "trace", skip_all)]
4080    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4081        let summary = self
4082            .get_verified_checkpoint_by_sequence_number(0)?
4083            .into_message();
4084        let content = self.get_checkpoint_contents(summary.content_digest)?;
4085        let genesis_transaction = content.enumerate_transactions(&summary).next();
4086        Ok(genesis_transaction
4087            .ok_or(IotaError::UserInput {
4088                error: UserInputError::GenesisTransactionNotFound,
4089            })?
4090            .1
4091            .transaction)
4092    }
4093
4094    #[instrument(level = "trace", skip_all)]
4095    pub fn get_verified_checkpoint_by_sequence_number(
4096        &self,
4097        sequence_number: CheckpointSequenceNumber,
4098    ) -> IotaResult<VerifiedCheckpoint> {
4099        let verified_checkpoint = self
4100            .get_checkpoint_store()
4101            .get_checkpoint_by_sequence_number(sequence_number)?;
4102        match verified_checkpoint {
4103            Some(verified_checkpoint) => Ok(verified_checkpoint),
4104            None => Err(IotaError::UserInput {
4105                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4106            }),
4107        }
4108    }
4109
4110    #[instrument(level = "trace", skip_all)]
4111    pub fn get_verified_checkpoint_summary_by_digest(
4112        &self,
4113        digest: CheckpointDigest,
4114    ) -> IotaResult<VerifiedCheckpoint> {
4115        let verified_checkpoint = self
4116            .get_checkpoint_store()
4117            .get_checkpoint_by_digest(&digest)?;
4118        match verified_checkpoint {
4119            Some(verified_checkpoint) => Ok(verified_checkpoint),
4120            None => Err(IotaError::UserInput {
4121                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4122            }),
4123        }
4124    }
4125
4126    #[instrument(level = "trace", skip_all)]
4127    pub fn get_checkpoint_contents(
4128        &self,
4129        digest: CheckpointContentsDigest,
4130    ) -> IotaResult<CheckpointContents> {
4131        self.get_checkpoint_store()
4132            .get_checkpoint_contents(&digest)?
4133            .ok_or(IotaError::UserInput {
4134                error: UserInputError::CheckpointContentsNotFound(digest),
4135            })
4136    }
4137
4138    #[instrument(level = "trace", skip_all)]
4139    pub fn get_checkpoint_contents_by_sequence_number(
4140        &self,
4141        sequence_number: CheckpointSequenceNumber,
4142    ) -> IotaResult<CheckpointContents> {
4143        let verified_checkpoint = self
4144            .get_checkpoint_store()
4145            .get_checkpoint_by_sequence_number(sequence_number)?;
4146        match verified_checkpoint {
4147            Some(verified_checkpoint) => {
4148                let content_digest = verified_checkpoint.into_inner().content_digest;
4149                self.get_checkpoint_contents(content_digest)
4150            }
4151            None => Err(IotaError::UserInput {
4152                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4153            }),
4154        }
4155    }
4156
4157    #[instrument(level = "trace", skip_all)]
4158    pub async fn query_events(
4159        &self,
4160        kv_store: &Arc<TransactionKeyValueStore>,
4161        query: EventFilter,
4162        // If `Some`, the query will start from the next item after the specified cursor
4163        cursor: Option<EventID>,
4164        limit: usize,
4165        descending: bool,
4166    ) -> IotaResult<Vec<IotaEvent>> {
4167        let index_store = self.get_indexes()?;
4168
4169        // Get the tx_num from tx_digest
4170        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4171            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4172                IotaError::TransactionNotFound {
4173                    digest: cursor.tx_digest,
4174                },
4175            )?;
4176            (tx_seq, cursor.event_seq as usize)
4177        } else if descending {
4178            (u64::MAX, usize::MAX)
4179        } else {
4180            (0, 0)
4181        };
4182
4183        let limit = limit + 1;
4184        let mut event_keys = match query {
4185            EventFilter::All(filters) => {
4186                if filters.is_empty() {
4187                    index_store.all_events(tx_num, event_num, limit, descending)?
4188                } else {
4189                    return Err(IotaError::UserInput {
4190                        error: UserInputError::Unsupported(
4191                            "This query type does not currently support filter combinations"
4192                                .to_string(),
4193                        ),
4194                    });
4195                }
4196            }
4197            EventFilter::Transaction(digest) => {
4198                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4199            }
4200            EventFilter::MoveModule { package, module } => {
4201                let module_id = ModuleId::new(package.into(), module);
4202                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4203            }
4204            EventFilter::MoveEventType(struct_name) => index_store
4205                .events_by_move_event_struct_name(
4206                    &struct_name,
4207                    tx_num,
4208                    event_num,
4209                    limit,
4210                    descending,
4211                )?,
4212            EventFilter::Sender(sender) => {
4213                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4214            }
4215            EventFilter::TimeRange {
4216                start_time,
4217                end_time,
4218            } => index_store
4219                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4220            EventFilter::MoveEventModule { package, module } => index_store
4221                .events_by_move_event_module(
4222                    &ModuleId::new(package.into(), module),
4223                    tx_num,
4224                    event_num,
4225                    limit,
4226                    descending,
4227                )?,
4228            // not using "_ =>" because we want to make sure we remember to add new variants here
4229            EventFilter::Package(_)
4230            | EventFilter::MoveEventField { .. }
4231            | EventFilter::Any(_)
4232            | EventFilter::And(_, _)
4233            | EventFilter::Or(_, _) => {
4234                return Err(IotaError::UserInput {
4235                    error: UserInputError::Unsupported(
4236                        "This query type is not supported by the full node.".to_string(),
4237                    ),
4238                });
4239            }
4240        };
4241
4242        // skip one event if exclusive cursor is provided,
4243        // otherwise truncate to the original limit.
4244        if cursor.is_some() {
4245            if !event_keys.is_empty() {
4246                event_keys.remove(0);
4247            }
4248        } else {
4249            event_keys.truncate(limit - 1);
4250        }
4251
4252        // get the unique set of digests from the event_keys
4253        let transaction_digests = event_keys
4254            .iter()
4255            .map(|(_, digest, _, _)| *digest)
4256            .collect::<HashSet<_>>()
4257            .into_iter()
4258            .collect::<Vec<_>>();
4259
4260        let events = kv_store
4261            .multi_get_events_by_tx_digests(&transaction_digests)
4262            .await?;
4263
4264        let events_map: HashMap<_, _> =
4265            transaction_digests.iter().zip(events.into_iter()).collect();
4266
4267        let stored_events = event_keys
4268            .into_iter()
4269            .map(|k| {
4270                (
4271                    k,
4272                    events_map
4273                        .get(&k.1)
4274                        .expect("fetched digest is missing")
4275                        .clone()
4276                        .and_then(|e| e.data.get(k.2).cloned()),
4277                )
4278            })
4279            .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4280                event
4281                    .map(|e| (e, tx_digest, event_seq, timestamp))
4282                    .ok_or(IotaError::TransactionEventsNotFound { digest })
4283            })
4284            .collect::<Result<Vec<_>, _>>()?;
4285
4286        let epoch_store = self.load_epoch_store_one_call_per_task();
4287        let backing_store = self.get_backing_package_store().as_ref();
4288        let mut layout_resolver = epoch_store
4289            .executor()
4290            .type_layout_resolver(Box::new(backing_store));
4291        let mut events = vec![];
4292        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4293            events.push(IotaEvent::try_from(
4294                e.clone(),
4295                tx_digest,
4296                event_seq as u64,
4297                Some(timestamp),
4298                layout_resolver.get_annotated_layout(&e.type_)?,
4299            )?)
4300        }
4301        Ok(events)
4302    }
4303
4304    pub async fn insert_genesis_object(&self, object: Object) {
4305        self.get_reconfig_api()
4306            .try_insert_genesis_object(object)
4307            .expect("Cannot insert genesis object")
4308    }
4309
4310    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4311        futures::future::join_all(
4312            objects
4313                .iter()
4314                .map(|o| self.insert_genesis_object(o.clone())),
4315        )
4316        .await;
4317    }
4318
4319    /// Make a status response for a transaction
4320    #[instrument(level = "trace", skip_all)]
4321    pub fn get_transaction_status(
4322        &self,
4323        transaction_digest: &TransactionDigest,
4324        epoch_store: &Arc<AuthorityPerEpochStore>,
4325    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4326        // TODO: In the case of read path, we should not have to re-sign the effects.
4327        if let Some(effects) =
4328            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4329        {
4330            if let Some(transaction) = self
4331                .get_transaction_cache_reader()
4332                .try_get_transaction_block(transaction_digest)?
4333            {
4334                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4335                let events = if let Some(digest) = effects.events_digest() {
4336                    self.get_transaction_events(digest)?
4337                } else {
4338                    TransactionEvents::default()
4339                };
4340                return Ok(Some((
4341                    (*transaction).clone().into_message(),
4342                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4343                )));
4344            } else {
4345                // The read of effects and read of transaction are not atomic. It's possible
4346                // that we reverted the transaction (during epoch change) in
4347                // between the above two reads, and we end up having effects but
4348                // not transaction. In this case, we just fall through.
4349                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4350            }
4351        }
4352        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4353            self.metrics.tx_already_processed.inc();
4354            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4355            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4356        } else {
4357            Ok(None)
4358        }
4359    }
4360
4361    /// Get the signed effects of the given transaction. If the effects was
4362    /// signed in a previous epoch, re-sign it so that the caller is able to
4363    /// form a cert of the effects in the current epoch.
4364    #[instrument(level = "trace", skip_all)]
4365    pub fn get_signed_effects_and_maybe_resign(
4366        &self,
4367        transaction_digest: &TransactionDigest,
4368        epoch_store: &Arc<AuthorityPerEpochStore>,
4369    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4370        let effects = self
4371            .get_transaction_cache_reader()
4372            .try_get_executed_effects(transaction_digest)?;
4373        match effects {
4374            Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4375            None => Ok(None),
4376        }
4377    }
4378
4379    #[instrument(level = "trace", skip_all)]
4380    pub(crate) fn sign_effects(
4381        &self,
4382        effects: TransactionEffects,
4383        epoch_store: &Arc<AuthorityPerEpochStore>,
4384    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4385        let tx_digest = *effects.transaction_digest();
4386        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4387            Some(sig) if sig.epoch == epoch_store.epoch() => {
4388                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4389            }
4390            _ => {
4391                // If the transaction was executed in previous epochs, the validator will
4392                // re-sign the effects with new current epoch so that a client is always able to
4393                // obtain an effects certificate at the current epoch.
4394                //
4395                // Why is this necessary? Consider the following case:
4396                // - assume there are 4 validators
4397                // - Quorum driver gets 2 signed effects before reconfig halt
4398                // - The tx makes it into final checkpoint.
4399                // - 2 validators go away and are replaced in the new epoch.
4400                // - The new epoch begins.
4401                // - The quorum driver cannot complete the partial effects cert from the
4402                //   previous epoch, because it may not be able to reach either of the 2 former
4403                //   validators.
4404                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4405                //   the new epoch, the QD can make a new effects cert and return it to the
4406                //   client.
4407                //
4408                // This is a considered a short-term workaround. Eventually, Quorum Driver
4409                // should be able to return either an effects certificate, -or-
4410                // a proof of inclusion in a checkpoint. In the case above, the
4411                // Quorum Driver would return a proof of inclusion in the final
4412                // checkpoint, and this code would no longer be necessary.
4413                debug!(
4414                    ?tx_digest,
4415                    epoch=?epoch_store.epoch(),
4416                    "Re-signing the effects with the current epoch"
4417                );
4418
4419                let sig = AuthoritySignInfo::new(
4420                    epoch_store.epoch(),
4421                    &effects,
4422                    Intent::iota_app(IntentScope::TransactionEffects),
4423                    self.name,
4424                    &*self.secret,
4425                );
4426
4427                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4428
4429                epoch_store.insert_effects_digest_and_signature(
4430                    &tx_digest,
4431                    effects.digest(),
4432                    &sig,
4433                )?;
4434
4435                effects
4436            }
4437        };
4438
4439        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4440            signed_effects,
4441        ))
4442    }
4443
4444    // Returns coin objects for indexing for fullnode if indexing is enabled.
4445    #[instrument(level = "trace", skip_all)]
4446    fn fullnode_only_get_tx_coins_for_indexing(
4447        &self,
4448        inner_temporary_store: &InnerTemporaryStore,
4449        epoch_store: &Arc<AuthorityPerEpochStore>,
4450    ) -> Option<TxCoins> {
4451        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4452            return None;
4453        }
4454        let written_coin_objects = inner_temporary_store
4455            .written
4456            .iter()
4457            .filter_map(|(k, v)| {
4458                if v.is_coin() {
4459                    Some((*k, v.clone()))
4460                } else {
4461                    None
4462                }
4463            })
4464            .collect();
4465        let input_coin_objects = inner_temporary_store
4466            .input_objects
4467            .iter()
4468            .filter_map(|(k, v)| {
4469                if v.is_coin() {
4470                    Some((*k, v.clone()))
4471                } else {
4472                    None
4473                }
4474            })
4475            .collect::<ObjectMap>();
4476        Some((input_coin_objects, written_coin_objects))
4477    }
4478
4479    /// Get the TransactionEnvelope that currently locks the given object, if
4480    /// any. Since object locks are only valid for one epoch, we also need
4481    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4482    /// no lock records for the given object can be found.
4483    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4484    /// object record is at a different version.
4485    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4486    /// certain transaction. Returns None if the a lock record is
4487    /// initialized for the given ObjectRef but not yet locked by any
4488    /// transaction,     or cannot find the transaction in transaction
4489    /// table, because of data race etc.
4490    #[instrument(level = "trace", skip_all)]
4491    pub async fn get_transaction_lock(
4492        &self,
4493        object_ref: &ObjectRef,
4494        epoch_store: &AuthorityPerEpochStore,
4495    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4496        let lock_info = self
4497            .get_object_cache_reader()
4498            .try_get_lock(*object_ref, epoch_store)?;
4499        let lock_info = match lock_info {
4500            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4501                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4502                    provided_obj_ref: *object_ref,
4503                    current_version: locked_ref.1,
4504                }
4505                .into());
4506            }
4507            ObjectLockStatus::Initialized => {
4508                return Ok(None);
4509            }
4510            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4511        };
4512
4513        epoch_store.get_signed_transaction(&lock_info)
4514    }
4515
4516    pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4517        self.get_object_cache_reader().try_get_objects(objects)
4518    }
4519
4520    /// Non-fallible version of `try_get_objects`.
4521    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4522        self.try_get_objects(objects)
4523            .await
4524            .expect("storage access failed")
4525    }
4526
4527    pub async fn try_get_object_or_tombstone(
4528        &self,
4529        object_id: ObjectID,
4530    ) -> IotaResult<Option<ObjectRef>> {
4531        self.get_object_cache_reader()
4532            .try_get_latest_object_ref_or_tombstone(object_id)
4533    }
4534
4535    /// Non-fallible version of `try_get_object_or_tombstone`.
4536    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4537        self.try_get_object_or_tombstone(object_id)
4538            .await
4539            .expect("storage access failed")
4540    }
4541
4542    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4543    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4544    /// upgrade.
4545    ///
4546    /// This method can be used to dynamic adjust the amount of buffer. If set
4547    /// to 0, the upgrade will go through with only 2f+1 votes.
4548    ///
4549    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4550    /// should have the same value), or you risk halting the chain.
4551    pub fn set_override_protocol_upgrade_buffer_stake(
4552        &self,
4553        expected_epoch: EpochId,
4554        buffer_stake_bps: u64,
4555    ) -> IotaResult {
4556        let epoch_store = self.load_epoch_store_one_call_per_task();
4557        let actual_epoch = epoch_store.epoch();
4558        if actual_epoch != expected_epoch {
4559            return Err(IotaError::WrongEpoch {
4560                expected_epoch,
4561                actual_epoch,
4562            });
4563        }
4564
4565        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4566    }
4567
4568    pub fn clear_override_protocol_upgrade_buffer_stake(
4569        &self,
4570        expected_epoch: EpochId,
4571    ) -> IotaResult {
4572        let epoch_store = self.load_epoch_store_one_call_per_task();
4573        let actual_epoch = epoch_store.epoch();
4574        if actual_epoch != expected_epoch {
4575            return Err(IotaError::WrongEpoch {
4576                expected_epoch,
4577                actual_epoch,
4578            });
4579        }
4580
4581        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4582    }
4583
4584    /// Get the set of system packages that are compiled in to this build, if
4585    /// those packages are compatible with the current versions of those
4586    /// packages on-chain.
4587    pub async fn get_available_system_packages(
4588        &self,
4589        binary_config: &BinaryConfig,
4590    ) -> Vec<ObjectRef> {
4591        let mut results = vec![];
4592
4593        let system_packages = BuiltInFramework::iter_system_packages();
4594
4595        // Add extra framework packages during simtest
4596        #[cfg(msim)]
4597        let extra_packages = framework_injection::get_extra_packages(self.name);
4598        #[cfg(msim)]
4599        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4600
4601        for system_package in system_packages {
4602            let modules = system_package.modules().to_vec();
4603            // In simtests, we could override the current built-in framework packages.
4604            #[cfg(msim)]
4605            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4606                .unwrap_or(modules);
4607
4608            let Some(obj_ref) = iota_framework::compare_system_package(
4609                &self.get_object_store(),
4610                &system_package.id,
4611                &modules,
4612                system_package.dependencies.to_vec(),
4613                binary_config,
4614            )
4615            .await
4616            else {
4617                return vec![];
4618            };
4619            results.push(obj_ref);
4620        }
4621
4622        results
4623    }
4624
4625    /// Return the new versions, module bytes, and dependencies for the packages
4626    /// that have been committed to for a framework upgrade, in
4627    /// `system_packages`.  Loads the module contents from the binary, and
4628    /// performs the following checks:
4629    ///
4630    /// - Whether its contents matches what is on-chain already, in which case
4631    ///   no upgrade is required, and its contents are omitted from the output.
4632    /// - Whether the contents in the binary can form a package whose digest
4633    ///   matches the input, meaning the framework will be upgraded, and this
4634    ///   authority can satisfy that upgrade, in which case the contents are
4635    ///   included in the output.
4636    ///
4637    /// If the current version of the framework can't be loaded, the binary does
4638    /// not contain the bytes for that framework ID, or the resulting
4639    /// package fails the digest check, `None` is returned indicating that
4640    /// this authority cannot run the upgrade that the network voted on.
4641    async fn get_system_package_bytes(
4642        &self,
4643        system_packages: Vec<ObjectRef>,
4644        binary_config: &BinaryConfig,
4645    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4646        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4647        let objects = self.get_objects(&ids).await;
4648
4649        let mut res = Vec::with_capacity(system_packages.len());
4650        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4651            let prev_transaction = match object {
4652                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4653                    // Skip this one because it doesn't need to be upgraded.
4654                    info!("Framework {} does not need updating", system_package_ref.0);
4655                    continue;
4656                }
4657
4658                Some(cur_object) => cur_object.previous_transaction,
4659                None => TransactionDigest::genesis_marker(),
4660            };
4661
4662            #[cfg(msim)]
4663            let SystemPackage {
4664                id: _,
4665                bytes,
4666                dependencies,
4667            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4668                .unwrap_or_else(|| {
4669                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4670                });
4671
4672            #[cfg(not(msim))]
4673            let SystemPackage {
4674                id: _,
4675                bytes,
4676                dependencies,
4677            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4678
4679            let modules: Vec<_> = bytes
4680                .iter()
4681                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4682                .collect();
4683
4684            let new_object = Object::new_system_package(
4685                &modules,
4686                system_package_ref.1,
4687                dependencies.clone(),
4688                prev_transaction,
4689            );
4690
4691            let new_ref = new_object.compute_object_reference();
4692            if new_ref != system_package_ref {
4693                error!(
4694                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4695                );
4696                return None;
4697            }
4698
4699            res.push((system_package_ref.1, bytes, dependencies));
4700        }
4701
4702        Some(res)
4703    }
4704
4705    /// Returns the new protocol version and system packages that the network
4706    /// has voted to upgrade to. If the proposed protocol version is not
4707    /// supported, None is returned.
4708    fn is_protocol_version_supported_v1(
4709        proposed_protocol_version: ProtocolVersion,
4710        committee: &Committee,
4711        capabilities: Vec<AuthorityCapabilitiesV1>,
4712        mut buffer_stake_bps: u64,
4713    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4714        if buffer_stake_bps > 10000 {
4715            warn!("clamping buffer_stake_bps to 10000");
4716            buffer_stake_bps = 10000;
4717        }
4718
4719        // For each validator, gather the protocol version and system packages that it
4720        // would like to upgrade to in the next epoch.
4721        let mut desired_upgrades: Vec<_> = capabilities
4722            .into_iter()
4723            .filter_map(|mut cap| {
4724                // A validator that lists no packages is voting against any change at all.
4725                if cap.available_system_packages.is_empty() {
4726                    return None;
4727                }
4728
4729                cap.available_system_packages.sort();
4730
4731                info!(
4732                    "validator {:?} supports {:?} with system packages: {:?}",
4733                    cap.authority.concise(),
4734                    cap.supported_protocol_versions,
4735                    cap.available_system_packages,
4736                );
4737
4738                // A validator that only supports the current protocol version is also voting
4739                // against any change, because framework upgrades always require a protocol
4740                // version bump.
4741                cap.supported_protocol_versions
4742                    .get_version_digest(proposed_protocol_version)
4743                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4744            })
4745            .collect();
4746
4747        // There can only be one set of votes that have a majority, find one if it
4748        // exists.
4749        desired_upgrades.sort();
4750        desired_upgrades
4751            .into_iter()
4752            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4753            .into_iter()
4754            .find_map(|((digest, packages), group)| {
4755                // should have been filtered out earlier.
4756                assert!(!packages.is_empty());
4757
4758                let mut stake_aggregator: StakeAggregator<(), true> =
4759                    StakeAggregator::new(Arc::new(committee.clone()));
4760
4761                for (_, _, authority) in group {
4762                    stake_aggregator.insert_generic(authority, ());
4763                }
4764
4765                let total_votes = stake_aggregator.total_votes();
4766                let quorum_threshold = committee.quorum_threshold();
4767                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4768
4769                info!(
4770                    protocol_config_digest = ?digest,
4771                    ?total_votes,
4772                    ?quorum_threshold,
4773                    ?buffer_stake_bps,
4774                    ?effective_threshold,
4775                    ?proposed_protocol_version,
4776                    ?packages,
4777                    "support for upgrade"
4778                );
4779
4780                let has_support = total_votes >= effective_threshold;
4781                has_support.then_some((proposed_protocol_version, digest, packages))
4782            })
4783    }
4784
4785    /// Selects the highest supported protocol version and system packages that
4786    /// the network has voted to upgrade to. If no upgrade is supported,
4787    /// returns the current protocol version and system packages.
4788    fn choose_protocol_version_and_system_packages_v1(
4789        current_protocol_version: ProtocolVersion,
4790        current_protocol_digest: Digest,
4791        committee: &Committee,
4792        capabilities: Vec<AuthorityCapabilitiesV1>,
4793        buffer_stake_bps: u64,
4794    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4795        let mut next_protocol_version = current_protocol_version;
4796        let mut system_packages = vec![];
4797        let mut protocol_version_digest = current_protocol_digest;
4798
4799        // Finds the highest supported protocol version and system packages by
4800        // incrementing the proposed protocol version by one until no further
4801        // upgrades are supported.
4802        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4803            next_protocol_version + 1,
4804            committee,
4805            capabilities.clone(),
4806            buffer_stake_bps,
4807        ) {
4808            next_protocol_version = version;
4809            protocol_version_digest = digest;
4810            system_packages = packages;
4811        }
4812
4813        (
4814            next_protocol_version,
4815            protocol_version_digest,
4816            system_packages,
4817        )
4818    }
4819
4820    /// Returns the indices of validators that support the given protocol
4821    /// version and digest. This includes both committee and non-committee
4822    /// validators based on their capabilities. Uses active validators
4823    /// instead of committee indices.
4824    fn get_validators_supporting_protocol_version(
4825        target_protocol_version: ProtocolVersion,
4826        target_digest: Digest,
4827        active_validators: &[AuthorityPublicKey],
4828        capabilities: &[AuthorityCapabilitiesV1],
4829    ) -> Vec<u64> {
4830        let mut eligible_validators = Vec::new();
4831
4832        for capability in capabilities {
4833            // Check if this validator supports the target protocol version and digest
4834            if let Some(digest) = capability
4835                .supported_protocol_versions
4836                .get_version_digest(target_protocol_version)
4837            {
4838                if digest == target_digest {
4839                    // Find the validator's index in the active validators list
4840                    if let Some(index) = active_validators
4841                        .iter()
4842                        .position(|name| AuthorityName::from(name) == capability.authority)
4843                    {
4844                        eligible_validators.push(index as u64);
4845                    }
4846                }
4847            }
4848        }
4849
4850        // Sort indices for deterministic behavior
4851        eligible_validators.sort();
4852        eligible_validators
4853    }
4854
4855    /// Calculates the sum of weights for eligible validators that are part of
4856    /// the committee. Takes the indices from
4857    /// get_validators_supporting_protocol_version and maps them back
4858    /// to committee members to get their weights.
4859    fn calculate_eligible_validators_weight(
4860        eligible_validator_indices: &[u64],
4861        active_validators: &[AuthorityPublicKey],
4862        committee: &Committee,
4863    ) -> u64 {
4864        let mut total_weight = 0u64;
4865
4866        for &index in eligible_validator_indices {
4867            let authority_pubkey = &active_validators[index as usize];
4868            // Check if this validator is in the committee and get their weight
4869            if let Some((_, weight)) = committee
4870                .members()
4871                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4872            {
4873                total_weight += weight;
4874            }
4875        }
4876
4877        total_weight
4878    }
4879
4880    #[instrument(level = "debug", skip_all)]
4881    fn create_authenticator_state_tx(
4882        &self,
4883        epoch_store: &Arc<AuthorityPerEpochStore>,
4884    ) -> Option<EndOfEpochTransactionKind> {
4885        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4886            info!("authenticator state transactions not enabled");
4887            return None;
4888        }
4889
4890        let authenticator_state_exists = epoch_store.authenticator_state_exists();
4891        let tx = if authenticator_state_exists {
4892            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4893            let min_epoch =
4894                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4895            let authenticator_obj_initial_shared_version = epoch_store
4896                .epoch_start_config()
4897                .authenticator_obj_initial_shared_version()
4898                .expect("initial version must exist");
4899
4900            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4901                min_epoch,
4902                authenticator_obj_initial_shared_version,
4903            );
4904
4905            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4906
4907            tx
4908        } else {
4909            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4910            info!("Creating AuthenticatorStateCreate tx");
4911            tx
4912        };
4913        Some(tx)
4914    }
4915
4916    /// Creates and execute the advance epoch transaction to effects without
4917    /// committing it to the database. The effects of the change epoch tx
4918    /// are only written to the database after a certified checkpoint has been
4919    /// formed and executed by CheckpointExecutor.
4920    ///
4921    /// When a framework upgraded has been decided on, but the validator does
4922    /// not have the new versions of the packages locally, the validator
4923    /// cannot form the ChangeEpochTx. In this case it returns Err,
4924    /// indicating that the checkpoint builder should give up trying to make the
4925    /// final checkpoint. As long as the network is able to create a certified
4926    /// checkpoint (which should be ensured by the capabilities vote), it
4927    /// will arrive via state sync and be executed by CheckpointExecutor.
4928    #[instrument(level = "error", skip_all)]
4929    pub async fn create_and_execute_advance_epoch_tx(
4930        &self,
4931        epoch_store: &Arc<AuthorityPerEpochStore>,
4932        gas_cost_summary: &GasCostSummary,
4933        checkpoint: CheckpointSequenceNumber,
4934        epoch_start_timestamp_ms: CheckpointTimestamp,
4935        scores: Vec<u64>,
4936    ) -> anyhow::Result<(
4937        IotaSystemState,
4938        Option<SystemEpochInfoEvent>,
4939        TransactionEffects,
4940    )> {
4941        let mut txns = Vec::new();
4942
4943        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4944            txns.push(tx);
4945        }
4946
4947        let next_epoch = epoch_store.epoch() + 1;
4948
4949        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4950        let authority_capabilities = epoch_store
4951            .get_capabilities_v1()
4952            .expect("read capabilities from db cannot fail");
4953        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4954            Self::choose_protocol_version_and_system_packages_v1(
4955                epoch_store.protocol_version(),
4956                SupportedProtocolVersionsWithHashes::protocol_config_digest(
4957                    epoch_store.protocol_config(),
4958                ),
4959                epoch_store.committee(),
4960                authority_capabilities.clone(),
4961                buffer_stake_bps,
4962            );
4963
4964        // since system packages are created during the current epoch, they should abide
4965        // by the rules of the current epoch, including the current epoch's max
4966        // Move binary format version
4967        let config = epoch_store.protocol_config();
4968        let binary_config = to_binary_config(config);
4969        let Some(next_epoch_system_package_bytes) = self
4970            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4971            .await
4972        else {
4973            error!(
4974                "upgraded system packages {:?} are not locally available, cannot create \
4975                ChangeEpochTx. validator binary must be upgraded to the correct version!",
4976                next_epoch_system_packages
4977            );
4978            // the checkpoint builder will keep retrying forever when it hits this error.
4979            // Eventually, one of two things will happen:
4980            // - The operator will upgrade this binary to one that has the new packages
4981            //   locally, and this function will succeed.
4982            // - The final checkpoint will be certified by other validators, we will receive
4983            //   it via state sync, and execute it. This will upgrade the framework
4984            //   packages, reconfigure, and most likely shut down in the new epoch (this
4985            //   validator likely doesn't support the new protocol version, or else it
4986            //   should have had the packages.)
4987            bail!("missing system packages: cannot form ChangeEpochTx");
4988        };
4989
4990        // Use ChangeEpochV3 or ChangeEpochV4 when the feature flags are enabled and
4991        // ChangeEpochV2 requirements are met
4992        if config.select_committee_from_eligible_validators() {
4993            // Get the list of eligible validators that support the target protocol version
4994            let active_validators = epoch_store.epoch_start_state().get_active_validators();
4995
4996            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
4997
4998            // Use validators supporting the target protocol version as eligible validators
4999            // in the next version if select_committee_supporting_next_epoch_version feature
5000            // flag is set to true.
5001            if config.select_committee_supporting_next_epoch_version() {
5002                eligible_active_validators = Self::get_validators_supporting_protocol_version(
5003                    next_epoch_protocol_version,
5004                    next_epoch_protocol_digest,
5005                    &active_validators,
5006                    &authority_capabilities,
5007                );
5008
5009                // Calculate the total weight of eligible validators in the committee
5010                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5011                    &eligible_active_validators,
5012                    &active_validators,
5013                    epoch_store.committee(),
5014                );
5015
5016                // Safety check: ensure eligible validators have enough stake
5017                // Use the same effective threshold calculation that was used to decide the
5018                // protocol version
5019                let committee = epoch_store.committee();
5020                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5021
5022                if eligible_validators_weight < effective_threshold {
5023                    error!(
5024                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5025                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5026                    );
5027                    // Pass all active validator indices as eligible validators
5028                    // to perform selection among all of them.
5029                    eligible_active_validators = (0..active_validators.len() as u64).collect();
5030                }
5031            }
5032
5033            // Use ChangeEpochV4 when the pass_validator_scores_to_advance_epoch feature
5034            // flag is enabled.
5035            if config.pass_validator_scores_to_advance_epoch() {
5036                txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5037                    next_epoch,
5038                    next_epoch_protocol_version,
5039                    gas_cost_summary.storage_cost,
5040                    gas_cost_summary.computation_cost,
5041                    gas_cost_summary.computation_cost_burned,
5042                    gas_cost_summary.storage_rebate,
5043                    gas_cost_summary.non_refundable_storage_fee,
5044                    epoch_start_timestamp_ms,
5045                    next_epoch_system_package_bytes,
5046                    eligible_active_validators,
5047                    scores,
5048                    config.adjust_rewards_by_score(),
5049                ));
5050            } else {
5051                txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5052                    next_epoch,
5053                    next_epoch_protocol_version,
5054                    gas_cost_summary.storage_cost,
5055                    gas_cost_summary.computation_cost,
5056                    gas_cost_summary.computation_cost_burned,
5057                    gas_cost_summary.storage_rebate,
5058                    gas_cost_summary.non_refundable_storage_fee,
5059                    epoch_start_timestamp_ms,
5060                    next_epoch_system_package_bytes,
5061                    eligible_active_validators,
5062                ));
5063            }
5064        } else if config.protocol_defined_base_fee()
5065            && config.max_committee_members_count_as_option().is_some()
5066        {
5067            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5068                next_epoch,
5069                next_epoch_protocol_version,
5070                gas_cost_summary.storage_cost,
5071                gas_cost_summary.computation_cost,
5072                gas_cost_summary.computation_cost_burned,
5073                gas_cost_summary.storage_rebate,
5074                gas_cost_summary.non_refundable_storage_fee,
5075                epoch_start_timestamp_ms,
5076                next_epoch_system_package_bytes,
5077            ));
5078        } else {
5079            txns.push(EndOfEpochTransactionKind::new_change_epoch(
5080                next_epoch,
5081                next_epoch_protocol_version,
5082                gas_cost_summary.storage_cost,
5083                gas_cost_summary.computation_cost,
5084                gas_cost_summary.storage_rebate,
5085                gas_cost_summary.non_refundable_storage_fee,
5086                epoch_start_timestamp_ms,
5087                next_epoch_system_package_bytes,
5088            ));
5089        }
5090
5091        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5092
5093        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5094            tx.clone(),
5095            epoch_store.epoch(),
5096            checkpoint,
5097        );
5098
5099        let tx_digest = executable_tx.digest();
5100
5101        info!(
5102            ?next_epoch,
5103            ?next_epoch_protocol_version,
5104            ?next_epoch_system_packages,
5105            computation_cost=?gas_cost_summary.computation_cost,
5106            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5107            storage_cost=?gas_cost_summary.storage_cost,
5108            storage_rebate=?gas_cost_summary.storage_rebate,
5109            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5110            ?tx_digest,
5111            "Creating advance epoch transaction"
5112        );
5113
5114        fail_point_async!("change_epoch_tx_delay");
5115        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5116
5117        // The tx could have been executed by state sync already - if so simply return
5118        // an error. The checkpoint builder will shortly be terminated by
5119        // reconfiguration anyway.
5120        if self
5121            .get_transaction_cache_reader()
5122            .try_is_tx_already_executed(tx_digest)?
5123        {
5124            warn!("change epoch tx has already been executed via state sync");
5125            bail!("change epoch tx has already been executed via state sync",);
5126        }
5127
5128        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5129
5130        // We must manually assign the shared object versions to the transaction before
5131        // executing it. This is because we do not sequence end-of-epoch
5132        // transactions through consensus.
5133        epoch_store.assign_shared_object_versions_idempotent(
5134            self.get_object_cache_reader().as_ref(),
5135            std::slice::from_ref(&executable_tx),
5136        )?;
5137
5138        let (input_objects, _, _) =
5139            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5140
5141        let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5142            &execution_guard,
5143            &executable_tx,
5144            input_objects,
5145            None,
5146            None,
5147            epoch_store,
5148        )?;
5149        let system_obj = get_iota_system_state(&temporary_store.written)
5150            .expect("change epoch tx must write to system object");
5151        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
5152        let system_epoch_info_event = temporary_store
5153            .events
5154            .data
5155            .into_iter()
5156            .find(|event| event.is_system_epoch_info_event())
5157            .map(SystemEpochInfoEvent::from);
5158        // The system epoch info event can be `None` in case if the `advance_epoch`
5159        // Move function call failed and was executed in the safe mode.
5160        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5161
5162        // We must write tx and effects to the state sync tables so that state sync is
5163        // able to deliver to the transaction to CheckpointExecutor after it is
5164        // included in a certified checkpoint.
5165        self.get_state_sync_store()
5166            .try_insert_transaction_and_effects(&tx, &effects)
5167            .map_err(|err| {
5168                let err: anyhow::Error = err.into();
5169                err
5170            })?;
5171
5172        info!(
5173            "Effects summary of the change epoch transaction: {:?}",
5174            effects.summary_for_debug()
5175        );
5176        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5177        // The change epoch transaction cannot fail to execute.
5178        assert!(effects.status().is_ok());
5179        Ok((system_obj, system_epoch_info_event, effects))
5180    }
5181
5182    /// This function is called at the very end of the epoch.
5183    /// This step is required before updating new epoch in the db and calling
5184    /// reopen_epoch_db.
5185    #[instrument(level = "error", skip_all)]
5186    async fn revert_uncommitted_epoch_transactions(
5187        &self,
5188        epoch_store: &AuthorityPerEpochStore,
5189    ) -> IotaResult {
5190        {
5191            let state = epoch_store.get_reconfig_state_write_lock_guard();
5192            if state.should_accept_user_certs() {
5193                // Need to change this so that consensus adapter do not accept certificates from
5194                // user. This can happen if our local validator did not initiate
5195                // epoch change locally, but 2f+1 nodes already concluded the
5196                // epoch.
5197                //
5198                // This lock is essentially a barrier for
5199                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5200                // after this block
5201                epoch_store.close_user_certs(state);
5202            }
5203            // lock is dropped here
5204        }
5205        let pending_certificates = epoch_store.pending_consensus_certificates();
5206        info!(
5207            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5208            pending_certificates.len(),
5209            pending_certificates,
5210        );
5211        for digest in pending_certificates {
5212            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5213                info!(
5214                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5215                    digest
5216                );
5217                continue;
5218            }
5219            info!("Reverting {:?} at the end of epoch", digest);
5220            epoch_store.revert_executed_transaction(&digest)?;
5221            self.get_reconfig_api().try_revert_state_update(&digest)?;
5222        }
5223        info!("All uncommitted local transactions reverted");
5224        Ok(())
5225    }
5226
5227    #[instrument(level = "error", skip_all)]
5228    async fn reopen_epoch_db(
5229        &self,
5230        cur_epoch_store: &AuthorityPerEpochStore,
5231        new_committee: Committee,
5232        epoch_start_configuration: EpochStartConfiguration,
5233        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5234        epoch_last_checkpoint: CheckpointSequenceNumber,
5235    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5236        let new_epoch = new_committee.epoch;
5237        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5238        assert_eq!(
5239            epoch_start_configuration.epoch_start_state().epoch(),
5240            new_committee.epoch
5241        );
5242        fail_point!("before-open-new-epoch-store");
5243        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5244            self.name,
5245            new_committee,
5246            epoch_start_configuration,
5247            self.get_backing_package_store().clone(),
5248            self.get_object_store().clone(),
5249            expensive_safety_check_config,
5250            epoch_last_checkpoint,
5251        )?;
5252        self.epoch_store.store(new_epoch_store.clone());
5253        Ok(new_epoch_store)
5254    }
5255
5256    /// Checks if `authenticator` unlocks a valid Move account and returns the
5257    /// account-related `AuthenticatorFunctionRef` object.
5258    fn check_move_account(
5259        &self,
5260        auth_account_object_id: ObjectID,
5261        auth_account_object_seq_number: Option<SequenceNumber>,
5262        auth_account_object_digest: Option<ObjectDigest>,
5263        account_object: ObjectReadResult,
5264        signer: &IotaAddress,
5265    ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5266        let account_object = match account_object.object {
5267            ObjectReadResultKind::Object(object) => Ok(object),
5268            ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5269                Err(UserInputError::AccountObjectDeleted {
5270                    account_id: account_object.id(),
5271                    account_version: version,
5272                    transaction_digest: digest,
5273                })
5274            }
5275            // It is impossible to check the account object because it is used in a canceled
5276            // transaction and is not loaded.
5277            ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5278                Err(UserInputError::AccountObjectInCanceledTransaction {
5279                    account_id: account_object.id(),
5280                    account_version: version,
5281                })
5282            }
5283        }?;
5284
5285        let account_object_addr = IotaAddress::from(auth_account_object_id);
5286
5287        fp_ensure!(
5288            signer == &account_object_addr,
5289            UserInputError::IncorrectUserSignature {
5290                error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5291            }
5292            .into()
5293        );
5294
5295        fp_ensure!(
5296            account_object.is_shared() || account_object.is_immutable(),
5297            UserInputError::AccountObjectNotSupported {
5298                object_id: auth_account_object_id
5299            }
5300            .into()
5301        );
5302
5303        let auth_account_object_seq_number =
5304            if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5305                let account_object_version = account_object.version();
5306
5307                fp_ensure!(
5308                    account_object_version == auth_account_object_seq_number,
5309                    UserInputError::AccountObjectVersionMismatch {
5310                        object_id: auth_account_object_id,
5311                        expected_version: auth_account_object_seq_number,
5312                        actual_version: account_object_version,
5313                    }
5314                    .into()
5315                );
5316
5317                auth_account_object_seq_number
5318            } else {
5319                account_object.version()
5320            };
5321
5322        if let Some(auth_account_object_digest) = auth_account_object_digest {
5323            let expected_digest = account_object.digest();
5324            fp_ensure!(
5325                expected_digest == auth_account_object_digest,
5326                UserInputError::InvalidAccountObjectDigest {
5327                    object_id: auth_account_object_id,
5328                    expected_digest,
5329                    actual_digest: auth_account_object_digest,
5330                }
5331                .into()
5332            );
5333        }
5334
5335        let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5336            auth_account_object_id,
5337            &AuthenticatorFunctionRefV1Key::tag().into(),
5338            &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5339        )
5340        .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5341            account_object_id: auth_account_object_id,
5342        })?;
5343
5344        let authenticator_function_ref_field = self
5345            .get_object_cache_reader()
5346            .try_find_object_lt_or_eq_version(
5347                authenticator_function_ref_field_id,
5348                auth_account_object_seq_number,
5349            )?;
5350
5351        if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5352            let field_move_object = authenticator_function_ref_field_obj
5353                .data
5354                .try_as_move()
5355                .expect("dynamic field should never be a package object");
5356
5357            let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5358                field_move_object.to_rust().ok_or(
5359                    UserInputError::InvalidAuthenticatorFunctionRefField {
5360                        account_object_id: auth_account_object_id,
5361                    },
5362                )?;
5363
5364            Ok(AuthenticatorFunctionRefForExecution::new_v1(
5365                field.value,
5366                authenticator_function_ref_field_obj.compute_object_reference(),
5367                authenticator_function_ref_field_obj.owner,
5368                authenticator_function_ref_field_obj.storage_rebate,
5369                authenticator_function_ref_field_obj.previous_transaction,
5370            ))
5371        } else {
5372            Err(UserInputError::MoveAuthenticatorNotFound {
5373                authenticator_function_ref_id: authenticator_function_ref_field_id,
5374                account_object_id: auth_account_object_id,
5375                account_object_version: auth_account_object_seq_number,
5376            }
5377            .into())
5378        }
5379    }
5380
5381    fn read_objects_for_signing(
5382        &self,
5383        transaction: &VerifiedTransaction,
5384        epoch: u64,
5385    ) -> IotaResult<(
5386        InputObjects,
5387        ReceivingObjects,
5388        Option<InputObjects>,
5389        Option<ObjectReadResult>,
5390    )> {
5391        let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5392            Some(transaction.digest()),
5393            &transaction.collect_all_input_object_kind_for_reading()?,
5394            &transaction.data().transaction_data().receiving_objects(),
5395            epoch,
5396        )?;
5397
5398        transaction
5399            .split_input_objects_into_groups_for_reading(input_objects)
5400            .map(|(tx_input_objects, auth_input_objects, account_object)| {
5401                (
5402                    tx_input_objects,
5403                    tx_receiving_objects,
5404                    auth_input_objects,
5405                    account_object,
5406                )
5407            })
5408    }
5409
5410    fn check_transaction_inputs_for_signing(
5411        &self,
5412        protocol_config: &ProtocolConfig,
5413        reference_gas_price: u64,
5414        tx_data: &TransactionData,
5415        tx_input_objects: InputObjects,
5416        tx_receiving_objects: &ReceivingObjects,
5417        move_authenticator: Option<&MoveAuthenticator>,
5418        auth_input_objects: Option<InputObjects>,
5419        account_object: Option<ObjectReadResult>,
5420    ) -> IotaResult<(
5421        IotaGasStatus,
5422        CheckedInputObjects,
5423        Option<CheckedInputObjects>,
5424        Option<AuthenticatorFunctionRef>,
5425    )> {
5426        let (
5427            auth_checked_input_objects_union,
5428            authenticator_function_ref,
5429            authenticator_gas_budget,
5430        ) = if let Some(move_authenticator) = move_authenticator {
5431            let auth_input_objects =
5432                auth_input_objects.expect("MoveAuthenticator input objects must be provided");
5433            let account_object = account_object.expect("Move account object must be provided");
5434
5435            // Check basic `object_to_authenticate` preconditions and get its components.
5436            let (
5437                auth_account_object_id,
5438                auth_account_object_seq_number,
5439                auth_account_object_digest,
5440            ) = move_authenticator.object_to_authenticate_components()?;
5441
5442            // Make sure the sender is a Move account.
5443            let AuthenticatorFunctionRefForExecution {
5444                authenticator_function_ref,
5445                ..
5446            } = self.check_move_account(
5447                auth_account_object_id,
5448                auth_account_object_seq_number,
5449                auth_account_object_digest,
5450                account_object,
5451                &tx_data.sender(),
5452            )?;
5453
5454            // Check the MoveAuthenticator input objects.
5455            let auth_checked_input_objects =
5456                iota_transaction_checks::check_move_authenticator_input_for_signing(
5457                    auth_input_objects,
5458                )?;
5459
5460            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
5461            // not a part of the transaction data.
5462            let authenticator_gas_budget = protocol_config.max_auth_gas();
5463
5464            (
5465                Some(auth_checked_input_objects),
5466                Some(authenticator_function_ref),
5467                authenticator_gas_budget,
5468            )
5469        } else {
5470            (None, None, 0)
5471        };
5472
5473        // Check the transaction inputs.
5474        let (gas_status, tx_checked_input_objects) =
5475            iota_transaction_checks::check_transaction_input(
5476                protocol_config,
5477                reference_gas_price,
5478                tx_data,
5479                tx_input_objects,
5480                tx_receiving_objects,
5481                &self.metrics.bytecode_verifier_metrics,
5482                &self.config.verifier_signing_config,
5483                authenticator_gas_budget,
5484            )?;
5485
5486        Ok((
5487            gas_status,
5488            tx_checked_input_objects,
5489            auth_checked_input_objects_union,
5490            authenticator_function_ref,
5491        ))
5492    }
5493
5494    #[cfg(test)]
5495    pub(crate) fn iter_live_object_set_for_testing(
5496        &self,
5497    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5498        self.get_accumulator_store()
5499            .iter_cached_live_object_set_for_testing()
5500    }
5501
5502    #[cfg(test)]
5503    pub(crate) fn shutdown_execution_for_test(&self) {
5504        self.tx_execution_shutdown
5505            .lock()
5506            .take()
5507            .unwrap()
5508            .send(())
5509            .unwrap();
5510    }
5511
5512    /// NOTE: this function is only to be used for fuzzing and testing. Never
5513    /// use in prod
5514    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5515        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5516        self.get_object_cache_reader()
5517            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5518        self.get_reconfig_api()
5519            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5520    }
5521}
5522
5523pub struct RandomnessRoundReceiver {
5524    authority_state: Arc<AuthorityState>,
5525    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5526}
5527
5528impl RandomnessRoundReceiver {
5529    pub fn spawn(
5530        authority_state: Arc<AuthorityState>,
5531        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5532    ) -> JoinHandle<()> {
5533        let rrr = RandomnessRoundReceiver {
5534            authority_state,
5535            randomness_rx,
5536        };
5537        spawn_monitored_task!(rrr.run())
5538    }
5539
5540    async fn run(mut self) {
5541        info!("RandomnessRoundReceiver event loop started");
5542
5543        loop {
5544            tokio::select! {
5545                maybe_recv = self.randomness_rx.recv() => {
5546                    if let Some((epoch, round, bytes)) = maybe_recv {
5547                        self.handle_new_randomness(epoch, round, bytes);
5548                    } else {
5549                        break;
5550                    }
5551                },
5552            }
5553        }
5554
5555        info!("RandomnessRoundReceiver event loop ended");
5556    }
5557
5558    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5559    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5560        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5561        if epoch_store.epoch() != epoch {
5562            warn!(
5563                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5564                epoch_store.epoch()
5565            );
5566            return;
5567        }
5568        let transaction = VerifiedTransaction::new_randomness_state_update(
5569            epoch,
5570            round,
5571            bytes,
5572            epoch_store
5573                .epoch_start_config()
5574                .randomness_obj_initial_shared_version(),
5575        );
5576        debug!(
5577            "created randomness state update transaction with digest: {:?}",
5578            transaction.digest()
5579        );
5580        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5581        let digest = *transaction.digest();
5582
5583        // Randomness state updates contain the full bls signature for the random round,
5584        // which cannot necessarily be reconstructed again later. Therefore we must
5585        // immediately persist this transaction. If we crash before its outputs
5586        // are committed, this ensures we will be able to re-execute it.
5587        self.authority_state
5588            .get_cache_commit()
5589            .persist_transaction(&transaction);
5590
5591        // Send transaction to TransactionManager for execution.
5592        self.authority_state
5593            .transaction_manager()
5594            .enqueue(vec![transaction], &epoch_store);
5595
5596        let authority_state = self.authority_state.clone();
5597        spawn_monitored_task!(async move {
5598            // Wait for transaction execution in a separate task, to avoid deadlock in case
5599            // of out-of-order randomness generation. (Each
5600            // RandomnessStateUpdate depends on the output of the
5601            // RandomnessStateUpdate from the previous round.)
5602            //
5603            // We set a very long timeout so that in case this gets stuck for some reason,
5604            // the validator will eventually crash rather than continuing in a
5605            // zombie mode.
5606            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5607            let result = tokio::time::timeout(
5608                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5609                authority_state
5610                    .get_transaction_cache_reader()
5611                    .try_notify_read_executed_effects(&[digest]),
5612            )
5613            .await;
5614            let result = match result {
5615                Ok(result) => result,
5616                Err(_) => {
5617                    if cfg!(debug_assertions) {
5618                        // Crash on randomness update execution timeout in debug builds.
5619                        panic!(
5620                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5621                        );
5622                    }
5623                    warn!(
5624                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5625                    );
5626                    // Continue waiting as long as necessary in non-debug builds.
5627                    authority_state
5628                        .get_transaction_cache_reader()
5629                        .try_notify_read_executed_effects(&[digest])
5630                        .await
5631                }
5632            };
5633
5634            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5635            let effects = effects.pop().expect("should return effects");
5636            if *effects.status() != ExecutionStatus::Success {
5637                fatal!(
5638                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5639                );
5640            }
5641            debug!(
5642                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5643            );
5644        });
5645    }
5646}
5647
5648#[async_trait]
5649impl TransactionKeyValueStoreTrait for AuthorityState {
5650    async fn multi_get(
5651        &self,
5652        transaction_keys: &[TransactionDigest],
5653        effects_keys: &[TransactionDigest],
5654    ) -> IotaResult<KVStoreTransactionData> {
5655        let txns = if !transaction_keys.is_empty() {
5656            self.get_transaction_cache_reader()
5657                .try_multi_get_transaction_blocks(transaction_keys)?
5658                .into_iter()
5659                .map(|t| t.map(|t| (*t).clone().into_inner()))
5660                .collect()
5661        } else {
5662            vec![]
5663        };
5664
5665        let fx = if !effects_keys.is_empty() {
5666            self.get_transaction_cache_reader()
5667                .try_multi_get_executed_effects(effects_keys)?
5668        } else {
5669            vec![]
5670        };
5671
5672        Ok((txns, fx))
5673    }
5674
5675    async fn multi_get_checkpoints(
5676        &self,
5677        checkpoint_summaries: &[CheckpointSequenceNumber],
5678        checkpoint_contents: &[CheckpointSequenceNumber],
5679        checkpoint_summaries_by_digest: &[CheckpointDigest],
5680    ) -> IotaResult<(
5681        Vec<Option<CertifiedCheckpointSummary>>,
5682        Vec<Option<CheckpointContents>>,
5683        Vec<Option<CertifiedCheckpointSummary>>,
5684    )> {
5685        // TODO: use multi-get methods if it ever becomes important (unlikely)
5686        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5687        let store = self.get_checkpoint_store();
5688        for seq in checkpoint_summaries {
5689            let checkpoint = store
5690                .get_checkpoint_by_sequence_number(*seq)?
5691                .map(|c| c.into_inner());
5692
5693            summaries.push(checkpoint);
5694        }
5695
5696        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5697        for seq in checkpoint_contents {
5698            let checkpoint = store
5699                .get_checkpoint_by_sequence_number(*seq)?
5700                .and_then(|summary| {
5701                    store
5702                        .get_checkpoint_contents(&summary.content_digest)
5703                        .expect("db read cannot fail")
5704                });
5705            contents.push(checkpoint);
5706        }
5707
5708        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5709        for digest in checkpoint_summaries_by_digest {
5710            let checkpoint = store
5711                .get_checkpoint_by_digest(digest)?
5712                .map(|c| c.into_inner());
5713            summaries_by_digest.push(checkpoint);
5714        }
5715
5716        Ok((summaries, contents, summaries_by_digest))
5717    }
5718
5719    async fn get_transaction_perpetual_checkpoint(
5720        &self,
5721        digest: TransactionDigest,
5722    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5723        self.get_checkpoint_cache()
5724            .try_get_transaction_perpetual_checkpoint(&digest)
5725            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5726    }
5727
5728    async fn get_object(
5729        &self,
5730        object_id: ObjectID,
5731        version: VersionNumber,
5732    ) -> IotaResult<Option<Object>> {
5733        self.get_object_cache_reader()
5734            .try_get_object_by_key(&object_id, version)
5735    }
5736
5737    async fn multi_get_transactions_perpetual_checkpoints(
5738        &self,
5739        digests: &[TransactionDigest],
5740    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5741        let res = self
5742            .get_checkpoint_cache()
5743            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5744
5745        Ok(res
5746            .into_iter()
5747            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5748            .collect())
5749    }
5750
5751    #[instrument(skip(self))]
5752    async fn multi_get_events_by_tx_digests(
5753        &self,
5754        digests: &[TransactionDigest],
5755    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5756        if digests.is_empty() {
5757            return Ok(vec![]);
5758        }
5759        let events_digests: Vec<_> = self
5760            .get_transaction_cache_reader()
5761            .try_multi_get_executed_effects(digests)?
5762            .into_iter()
5763            .map(|t| t.and_then(|t| t.events_digest().cloned()))
5764            .collect();
5765        let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5766        let mut events = self
5767            .get_transaction_cache_reader()
5768            .try_multi_get_events(&non_empty_events)?
5769            .into_iter();
5770        Ok(events_digests
5771            .into_iter()
5772            .map(|ev| ev.and_then(|_| events.next()?))
5773            .collect())
5774    }
5775}
5776
5777#[cfg(msim)]
5778pub mod framework_injection {
5779    use std::{
5780        cell::RefCell,
5781        collections::{BTreeMap, BTreeSet},
5782    };
5783
5784    use iota_framework::{BuiltInFramework, SystemPackage};
5785    use iota_types::{
5786        base_types::{AuthorityName, ObjectID},
5787        is_system_package,
5788    };
5789    use move_binary_format::CompiledModule;
5790
5791    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5792
5793    // Thread local cache because all simtests run in a single unique thread.
5794    thread_local! {
5795        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5796    }
5797
5798    type Framework = Vec<CompiledModule>;
5799
5800    pub type PackageUpgradeCallback =
5801        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5802
5803    enum PackageOverrideConfig {
5804        Global(Framework),
5805        PerValidator(PackageUpgradeCallback),
5806    }
5807
5808    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5809        modules
5810            .iter()
5811            .map(|m| {
5812                let mut buf = Vec::new();
5813                m.serialize_with_version(m.version, &mut buf).unwrap();
5814                buf
5815            })
5816            .collect()
5817    }
5818
5819    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5820        OVERRIDE.with(|bs| {
5821            bs.borrow_mut()
5822                .insert(package_id, PackageOverrideConfig::Global(modules))
5823        });
5824    }
5825
5826    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5827        OVERRIDE.with(|bs| {
5828            bs.borrow_mut()
5829                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5830        });
5831    }
5832
5833    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5834        OVERRIDE.with(|cfg| {
5835            cfg.borrow().get(package_id).and_then(|entry| match entry {
5836                PackageOverrideConfig::Global(framework) => {
5837                    Some(compiled_modules_to_bytes(framework))
5838                }
5839                PackageOverrideConfig::PerValidator(func) => {
5840                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
5841                }
5842            })
5843        })
5844    }
5845
5846    pub fn get_override_modules(
5847        package_id: &ObjectID,
5848        name: AuthorityName,
5849    ) -> Option<Vec<CompiledModule>> {
5850        OVERRIDE.with(|cfg| {
5851            cfg.borrow().get(package_id).and_then(|entry| match entry {
5852                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5853                PackageOverrideConfig::PerValidator(func) => func(name),
5854            })
5855        })
5856    }
5857
5858    pub fn get_override_system_package(
5859        package_id: &ObjectID,
5860        name: AuthorityName,
5861    ) -> Option<SystemPackage> {
5862        let bytes = get_override_bytes(package_id, name)?;
5863        let dependencies = if is_system_package(*package_id) {
5864            BuiltInFramework::get_package_by_id(package_id)
5865                .dependencies
5866                .to_vec()
5867        } else {
5868            // Assume that entirely new injected packages depend on all existing system
5869            // packages.
5870            BuiltInFramework::all_package_ids()
5871        };
5872        Some(SystemPackage {
5873            id: *package_id,
5874            bytes,
5875            dependencies,
5876        })
5877    }
5878
5879    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5880        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5881        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5882            cfg.borrow()
5883                .keys()
5884                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5885                .collect()
5886        });
5887
5888        extra
5889            .into_iter()
5890            .map(|package| SystemPackage {
5891                id: package,
5892                bytes: get_override_bytes(&package, name).unwrap(),
5893                dependencies: BuiltInFramework::all_package_ids(),
5894            })
5895            .collect()
5896    }
5897}
5898
5899#[derive(Debug, Serialize, Deserialize, Clone)]
5900pub struct ObjDumpFormat {
5901    pub id: ObjectID,
5902    pub version: VersionNumber,
5903    pub digest: ObjectDigest,
5904    pub object: Object,
5905}
5906
5907impl ObjDumpFormat {
5908    fn new(object: Object) -> Self {
5909        let oref = object.compute_object_reference();
5910        Self {
5911            id: oref.0,
5912            version: oref.1,
5913            digest: oref.2,
5914            object,
5915        }
5916    }
5917}
5918
5919#[derive(Debug, Serialize, Deserialize, Clone)]
5920pub struct NodeStateDump {
5921    pub tx_digest: TransactionDigest,
5922    pub sender_signed_data: SenderSignedData,
5923    pub executed_epoch: u64,
5924    pub reference_gas_price: u64,
5925    pub protocol_version: u64,
5926    pub epoch_start_timestamp_ms: u64,
5927    pub computed_effects: TransactionEffects,
5928    pub expected_effects_digest: TransactionEffectsDigest,
5929    pub relevant_system_packages: Vec<ObjDumpFormat>,
5930    pub shared_objects: Vec<ObjDumpFormat>,
5931    pub loaded_child_objects: Vec<ObjDumpFormat>,
5932    pub modified_at_versions: Vec<ObjDumpFormat>,
5933    pub runtime_reads: Vec<ObjDumpFormat>,
5934    pub input_objects: Vec<ObjDumpFormat>,
5935}
5936
5937impl NodeStateDump {
5938    pub fn new(
5939        tx_digest: &TransactionDigest,
5940        effects: &TransactionEffects,
5941        expected_effects_digest: TransactionEffectsDigest,
5942        object_store: &dyn ObjectStore,
5943        epoch_store: &Arc<AuthorityPerEpochStore>,
5944        inner_temporary_store: &InnerTemporaryStore,
5945        certificate: &VerifiedExecutableTransaction,
5946    ) -> IotaResult<Self> {
5947        // Epoch info
5948        let executed_epoch = epoch_store.epoch();
5949        let reference_gas_price = epoch_store.reference_gas_price();
5950        let epoch_start_config = epoch_store.epoch_start_config();
5951        let protocol_version = epoch_store.protocol_version().as_u64();
5952        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5953
5954        // Record all system packages at this version
5955        let mut relevant_system_packages = Vec::new();
5956        for sys_package_id in BuiltInFramework::all_package_ids() {
5957            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5958                relevant_system_packages.push(ObjDumpFormat::new(w))
5959            }
5960        }
5961
5962        // Record all the shared objects
5963        let mut shared_objects = Vec::new();
5964        for kind in effects.input_shared_objects() {
5965            match kind {
5966                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5967                    if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5968                        shared_objects.push(ObjDumpFormat::new(w))
5969                    }
5970                }
5971                InputSharedObject::ReadDeleted(..)
5972                | InputSharedObject::MutateDeleted(..)
5973                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
5974                                                           * objects. */
5975            }
5976        }
5977
5978        // Record all loaded child objects
5979        // Child objects which are read but not mutated are not tracked anywhere else
5980        let mut loaded_child_objects = Vec::new();
5981        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5982            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5983                loaded_child_objects.push(ObjDumpFormat::new(w))
5984            }
5985        }
5986
5987        // Record all modified objects
5988        let mut modified_at_versions = Vec::new();
5989        for (id, ver) in effects.modified_at_versions() {
5990            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5991                modified_at_versions.push(ObjDumpFormat::new(w))
5992            }
5993        }
5994
5995        // Packages read at runtime, which were not previously loaded into the temoorary
5996        // store Some packages may be fetched at runtime and wont show up in
5997        // input objects
5998        let mut runtime_reads = Vec::new();
5999        for obj in inner_temporary_store
6000            .runtime_packages_loaded_from_db
6001            .values()
6002        {
6003            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6004        }
6005
6006        // All other input objects should already be in `inner_temporary_store.objects`
6007
6008        Ok(Self {
6009            tx_digest: *tx_digest,
6010            executed_epoch,
6011            reference_gas_price,
6012            epoch_start_timestamp_ms,
6013            protocol_version,
6014            relevant_system_packages,
6015            shared_objects,
6016            loaded_child_objects,
6017            modified_at_versions,
6018            runtime_reads,
6019            sender_signed_data: certificate.clone().into_message(),
6020            input_objects: inner_temporary_store
6021                .input_objects
6022                .values()
6023                .map(|o| ObjDumpFormat::new(o.clone()))
6024                .collect(),
6025            computed_effects: effects.clone(),
6026            expected_effects_digest,
6027        })
6028    }
6029
6030    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6031        let mut objects = Vec::new();
6032        objects.extend(self.relevant_system_packages.clone());
6033        objects.extend(self.shared_objects.clone());
6034        objects.extend(self.loaded_child_objects.clone());
6035        objects.extend(self.modified_at_versions.clone());
6036        objects.extend(self.runtime_reads.clone());
6037        objects.extend(self.input_objects.clone());
6038        objects
6039    }
6040
6041    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6042        let file_name = format!(
6043            "{}_{}_NODE_DUMP.json",
6044            self.tx_digest,
6045            AuthorityState::unixtime_now_ms()
6046        );
6047        let mut path = path.to_path_buf();
6048        path.push(&file_name);
6049        let mut file = File::create(path.clone())?;
6050        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6051        Ok(path)
6052    }
6053
6054    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6055        let file = File::open(path)?;
6056        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6057    }
6058}