iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49    key_value_store::{
50        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51    },
52    key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57    IOTA_SYSTEM_ADDRESS, TypeTag,
58    account_abstraction::{
59        account::AuthenticatorFunctionRefV1Key,
60        authenticator_function::{
61            AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
62            AuthenticatorFunctionRefV1,
63        },
64    },
65    authenticator_state::get_authenticator_state,
66    base_types::*,
67    committee::{Committee, EpochId, ProtocolVersion},
68    crypto::{
69        AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
70        default_hash,
71    },
72    deny_list_v1::check_coin_deny_list_v1_during_signing,
73    digests::{ChainIdentifier, Digest, TransactionEventsDigest},
74    dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
75    effects::{
76        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
77        TransactionEvents, VerifiedSignedTransactionEffects,
78    },
79    error::{ExecutionError, IotaError, IotaResult, UserInputError},
80    event::{Event, EventID, SystemEpochInfoEvent},
81    executable_transaction::VerifiedExecutableTransaction,
82    execution_config_utils::to_binary_config,
83    execution_status::ExecutionStatus,
84    fp_ensure,
85    gas::{GasCostSummary, IotaGasStatus},
86    gas_coin::NANOS_PER_IOTA,
87    inner_temporary_store::{
88        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
89        WrittenObjects,
90    },
91    iota_system_state::{
92        IotaSystemState, IotaSystemStateTrait,
93        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
94    },
95    is_system_package,
96    layout_resolver::{LayoutResolver, into_struct_layout},
97    message_envelope::Message,
98    messages_checkpoint::{
99        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
100        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
101        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
102        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
103    },
104    messages_consensus::AuthorityCapabilitiesV1,
105    messages_grpc::{
106        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
107        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
108        TransactionStatus,
109    },
110    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
111    move_authenticator::MoveAuthenticator,
112    object::{
113        MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
114        bounded_visitor::BoundedVisitor,
115    },
116    storage::{
117        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
118    },
119    supported_protocol_versions::{
120        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
121    },
122    transaction::*,
123    transaction_executor::{SimulateTransactionResult, VmChecks},
124};
125use itertools::Itertools;
126use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
127use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
128use parking_lot::Mutex;
129use prometheus::{
130    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131    register_histogram_vec_with_registry, register_histogram_with_registry,
132    register_int_counter_vec_with_registry, register_int_counter_with_registry,
133    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139    task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152    authority::{
153        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157        authority_store_tables::AuthorityPrunerTables,
158        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159    },
160    authority_client::NetworkAuthorityClient,
161    checkpoints::CheckpointStore,
162    congestion_tracker::CongestionTracker,
163    consensus_adapter::ConsensusAdapter,
164    epoch::committee_store::CommitteeStore,
165    execution_cache::{
166        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
167        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
168        TransactionCacheRead,
169    },
170    execution_driver::execution_process,
171    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
172    metrics::{LatencyObserver, RateTracker},
173    module_cache_metrics::ResolverMetrics,
174    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
175    rest_index::RestIndexStore,
176    stake_aggregator::StakeAggregator,
177    state_accumulator::{AccumulatorStore, StateAccumulator},
178    subscription_handler::SubscriptionHandler,
179    transaction_input_loader::TransactionInputLoader,
180    transaction_manager::TransactionManager,
181    transaction_outputs::TransactionOutputs,
182    validator_tx_finalizer::ValidatorTxFinalizer,
183    verify_indexes::verify_indexes,
184};
185
186#[cfg(test)]
187#[path = "unit_tests/authority_tests.rs"]
188pub mod authority_tests;
189
190#[cfg(test)]
191#[path = "unit_tests/transaction_tests.rs"]
192pub mod transaction_tests;
193
194#[cfg(test)]
195#[path = "unit_tests/batch_transaction_tests.rs"]
196mod batch_transaction_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/move_integration_tests.rs"]
200pub mod move_integration_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/gas_tests.rs"]
204mod gas_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/batch_verification_tests.rs"]
208mod batch_verification_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/coin_deny_list_tests.rs"]
212mod coin_deny_list_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/auth_unit_test_utils.rs"]
216pub mod auth_unit_test_utils;
217
218pub mod authority_test_utils;
219
220pub mod authority_per_epoch_store;
221pub mod authority_per_epoch_store_pruner;
222
223pub mod authority_store_pruner;
224pub mod authority_store_tables;
225pub mod authority_store_types;
226pub mod epoch_start_configuration;
227pub mod shared_object_congestion_tracker;
228pub mod shared_object_version_manager;
229pub mod suggested_gas_price_calculator;
230pub mod test_authority_builder;
231pub mod transaction_deferral;
232
233pub(crate) mod authority_store;
234pub mod backpressure;
235
236/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
237pub struct AuthorityMetrics {
238    tx_orders: IntCounter,
239    total_certs: IntCounter,
240    total_cert_attempts: IntCounter,
241    total_effects: IntCounter,
242    pub shared_obj_tx: IntCounter,
243    sponsored_tx: IntCounter,
244    tx_already_processed: IntCounter,
245    num_input_objs: Histogram,
246    num_shared_objects: Histogram,
247    batch_size: Histogram,
248
249    authority_state_handle_transaction_latency: Histogram,
250
251    execute_certificate_latency_single_writer: Histogram,
252    execute_certificate_latency_shared_object: Histogram,
253
254    internal_execution_latency: Histogram,
255    execution_load_input_objects_latency: Histogram,
256    prepare_certificate_latency: Histogram,
257    commit_certificate_latency: Histogram,
258    db_checkpoint_latency: Histogram,
259
260    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
261    pub(crate) transaction_manager_num_missing_objects: IntGauge,
262    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
263    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
264    pub(crate) transaction_manager_num_ready: IntGauge,
265    pub(crate) transaction_manager_object_cache_size: IntGauge,
266    pub(crate) transaction_manager_object_cache_hits: IntCounter,
267    pub(crate) transaction_manager_object_cache_misses: IntCounter,
268    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
269    pub(crate) transaction_manager_package_cache_size: IntGauge,
270    pub(crate) transaction_manager_package_cache_hits: IntCounter,
271    pub(crate) transaction_manager_package_cache_misses: IntCounter,
272    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
273    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
274
275    pub(crate) execution_driver_executed_transactions: IntCounter,
276    pub(crate) execution_driver_dispatch_queue: IntGauge,
277    pub(crate) execution_queueing_delay_s: Histogram,
278    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
279    pub(crate) execution_gas_latency_ratio: Histogram,
280
281    pub(crate) skipped_consensus_txns: IntCounter,
282    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
283
284    pub(crate) authority_overload_status: IntGauge,
285    pub(crate) authority_load_shedding_percentage: IntGauge,
286
287    pub(crate) transaction_overload_sources: IntCounterVec,
288
289    /// Post processing metrics
290    post_processing_total_events_emitted: IntCounter,
291    post_processing_total_tx_indexed: IntCounter,
292    post_processing_total_tx_had_event_processed: IntCounter,
293    post_processing_total_failures: IntCounter,
294
295    /// Consensus handler metrics
296    pub consensus_handler_processed: IntCounterVec,
297    pub consensus_handler_transaction_sizes: HistogramVec,
298    pub consensus_handler_num_low_scoring_authorities: IntGauge,
299    pub consensus_handler_scores: IntGaugeVec,
300    pub consensus_handler_deferred_transactions: IntCounter,
301    pub consensus_handler_congested_transactions: IntCounter,
302    pub consensus_handler_cancelled_transactions: IntCounter,
303    pub consensus_handler_max_object_costs: IntGaugeVec,
304    pub consensus_committed_subdags: IntCounterVec,
305    pub consensus_committed_messages: IntGaugeVec,
306    pub consensus_committed_user_transactions: IntGaugeVec,
307    pub consensus_handler_leader_round: IntGauge,
308    pub consensus_calculated_throughput: IntGauge,
309    pub consensus_calculated_throughput_profile: IntGauge,
310
311    pub limits_metrics: Arc<LimitsMetrics>,
312
313    /// bytecode verifier metrics for tracking timeouts
314    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
315
316    /// Count of zklogin signatures
317    pub zklogin_sig_count: IntCounter,
318    /// Count of multisig signatures
319    pub multisig_sig_count: IntCounter,
320
321    // Tracks recent average txn queueing delay between when it is ready for execution
322    // until it starts executing.
323    pub execution_queueing_latency: LatencyObserver,
324
325    // Tracks the rate of transactions become ready for execution in transaction manager.
326    // The need for the Mutex is that the tracker is updated in transaction manager and read
327    // in the overload_monitor. There should be low mutex contention because
328    // transaction manager is single threaded and the read rate in overload_monitor is
329    // low. In the case where transaction manager becomes multi-threaded, we can
330    // create one rate tracker per thread.
331    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
332
333    // Tracks the rate of transactions starts execution in execution driver.
334    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
335    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
336}
337
338// Override default Prom buckets for positive numbers in 0-10M range
339const POSITIVE_INT_BUCKETS: &[f64] = &[
340    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
341    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
342    7000000., 10000000.,
343];
344
345const LATENCY_SEC_BUCKETS: &[f64] = &[
346    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
347    10., 20., 30., 60., 90.,
348];
349
350// Buckets for low latency samples. Starts from 10us.
351const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
352    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
353    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
354];
355
356const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
357    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
358    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
359];
360
361/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
362pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
363
364impl AuthorityMetrics {
365    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
366        let execute_certificate_latency = register_histogram_vec_with_registry!(
367            "authority_state_execute_certificate_latency",
368            "Latency of executing certificates, including waiting for inputs",
369            &["tx_type"],
370            LATENCY_SEC_BUCKETS.to_vec(),
371            registry,
372        )
373        .unwrap();
374
375        let execute_certificate_latency_single_writer =
376            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
377        let execute_certificate_latency_shared_object =
378            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
379
380        Self {
381            tx_orders: register_int_counter_with_registry!(
382                "total_transaction_orders",
383                "Total number of transaction orders",
384                registry,
385            )
386            .unwrap(),
387            total_certs: register_int_counter_with_registry!(
388                "total_transaction_certificates",
389                "Total number of transaction certificates handled",
390                registry,
391            )
392            .unwrap(),
393            total_cert_attempts: register_int_counter_with_registry!(
394                "total_handle_certificate_attempts",
395                "Number of calls to handle_certificate",
396                registry,
397            )
398            .unwrap(),
399            // total_effects == total transactions finished
400            total_effects: register_int_counter_with_registry!(
401                "total_transaction_effects",
402                "Total number of transaction effects produced",
403                registry,
404            )
405            .unwrap(),
406
407            shared_obj_tx: register_int_counter_with_registry!(
408                "num_shared_obj_tx",
409                "Number of transactions involving shared objects",
410                registry,
411            )
412            .unwrap(),
413
414            sponsored_tx: register_int_counter_with_registry!(
415                "num_sponsored_tx",
416                "Number of sponsored transactions",
417                registry,
418            )
419            .unwrap(),
420
421            tx_already_processed: register_int_counter_with_registry!(
422                "num_tx_already_processed",
423                "Number of transaction orders already processed previously",
424                registry,
425            )
426            .unwrap(),
427            num_input_objs: register_histogram_with_registry!(
428                "num_input_objects",
429                "Distribution of number of input TX objects per TX",
430                POSITIVE_INT_BUCKETS.to_vec(),
431                registry,
432            )
433            .unwrap(),
434            num_shared_objects: register_histogram_with_registry!(
435                "num_shared_objects",
436                "Number of shared input objects per TX",
437                POSITIVE_INT_BUCKETS.to_vec(),
438                registry,
439            )
440            .unwrap(),
441            batch_size: register_histogram_with_registry!(
442                "batch_size",
443                "Distribution of size of transaction batch",
444                POSITIVE_INT_BUCKETS.to_vec(),
445                registry,
446            )
447            .unwrap(),
448            authority_state_handle_transaction_latency: register_histogram_with_registry!(
449                "authority_state_handle_transaction_latency",
450                "Latency of handling transactions",
451                LATENCY_SEC_BUCKETS.to_vec(),
452                registry,
453            )
454            .unwrap(),
455            execute_certificate_latency_single_writer,
456            execute_certificate_latency_shared_object,
457            internal_execution_latency: register_histogram_with_registry!(
458                "authority_state_internal_execution_latency",
459                "Latency of actual certificate executions",
460                LATENCY_SEC_BUCKETS.to_vec(),
461                registry,
462            )
463            .unwrap(),
464            execution_load_input_objects_latency: register_histogram_with_registry!(
465                "authority_state_execution_load_input_objects_latency",
466                "Latency of loading input objects for execution",
467                LOW_LATENCY_SEC_BUCKETS.to_vec(),
468                registry,
469            )
470            .unwrap(),
471            prepare_certificate_latency: register_histogram_with_registry!(
472                "authority_state_prepare_certificate_latency",
473                "Latency of executing certificates, before committing the results",
474                LATENCY_SEC_BUCKETS.to_vec(),
475                registry,
476            )
477            .unwrap(),
478            commit_certificate_latency: register_histogram_with_registry!(
479                "authority_state_commit_certificate_latency",
480                "Latency of committing certificate execution results",
481                LATENCY_SEC_BUCKETS.to_vec(),
482                registry,
483            )
484            .unwrap(),
485            db_checkpoint_latency: register_histogram_with_registry!(
486                "db_checkpoint_latency",
487                "Latency of checkpointing dbs",
488                LATENCY_SEC_BUCKETS.to_vec(),
489                registry,
490            ).unwrap(),
491            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
492                "transaction_manager_num_enqueued_certificates",
493                "Current number of certificates enqueued to TransactionManager",
494                &["result"],
495                registry,
496            )
497            .unwrap(),
498            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
499                "transaction_manager_num_missing_objects",
500                "Current number of missing objects in TransactionManager",
501                registry,
502            )
503            .unwrap(),
504            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
505                "transaction_manager_num_pending_certificates",
506                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
507                registry,
508            )
509            .unwrap(),
510            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
511                "transaction_manager_num_executing_certificates",
512                "Number of executing certificates, including queued and actually running certificates",
513                registry,
514            )
515            .unwrap(),
516            transaction_manager_num_ready: register_int_gauge_with_registry!(
517                "transaction_manager_num_ready",
518                "Number of ready transactions in TransactionManager",
519                registry,
520            )
521            .unwrap(),
522            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
523                "transaction_manager_object_cache_size",
524                "Current size of object-availability cache in TransactionManager",
525                registry,
526            )
527            .unwrap(),
528            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
529                "transaction_manager_object_cache_hits",
530                "Number of object-availability cache hits in TransactionManager",
531                registry,
532            )
533            .unwrap(),
534            authority_overload_status: register_int_gauge_with_registry!(
535                "authority_overload_status",
536                "Whether authority is current experiencing overload and enters load shedding mode.",
537                registry)
538            .unwrap(),
539            authority_load_shedding_percentage: register_int_gauge_with_registry!(
540                "authority_load_shedding_percentage",
541                "The percentage of transactions is shed when the authority is in load shedding mode.",
542                registry)
543            .unwrap(),
544            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
545                "transaction_manager_object_cache_misses",
546                "Number of object-availability cache misses in TransactionManager",
547                registry,
548            )
549            .unwrap(),
550            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
551                "transaction_manager_object_cache_evictions",
552                "Number of object-availability cache evictions in TransactionManager",
553                registry,
554            )
555            .unwrap(),
556            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
557                "transaction_manager_package_cache_size",
558                "Current size of package-availability cache in TransactionManager",
559                registry,
560            )
561            .unwrap(),
562            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
563                "transaction_manager_package_cache_hits",
564                "Number of package-availability cache hits in TransactionManager",
565                registry,
566            )
567            .unwrap(),
568            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
569                "transaction_manager_package_cache_misses",
570                "Number of package-availability cache misses in TransactionManager",
571                registry,
572            )
573            .unwrap(),
574            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
575                "transaction_manager_package_cache_evictions",
576                "Number of package-availability cache evictions in TransactionManager",
577                registry,
578            )
579            .unwrap(),
580            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
581                "transaction_manager_transaction_queue_age_s",
582                "Time spent in waiting for transaction in the queue",
583                LATENCY_SEC_BUCKETS.to_vec(),
584                registry,
585            )
586            .unwrap(),
587            transaction_overload_sources: register_int_counter_vec_with_registry!(
588                "transaction_overload_sources",
589                "Number of times each source indicates transaction overload.",
590                &["source"],
591                registry)
592            .unwrap(),
593            execution_driver_executed_transactions: register_int_counter_with_registry!(
594                "execution_driver_executed_transactions",
595                "Cumulative number of transaction executed by execution driver",
596                registry,
597            )
598            .unwrap(),
599            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
600                "execution_driver_dispatch_queue",
601                "Number of transaction pending in execution driver dispatch queue",
602                registry,
603            )
604            .unwrap(),
605            execution_queueing_delay_s: register_histogram_with_registry!(
606                "execution_queueing_delay_s",
607                "Queueing delay between a transaction is ready for execution until it starts executing.",
608                LATENCY_SEC_BUCKETS.to_vec(),
609                registry
610            )
611            .unwrap(),
612            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
613                "prepare_cert_gas_latency_ratio",
614                "The ratio of computation gas divided by VM execution latency.",
615                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
616                registry
617            )
618            .unwrap(),
619            execution_gas_latency_ratio: register_histogram_with_registry!(
620                "execution_gas_latency_ratio",
621                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
622                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
623                registry
624            )
625            .unwrap(),
626            skipped_consensus_txns: register_int_counter_with_registry!(
627                "skipped_consensus_txns",
628                "Total number of consensus transactions skipped",
629                registry,
630            )
631            .unwrap(),
632            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
633                "skipped_consensus_txns_cache_hit",
634                "Total number of consensus transactions skipped because of local cache hit",
635                registry,
636            )
637            .unwrap(),
638            post_processing_total_events_emitted: register_int_counter_with_registry!(
639                "post_processing_total_events_emitted",
640                "Total number of events emitted in post processing",
641                registry,
642            )
643            .unwrap(),
644            post_processing_total_tx_indexed: register_int_counter_with_registry!(
645                "post_processing_total_tx_indexed",
646                "Total number of txes indexed in post processing",
647                registry,
648            )
649            .unwrap(),
650            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
651                "post_processing_total_tx_had_event_processed",
652                "Total number of txes finished event processing in post processing",
653                registry,
654            )
655            .unwrap(),
656            post_processing_total_failures: register_int_counter_with_registry!(
657                "post_processing_total_failures",
658                "Total number of failure in post processing",
659                registry,
660            )
661            .unwrap(),
662            consensus_handler_processed: register_int_counter_vec_with_registry!(
663                "consensus_handler_processed",
664                "Number of transactions processed by consensus handler",
665                &["class"],
666                registry
667            ).unwrap(),
668            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
669                "consensus_handler_transaction_sizes",
670                "Sizes of each type of transactions processed by consensus handler",
671                &["class"],
672                POSITIVE_INT_BUCKETS.to_vec(),
673                registry
674            ).unwrap(),
675            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
676                "consensus_handler_num_low_scoring_authorities",
677                "Number of low scoring authorities based on reputation scores from consensus",
678                registry
679            ).unwrap(),
680            consensus_handler_scores: register_int_gauge_vec_with_registry!(
681                "consensus_handler_scores",
682                "scores from consensus for each authority",
683                &["authority"],
684                registry,
685            ).unwrap(),
686            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
687                "consensus_handler_deferred_transactions",
688                "Number of transactions deferred by consensus handler",
689                registry,
690            ).unwrap(),
691            consensus_handler_congested_transactions: register_int_counter_with_registry!(
692                "consensus_handler_congested_transactions",
693                "Number of transactions deferred by consensus handler due to congestion",
694                registry,
695            ).unwrap(),
696            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
697                "consensus_handler_cancelled_transactions",
698                "Number of transactions cancelled by consensus handler",
699                registry,
700            ).unwrap(),
701            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
702                "consensus_handler_max_congestion_control_object_costs",
703                "Max object costs for congestion control in the current consensus commit",
704                &["commit_type"],
705                registry,
706            ).unwrap(),
707            consensus_committed_subdags: register_int_counter_vec_with_registry!(
708                "consensus_committed_subdags",
709                "Number of committed subdags, sliced by leader",
710                &["authority"],
711                registry,
712            ).unwrap(),
713            consensus_committed_messages: register_int_gauge_vec_with_registry!(
714                "consensus_committed_messages",
715                "Total number of committed consensus messages, sliced by author",
716                &["authority"],
717                registry,
718            ).unwrap(),
719            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
720                "consensus_committed_user_transactions",
721                "Number of committed user transactions, sliced by submitter",
722                &["authority"],
723                registry,
724            ).unwrap(),
725            consensus_handler_leader_round: register_int_gauge_with_registry!(
726                "consensus_handler_leader_round",
727                "The leader round of the current consensus output being processed in the consensus handler",
728                registry,
729            ).unwrap(),
730            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
731            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
732            zklogin_sig_count: register_int_counter_with_registry!(
733                "zklogin_sig_count",
734                "Count of zkLogin signatures",
735                registry,
736            )
737            .unwrap(),
738            multisig_sig_count: register_int_counter_with_registry!(
739                "multisig_sig_count",
740                "Count of zkLogin signatures",
741                registry,
742            )
743            .unwrap(),
744            consensus_calculated_throughput: register_int_gauge_with_registry!(
745                "consensus_calculated_throughput",
746                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
747                registry,
748            ).unwrap(),
749            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
750                "consensus_calculated_throughput_profile",
751                "The current active calculated throughput profile",
752                registry
753            ).unwrap(),
754            execution_queueing_latency: LatencyObserver::new(),
755            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
756            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757        }
758    }
759
760    /// Reset metrics that contain `hostname` as one of the labels. This is
761    /// needed to avoid retaining metrics for long-gone committee members and
762    /// only exposing metrics for the committee in the current epoch.
763    pub fn reset_on_reconfigure(&self) {
764        self.consensus_committed_messages.reset();
765        self.consensus_handler_scores.reset();
766        self.consensus_committed_user_transactions.reset();
767    }
768}
769
770/// a Trait object for `Signer` that is:
771/// - Pin, i.e. confined to one place in memory (we don't want to copy private
772///   keys).
773/// - Sync, i.e. can be safely shared between threads.
774///
775/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
776pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
777
778pub struct AuthorityState {
779    // Fixed size, static, identity of the authority
780    /// The name of this authority.
781    pub name: AuthorityName,
782    /// The signature key of the authority.
783    pub secret: StableSyncAuthoritySigner,
784
785    /// The database
786    input_loader: TransactionInputLoader,
787    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
788
789    epoch_store: ArcSwap<AuthorityPerEpochStore>,
790
791    /// This lock denotes current 'execution epoch'.
792    /// Execution acquires read lock, checks certificate epoch and holds it
793    /// until all writes are complete. Reconfiguration acquires write lock,
794    /// changes the epoch and revert all transactions from previous epoch
795    /// that are executed but did not make into checkpoint.
796    execution_lock: RwLock<EpochId>,
797
798    pub indexes: Option<Arc<IndexStore>>,
799    pub rest_index: Option<Arc<RestIndexStore>>,
800
801    pub subscription_handler: Arc<SubscriptionHandler>,
802    pub checkpoint_store: Arc<CheckpointStore>,
803
804    committee_store: Arc<CommitteeStore>,
805
806    /// Manages pending certificates and their missing input objects.
807    transaction_manager: Arc<TransactionManager>,
808
809    /// Shuts down the execution task. Used only in testing.
810    #[cfg_attr(not(test), expect(unused))]
811    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
812
813    pub metrics: Arc<AuthorityMetrics>,
814    _pruner: AuthorityStorePruner,
815    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
816
817    /// Take db checkpoints of different dbs
818    db_checkpoint_config: DBCheckpointConfig,
819
820    pub config: NodeConfig,
821
822    /// Current overload status in this authority. Updated periodically.
823    pub overload_info: AuthorityOverloadInfo,
824
825    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
826
827    /// The chain identifier is derived from the digest of the genesis
828    /// checkpoint.
829    chain_identifier: ChainIdentifier,
830
831    pub(crate) congestion_tracker: Arc<CongestionTracker>,
832}
833
834/// The authority state encapsulates all state, drives execution, and ensures
835/// safety.
836///
837/// Note the authority operations can be accessed through a read ref (&) and do
838/// not require &mut. Internally a database is synchronized through a mutex
839/// lock.
840///
841/// Repeating valid commands should produce no changes and return no error.
842impl AuthorityState {
843    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
844        epoch_store.committee().authority_exists(&self.name)
845    }
846
847    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
848        epoch_store
849            .active_validators()
850            .iter()
851            .any(|a| AuthorityName::from(a) == self.name)
852    }
853
854    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
855        !self.is_committee_validator(epoch_store)
856    }
857
858    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
859        &self.committee_store
860    }
861
862    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
863        self.committee_store.clone()
864    }
865
866    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
867        &self.config.authority_overload_config
868    }
869
870    pub fn get_epoch_state_commitments(
871        &self,
872        epoch: EpochId,
873    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
874        self.checkpoint_store.get_epoch_state_commitments(epoch)
875    }
876
877    /// This is a private method and should be kept that way. It doesn't check
878    /// whether the provided transaction is a system transaction, and hence
879    /// can only be called internally.
880    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
881    async fn handle_transaction_impl(
882        &self,
883        transaction: VerifiedTransaction,
884        epoch_store: &Arc<AuthorityPerEpochStore>,
885    ) -> IotaResult<VerifiedSignedTransaction> {
886        // Ensure that validator cannot reconfigure while we are signing the tx
887        let _execution_lock = self.execution_lock_for_signing();
888
889        let protocol_config = epoch_store.protocol_config();
890        let reference_gas_price = epoch_store.reference_gas_price();
891
892        let epoch = epoch_store.epoch();
893
894        let tx_data = transaction.data().transaction_data();
895
896        // Note: the deny checks may do redundant package loads but:
897        // - they only load packages when there is an active package deny map
898        // - the loads are cached anyway
899        iota_transaction_checks::deny::check_transaction_for_signing(
900            tx_data,
901            transaction.tx_signatures(),
902            &transaction.input_objects()?,
903            &tx_data.receiving_objects(),
904            &self.config.transaction_deny_config,
905            self.get_backing_package_store().as_ref(),
906        )?;
907
908        // Load all transaction-related input objects.
909        // Authenticator input objects and the account object are loaded in the same
910        // call if there is a sender `MoveAuthenticator` signature present in the
911        // transaction.
912        let (tx_input_objects, tx_receiving_objects, auth_input_objects, account_object) =
913            self.read_objects_for_signing(&transaction, epoch)?;
914
915        // Get the sender `MoveAuthenticator`, if any.
916        // Only one `MoveAuthenticator` signature is possible, since it is not
917        // implemented for the sponsor at the moment.
918        let move_authenticator = transaction.sender_move_authenticator();
919
920        // Check the inputs for signing.
921        // If there is a sender `MoveAuthenticator` signature, its input objects and the
922        // account object are also checked and must be provided.
923        // It is also checked if there is enough gas to execute the transaction and its
924        // authenticators.
925        let (
926            gas_status,
927            tx_checked_input_objects,
928            auth_checked_input_objects,
929            authenticator_function_ref,
930        ) = self.check_transaction_inputs_for_signing(
931            protocol_config,
932            reference_gas_price,
933            tx_data,
934            tx_input_objects,
935            &tx_receiving_objects,
936            move_authenticator,
937            auth_input_objects,
938            account_object,
939        )?;
940
941        check_coin_deny_list_v1_during_signing(
942            tx_data.sender(),
943            &tx_checked_input_objects,
944            &tx_receiving_objects,
945            &auth_checked_input_objects,
946            &self.get_object_store(),
947        )?;
948
949        if let Some(move_authenticator) = move_authenticator {
950            // It is supposed that `MoveAuthenticator` availability is checked in
951            // `SenderSignedData::validity_check`.
952
953            let auth_checked_input_objects = auth_checked_input_objects
954                .expect("MoveAuthenticator input objects must be provided");
955            let authenticator_function_ref = authenticator_function_ref
956                .expect("AuthenticatorFunctionRefV1 object must be provided");
957
958            let (kind, signer, _) = tx_data.execution_parts();
959
960            // Execute the Move authenticator.
961            let validation_result = epoch_store.executor().authenticate_transaction(
962                self.get_backing_store().as_ref(),
963                protocol_config,
964                self.metrics.limits_metrics.clone(),
965                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
966                epoch_store
967                    .epoch_start_config()
968                    .epoch_data()
969                    .epoch_start_timestamp(),
970                gas_status,
971                move_authenticator.to_owned(),
972                authenticator_function_ref,
973                auth_checked_input_objects,
974                kind,
975                signer,
976                transaction.digest().to_owned(),
977                &mut None,
978            );
979
980            if let Err(validation_error) = validation_result {
981                return Err(IotaError::MoveAuthenticatorExecutionFailure {
982                    error: validation_error.to_string(),
983                });
984            }
985        }
986
987        let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
988
989        let signed_transaction =
990            VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
991
992        // Check and write locks, to signed transaction, into the database
993        // The call to self.set_transaction_lock checks the lock is not conflicting,
994        // and returns ConflictingTransaction error in case there is a lock on a
995        // different existing transaction.
996        self.get_cache_writer().try_acquire_transaction_locks(
997            epoch_store,
998            &owned_objects,
999            signed_transaction.clone(),
1000        )?;
1001
1002        Ok(signed_transaction)
1003    }
1004
1005    /// Initiate a new transaction.
1006    #[instrument(name = "handle_transaction", level = "trace", skip_all)]
1007    pub async fn handle_transaction(
1008        &self,
1009        epoch_store: &Arc<AuthorityPerEpochStore>,
1010        transaction: VerifiedTransaction,
1011    ) -> IotaResult<HandleTransactionResponse> {
1012        let tx_digest = *transaction.digest();
1013        debug!("handle_transaction");
1014
1015        // Ensure an idempotent answer.
1016        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1017            return Ok(HandleTransactionResponse { status });
1018        }
1019
1020        let _metrics_guard = self
1021            .metrics
1022            .authority_state_handle_transaction_latency
1023            .start_timer();
1024        self.metrics.tx_orders.inc();
1025
1026        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1027        match signed {
1028            Ok(s) => {
1029                if self.is_committee_validator(epoch_store) {
1030                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1031                        let tx = s.clone();
1032                        let validator_tx_finalizer = validator_tx_finalizer.clone();
1033                        let cache_reader = self.get_transaction_cache_reader().clone();
1034                        let epoch_store = epoch_store.clone();
1035                        spawn_monitored_task!(epoch_store.within_alive_epoch(
1036                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1037                        ));
1038                    }
1039                }
1040                Ok(HandleTransactionResponse {
1041                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
1042                })
1043            }
1044            // It happens frequently that while we are checking the validity of the transaction, it
1045            // has just been executed.
1046            // In that case, we could still return Ok to avoid showing confusing errors.
1047            Err(err) => Ok(HandleTransactionResponse {
1048                status: self
1049                    .get_transaction_status(&tx_digest, epoch_store)?
1050                    .ok_or(err)?
1051                    .1,
1052            }),
1053        }
1054    }
1055
1056    pub fn check_system_overload_at_signing(&self) -> bool {
1057        self.config
1058            .authority_overload_config
1059            .check_system_overload_at_signing
1060    }
1061
1062    pub fn check_system_overload_at_execution(&self) -> bool {
1063        self.config
1064            .authority_overload_config
1065            .check_system_overload_at_execution
1066    }
1067
1068    pub(crate) fn check_system_overload(
1069        &self,
1070        consensus_adapter: &Arc<ConsensusAdapter>,
1071        tx_data: &SenderSignedData,
1072        do_authority_overload_check: bool,
1073    ) -> IotaResult {
1074        if do_authority_overload_check {
1075            self.check_authority_overload(tx_data).tap_err(|_| {
1076                self.update_overload_metrics("execution_queue");
1077            })?;
1078        }
1079        self.transaction_manager
1080            .check_execution_overload(self.overload_config(), tx_data)
1081            .tap_err(|_| {
1082                self.update_overload_metrics("execution_pending");
1083            })?;
1084        consensus_adapter.check_consensus_overload().tap_err(|_| {
1085            self.update_overload_metrics("consensus");
1086        })?;
1087
1088        let pending_tx_count = self
1089            .get_cache_commit()
1090            .approximate_pending_transaction_count();
1091        if pending_tx_count
1092            > self
1093                .config
1094                .execution_cache_config
1095                .writeback_cache
1096                .backpressure_threshold_for_rpc()
1097        {
1098            return Err(IotaError::ValidatorOverloadedRetryAfter {
1099                retry_after_secs: 10,
1100            });
1101        }
1102
1103        Ok(())
1104    }
1105
1106    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1107        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1108            return Ok(());
1109        }
1110
1111        let load_shedding_percentage = self
1112            .overload_info
1113            .load_shedding_percentage
1114            .load(Ordering::Relaxed);
1115        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1116    }
1117
1118    fn update_overload_metrics(&self, source: &str) {
1119        self.metrics
1120            .transaction_overload_sources
1121            .with_label_values(&[source])
1122            .inc();
1123    }
1124
1125    /// Executes a certificate for its effects.
1126    #[instrument(level = "trace", skip_all)]
1127    pub async fn execute_certificate(
1128        &self,
1129        certificate: &VerifiedCertificate,
1130        epoch_store: &Arc<AuthorityPerEpochStore>,
1131    ) -> IotaResult<TransactionEffects> {
1132        let _metrics_guard = if certificate.contains_shared_object() {
1133            self.metrics
1134                .execute_certificate_latency_shared_object
1135                .start_timer()
1136        } else {
1137            self.metrics
1138                .execute_certificate_latency_single_writer
1139                .start_timer()
1140        };
1141        trace!("execute_certificate");
1142
1143        self.metrics.total_cert_attempts.inc();
1144
1145        if !certificate.contains_shared_object() {
1146            // Shared object transactions need to be sequenced by the consensus before
1147            // enqueueing for execution, done in
1148            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1149            // object transactions, they can be enqueued for execution immediately.
1150            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1151        }
1152
1153        // tx could be reverted when epoch ends, so we must be careful not to return a
1154        // result here after the epoch ends.
1155        epoch_store
1156            .within_alive_epoch(self.notify_read_effects(certificate))
1157            .await
1158            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1159            .and_then(|r| r)
1160    }
1161
1162    /// Internal logic to execute a certificate.
1163    ///
1164    /// Guarantees that
1165    /// - If input objects are available, return no permanent failure.
1166    /// - Execution and output commit are atomic. i.e. outputs are only written
1167    ///   to storage,
1168    /// on successful execution; crashed execution has no observable effect and
1169    /// can be retried.
1170    ///
1171    /// It is caller's responsibility to ensure input objects are available and
1172    /// locks are set. If this cannot be satisfied by the caller,
1173    /// execute_certificate() should be called instead.
1174    ///
1175    /// Should only be called within iota-core.
1176    #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1177    pub fn try_execute_immediately(
1178        &self,
1179        certificate: &VerifiedExecutableTransaction,
1180        expected_effects_digest: Option<TransactionEffectsDigest>,
1181        epoch_store: &Arc<AuthorityPerEpochStore>,
1182    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1183        let _scope = monitored_scope("Execution::try_execute_immediately");
1184        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1185
1186        let tx_digest = certificate.digest();
1187
1188        // Acquire a lock to prevent concurrent executions of the same transaction.
1189        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1190
1191        // The cert could have been processed by a concurrent attempt of the same cert,
1192        // so check if the effects have already been written.
1193        if let Some(effects) = self
1194            .get_transaction_cache_reader()
1195            .try_get_executed_effects(tx_digest)?
1196        {
1197            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1198                assert_eq!(
1199                    effects.digest(),
1200                    expected_effects_digest_inner,
1201                    "Unexpected effects digest for transaction {tx_digest:?}"
1202                );
1203            }
1204            tx_guard.release();
1205            return Ok((effects, None));
1206        }
1207
1208        let (tx_input_objects, authenticator_input_objects, account_object) =
1209            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1210
1211        // If no expected_effects_digest was provided, try to get it from storage.
1212        // We could be re-executing a previously executed but uncommitted transaction,
1213        // perhaps after restarting with a new binary. In this situation, if
1214        // we have published an effects signature, we must be sure not to
1215        // equivocate. TODO: read from cache instead of DB
1216        let expected_effects_digest =
1217            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1218
1219        self.process_certificate(
1220            tx_guard,
1221            certificate,
1222            tx_input_objects,
1223            account_object,
1224            authenticator_input_objects,
1225            expected_effects_digest,
1226            epoch_store,
1227        )
1228        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1229        .tap_ok(
1230            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1231        )
1232    }
1233
1234    pub fn read_objects_for_execution(
1235        &self,
1236        tx_lock: &CertLockGuard,
1237        certificate: &VerifiedExecutableTransaction,
1238        epoch_store: &Arc<AuthorityPerEpochStore>,
1239    ) -> IotaResult<(InputObjects, Option<InputObjects>, Option<ObjectReadResult>)> {
1240        let _scope = monitored_scope("Execution::load_input_objects");
1241        let _metrics_guard = self
1242            .metrics
1243            .execution_load_input_objects_latency
1244            .start_timer();
1245
1246        let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1247
1248        let input_objects = self.input_loader.read_objects_for_execution(
1249            epoch_store,
1250            &certificate.key(),
1251            tx_lock,
1252            &input_objects,
1253            epoch_store.epoch(),
1254        )?;
1255
1256        certificate.split_input_objects_into_groups_for_reading(input_objects)
1257    }
1258
1259    /// Test only wrapper for `try_execute_immediately()` above, useful for
1260    /// checking errors if the pre-conditions are not satisfied, and
1261    /// executing change epoch transactions.
1262    pub fn try_execute_for_test(
1263        &self,
1264        certificate: &VerifiedCertificate,
1265    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1266        let epoch_store = self.epoch_store_for_testing();
1267        let (effects, execution_error_opt) = self.try_execute_immediately(
1268            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1269            None,
1270            &epoch_store,
1271        )?;
1272        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1273        Ok((signed_effects, execution_error_opt))
1274    }
1275
1276    /// Non-fallible version of `try_execute_for_test()`.
1277    pub fn execute_for_test(
1278        &self,
1279        certificate: &VerifiedCertificate,
1280    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1281        self.try_execute_for_test(certificate)
1282            .expect("try_execute_for_test should not fail")
1283    }
1284
1285    pub async fn notify_read_effects(
1286        &self,
1287        certificate: &VerifiedCertificate,
1288    ) -> IotaResult<TransactionEffects> {
1289        self.get_transaction_cache_reader()
1290            .try_notify_read_executed_effects(&[*certificate.digest()])
1291            .await
1292            .map(|mut r| r.pop().expect("must return correct number of effects"))
1293    }
1294
1295    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1296        self.get_object_cache_reader()
1297            .try_check_owned_objects_are_live(owned_object_refs)
1298    }
1299
1300    /// This function captures the required state to debug a forked transaction.
1301    /// The dump is written to a file in dir `path`, with name prefixed by the
1302    /// transaction digest. NOTE: Since this info escapes the validator
1303    /// context, make sure not to leak any private info here
1304    pub(crate) fn debug_dump_transaction_state(
1305        &self,
1306        tx_digest: &TransactionDigest,
1307        effects: &TransactionEffects,
1308        expected_effects_digest: TransactionEffectsDigest,
1309        inner_temporary_store: &InnerTemporaryStore,
1310        certificate: &VerifiedExecutableTransaction,
1311        debug_dump_config: &StateDebugDumpConfig,
1312    ) -> IotaResult<PathBuf> {
1313        let dump_dir = debug_dump_config
1314            .dump_file_directory
1315            .as_ref()
1316            .cloned()
1317            .unwrap_or(std::env::temp_dir());
1318        let epoch_store = self.load_epoch_store_one_call_per_task();
1319
1320        NodeStateDump::new(
1321            tx_digest,
1322            effects,
1323            expected_effects_digest,
1324            self.get_object_store().as_ref(),
1325            &epoch_store,
1326            inner_temporary_store,
1327            certificate,
1328        )?
1329        .write_to_file(&dump_dir)
1330        .map_err(|e| IotaError::FileIO(e.to_string()))
1331    }
1332
1333    #[instrument(level = "trace", skip_all)]
1334    pub(crate) fn process_certificate(
1335        &self,
1336        tx_guard: CertTxGuard,
1337        certificate: &VerifiedExecutableTransaction,
1338        tx_input_objects: InputObjects,
1339        account_object: Option<ObjectReadResult>,
1340        authenticator_input_objects: Option<InputObjects>,
1341        expected_effects_digest: Option<TransactionEffectsDigest>,
1342        epoch_store: &Arc<AuthorityPerEpochStore>,
1343    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1344        let process_certificate_start_time = tokio::time::Instant::now();
1345        let digest = *certificate.digest();
1346
1347        fail_point_if!("correlated-crash-process-certificate", || {
1348            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1349                iota_simulator::task::kill_current_node(None);
1350            }
1351        });
1352
1353        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1354        // Any caller that verifies the signatures on the certificate will have already
1355        // checked the epoch. But paths that don't verify sigs (e.g. execution
1356        // from checkpoint, reading from db) present the possibility of an epoch
1357        // mismatch. If this cert is not finalzied in previous epoch, then it's
1358        // invalid.
1359        let execution_guard = match execution_guard {
1360            Ok(execution_guard) => execution_guard,
1361            Err(err) => {
1362                tx_guard.release();
1363                return Err(err);
1364            }
1365        };
1366        // Since we obtain a reference to the epoch store before taking the execution
1367        // lock, it's possible that reconfiguration has happened and they no
1368        // longer match.
1369        if *execution_guard != epoch_store.epoch() {
1370            tx_guard.release();
1371            info!("The epoch of the execution_guard doesn't match the epoch store");
1372            return Err(IotaError::WrongEpoch {
1373                expected_epoch: epoch_store.epoch(),
1374                actual_epoch: *execution_guard,
1375            });
1376        }
1377
1378        // Errors originating from prepare_certificate may be transient (failure to read
1379        // locks) or non-transient (transaction input is invalid, move vm
1380        // errors). However, all errors from this function occur before we have
1381        // written anything to the db, so we commit the tx guard and rely on the
1382        // client to retry the tx (if it was transient).
1383        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1384            &execution_guard,
1385            certificate,
1386            tx_input_objects,
1387            account_object,
1388            authenticator_input_objects,
1389            epoch_store,
1390        ) {
1391            Err(e) => {
1392                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1393                tx_guard.release();
1394                return Err(e);
1395            }
1396            Ok(res) => res,
1397        };
1398
1399        if let Some(expected_effects_digest) = expected_effects_digest {
1400            if effects.digest() != expected_effects_digest {
1401                // We dont want to mask the original error, so we log it and continue.
1402                match self.debug_dump_transaction_state(
1403                    &digest,
1404                    &effects,
1405                    expected_effects_digest,
1406                    &inner_temporary_store,
1407                    certificate,
1408                    &self.config.state_debug_dump_config,
1409                ) {
1410                    Ok(out_path) => {
1411                        info!(
1412                            "Dumped node state for transaction {} to {}",
1413                            digest,
1414                            out_path.as_path().display().to_string()
1415                        );
1416                    }
1417                    Err(e) => {
1418                        error!("Error dumping state for transaction {}: {e}", digest);
1419                    }
1420                }
1421                error!(
1422                    tx_digest = ?digest,
1423                    ?expected_effects_digest,
1424                    actual_effects = ?effects,
1425                    "fork detected!"
1426                );
1427                panic!(
1428                    "Transaction {} is expected to have effects digest {}, but got {}!",
1429                    digest,
1430                    expected_effects_digest,
1431                    effects.digest(),
1432                );
1433            }
1434        }
1435
1436        fail_point!("crash");
1437
1438        self.commit_certificate(
1439            certificate,
1440            inner_temporary_store,
1441            &effects,
1442            tx_guard,
1443            execution_guard,
1444            epoch_store,
1445        )?;
1446
1447        if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1448            certificate.data().transaction_data().kind()
1449        {
1450            if let Some(err) = &execution_error_opt {
1451                debug_fatal!("Authenticator state update failed: {:?}", err);
1452            }
1453            epoch_store.update_authenticator_state(auth_state);
1454
1455            // double check that the signature verifier always matches the authenticator
1456            // state
1457            if cfg!(debug_assertions) {
1458                let authenticator_state = get_authenticator_state(self.get_object_store())
1459                    .expect("Read cannot fail")
1460                    .expect("Authenticator state must exist");
1461
1462                let mut sys_jwks: Vec<_> = authenticator_state
1463                    .active_jwks
1464                    .into_iter()
1465                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1466                    .collect();
1467                let mut active_jwks: Vec<_> = epoch_store
1468                    .signature_verifier
1469                    .get_jwks()
1470                    .into_iter()
1471                    .collect();
1472                sys_jwks.sort();
1473                active_jwks.sort();
1474
1475                assert_eq!(sys_jwks, active_jwks);
1476            }
1477        }
1478
1479        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1480        if elapsed > 0.0 {
1481            self.metrics
1482                .execution_gas_latency_ratio
1483                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1484        };
1485        Ok((effects, execution_error_opt))
1486    }
1487
1488    #[instrument(level = "trace", skip_all)]
1489    fn commit_certificate(
1490        &self,
1491        certificate: &VerifiedExecutableTransaction,
1492        inner_temporary_store: InnerTemporaryStore,
1493        effects: &TransactionEffects,
1494        tx_guard: CertTxGuard,
1495        _execution_guard: ExecutionLockReadGuard<'_>,
1496        epoch_store: &Arc<AuthorityPerEpochStore>,
1497    ) -> IotaResult {
1498        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1499            monitored_scope("Execution::commit_certificate");
1500        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1501
1502        let tx_key = certificate.key();
1503        let tx_digest = certificate.digest();
1504        let input_object_count = inner_temporary_store.input_objects.len();
1505        let shared_object_count = effects.input_shared_objects().len();
1506
1507        let output_keys = inner_temporary_store.get_output_keys(effects);
1508
1509        // index certificate
1510        let _ = self
1511            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1512            .tap_err(|e| {
1513                self.metrics.post_processing_total_failures.inc();
1514                error!(?tx_digest, "tx post processing failed: {e}");
1515            });
1516
1517        // The insertion to epoch_store is not atomic with the insertion to the
1518        // perpetual store. This is OK because we insert to the epoch store
1519        // first. And during lookups we always look up in the perpetual store first.
1520        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1521
1522        // Allow testing what happens if we crash here.
1523        fail_point!("crash");
1524
1525        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1526            certificate.clone().into_unsigned(),
1527            effects.clone(),
1528            inner_temporary_store,
1529        );
1530        self.get_cache_writer()
1531            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1532
1533        if certificate.transaction_data().is_end_of_epoch_tx() {
1534            // At the end of epoch, since system packages may have been upgraded, force
1535            // reload them in the cache.
1536            self.get_object_cache_reader()
1537                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1538        }
1539
1540        // commit_certificate finished, the tx is fully committed to the store.
1541        tx_guard.commit_tx();
1542
1543        // Notifies transaction manager about transaction and output objects committed.
1544        // This provides necessary information to transaction manager to start executing
1545        // additional ready transactions.
1546        self.transaction_manager
1547            .notify_commit(tx_digest, output_keys, epoch_store);
1548
1549        self.update_metrics(certificate, input_object_count, shared_object_count);
1550
1551        Ok(())
1552    }
1553
1554    fn update_metrics(
1555        &self,
1556        certificate: &VerifiedExecutableTransaction,
1557        input_object_count: usize,
1558        shared_object_count: usize,
1559    ) {
1560        // count signature by scheme, for zklogin and multisig
1561        if certificate.has_zklogin_sig() {
1562            self.metrics.zklogin_sig_count.inc();
1563        } else if certificate.has_upgraded_multisig() {
1564            self.metrics.multisig_sig_count.inc();
1565        }
1566
1567        self.metrics.total_effects.inc();
1568        self.metrics.total_certs.inc();
1569
1570        if shared_object_count > 0 {
1571            self.metrics.shared_obj_tx.inc();
1572        }
1573
1574        if certificate.is_sponsored_tx() {
1575            self.metrics.sponsored_tx.inc();
1576        }
1577
1578        self.metrics
1579            .num_input_objs
1580            .observe(input_object_count as f64);
1581        self.metrics
1582            .num_shared_objects
1583            .observe(shared_object_count as f64);
1584        self.metrics.batch_size.observe(
1585            certificate
1586                .data()
1587                .intent_message()
1588                .value
1589                .kind()
1590                .num_commands() as f64,
1591        );
1592    }
1593
1594    /// prepare_certificate validates the transaction input, and executes the
1595    /// certificate, returning effects, output objects, events, etc.
1596    ///
1597    /// It reads state from the db (both owned and shared locks), but it has no
1598    /// side effects.
1599    ///
1600    /// It can be generally understood that a failure of prepare_certificate
1601    /// indicates a non-transient error, e.g. the transaction input is
1602    /// somehow invalid, the correct locks are not held, etc. However, this
1603    /// is not entirely true, as a transient db read error may also cause
1604    /// this function to fail.
1605    #[instrument(level = "trace", skip_all)]
1606    fn prepare_certificate(
1607        &self,
1608        _execution_guard: &ExecutionLockReadGuard<'_>,
1609        certificate: &VerifiedExecutableTransaction,
1610        tx_input_objects: InputObjects,
1611        account_object: Option<ObjectReadResult>,
1612        move_authenticator_input_objects: Option<InputObjects>,
1613        epoch_store: &Arc<AuthorityPerEpochStore>,
1614    ) -> IotaResult<(
1615        InnerTemporaryStore,
1616        TransactionEffects,
1617        Option<ExecutionError>,
1618    )> {
1619        let _scope = monitored_scope("Execution::prepare_certificate");
1620        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1621        let prepare_certificate_start_time = tokio::time::Instant::now();
1622
1623        let protocol_config = epoch_store.protocol_config();
1624
1625        let reference_gas_price = epoch_store.reference_gas_price();
1626
1627        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1628        let epoch_start_timestamp = epoch_store
1629            .epoch_start_config()
1630            .epoch_data()
1631            .epoch_start_timestamp();
1632
1633        let backing_store = self.get_backing_store().as_ref();
1634
1635        let tx_digest = *certificate.digest();
1636
1637        // TODO: We need to move this to a more appropriate place to avoid redundant
1638        // checks.
1639        let tx_data = certificate.data().transaction_data();
1640        tx_data.validity_check(protocol_config)?;
1641
1642        let (kind, signer, gas) = tx_data.execution_parts();
1643
1644        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1645        let (inner_temp_store, _, mut effects, execution_error_opt) = if let Some(
1646            move_authenticator,
1647        ) =
1648            certificate.sender_move_authenticator()
1649        {
1650            // Check if the sender needs to be authenticated in Move and, if so, execute the
1651            // authentication.
1652            // It is supposed that `MoveAuthenticator` availability is checked in
1653            // `SenderSignedData::validity_check`.
1654
1655            // Check basic `object_to_authenticate` preconditions and get its components.
1656            let (
1657                auth_account_object_id,
1658                auth_account_object_seq_number,
1659                auth_account_object_digest,
1660            ) = move_authenticator.object_to_authenticate_components()?;
1661
1662            // Since the `object_to_authenticate` components are provided, it is supposed
1663            // that the account object is loaded.
1664            let account_object = account_object.expect("Account object must be provided");
1665
1666            let authenticator_function_ref_for_execution = self.check_move_account(
1667                auth_account_object_id,
1668                auth_account_object_seq_number,
1669                auth_account_object_digest,
1670                account_object,
1671                &signer,
1672            )?;
1673
1674            let move_authenticator_input_objects = move_authenticator_input_objects.expect(
1675                    "In case of a `MoveAuthenticator` signature, the authenticator input objects must be provided",
1676                );
1677
1678            // Check the `MoveAuthenticator` input objects.
1679            // The `MoveAuthenticator` receiving objects are checked on the signing step.
1680            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
1681            // not a part of the transaction data.
1682            let authenticator_gas_budget = protocol_config.max_auth_gas();
1683            let (
1684                gas_status,
1685                authenticator_checked_input_objects,
1686                authenticator_and_tx_checked_input_objects,
1687            ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1688                certificate,
1689                tx_input_objects,
1690                move_authenticator_input_objects,
1691                authenticator_gas_budget,
1692                protocol_config,
1693                reference_gas_price,
1694            )?;
1695
1696            let owned_object_refs = authenticator_and_tx_checked_input_objects
1697                .inner()
1698                .filter_owned_objects();
1699            self.check_owned_locks(&owned_object_refs)?;
1700
1701            epoch_store
1702                .executor()
1703                .authenticate_then_execute_transaction_to_effects(
1704                    backing_store,
1705                    protocol_config,
1706                    self.metrics.limits_metrics.clone(),
1707                    self.config
1708                        .expensive_safety_check_config
1709                        .enable_deep_per_tx_iota_conservation_check(),
1710                    self.config.certificate_deny_config.certificate_deny_set(),
1711                    &epoch_id,
1712                    epoch_start_timestamp,
1713                    gas_status,
1714                    gas,
1715                    move_authenticator.to_owned(),
1716                    authenticator_function_ref_for_execution,
1717                    authenticator_checked_input_objects,
1718                    authenticator_and_tx_checked_input_objects,
1719                    kind,
1720                    signer,
1721                    tx_digest,
1722                    &mut None,
1723                )
1724        } else {
1725            // No Move authentication required, proceed to execute the transaction directly.
1726
1727            // The cost of partially re-auditing a transaction before execution is
1728            // tolerated.
1729            let (tx_gas_status, tx_checked_input_objects) =
1730                iota_transaction_checks::check_certificate_input(
1731                    certificate,
1732                    tx_input_objects,
1733                    protocol_config,
1734                    reference_gas_price,
1735                )?;
1736
1737            let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1738            self.check_owned_locks(&owned_object_refs)?;
1739            epoch_store.executor().execute_transaction_to_effects(
1740                backing_store,
1741                protocol_config,
1742                self.metrics.limits_metrics.clone(),
1743                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1744                // cyclic dependency w/ iota-adapter
1745                self.config
1746                    .expensive_safety_check_config
1747                    .enable_deep_per_tx_iota_conservation_check(),
1748                self.config.certificate_deny_config.certificate_deny_set(),
1749                &epoch_id,
1750                epoch_start_timestamp,
1751                tx_checked_input_objects,
1752                gas,
1753                tx_gas_status,
1754                kind,
1755                signer,
1756                tx_digest,
1757                &mut None,
1758            )
1759        };
1760
1761        fail_point_if!("cp_execution_nondeterminism", || {
1762            #[cfg(msim)]
1763            self.create_fail_state(certificate, epoch_store, &mut effects);
1764        });
1765
1766        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1767        if elapsed > 0.0 {
1768            self.metrics
1769                .prepare_cert_gas_latency_ratio
1770                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1771        }
1772
1773        Ok((inner_temp_store, effects, execution_error_opt.err()))
1774    }
1775
1776    pub fn prepare_certificate_for_benchmark(
1777        &self,
1778        certificate: &VerifiedExecutableTransaction,
1779        input_objects: InputObjects,
1780        epoch_store: &Arc<AuthorityPerEpochStore>,
1781    ) -> IotaResult<(
1782        InnerTemporaryStore,
1783        TransactionEffects,
1784        Option<ExecutionError>,
1785    )> {
1786        let lock = RwLock::new(epoch_store.epoch());
1787        let execution_guard = lock.try_read().unwrap();
1788
1789        self.prepare_certificate(
1790            &execution_guard,
1791            certificate,
1792            input_objects,
1793            None,
1794            None,
1795            epoch_store,
1796        )
1797    }
1798
1799    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1800    /// `VmChecks::Enabled` instead.
1801    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1802    #[allow(clippy::type_complexity)]
1803    pub fn dry_exec_transaction(
1804        &self,
1805        transaction: TransactionData,
1806        transaction_digest: TransactionDigest,
1807    ) -> IotaResult<(
1808        DryRunTransactionBlockResponse,
1809        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1810        TransactionEffects,
1811        Option<ObjectID>,
1812    )> {
1813        let epoch_store = self.load_epoch_store_one_call_per_task();
1814        if !self.is_fullnode(&epoch_store) {
1815            return Err(IotaError::UnsupportedFeature {
1816                error: "dry-exec is only supported on fullnodes".to_string(),
1817            });
1818        }
1819
1820        if transaction.kind().is_system_tx() {
1821            return Err(IotaError::UnsupportedFeature {
1822                error: "dry-exec does not support system transactions".to_string(),
1823            });
1824        }
1825
1826        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1827    }
1828
1829    #[allow(clippy::type_complexity)]
1830    pub fn dry_exec_transaction_for_benchmark(
1831        &self,
1832        transaction: TransactionData,
1833        transaction_digest: TransactionDigest,
1834    ) -> IotaResult<(
1835        DryRunTransactionBlockResponse,
1836        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1837        TransactionEffects,
1838        Option<ObjectID>,
1839    )> {
1840        let epoch_store = self.load_epoch_store_one_call_per_task();
1841        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1842    }
1843
1844    #[instrument(level = "trace", skip_all)]
1845    #[allow(clippy::type_complexity)]
1846    fn dry_exec_transaction_impl(
1847        &self,
1848        epoch_store: &AuthorityPerEpochStore,
1849        transaction: TransactionData,
1850        transaction_digest: TransactionDigest,
1851    ) -> IotaResult<(
1852        DryRunTransactionBlockResponse,
1853        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1854        TransactionEffects,
1855        Option<ObjectID>,
1856    )> {
1857        // Cheap validity checks for a transaction, including input size limits.
1858        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1859
1860        let input_object_kinds = transaction.input_objects()?;
1861        let receiving_object_refs = transaction.receiving_objects();
1862
1863        iota_transaction_checks::deny::check_transaction_for_signing(
1864            &transaction,
1865            &[],
1866            &input_object_kinds,
1867            &receiving_object_refs,
1868            &self.config.transaction_deny_config,
1869            self.get_backing_package_store().as_ref(),
1870        )?;
1871
1872        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1873            // We don't want to cache this transaction since it's a dry run.
1874            None,
1875            &input_object_kinds,
1876            &receiving_object_refs,
1877            epoch_store.epoch(),
1878        )?;
1879
1880        // make a gas object if one was not provided
1881        let mut transaction = transaction;
1882        let mut gas_object_refs = transaction.gas().to_vec();
1883        let reference_gas_price = epoch_store.reference_gas_price();
1884        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1885            let sender = transaction.gas_owner();
1886            let gas_object_id = ObjectID::random();
1887            let gas_object = Object::new_move(
1888                MoveObject::new_gas_coin(
1889                    OBJECT_START_VERSION,
1890                    gas_object_id,
1891                    SIMULATION_GAS_COIN_VALUE,
1892                ),
1893                Owner::AddressOwner(sender),
1894                TransactionDigest::genesis_marker(),
1895            );
1896            let gas_object_ref = gas_object.compute_object_reference();
1897            gas_object_refs = vec![gas_object_ref];
1898            // Add gas object to transaction gas payment
1899            transaction.gas_data_mut().payment = gas_object_refs.clone();
1900            (
1901                iota_transaction_checks::check_transaction_input_with_given_gas(
1902                    epoch_store.protocol_config(),
1903                    reference_gas_price,
1904                    &transaction,
1905                    input_objects,
1906                    receiving_objects,
1907                    gas_object,
1908                    &self.metrics.bytecode_verifier_metrics,
1909                    &self.config.verifier_signing_config,
1910                )?,
1911                Some(gas_object_id),
1912            )
1913        } else {
1914            // `MoveAuthenticator`s are not supported in dry runs, so we set the
1915            // `authenticator_gas_budget` to 0.
1916            let authenticator_gas_budget = 0;
1917
1918            (
1919                iota_transaction_checks::check_transaction_input(
1920                    epoch_store.protocol_config(),
1921                    reference_gas_price,
1922                    &transaction,
1923                    input_objects,
1924                    &receiving_objects,
1925                    &self.metrics.bytecode_verifier_metrics,
1926                    &self.config.verifier_signing_config,
1927                    authenticator_gas_budget,
1928                )?,
1929                None,
1930            )
1931        };
1932
1933        let protocol_config = epoch_store.protocol_config();
1934        let (kind, signer, _) = transaction.execution_parts();
1935
1936        let silent = true;
1937        let executor = iota_execution::executor(protocol_config, silent, None)
1938            .expect("Creating an executor should not fail here");
1939
1940        let expensive_checks = false;
1941        let (inner_temp_store, _, effects, execution_error) = executor
1942            .execute_transaction_to_effects(
1943                self.get_backing_store().as_ref(),
1944                protocol_config,
1945                self.metrics.limits_metrics.clone(),
1946                expensive_checks,
1947                self.config.certificate_deny_config.certificate_deny_set(),
1948                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1949                epoch_store
1950                    .epoch_start_config()
1951                    .epoch_data()
1952                    .epoch_start_timestamp(),
1953                checked_input_objects,
1954                gas_object_refs,
1955                gas_status,
1956                kind,
1957                signer,
1958                transaction_digest,
1959                &mut None,
1960            );
1961        let tx_digest = *effects.transaction_digest();
1962
1963        let module_cache =
1964            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1965
1966        let mut layout_resolver =
1967            epoch_store
1968                .executor()
1969                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1970                    &inner_temp_store,
1971                    self.get_backing_package_store(),
1972                )));
1973        // Returning empty vector here because we recalculate changes in the rpc layer.
1974        let object_changes = Vec::new();
1975
1976        // Returning empty vector here because we recalculate changes in the rpc layer.
1977        let balance_changes = Vec::new();
1978
1979        let written_with_kind = effects
1980            .created()
1981            .into_iter()
1982            .map(|(oref, _)| (oref, WriteKind::Create))
1983            .chain(
1984                effects
1985                    .unwrapped()
1986                    .into_iter()
1987                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1988            )
1989            .chain(
1990                effects
1991                    .mutated()
1992                    .into_iter()
1993                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
1994            )
1995            .map(|(oref, kind)| {
1996                let obj = inner_temp_store.written.get(&oref.0).unwrap();
1997                // TODO: Avoid clones.
1998                (oref.0, (oref, obj.clone(), kind))
1999            })
2000            .collect();
2001
2002        let execution_error_source = execution_error
2003            .as_ref()
2004            .err()
2005            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2006
2007        Ok((
2008            DryRunTransactionBlockResponse {
2009                // to avoid cloning `transaction`, fields are populated in this order
2010                suggested_gas_price: self
2011                    .congestion_tracker
2012                    .get_prediction_suggested_gas_price(&transaction),
2013                input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
2014                    .map_err(|e| IotaError::TransactionSerialization {
2015                        error: format!(
2016                            "Failed to convert transaction to IotaTransactionBlockData: {e}",
2017                        ),
2018                    })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
2019                effects: effects.clone().try_into()?,
2020                events: IotaTransactionBlockEvents::try_from(
2021                    inner_temp_store.events.clone(),
2022                    tx_digest,
2023                    None,
2024                    layout_resolver.as_mut(),
2025                )?,
2026                object_changes,
2027                balance_changes,
2028                execution_error_source,
2029            },
2030            written_with_kind,
2031            effects,
2032            mock_gas,
2033        ))
2034    }
2035
2036    pub fn simulate_transaction(
2037        &self,
2038        mut transaction: TransactionData,
2039        checks: VmChecks,
2040    ) -> IotaResult<SimulateTransactionResult> {
2041        if transaction.kind().is_system_tx() {
2042            return Err(IotaError::UnsupportedFeature {
2043                error: "simulate does not support system transactions".to_string(),
2044            });
2045        }
2046
2047        let epoch_store = self.load_epoch_store_one_call_per_task();
2048        if !self.is_fullnode(&epoch_store) {
2049            return Err(IotaError::UnsupportedFeature {
2050                error: "simulate is only supported on fullnodes".to_string(),
2051            });
2052        }
2053
2054        // Cheap validity checks for a transaction, including input size limits.
2055        // This does not check if gas objects are missing since we may create a
2056        // mock gas object. It checks for other transaction input validity.
2057        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2058
2059        let input_object_kinds = transaction.input_objects()?;
2060        let receiving_object_refs = transaction.receiving_objects();
2061
2062        // Since we need to simulate a validator signing the transaction, the first step
2063        // is to check if some transaction elements are denied.
2064        iota_transaction_checks::deny::check_transaction_for_signing(
2065            &transaction,
2066            &[],
2067            &input_object_kinds,
2068            &receiving_object_refs,
2069            &self.config.transaction_deny_config,
2070            self.get_backing_package_store().as_ref(),
2071        )?;
2072
2073        // Load input and receiving objects
2074        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2075            // We don't want to cache this transaction since it's a simulation.
2076            None,
2077            &input_object_kinds,
2078            &receiving_object_refs,
2079            epoch_store.epoch(),
2080        )?;
2081
2082        // Create a mock gas object if one was not provided
2083        let mock_gas_id = if transaction.gas().is_empty() {
2084            let mock_gas_object = Object::new_move(
2085                MoveObject::new_gas_coin(
2086                    OBJECT_START_VERSION,
2087                    ObjectID::MAX,
2088                    SIMULATION_GAS_COIN_VALUE,
2089                ),
2090                Owner::AddressOwner(transaction.gas_data().owner),
2091                TransactionDigest::genesis_marker(),
2092            );
2093            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2094            transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2095            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2096            Some(mock_gas_object.id())
2097        } else {
2098            None
2099        };
2100
2101        let protocol_config = epoch_store.protocol_config();
2102
2103        // `MoveAuthenticator`s are not supported in simulation, so we set the
2104        // `authenticator_gas_budget` to 0.
2105        let authenticator_gas_budget = 0;
2106
2107        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
2108        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
2109        let (gas_status, checked_input_objects) = if checks.enabled() {
2110            iota_transaction_checks::check_transaction_input(
2111                protocol_config,
2112                epoch_store.reference_gas_price(),
2113                &transaction,
2114                input_objects,
2115                &receiving_objects,
2116                &self.metrics.bytecode_verifier_metrics,
2117                &self.config.verifier_signing_config,
2118                authenticator_gas_budget,
2119            )?
2120        } else {
2121            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2122                protocol_config,
2123                transaction.kind(),
2124                input_objects,
2125                receiving_objects,
2126            )?;
2127            let gas_status = IotaGasStatus::new(
2128                transaction.gas_budget(),
2129                transaction.gas_price(),
2130                epoch_store.reference_gas_price(),
2131                protocol_config,
2132            )?;
2133
2134            (gas_status, checked_input_objects)
2135        };
2136
2137        // Create a new executor for the simulation
2138        let executor = iota_execution::executor(
2139            protocol_config,
2140            true, // silent
2141            None,
2142        )
2143        .expect("Creating an executor should not fail here");
2144
2145        // Execute the simulation
2146        let (kind, signer, gas_coins) = transaction.execution_parts();
2147        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2148            self.get_backing_store().as_ref(),
2149            protocol_config,
2150            self.metrics.limits_metrics.clone(),
2151            false, // expensive_checks
2152            self.config.certificate_deny_config.certificate_deny_set(),
2153            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2154            epoch_store
2155                .epoch_start_config()
2156                .epoch_data()
2157                .epoch_start_timestamp(),
2158            checked_input_objects,
2159            gas_coins,
2160            gas_status,
2161            kind,
2162            signer,
2163            transaction.digest(),
2164            checks.disabled(),
2165        );
2166
2167        // In the case of a dev inspect, the execution_result could be filled with some
2168        // values. Else, execution_result is empty in the case of a dry run.
2169        Ok(SimulateTransactionResult {
2170            input_objects: inner_temp_store.input_objects,
2171            output_objects: inner_temp_store.written,
2172            events: effects.events_digest().map(|_| inner_temp_store.events),
2173            effects,
2174            execution_result,
2175            mock_gas_id,
2176        })
2177    }
2178
2179    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
2180    /// `VmChecks::DISABLED` instead.
2181    /// The object ID for gas can be any
2182    /// object ID, even for an uncreated object
2183    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2184    pub async fn dev_inspect_transaction_block(
2185        &self,
2186        sender: IotaAddress,
2187        transaction_kind: TransactionKind,
2188        gas_price: Option<u64>,
2189        gas_budget: Option<u64>,
2190        gas_sponsor: Option<IotaAddress>,
2191        gas_objects: Option<Vec<ObjectRef>>,
2192        show_raw_txn_data_and_effects: Option<bool>,
2193        skip_checks: Option<bool>,
2194    ) -> IotaResult<DevInspectResults> {
2195        let epoch_store = self.load_epoch_store_one_call_per_task();
2196
2197        if !self.is_fullnode(&epoch_store) {
2198            return Err(IotaError::UnsupportedFeature {
2199                error: "dev-inspect is only supported on fullnodes".to_string(),
2200            });
2201        }
2202
2203        if transaction_kind.is_system_tx() {
2204            return Err(IotaError::UnsupportedFeature {
2205                error: "system transactions are not supported".to_string(),
2206            });
2207        }
2208
2209        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2210        let skip_checks = skip_checks.unwrap_or(true);
2211        let reference_gas_price = epoch_store.reference_gas_price();
2212        let protocol_config = epoch_store.protocol_config();
2213        let max_tx_gas = protocol_config.max_tx_gas();
2214
2215        let price = gas_price.unwrap_or(reference_gas_price);
2216        let budget = gas_budget.unwrap_or(max_tx_gas);
2217        let owner = gas_sponsor.unwrap_or(sender);
2218        // Payment might be empty here, but it's fine we'll have to deal with it later
2219        // after reading all the input objects.
2220        let payment = gas_objects.unwrap_or_default();
2221        let transaction = TransactionData::V1(TransactionDataV1 {
2222            kind: transaction_kind.clone(),
2223            sender,
2224            gas_data: GasData {
2225                payment,
2226                owner,
2227                price,
2228                budget,
2229            },
2230            expiration: TransactionExpiration::None,
2231        });
2232
2233        let raw_txn_data = if show_raw_txn_data_and_effects {
2234            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2235                error: "Failed to serialize transaction during dev inspect".to_string(),
2236            })?
2237        } else {
2238            vec![]
2239        };
2240
2241        transaction.validity_check_no_gas_check(protocol_config)?;
2242
2243        let input_object_kinds = transaction.input_objects()?;
2244        let receiving_object_refs = transaction.receiving_objects();
2245
2246        iota_transaction_checks::deny::check_transaction_for_signing(
2247            &transaction,
2248            &[],
2249            &input_object_kinds,
2250            &receiving_object_refs,
2251            &self.config.transaction_deny_config,
2252            self.get_backing_package_store().as_ref(),
2253        )?;
2254
2255        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2256            // We don't want to cache this transaction since it's a dev inspect.
2257            None,
2258            &input_object_kinds,
2259            &receiving_object_refs,
2260            epoch_store.epoch(),
2261        )?;
2262
2263        // Create and use a dummy gas object if there is no gas object provided.
2264        let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2265            SIMULATION_GAS_COIN_VALUE,
2266            transaction.gas_owner(),
2267        );
2268
2269        let gas_objects = if transaction.gas().is_empty() {
2270            let gas_object_ref = dummy_gas_object.compute_object_reference();
2271            vec![gas_object_ref]
2272        } else {
2273            transaction.gas().to_vec()
2274        };
2275
2276        let (gas_status, checked_input_objects) = if skip_checks {
2277            // If we are skipping checks, then we call the check_dev_inspect_input function
2278            // which will perform only lightweight checks on the transaction
2279            // input. And if the gas field is empty, that means we will
2280            // use the dummy gas object so we need to add it to the input objects vector.
2281            if transaction.gas().is_empty() {
2282                input_objects.push(ObjectReadResult::new(
2283                    InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2284                    dummy_gas_object.into(),
2285                ));
2286            }
2287            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2288                protocol_config,
2289                &transaction_kind,
2290                input_objects,
2291                receiving_objects,
2292            )?;
2293            let gas_status = IotaGasStatus::new(
2294                max_tx_gas,
2295                transaction.gas_price(),
2296                reference_gas_price,
2297                protocol_config,
2298            )?;
2299
2300            (gas_status, checked_input_objects)
2301        } else {
2302            // If we are not skipping checks, then we call the check_transaction_input
2303            // function and its dummy gas variant which will perform full
2304            // fledged checks just like a real transaction execution.
2305            if transaction.gas().is_empty() {
2306                iota_transaction_checks::check_transaction_input_with_given_gas(
2307                    epoch_store.protocol_config(),
2308                    reference_gas_price,
2309                    &transaction,
2310                    input_objects,
2311                    receiving_objects,
2312                    dummy_gas_object,
2313                    &self.metrics.bytecode_verifier_metrics,
2314                    &self.config.verifier_signing_config,
2315                )?
2316            } else {
2317                // `MoveAuthenticator`s are not supported in dev inspects, so we set the
2318                // `authenticator_gas_budget` to 0.
2319                let authenticator_gas_budget = 0;
2320
2321                iota_transaction_checks::check_transaction_input(
2322                    epoch_store.protocol_config(),
2323                    reference_gas_price,
2324                    &transaction,
2325                    input_objects,
2326                    &receiving_objects,
2327                    &self.metrics.bytecode_verifier_metrics,
2328                    &self.config.verifier_signing_config,
2329                    authenticator_gas_budget,
2330                )?
2331            }
2332        };
2333
2334        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2335            .expect("Creating an executor should not fail here");
2336        let intent_msg = IntentMessage::new(
2337            Intent {
2338                version: IntentVersion::V0,
2339                scope: IntentScope::TransactionData,
2340                app_id: IntentAppId::Iota,
2341            },
2342            transaction,
2343        );
2344        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2345        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2346            self.get_backing_store().as_ref(),
2347            protocol_config,
2348            self.metrics.limits_metrics.clone(),
2349            // expensive checks
2350            false,
2351            self.config.certificate_deny_config.certificate_deny_set(),
2352            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2353            epoch_store
2354                .epoch_start_config()
2355                .epoch_data()
2356                .epoch_start_timestamp(),
2357            checked_input_objects,
2358            gas_objects,
2359            gas_status,
2360            transaction_kind,
2361            sender,
2362            transaction_digest,
2363            skip_checks,
2364        );
2365
2366        let raw_effects = if show_raw_txn_data_and_effects {
2367            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2368                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2369            })?
2370        } else {
2371            vec![]
2372        };
2373
2374        let mut layout_resolver =
2375            epoch_store
2376                .executor()
2377                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2378                    &inner_temp_store,
2379                    self.get_backing_package_store(),
2380                )));
2381
2382        DevInspectResults::new(
2383            effects,
2384            inner_temp_store.events.clone(),
2385            execution_result,
2386            raw_txn_data,
2387            raw_effects,
2388            layout_resolver.as_mut(),
2389        )
2390    }
2391
2392    // Only used for testing because of how epoch store is loaded.
2393    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2394        let epoch_store = self.epoch_store_for_testing();
2395        Ok(epoch_store.reference_gas_price())
2396    }
2397
2398    #[instrument(level = "trace", skip_all)]
2399    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2400        self.get_transaction_cache_reader()
2401            .try_is_tx_already_executed(digest)
2402    }
2403
2404    /// Non-fallible version of `try_is_tx_already_executed`.
2405    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2406        self.try_is_tx_already_executed(digest)
2407            .expect("storage access failed")
2408    }
2409
2410    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2411    #[instrument(level = "debug", skip_all, err)]
2412    fn index_tx(
2413        &self,
2414        indexes: &IndexStore,
2415        digest: &TransactionDigest,
2416        // TODO: index_tx really just need the transaction data here.
2417        cert: &VerifiedExecutableTransaction,
2418        effects: &TransactionEffects,
2419        events: &TransactionEvents,
2420        timestamp_ms: u64,
2421        tx_coins: Option<TxCoins>,
2422        written: &WrittenObjects,
2423        inner_temporary_store: &InnerTemporaryStore,
2424    ) -> IotaResult<u64> {
2425        let changes = self
2426            .process_object_index(effects, written, inner_temporary_store)
2427            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2428
2429        indexes.index_tx(
2430            cert.data().intent_message().value.sender(),
2431            cert.data()
2432                .intent_message()
2433                .value
2434                .input_objects()?
2435                .iter()
2436                .map(|o| o.object_id()),
2437            effects
2438                .all_changed_objects()
2439                .into_iter()
2440                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2441            cert.data()
2442                .intent_message()
2443                .value
2444                .move_calls()
2445                .into_iter()
2446                .map(|(package, module, function)| {
2447                    (*package, module.to_owned(), function.to_owned())
2448                }),
2449            events,
2450            changes,
2451            digest,
2452            timestamp_ms,
2453            tx_coins,
2454        )
2455    }
2456
2457    #[cfg(msim)]
2458    fn create_fail_state(
2459        &self,
2460        certificate: &VerifiedExecutableTransaction,
2461        epoch_store: &Arc<AuthorityPerEpochStore>,
2462        effects: &mut TransactionEffects,
2463    ) {
2464        use std::cell::RefCell;
2465        thread_local! {
2466            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2467        }
2468        if !certificate.data().intent_message().value.is_system_tx() {
2469            let committee = epoch_store.committee();
2470            let cur_stake = (**committee).weight(&self.name);
2471            if cur_stake > 0 {
2472                FAIL_STATE.with_borrow_mut(|fail_state| {
2473                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2474                    if fail_state.0 < committee.validity_threshold() {
2475                        fail_state.0 += cur_stake;
2476                        fail_state.1.insert(self.name);
2477                    }
2478
2479                    if fail_state.1.contains(&self.name) {
2480                        info!("cp_exec failing tx");
2481                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2482                    }
2483                });
2484            }
2485        }
2486    }
2487
2488    fn process_object_index(
2489        &self,
2490        effects: &TransactionEffects,
2491        written: &WrittenObjects,
2492        inner_temporary_store: &InnerTemporaryStore,
2493    ) -> IotaResult<ObjectIndexChanges> {
2494        let epoch_store = self.load_epoch_store_one_call_per_task();
2495        let mut layout_resolver =
2496            epoch_store
2497                .executor()
2498                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2499                    inner_temporary_store,
2500                    self.get_backing_package_store(),
2501                )));
2502
2503        let modified_at_version = effects
2504            .modified_at_versions()
2505            .into_iter()
2506            .collect::<HashMap<_, _>>();
2507
2508        let tx_digest = effects.transaction_digest();
2509        let mut deleted_owners = vec![];
2510        let mut deleted_dynamic_fields = vec![];
2511        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2512            let old_version = modified_at_version.get(&id).unwrap();
2513            // When we process the index, the latest object hasn't been written yet so
2514            // the old object must be present.
2515            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2516                |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2517            ) {
2518                Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2519                Owner::ObjectOwner(object_id) => {
2520                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2521                }
2522                _ => {}
2523            }
2524        }
2525
2526        let mut new_owners = vec![];
2527        let mut new_dynamic_fields = vec![];
2528
2529        for (oref, owner, kind) in effects.all_changed_objects() {
2530            let id = &oref.0;
2531            // For mutated objects, retrieve old owner and delete old index if there is a
2532            // owner change.
2533            if let WriteKind::Mutate = kind {
2534                let Some(old_version) = modified_at_version.get(id) else {
2535                    panic!(
2536                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2537                    );
2538                };
2539                // When we process the index, the latest object hasn't been written yet so
2540                // the old object must be present.
2541                let Some(old_object) = self
2542                    .get_object_store()
2543                    .try_get_object_by_key(id, *old_version)?
2544                else {
2545                    panic!(
2546                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2547                    );
2548                };
2549                if old_object.owner != owner {
2550                    match old_object.owner {
2551                        Owner::AddressOwner(addr) => {
2552                            deleted_owners.push((addr, *id));
2553                        }
2554                        Owner::ObjectOwner(object_id) => {
2555                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2556                        }
2557                        _ => {}
2558                    }
2559                }
2560            }
2561
2562            match owner {
2563                Owner::AddressOwner(addr) => {
2564                    // TODO: We can remove the object fetching after we added ObjectType to
2565                    // TransactionEffects
2566                    let new_object = written.get(id).unwrap_or_else(
2567                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2568                    );
2569                    assert_eq!(
2570                        new_object.version(),
2571                        oref.1,
2572                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2573                        tx_digest,
2574                        id,
2575                        new_object.version(),
2576                        oref.1
2577                    );
2578
2579                    let type_ = new_object
2580                        .type_()
2581                        .map(|type_| ObjectType::Struct(type_.clone()))
2582                        .unwrap_or(ObjectType::Package);
2583
2584                    new_owners.push((
2585                        (addr, *id),
2586                        ObjectInfo {
2587                            object_id: *id,
2588                            version: oref.1,
2589                            digest: oref.2,
2590                            type_,
2591                            owner,
2592                            previous_transaction: *effects.transaction_digest(),
2593                        },
2594                    ));
2595                }
2596                Owner::ObjectOwner(owner) => {
2597                    let new_object = written.get(id).unwrap_or_else(
2598                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2599                    );
2600                    assert_eq!(
2601                        new_object.version(),
2602                        oref.1,
2603                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2604                        tx_digest,
2605                        id,
2606                        new_object.version(),
2607                        oref.1
2608                    );
2609
2610                    let Some(df_info) = self
2611                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut(), false)
2612                        .unwrap_or_else(|e| {
2613                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2614                            None
2615                        }
2616                    )
2617                        else {
2618                            // Skip indexing for non dynamic field objects.
2619                            continue;
2620                        };
2621                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2622                }
2623                _ => {}
2624            }
2625        }
2626
2627        Ok(ObjectIndexChanges {
2628            deleted_owners,
2629            deleted_dynamic_fields,
2630            new_owners,
2631            new_dynamic_fields,
2632        })
2633    }
2634
2635    fn try_create_dynamic_field_info(
2636        &self,
2637        o: &Object,
2638        written: &WrittenObjects,
2639        resolver: &mut dyn LayoutResolver,
2640        get_latest_object_version: bool,
2641    ) -> IotaResult<Option<DynamicFieldInfo>> {
2642        // Skip if not a move object
2643        let Some(move_object) = o.data.try_as_move().cloned() else {
2644            return Ok(None);
2645        };
2646
2647        // We only index dynamic field objects
2648        if !move_object.type_().is_dynamic_field() {
2649            return Ok(None);
2650        }
2651
2652        let layout = resolver
2653            .get_annotated_layout(&move_object.type_().clone().into())?
2654            .into_layout();
2655
2656        let field =
2657            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2658                IotaError::ObjectDeserialization {
2659                    error: e.to_string(),
2660                }
2661            })?;
2662
2663        let type_ = field.kind;
2664        let name_type: TypeTag = field.name_layout.into();
2665        let bcs_name = field.name_bytes.to_owned();
2666
2667        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2668            .map_err(|e| {
2669                warn!("{e}");
2670                IotaError::ObjectDeserialization {
2671                    error: e.to_string(),
2672                }
2673            })?;
2674
2675        let name = DynamicFieldName {
2676            type_: name_type,
2677            value: IotaMoveValue::from(name_value).to_json_value(),
2678        };
2679
2680        let value_metadata = field.value_metadata().map_err(|e| {
2681            warn!("{e}");
2682            IotaError::ObjectDeserialization {
2683                error: e.to_string(),
2684            }
2685        })?;
2686
2687        Ok(Some(match value_metadata {
2688            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2689                name,
2690                bcs_name,
2691                type_,
2692                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2693                object_id: o.id(),
2694                version: o.version(),
2695                digest: o.digest(),
2696            },
2697
2698            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2699                // Find the actual object from storage using the object id obtained from the
2700                // wrapper.
2701
2702                // Try to find the object in the written objects first.
2703                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2704                    (
2705                        object.version(),
2706                        object.digest(),
2707                        object.data.type_().unwrap().clone(),
2708                    )
2709                } else {
2710                    // If not found, try to find it in the database.
2711                    let object = if get_latest_object_version {
2712                        // Loading genesis object could meet a condition that the version of the
2713                        // genesis object is behind the one in the snapshot.
2714                        // In this case, the object can not be found with get_object_by_key, we need
2715                        // to use get_object instead. Since get_object is a heavier operation, we
2716                        // only allow to use it for genesis.
2717                        // reference: https://github.com/iotaledger/iota/issues/7267
2718                        self.get_object_store()
2719                            .try_get_object(&object_id)?
2720                            .ok_or_else(|| UserInputError::ObjectNotFound {
2721                                object_id,
2722                                version: Some(o.version()),
2723                            })?
2724                    } else {
2725                        // Non-genesis object should be in the database with the given version.
2726                        self.get_object_store()
2727                            .try_get_object_by_key(&object_id, o.version())?
2728                            .ok_or_else(|| UserInputError::ObjectNotFound {
2729                                object_id,
2730                                version: Some(o.version()),
2731                            })?
2732                    };
2733
2734                    (
2735                        object.version(),
2736                        object.digest(),
2737                        object.data.type_().unwrap().clone(),
2738                    )
2739                };
2740
2741                DynamicFieldInfo {
2742                    name,
2743                    bcs_name,
2744                    type_,
2745                    object_type: object_type.to_string(),
2746                    object_id,
2747                    version,
2748                    digest,
2749                }
2750            }
2751        }))
2752    }
2753
2754    #[instrument(level = "trace", skip_all, err)]
2755    fn post_process_one_tx(
2756        &self,
2757        certificate: &VerifiedExecutableTransaction,
2758        effects: &TransactionEffects,
2759        inner_temporary_store: &InnerTemporaryStore,
2760        epoch_store: &Arc<AuthorityPerEpochStore>,
2761    ) -> IotaResult {
2762        if self.indexes.is_none() {
2763            return Ok(());
2764        }
2765
2766        let tx_digest = certificate.digest();
2767        let timestamp_ms = Self::unixtime_now_ms();
2768        let events = &inner_temporary_store.events;
2769        let written = &inner_temporary_store.written;
2770        let tx_coins =
2771            self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2772
2773        // Index tx
2774        if let Some(indexes) = &self.indexes {
2775            let _ = self
2776                .index_tx(
2777                    indexes.as_ref(),
2778                    tx_digest,
2779                    certificate,
2780                    effects,
2781                    events,
2782                    timestamp_ms,
2783                    tx_coins,
2784                    written,
2785                    inner_temporary_store,
2786                )
2787                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2788                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2789                .expect("Indexing tx should not fail");
2790
2791            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2792            let events = self.make_transaction_block_events(
2793                events.clone(),
2794                *tx_digest,
2795                timestamp_ms,
2796                epoch_store,
2797                inner_temporary_store,
2798            )?;
2799            // Emit events
2800            self.subscription_handler
2801                .process_tx(certificate.data().transaction_data(), &effects, &events)
2802                .tap_ok(|_| {
2803                    self.metrics
2804                        .post_processing_total_tx_had_event_processed
2805                        .inc()
2806                })
2807                .tap_err(|e| {
2808                    warn!(
2809                        ?tx_digest,
2810                        "Post processing - Couldn't process events for tx: {}", e
2811                    )
2812                })?;
2813
2814            self.metrics
2815                .post_processing_total_events_emitted
2816                .inc_by(events.data.len() as u64);
2817        };
2818        Ok(())
2819    }
2820
2821    fn make_transaction_block_events(
2822        &self,
2823        transaction_events: TransactionEvents,
2824        digest: TransactionDigest,
2825        timestamp_ms: u64,
2826        epoch_store: &Arc<AuthorityPerEpochStore>,
2827        inner_temporary_store: &InnerTemporaryStore,
2828    ) -> IotaResult<IotaTransactionBlockEvents> {
2829        let mut layout_resolver =
2830            epoch_store
2831                .executor()
2832                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2833                    inner_temporary_store,
2834                    self.get_backing_package_store(),
2835                )));
2836        IotaTransactionBlockEvents::try_from(
2837            transaction_events,
2838            digest,
2839            Some(timestamp_ms),
2840            layout_resolver.as_mut(),
2841        )
2842    }
2843
2844    pub fn unixtime_now_ms() -> u64 {
2845        let now = SystemTime::now()
2846            .duration_since(UNIX_EPOCH)
2847            .expect("Time went backwards")
2848            .as_millis();
2849        u64::try_from(now).expect("Travelling in time machine")
2850    }
2851
2852    #[instrument(level = "trace", skip_all)]
2853    pub async fn handle_transaction_info_request(
2854        &self,
2855        request: TransactionInfoRequest,
2856    ) -> IotaResult<TransactionInfoResponse> {
2857        let epoch_store = self.load_epoch_store_one_call_per_task();
2858        let (transaction, status) = self
2859            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2860            .ok_or(IotaError::TransactionNotFound {
2861                digest: request.transaction_digest,
2862            })?;
2863        Ok(TransactionInfoResponse {
2864            transaction,
2865            status,
2866        })
2867    }
2868
2869    #[instrument(level = "trace", skip_all)]
2870    pub async fn handle_object_info_request(
2871        &self,
2872        request: ObjectInfoRequest,
2873    ) -> IotaResult<ObjectInfoResponse> {
2874        let epoch_store = self.load_epoch_store_one_call_per_task();
2875
2876        let requested_object_seq = match request.request_kind {
2877            ObjectInfoRequestKind::LatestObjectInfo => {
2878                let (_, seq, _) = self
2879                    .try_get_object_or_tombstone(request.object_id)
2880                    .await?
2881                    .ok_or_else(|| {
2882                        IotaError::from(UserInputError::ObjectNotFound {
2883                            object_id: request.object_id,
2884                            version: None,
2885                        })
2886                    })?;
2887                seq
2888            }
2889            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2890        };
2891
2892        let object = self
2893            .get_object_store()
2894            .try_get_object_by_key(&request.object_id, requested_object_seq)?
2895            .ok_or_else(|| {
2896                IotaError::from(UserInputError::ObjectNotFound {
2897                    object_id: request.object_id,
2898                    version: Some(requested_object_seq),
2899                })
2900            })?;
2901
2902        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2903            (request.generate_layout, object.data.try_as_move())
2904        {
2905            Some(into_struct_layout(
2906                epoch_store
2907                    .executor()
2908                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2909                    .get_annotated_layout(&move_obj.type_().clone().into())?,
2910            )?)
2911        } else {
2912            None
2913        };
2914
2915        let lock = if !object.is_address_owned() {
2916            // Only address owned objects have locks.
2917            None
2918        } else {
2919            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2920                .await?
2921                .map(|s| s.into_inner())
2922        };
2923
2924        Ok(ObjectInfoResponse {
2925            object,
2926            layout,
2927            lock_for_debugging: lock,
2928        })
2929    }
2930
2931    #[instrument(level = "trace", skip_all)]
2932    pub fn handle_checkpoint_request(
2933        &self,
2934        request: &CheckpointRequest,
2935    ) -> IotaResult<CheckpointResponse> {
2936        let summary = if request.certified {
2937            let summary = match request.sequence_number {
2938                Some(seq) => self
2939                    .checkpoint_store
2940                    .get_checkpoint_by_sequence_number(seq)?,
2941                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2942            }
2943            .map(|v| v.into_inner());
2944            summary.map(CheckpointSummaryResponse::Certified)
2945        } else {
2946            let summary = match request.sequence_number {
2947                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2948                None => self
2949                    .checkpoint_store
2950                    .get_latest_locally_computed_checkpoint()?,
2951            };
2952            summary.map(CheckpointSummaryResponse::Pending)
2953        };
2954        let contents = match &summary {
2955            Some(s) => self
2956                .checkpoint_store
2957                .get_checkpoint_contents(&s.content_digest())?,
2958            None => None,
2959        };
2960        Ok(CheckpointResponse {
2961            checkpoint: summary,
2962            contents,
2963        })
2964    }
2965
2966    fn check_protocol_version(
2967        supported_protocol_versions: SupportedProtocolVersions,
2968        current_version: ProtocolVersion,
2969    ) {
2970        info!("current protocol version is now {:?}", current_version);
2971        info!("supported versions are: {:?}", supported_protocol_versions);
2972        if !supported_protocol_versions.is_version_supported(current_version) {
2973            let msg = format!(
2974                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2975            );
2976
2977            error!("{}", msg);
2978            eprintln!("{msg}");
2979
2980            #[cfg(not(msim))]
2981            std::process::exit(1);
2982
2983            #[cfg(msim)]
2984            iota_simulator::task::shutdown_current_node();
2985        }
2986    }
2987
2988    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
2989    pub async fn new(
2990        name: AuthorityName,
2991        secret: StableSyncAuthoritySigner,
2992        supported_protocol_versions: SupportedProtocolVersions,
2993        store: Arc<AuthorityStore>,
2994        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2995        epoch_store: Arc<AuthorityPerEpochStore>,
2996        committee_store: Arc<CommitteeStore>,
2997        indexes: Option<Arc<IndexStore>>,
2998        rest_index: Option<Arc<RestIndexStore>>,
2999        checkpoint_store: Arc<CheckpointStore>,
3000        prometheus_registry: &Registry,
3001        genesis_objects: &[Object],
3002        db_checkpoint_config: &DBCheckpointConfig,
3003        config: NodeConfig,
3004        archive_readers: ArchiveReaderBalancer,
3005        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3006        chain_identifier: ChainIdentifier,
3007        pruner_db: Option<Arc<AuthorityPrunerTables>>,
3008    ) -> Arc<Self> {
3009        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3010
3011        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3012        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3013        let transaction_manager = Arc::new(TransactionManager::new(
3014            execution_cache_trait_pointers.object_cache_reader.clone(),
3015            execution_cache_trait_pointers
3016                .transaction_cache_reader
3017                .clone(),
3018            &epoch_store,
3019            tx_ready_certificates,
3020            metrics.clone(),
3021        ));
3022        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3023
3024        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3025            epoch_store.get_parent_path(),
3026            &config.authority_store_pruning_config,
3027        );
3028        let _pruner = AuthorityStorePruner::new(
3029            store.perpetual_tables.clone(),
3030            checkpoint_store.clone(),
3031            rest_index.clone(),
3032            indexes.clone(),
3033            config.authority_store_pruning_config.clone(),
3034            epoch_store.committee().authority_exists(&name),
3035            epoch_store.epoch_start_state().epoch_duration_ms(),
3036            prometheus_registry,
3037            archive_readers,
3038            pruner_db,
3039        );
3040        let input_loader =
3041            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3042        let epoch = epoch_store.epoch();
3043        let rgp = epoch_store.reference_gas_price();
3044        let state = Arc::new(AuthorityState {
3045            name,
3046            secret,
3047            execution_lock: RwLock::new(epoch),
3048            epoch_store: ArcSwap::new(epoch_store.clone()),
3049            input_loader,
3050            execution_cache_trait_pointers,
3051            indexes,
3052            rest_index,
3053            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3054            checkpoint_store,
3055            committee_store,
3056            transaction_manager,
3057            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3058            metrics,
3059            _pruner,
3060            _authority_per_epoch_pruner,
3061            db_checkpoint_config: db_checkpoint_config.clone(),
3062            config,
3063            overload_info: AuthorityOverloadInfo::default(),
3064            validator_tx_finalizer,
3065            chain_identifier,
3066            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3067        });
3068
3069        // Start a task to execute ready certificates.
3070        let authority_state = Arc::downgrade(&state);
3071        spawn_monitored_task!(execution_process(
3072            authority_state,
3073            rx_ready_certificates,
3074            rx_execution_shutdown,
3075        ));
3076
3077        // TODO: This doesn't belong to the constructor of AuthorityState.
3078        state
3079            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3080            .expect("Error indexing genesis objects.");
3081
3082        state
3083    }
3084
3085    // TODO: Consolidate our traits to reduce the number of methods here.
3086    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3087        &self.execution_cache_trait_pointers.object_cache_reader
3088    }
3089
3090    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3091        &self.execution_cache_trait_pointers.transaction_cache_reader
3092    }
3093
3094    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3095        &self.execution_cache_trait_pointers.cache_writer
3096    }
3097
3098    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3099        &self.execution_cache_trait_pointers.backing_store
3100    }
3101
3102    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3103        &self.execution_cache_trait_pointers.backing_package_store
3104    }
3105
3106    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3107        &self.execution_cache_trait_pointers.object_store
3108    }
3109
3110    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3111        &self.execution_cache_trait_pointers.reconfig_api
3112    }
3113
3114    pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
3115        &self.execution_cache_trait_pointers.accumulator_store
3116    }
3117
3118    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3119        &self.execution_cache_trait_pointers.checkpoint_cache
3120    }
3121
3122    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3123        &self.execution_cache_trait_pointers.state_sync_store
3124    }
3125
3126    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3127        &self.execution_cache_trait_pointers.cache_commit
3128    }
3129
3130    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3131        self.execution_cache_trait_pointers
3132            .testing_api
3133            .database_for_testing()
3134    }
3135
3136    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3137        &self,
3138        config: NodeConfig,
3139        metrics: Arc<AuthorityStorePruningMetrics>,
3140    ) -> anyhow::Result<()> {
3141        let archive_readers =
3142            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3143        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3144            &self.database_for_testing().perpetual_tables,
3145            &self.checkpoint_store,
3146            self.rest_index.as_deref(),
3147            None,
3148            config.authority_store_pruning_config,
3149            metrics,
3150            archive_readers,
3151            EPOCH_DURATION_MS_FOR_TESTING,
3152        )
3153        .await
3154    }
3155
3156    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3157        &self.transaction_manager
3158    }
3159
3160    /// Adds transactions / certificates to transaction manager for ordered
3161    /// execution.
3162    pub fn enqueue_transactions_for_execution(
3163        &self,
3164        txns: Vec<VerifiedExecutableTransaction>,
3165        epoch_store: &Arc<AuthorityPerEpochStore>,
3166    ) {
3167        self.transaction_manager.enqueue(txns, epoch_store)
3168    }
3169
3170    /// Adds certificates to transaction manager for ordered execution.
3171    pub fn enqueue_certificates_for_execution(
3172        &self,
3173        certs: Vec<VerifiedCertificate>,
3174        epoch_store: &Arc<AuthorityPerEpochStore>,
3175    ) {
3176        self.transaction_manager
3177            .enqueue_certificates(certs, epoch_store)
3178    }
3179
3180    pub fn enqueue_with_expected_effects_digest(
3181        &self,
3182        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3183        epoch_store: &AuthorityPerEpochStore,
3184    ) {
3185        self.transaction_manager
3186            .enqueue_with_expected_effects_digest(certs, epoch_store)
3187    }
3188
3189    fn create_owner_index_if_empty(
3190        &self,
3191        genesis_objects: &[Object],
3192        epoch_store: &Arc<AuthorityPerEpochStore>,
3193    ) -> IotaResult {
3194        let Some(index_store) = &self.indexes else {
3195            return Ok(());
3196        };
3197        if !index_store.is_empty() {
3198            return Ok(());
3199        }
3200
3201        let mut new_owners = vec![];
3202        let mut new_dynamic_fields = vec![];
3203        let mut layout_resolver = epoch_store
3204            .executor()
3205            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3206        for o in genesis_objects.iter() {
3207            match o.owner {
3208                Owner::AddressOwner(addr) => new_owners.push((
3209                    (addr, o.id()),
3210                    ObjectInfo::new(&o.compute_object_reference(), o),
3211                )),
3212                Owner::ObjectOwner(object_id) => {
3213                    let id = o.id();
3214                    let Some(info) = self.try_create_dynamic_field_info(
3215                        o,
3216                        &BTreeMap::new(),
3217                        layout_resolver.as_mut(),
3218                        true,
3219                    )?
3220                    else {
3221                        continue;
3222                    };
3223                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3224                }
3225                _ => {}
3226            }
3227        }
3228
3229        index_store.insert_genesis_objects(ObjectIndexChanges {
3230            deleted_owners: vec![],
3231            deleted_dynamic_fields: vec![],
3232            new_owners,
3233            new_dynamic_fields,
3234        })
3235    }
3236
3237    /// Attempts to acquire execution lock for an executable transaction.
3238    /// Returns the lock if the transaction is matching current executed epoch
3239    /// Returns None otherwise
3240    pub fn execution_lock_for_executable_transaction(
3241        &self,
3242        transaction: &VerifiedExecutableTransaction,
3243    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3244        let lock = self
3245            .execution_lock
3246            .try_read()
3247            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3248        if *lock == transaction.auth_sig().epoch() {
3249            Ok(lock)
3250        } else {
3251            Err(IotaError::WrongEpoch {
3252                expected_epoch: *lock,
3253                actual_epoch: transaction.auth_sig().epoch(),
3254            })
3255        }
3256    }
3257
3258    /// Acquires the execution lock for the duration of a transaction signing
3259    /// request. This prevents reconfiguration from starting until we are
3260    /// finished handling the signing request. Otherwise, in-memory lock
3261    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3262    /// while we are attempting to acquire locks for the transaction.
3263    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3264        self.execution_lock
3265            .try_read()
3266            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3267    }
3268
3269    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3270        self.execution_lock.write().await
3271    }
3272
3273    #[instrument(level = "error", skip_all)]
3274    pub async fn reconfigure(
3275        &self,
3276        cur_epoch_store: &AuthorityPerEpochStore,
3277        supported_protocol_versions: SupportedProtocolVersions,
3278        new_committee: Committee,
3279        epoch_start_configuration: EpochStartConfiguration,
3280        accumulator: Arc<StateAccumulator>,
3281        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3282        epoch_supply_change: i64,
3283        epoch_last_checkpoint: CheckpointSequenceNumber,
3284    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3285        Self::check_protocol_version(
3286            supported_protocol_versions,
3287            epoch_start_configuration
3288                .epoch_start_state()
3289                .protocol_version(),
3290        );
3291        self.metrics.reset_on_reconfigure();
3292        self.committee_store.insert_new_committee(&new_committee)?;
3293
3294        // Wait until no transactions are being executed.
3295        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3296
3297        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3298        cur_epoch_store.epoch_terminated().await;
3299
3300        let highest_locally_built_checkpoint_seq = self
3301            .checkpoint_store
3302            .get_latest_locally_computed_checkpoint()?
3303            .map(|c| *c.sequence_number())
3304            .unwrap_or(0);
3305
3306        assert!(
3307            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3308            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3309        );
3310        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3311            // if we built the last checkpoint locally (as opposed to receiving it from a
3312            // peer), then all shared_version_assignments except the one for the
3313            // ChangeEpoch transaction should have been removed
3314            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3315            // Note that while 1 is the typical value, 0 is possible if the node restarts
3316            // after committing the last checkpoint but before reconfiguring.
3317            if num_shared_version_assignments > 1 {
3318                // If this happens in prod, we have a memory leak, but not a correctness issue.
3319                debug_fatal!(
3320                    "all shared_version_assignments should have been removed \
3321                    (num_shared_version_assignments: {num_shared_version_assignments})"
3322                );
3323            }
3324        }
3325
3326        // Safe to reconfigure now. No transactions are being executed,
3327        // and no epoch-specific tasks are running.
3328
3329        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3330        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3331        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3332            .await?;
3333        self.get_reconfig_api()
3334            .clear_state_end_of_epoch(&execution_lock);
3335        self.check_system_consistency(
3336            cur_epoch_store,
3337            accumulator,
3338            expensive_safety_check_config,
3339            epoch_supply_change,
3340        )?;
3341        self.get_reconfig_api()
3342            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3343        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3344            if self
3345                .db_checkpoint_config
3346                .perform_db_checkpoints_at_epoch_end
3347            {
3348                let checkpoint_indexes = self
3349                    .db_checkpoint_config
3350                    .perform_index_db_checkpoints_at_epoch_end
3351                    .unwrap_or(false);
3352                let current_epoch = cur_epoch_store.epoch();
3353                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3354                self.checkpoint_all_dbs(
3355                    &epoch_checkpoint_path,
3356                    cur_epoch_store,
3357                    checkpoint_indexes,
3358                )?;
3359            }
3360        }
3361
3362        self.get_reconfig_api()
3363            .reconfigure_cache(&epoch_start_configuration)
3364            .await;
3365
3366        let new_epoch = new_committee.epoch;
3367        let new_epoch_store = self
3368            .reopen_epoch_db(
3369                cur_epoch_store,
3370                new_committee,
3371                epoch_start_configuration,
3372                expensive_safety_check_config,
3373                epoch_last_checkpoint,
3374            )
3375            .await?;
3376        assert_eq!(new_epoch_store.epoch(), new_epoch);
3377        self.transaction_manager.reconfigure(new_epoch);
3378        *execution_lock = new_epoch;
3379        // drop execution_lock after epoch store was updated
3380        // see also assert in AuthorityState::process_certificate
3381        // on the epoch store and execution lock epoch match
3382        Ok(new_epoch_store)
3383    }
3384
3385    /// Advance the epoch store to the next epoch for testing only.
3386    /// This only manually sets all the places where we have the epoch number.
3387    /// It doesn't properly reconfigure the node, hence should be only used for
3388    /// testing.
3389    pub async fn reconfigure_for_testing(&self) {
3390        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3391        let epoch_store = self.epoch_store_for_testing().clone();
3392        let protocol_config = epoch_store.protocol_config().clone();
3393        // The current protocol config used in the epoch store may have been overridden
3394        // and diverged from the protocol config definitions. That override may
3395        // have now been dropped when the initial guard was dropped. We reapply
3396        // the override before creating the new epoch store, to make sure that
3397        // the new epoch store has the same protocol config as the current one.
3398        // Since this is for testing only, we mostly like to keep the protocol config
3399        // the same across epochs.
3400        let _guard =
3401            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3402        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3403            self.get_backing_package_store().clone(),
3404            self.get_object_store().clone(),
3405            &self.config.expensive_safety_check_config,
3406            self.checkpoint_store
3407                .get_epoch_last_checkpoint(epoch_store.epoch())
3408                .unwrap()
3409                .map(|c| *c.sequence_number())
3410                .unwrap_or_default(),
3411        );
3412        let new_epoch = new_epoch_store.epoch();
3413        self.transaction_manager.reconfigure(new_epoch);
3414        self.epoch_store.store(new_epoch_store);
3415        epoch_store.epoch_terminated().await;
3416        *execution_lock = new_epoch;
3417    }
3418
3419    #[instrument(level = "error", skip_all)]
3420    fn check_system_consistency(
3421        &self,
3422        cur_epoch_store: &AuthorityPerEpochStore,
3423        accumulator: Arc<StateAccumulator>,
3424        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3425        epoch_supply_change: i64,
3426    ) -> IotaResult<()> {
3427        info!(
3428            "Performing iota conservation consistency check for epoch {}",
3429            cur_epoch_store.epoch()
3430        );
3431
3432        if cfg!(debug_assertions) {
3433            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3434        }
3435
3436        self.get_reconfig_api()
3437            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3438
3439        // check for root state hash consistency with live object set
3440        if expensive_safety_check_config.enable_state_consistency_check() {
3441            info!(
3442                "Performing state consistency check for epoch {}",
3443                cur_epoch_store.epoch()
3444            );
3445            self.expensive_check_is_consistent_state(
3446                accumulator,
3447                cur_epoch_store,
3448                cfg!(debug_assertions), // panic in debug mode only
3449            );
3450        }
3451
3452        if expensive_safety_check_config.enable_secondary_index_checks() {
3453            if let Some(indexes) = self.indexes.clone() {
3454                verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3455                    .expect("secondary indexes are inconsistent");
3456            }
3457        }
3458
3459        Ok(())
3460    }
3461
3462    fn expensive_check_is_consistent_state(
3463        &self,
3464        accumulator: Arc<StateAccumulator>,
3465        cur_epoch_store: &AuthorityPerEpochStore,
3466        panic: bool,
3467    ) {
3468        let live_object_set_hash = accumulator.digest_live_object_set();
3469
3470        let root_state_hash: ECMHLiveObjectSetDigest = self
3471            .get_accumulator_store()
3472            .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3473            .expect("Retrieving root state hash cannot fail")
3474            .expect("Root state hash for epoch must exist")
3475            .1
3476            .digest()
3477            .into();
3478
3479        let is_inconsistent = root_state_hash != live_object_set_hash;
3480        if is_inconsistent {
3481            if panic {
3482                panic!(
3483                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3484                );
3485            } else {
3486                error!(
3487                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3488                    root_state_hash, live_object_set_hash
3489                );
3490            }
3491        } else {
3492            info!("State consistency check passed");
3493        }
3494
3495        if !panic {
3496            accumulator.set_inconsistent_state(is_inconsistent);
3497        }
3498    }
3499
3500    pub fn current_epoch_for_testing(&self) -> EpochId {
3501        self.epoch_store_for_testing().epoch()
3502    }
3503
3504    #[instrument(level = "error", skip_all)]
3505    pub fn checkpoint_all_dbs(
3506        &self,
3507        checkpoint_path: &Path,
3508        cur_epoch_store: &AuthorityPerEpochStore,
3509        checkpoint_indexes: bool,
3510    ) -> IotaResult {
3511        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3512        let current_epoch = cur_epoch_store.epoch();
3513
3514        if checkpoint_path.exists() {
3515            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3516            return Ok(());
3517        }
3518
3519        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3520        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3521
3522        if checkpoint_path_tmp.exists() {
3523            fs::remove_dir_all(&checkpoint_path_tmp)
3524                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3525        }
3526
3527        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3528        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3529
3530        // NOTE: Do not change the order of invoking these checkpoint calls
3531        // We want to snapshot checkpoint db first to not race with state sync
3532        self.checkpoint_store
3533            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3534
3535        self.get_reconfig_api()
3536            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3537
3538        self.committee_store
3539            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3540
3541        if checkpoint_indexes {
3542            if let Some(indexes) = self.indexes.as_ref() {
3543                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3544            }
3545        }
3546
3547        fs::rename(checkpoint_path_tmp, checkpoint_path)
3548            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3549        Ok(())
3550    }
3551
3552    /// Load the current epoch store. This can change during reconfiguration. To
3553    /// ensure that we never end up accessing different epoch stores in a
3554    /// single task, we need to make sure that this is called once per task.
3555    /// Each call needs to be carefully audited to ensure it is
3556    /// the case. This also means we should minimize the number of call-sites.
3557    /// Only call it when there is no way to obtain it from somewhere else.
3558    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3559        self.epoch_store.load()
3560    }
3561
3562    // Load the epoch store, should be used in tests only.
3563    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3564        self.load_epoch_store_one_call_per_task()
3565    }
3566
3567    pub fn clone_committee_for_testing(&self) -> Committee {
3568        Committee::clone(self.epoch_store_for_testing().committee())
3569    }
3570
3571    #[instrument(level = "trace", skip_all)]
3572    pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3573        self.get_object_store()
3574            .try_get_object(object_id)
3575            .map_err(Into::into)
3576    }
3577
3578    /// Non-fallible version of `try_get_object`.
3579    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3580        self.try_get_object(object_id)
3581            .await
3582            .expect("storage access failed")
3583    }
3584
3585    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3586        Ok(self
3587            .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3588            .await?
3589            .expect("framework object should always exist")
3590            .compute_object_reference())
3591    }
3592
3593    // This function is only used for testing.
3594    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3595        self.get_object_cache_reader()
3596            .try_get_iota_system_state_object_unsafe()
3597    }
3598
3599    #[instrument(level = "trace", skip_all)]
3600    pub fn get_checkpoint_by_sequence_number(
3601        &self,
3602        sequence_number: CheckpointSequenceNumber,
3603    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3604        Ok(self
3605            .checkpoint_store
3606            .get_checkpoint_by_sequence_number(sequence_number)?)
3607    }
3608
3609    #[instrument(level = "trace", skip_all)]
3610    pub fn get_transaction_checkpoint_for_tests(
3611        &self,
3612        digest: &TransactionDigest,
3613        epoch_store: &AuthorityPerEpochStore,
3614    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3615        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3616        let Some(checkpoint) = checkpoint else {
3617            return Ok(None);
3618        };
3619        let checkpoint = self
3620            .checkpoint_store
3621            .get_checkpoint_by_sequence_number(checkpoint)?;
3622        Ok(checkpoint)
3623    }
3624
3625    #[instrument(level = "trace", skip_all)]
3626    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3627        Ok(
3628            match self
3629                .get_object_cache_reader()
3630                .try_get_latest_object_or_tombstone(*object_id)?
3631            {
3632                Some((_, ObjectOrTombstone::Object(object))) => {
3633                    let layout = self.get_object_layout(&object)?;
3634                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3635                }
3636                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3637                None => ObjectRead::NotExists(*object_id),
3638            },
3639        )
3640    }
3641
3642    /// Chain Identifier is the digest of the genesis checkpoint.
3643    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3644        self.chain_identifier
3645    }
3646
3647    #[instrument(level = "trace", skip_all)]
3648    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3649    where
3650        T: DeserializeOwned,
3651    {
3652        let o = self.get_object_read(object_id)?.into_object()?;
3653        if let Some(move_object) = o.data.try_as_move() {
3654            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3655                IotaError::ObjectDeserialization {
3656                    error: format!("{e}"),
3657                }
3658            })?)
3659        } else {
3660            Err(IotaError::ObjectDeserialization {
3661                error: format!("Provided object : [{object_id}] is not a Move object."),
3662            })
3663        }
3664    }
3665
3666    /// This function aims to serve rpc reads on past objects and
3667    /// we don't expect it to be called for other purposes.
3668    /// Depending on the object pruning policies that will be enforced in the
3669    /// future there is no software-level guarantee/SLA to retrieve an object
3670    /// with an old version even if it exists/existed.
3671    #[instrument(level = "trace", skip_all)]
3672    pub fn get_past_object_read(
3673        &self,
3674        object_id: &ObjectID,
3675        version: SequenceNumber,
3676    ) -> IotaResult<PastObjectRead> {
3677        // Firstly we see if the object ever existed by getting its latest data
3678        let Some(obj_ref) = self
3679            .get_object_cache_reader()
3680            .try_get_latest_object_ref_or_tombstone(*object_id)?
3681        else {
3682            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3683        };
3684
3685        if version > obj_ref.1 {
3686            return Ok(PastObjectRead::VersionTooHigh {
3687                object_id: *object_id,
3688                asked_version: version,
3689                latest_version: obj_ref.1,
3690            });
3691        }
3692
3693        if version < obj_ref.1 {
3694            // Read past objects
3695            return Ok(match self.read_object_at_version(object_id, version)? {
3696                Some((object, layout)) => {
3697                    let obj_ref = object.compute_object_reference();
3698                    PastObjectRead::VersionFound(obj_ref, object, layout)
3699                }
3700
3701                None => PastObjectRead::VersionNotFound(*object_id, version),
3702            });
3703        }
3704
3705        if !obj_ref.2.is_alive() {
3706            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3707        }
3708
3709        match self.read_object_at_version(object_id, obj_ref.1)? {
3710            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3711            None => {
3712                error!(
3713                    "Object with in parent_entry is missing from object store, datastore is \
3714                     inconsistent",
3715                );
3716                Err(UserInputError::ObjectNotFound {
3717                    object_id: *object_id,
3718                    version: Some(obj_ref.1),
3719                }
3720                .into())
3721            }
3722        }
3723    }
3724
3725    #[instrument(level = "trace", skip_all)]
3726    fn read_object_at_version(
3727        &self,
3728        object_id: &ObjectID,
3729        version: SequenceNumber,
3730    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3731        let Some(object) = self
3732            .get_object_cache_reader()
3733            .try_get_object_by_key(object_id, version)?
3734        else {
3735            return Ok(None);
3736        };
3737
3738        let layout = self.get_object_layout(&object)?;
3739        Ok(Some((object, layout)))
3740    }
3741
3742    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3743        let layout = object
3744            .data
3745            .try_as_move()
3746            .map(|object| {
3747                into_struct_layout(
3748                    self.load_epoch_store_one_call_per_task()
3749                        .executor()
3750                        // TODO(cache) - must read through cache
3751                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3752                        .get_annotated_layout(&object.type_().clone().into())?,
3753                )
3754            })
3755            .transpose()?;
3756        Ok(layout)
3757    }
3758
3759    fn get_owner_at_version(
3760        &self,
3761        object_id: &ObjectID,
3762        version: SequenceNumber,
3763    ) -> IotaResult<Owner> {
3764        self.get_object_store()
3765            .try_get_object_by_key(object_id, version)?
3766            .ok_or_else(|| {
3767                IotaError::from(UserInputError::ObjectNotFound {
3768                    object_id: *object_id,
3769                    version: Some(version),
3770                })
3771            })
3772            .map(|o| o.owner)
3773    }
3774
3775    #[instrument(level = "trace", skip_all)]
3776    pub fn get_owner_objects(
3777        &self,
3778        owner: IotaAddress,
3779        // If `Some`, the query will start from the next item after the specified cursor
3780        cursor: Option<ObjectID>,
3781        limit: usize,
3782        filter: Option<IotaObjectDataFilter>,
3783    ) -> IotaResult<Vec<ObjectInfo>> {
3784        if let Some(indexes) = &self.indexes {
3785            indexes.get_owner_objects(owner, cursor, limit, filter)
3786        } else {
3787            Err(IotaError::IndexStoreNotAvailable)
3788        }
3789    }
3790
3791    #[instrument(level = "trace", skip_all)]
3792    pub fn get_owned_coins_iterator_with_cursor(
3793        &self,
3794        owner: IotaAddress,
3795        // If `Some`, the query will start from the next item after the specified cursor
3796        cursor: (String, ObjectID),
3797        limit: usize,
3798        one_coin_type_only: bool,
3799    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3800        if let Some(indexes) = &self.indexes {
3801            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3802        } else {
3803            Err(IotaError::IndexStoreNotAvailable)
3804        }
3805    }
3806
3807    #[instrument(level = "trace", skip_all)]
3808    pub fn get_owner_objects_iterator(
3809        &self,
3810        owner: IotaAddress,
3811        // If `Some`, the query will start from the next item after the specified cursor
3812        cursor: Option<ObjectID>,
3813        filter: Option<IotaObjectDataFilter>,
3814    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3815        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3816        if let Some(indexes) = &self.indexes {
3817            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3818        } else {
3819            Err(IotaError::IndexStoreNotAvailable)
3820        }
3821    }
3822
3823    #[instrument(level = "trace", skip_all)]
3824    pub async fn get_move_objects<T>(
3825        &self,
3826        owner: IotaAddress,
3827        type_: MoveObjectType,
3828    ) -> IotaResult<Vec<T>>
3829    where
3830        T: DeserializeOwned,
3831    {
3832        let object_ids = self
3833            .get_owner_objects_iterator(owner, None, None)?
3834            .filter(|o| match &o.type_ {
3835                ObjectType::Struct(s) => &type_ == s,
3836                ObjectType::Package => false,
3837            })
3838            .map(|info| ObjectKey(info.object_id, info.version))
3839            .collect::<Vec<_>>();
3840        let mut move_objects = vec![];
3841
3842        let objects = self
3843            .get_object_store()
3844            .try_multi_get_objects_by_key(&object_ids)?;
3845
3846        for (o, id) in objects.into_iter().zip(object_ids) {
3847            let object = o.ok_or_else(|| {
3848                IotaError::from(UserInputError::ObjectNotFound {
3849                    object_id: id.0,
3850                    version: Some(id.1),
3851                })
3852            })?;
3853            let move_object = object.data.try_as_move().ok_or_else(|| {
3854                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3855            })?;
3856            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3857                IotaError::ObjectDeserialization {
3858                    error: format!("{e}"),
3859                }
3860            })?);
3861        }
3862        Ok(move_objects)
3863    }
3864
3865    #[instrument(level = "trace", skip_all)]
3866    pub fn get_dynamic_fields(
3867        &self,
3868        owner: ObjectID,
3869        // If `Some`, the query will start from the next item after the specified cursor
3870        cursor: Option<ObjectID>,
3871        limit: usize,
3872    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3873        Ok(self
3874            .get_dynamic_fields_iterator(owner, cursor)?
3875            .take(limit)
3876            .collect::<Result<Vec<_>, _>>()?)
3877    }
3878
3879    fn get_dynamic_fields_iterator(
3880        &self,
3881        owner: ObjectID,
3882        // If `Some`, the query will start from the next item after the specified cursor
3883        cursor: Option<ObjectID>,
3884    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3885    {
3886        if let Some(indexes) = &self.indexes {
3887            indexes.get_dynamic_fields_iterator(owner, cursor)
3888        } else {
3889            Err(IotaError::IndexStoreNotAvailable)
3890        }
3891    }
3892
3893    #[instrument(level = "trace", skip_all)]
3894    pub fn get_dynamic_field_object_id(
3895        &self,
3896        owner: ObjectID,
3897        name_type: TypeTag,
3898        name_bcs_bytes: &[u8],
3899    ) -> IotaResult<Option<ObjectID>> {
3900        if let Some(indexes) = &self.indexes {
3901            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3902        } else {
3903            Err(IotaError::IndexStoreNotAvailable)
3904        }
3905    }
3906
3907    #[instrument(level = "trace", skip_all)]
3908    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3909        Ok(self.get_indexes()?.next_sequence_number())
3910    }
3911
3912    #[instrument(level = "trace", skip_all)]
3913    pub async fn get_executed_transaction_and_effects(
3914        &self,
3915        digest: TransactionDigest,
3916        kv_store: Arc<TransactionKeyValueStore>,
3917    ) -> IotaResult<(Transaction, TransactionEffects)> {
3918        let transaction = kv_store.get_tx(digest).await?;
3919        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3920        Ok((transaction, effects))
3921    }
3922
3923    #[instrument(level = "trace", skip_all)]
3924    pub fn multi_get_checkpoint_by_sequence_number(
3925        &self,
3926        sequence_numbers: &[CheckpointSequenceNumber],
3927    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3928        Ok(self
3929            .checkpoint_store
3930            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3931    }
3932
3933    #[instrument(level = "trace", skip_all)]
3934    pub fn get_transaction_events(
3935        &self,
3936        digest: &TransactionEventsDigest,
3937    ) -> IotaResult<TransactionEvents> {
3938        self.get_transaction_cache_reader()
3939            .try_get_events(digest)?
3940            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3941    }
3942
3943    pub fn get_transaction_input_objects(
3944        &self,
3945        effects: &TransactionEffects,
3946    ) -> anyhow::Result<Vec<Object>> {
3947        iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
3948            .map_err(Into::into)
3949    }
3950
3951    pub fn get_transaction_output_objects(
3952        &self,
3953        effects: &TransactionEffects,
3954    ) -> anyhow::Result<Vec<Object>> {
3955        iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
3956            .map_err(Into::into)
3957    }
3958
3959    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3960        match &self.indexes {
3961            Some(i) => Ok(i.clone()),
3962            None => Err(IotaError::UnsupportedFeature {
3963                error: "extended object indexing is not enabled on this server".into(),
3964            }),
3965        }
3966    }
3967
3968    pub async fn get_transactions_for_tests(
3969        self: &Arc<Self>,
3970        filter: Option<TransactionFilter>,
3971        cursor: Option<TransactionDigest>,
3972        limit: Option<usize>,
3973        reverse: bool,
3974    ) -> IotaResult<Vec<TransactionDigest>> {
3975        let metrics = KeyValueStoreMetrics::new_for_tests();
3976        let kv_store = Arc::new(TransactionKeyValueStore::new(
3977            "rocksdb",
3978            metrics,
3979            self.clone(),
3980        ));
3981        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3982            .await
3983    }
3984
3985    #[instrument(level = "trace", skip_all)]
3986    pub async fn get_transactions(
3987        &self,
3988        kv_store: &Arc<TransactionKeyValueStore>,
3989        filter: Option<TransactionFilter>,
3990        // If `Some`, the query will start from the next item after the specified cursor
3991        cursor: Option<TransactionDigest>,
3992        limit: Option<usize>,
3993        reverse: bool,
3994    ) -> IotaResult<Vec<TransactionDigest>> {
3995        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3996            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3997            let iter = checkpoint_contents.iter().map(|c| c.transaction);
3998            if reverse {
3999                let iter = iter
4000                    .rev()
4001                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4002                    .skip(usize::from(cursor.is_some()));
4003                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4004            } else {
4005                let iter = iter
4006                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4007                    .skip(usize::from(cursor.is_some()));
4008                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4009            }
4010        }
4011        self.get_indexes()?
4012            .get_transactions(filter, cursor, limit, reverse)
4013    }
4014
4015    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4016        &self.checkpoint_store
4017    }
4018
4019    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4020        self.get_checkpoint_store()
4021            .get_highest_executed_checkpoint_seq_number()?
4022            .ok_or(IotaError::UserInput {
4023                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4024            })
4025    }
4026
4027    #[cfg(msim)]
4028    pub fn get_highest_pruned_checkpoint_for_testing(
4029        &self,
4030    ) -> IotaResult<CheckpointSequenceNumber> {
4031        self.database_for_testing()
4032            .perpetual_tables
4033            .get_highest_pruned_checkpoint()
4034            .map(|c| c.unwrap_or(0))
4035            .map_err(Into::into)
4036    }
4037
4038    #[instrument(level = "trace", skip_all)]
4039    pub fn get_checkpoint_summary_by_sequence_number(
4040        &self,
4041        sequence_number: CheckpointSequenceNumber,
4042    ) -> IotaResult<CheckpointSummary> {
4043        let verified_checkpoint = self
4044            .get_checkpoint_store()
4045            .get_checkpoint_by_sequence_number(sequence_number)?;
4046        match verified_checkpoint {
4047            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4048            None => Err(IotaError::UserInput {
4049                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4050            }),
4051        }
4052    }
4053
4054    #[instrument(level = "trace", skip_all)]
4055    pub fn get_checkpoint_summary_by_digest(
4056        &self,
4057        digest: CheckpointDigest,
4058    ) -> IotaResult<CheckpointSummary> {
4059        let verified_checkpoint = self
4060            .get_checkpoint_store()
4061            .get_checkpoint_by_digest(&digest)?;
4062        match verified_checkpoint {
4063            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4064            None => Err(IotaError::UserInput {
4065                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4066            }),
4067        }
4068    }
4069
4070    #[instrument(level = "trace", skip_all)]
4071    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4072        if is_system_package(package_id) {
4073            return self.find_genesis_txn_digest();
4074        }
4075        Ok(self
4076            .get_object_read(&package_id)?
4077            .into_object()?
4078            .previous_transaction)
4079    }
4080
4081    #[instrument(level = "trace", skip_all)]
4082    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4083        let summary = self
4084            .get_verified_checkpoint_by_sequence_number(0)?
4085            .into_message();
4086        let content = self.get_checkpoint_contents(summary.content_digest)?;
4087        let genesis_transaction = content.enumerate_transactions(&summary).next();
4088        Ok(genesis_transaction
4089            .ok_or(IotaError::UserInput {
4090                error: UserInputError::GenesisTransactionNotFound,
4091            })?
4092            .1
4093            .transaction)
4094    }
4095
4096    #[instrument(level = "trace", skip_all)]
4097    pub fn get_verified_checkpoint_by_sequence_number(
4098        &self,
4099        sequence_number: CheckpointSequenceNumber,
4100    ) -> IotaResult<VerifiedCheckpoint> {
4101        let verified_checkpoint = self
4102            .get_checkpoint_store()
4103            .get_checkpoint_by_sequence_number(sequence_number)?;
4104        match verified_checkpoint {
4105            Some(verified_checkpoint) => Ok(verified_checkpoint),
4106            None => Err(IotaError::UserInput {
4107                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4108            }),
4109        }
4110    }
4111
4112    #[instrument(level = "trace", skip_all)]
4113    pub fn get_verified_checkpoint_summary_by_digest(
4114        &self,
4115        digest: CheckpointDigest,
4116    ) -> IotaResult<VerifiedCheckpoint> {
4117        let verified_checkpoint = self
4118            .get_checkpoint_store()
4119            .get_checkpoint_by_digest(&digest)?;
4120        match verified_checkpoint {
4121            Some(verified_checkpoint) => Ok(verified_checkpoint),
4122            None => Err(IotaError::UserInput {
4123                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4124            }),
4125        }
4126    }
4127
4128    #[instrument(level = "trace", skip_all)]
4129    pub fn get_checkpoint_contents(
4130        &self,
4131        digest: CheckpointContentsDigest,
4132    ) -> IotaResult<CheckpointContents> {
4133        self.get_checkpoint_store()
4134            .get_checkpoint_contents(&digest)?
4135            .ok_or(IotaError::UserInput {
4136                error: UserInputError::CheckpointContentsNotFound(digest),
4137            })
4138    }
4139
4140    #[instrument(level = "trace", skip_all)]
4141    pub fn get_checkpoint_contents_by_sequence_number(
4142        &self,
4143        sequence_number: CheckpointSequenceNumber,
4144    ) -> IotaResult<CheckpointContents> {
4145        let verified_checkpoint = self
4146            .get_checkpoint_store()
4147            .get_checkpoint_by_sequence_number(sequence_number)?;
4148        match verified_checkpoint {
4149            Some(verified_checkpoint) => {
4150                let content_digest = verified_checkpoint.into_inner().content_digest;
4151                self.get_checkpoint_contents(content_digest)
4152            }
4153            None => Err(IotaError::UserInput {
4154                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4155            }),
4156        }
4157    }
4158
4159    #[instrument(level = "trace", skip_all)]
4160    pub async fn query_events(
4161        &self,
4162        kv_store: &Arc<TransactionKeyValueStore>,
4163        query: EventFilter,
4164        // If `Some`, the query will start from the next item after the specified cursor
4165        cursor: Option<EventID>,
4166        limit: usize,
4167        descending: bool,
4168    ) -> IotaResult<Vec<IotaEvent>> {
4169        let index_store = self.get_indexes()?;
4170
4171        // Get the tx_num from tx_digest
4172        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4173            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4174                IotaError::TransactionNotFound {
4175                    digest: cursor.tx_digest,
4176                },
4177            )?;
4178            (tx_seq, cursor.event_seq as usize)
4179        } else if descending {
4180            (u64::MAX, usize::MAX)
4181        } else {
4182            (0, 0)
4183        };
4184
4185        let limit = limit + 1;
4186        let mut event_keys = match query {
4187            EventFilter::All(filters) => {
4188                if filters.is_empty() {
4189                    index_store.all_events(tx_num, event_num, limit, descending)?
4190                } else {
4191                    return Err(IotaError::UserInput {
4192                        error: UserInputError::Unsupported(
4193                            "This query type does not currently support filter combinations"
4194                                .to_string(),
4195                        ),
4196                    });
4197                }
4198            }
4199            EventFilter::Transaction(digest) => {
4200                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4201            }
4202            EventFilter::MoveModule { package, module } => {
4203                let module_id = ModuleId::new(package.into(), module);
4204                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4205            }
4206            EventFilter::MoveEventType(struct_name) => index_store
4207                .events_by_move_event_struct_name(
4208                    &struct_name,
4209                    tx_num,
4210                    event_num,
4211                    limit,
4212                    descending,
4213                )?,
4214            EventFilter::Sender(sender) => {
4215                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4216            }
4217            EventFilter::TimeRange {
4218                start_time,
4219                end_time,
4220            } => index_store
4221                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4222            EventFilter::MoveEventModule { package, module } => index_store
4223                .events_by_move_event_module(
4224                    &ModuleId::new(package.into(), module),
4225                    tx_num,
4226                    event_num,
4227                    limit,
4228                    descending,
4229                )?,
4230            // not using "_ =>" because we want to make sure we remember to add new variants here
4231            EventFilter::Package(_)
4232            | EventFilter::MoveEventField { .. }
4233            | EventFilter::Any(_)
4234            | EventFilter::And(_, _)
4235            | EventFilter::Or(_, _) => {
4236                return Err(IotaError::UserInput {
4237                    error: UserInputError::Unsupported(
4238                        "This query type is not supported by the full node.".to_string(),
4239                    ),
4240                });
4241            }
4242        };
4243
4244        // skip one event if exclusive cursor is provided,
4245        // otherwise truncate to the original limit.
4246        if cursor.is_some() {
4247            if !event_keys.is_empty() {
4248                event_keys.remove(0);
4249            }
4250        } else {
4251            event_keys.truncate(limit - 1);
4252        }
4253
4254        // get the unique set of digests from the event_keys
4255        let transaction_digests = event_keys
4256            .iter()
4257            .map(|(_, digest, _, _)| *digest)
4258            .collect::<HashSet<_>>()
4259            .into_iter()
4260            .collect::<Vec<_>>();
4261
4262        let events = kv_store
4263            .multi_get_events_by_tx_digests(&transaction_digests)
4264            .await?;
4265
4266        let events_map: HashMap<_, _> =
4267            transaction_digests.iter().zip(events.into_iter()).collect();
4268
4269        let stored_events = event_keys
4270            .into_iter()
4271            .map(|k| {
4272                (
4273                    k,
4274                    events_map
4275                        .get(&k.1)
4276                        .expect("fetched digest is missing")
4277                        .clone()
4278                        .and_then(|e| e.data.get(k.2).cloned()),
4279                )
4280            })
4281            .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4282                event
4283                    .map(|e| (e, tx_digest, event_seq, timestamp))
4284                    .ok_or(IotaError::TransactionEventsNotFound { digest })
4285            })
4286            .collect::<Result<Vec<_>, _>>()?;
4287
4288        let epoch_store = self.load_epoch_store_one_call_per_task();
4289        let backing_store = self.get_backing_package_store().as_ref();
4290        let mut layout_resolver = epoch_store
4291            .executor()
4292            .type_layout_resolver(Box::new(backing_store));
4293        let mut events = vec![];
4294        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4295            events.push(IotaEvent::try_from(
4296                e.clone(),
4297                tx_digest,
4298                event_seq as u64,
4299                Some(timestamp),
4300                layout_resolver.get_annotated_layout(&e.type_)?,
4301            )?)
4302        }
4303        Ok(events)
4304    }
4305
4306    pub async fn insert_genesis_object(&self, object: Object) {
4307        self.get_reconfig_api()
4308            .try_insert_genesis_object(object)
4309            .expect("Cannot insert genesis object")
4310    }
4311
4312    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4313        futures::future::join_all(
4314            objects
4315                .iter()
4316                .map(|o| self.insert_genesis_object(o.clone())),
4317        )
4318        .await;
4319    }
4320
4321    /// Make a status response for a transaction
4322    #[instrument(level = "trace", skip_all)]
4323    pub fn get_transaction_status(
4324        &self,
4325        transaction_digest: &TransactionDigest,
4326        epoch_store: &Arc<AuthorityPerEpochStore>,
4327    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4328        // TODO: In the case of read path, we should not have to re-sign the effects.
4329        if let Some(effects) =
4330            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4331        {
4332            if let Some(transaction) = self
4333                .get_transaction_cache_reader()
4334                .try_get_transaction_block(transaction_digest)?
4335            {
4336                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4337                let events = if let Some(digest) = effects.events_digest() {
4338                    self.get_transaction_events(digest)?
4339                } else {
4340                    TransactionEvents::default()
4341                };
4342                return Ok(Some((
4343                    (*transaction).clone().into_message(),
4344                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4345                )));
4346            } else {
4347                // The read of effects and read of transaction are not atomic. It's possible
4348                // that we reverted the transaction (during epoch change) in
4349                // between the above two reads, and we end up having effects but
4350                // not transaction. In this case, we just fall through.
4351                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4352            }
4353        }
4354        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4355            self.metrics.tx_already_processed.inc();
4356            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4357            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4358        } else {
4359            Ok(None)
4360        }
4361    }
4362
4363    /// Get the signed effects of the given transaction. If the effects was
4364    /// signed in a previous epoch, re-sign it so that the caller is able to
4365    /// form a cert of the effects in the current epoch.
4366    #[instrument(level = "trace", skip_all)]
4367    pub fn get_signed_effects_and_maybe_resign(
4368        &self,
4369        transaction_digest: &TransactionDigest,
4370        epoch_store: &Arc<AuthorityPerEpochStore>,
4371    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4372        let effects = self
4373            .get_transaction_cache_reader()
4374            .try_get_executed_effects(transaction_digest)?;
4375        match effects {
4376            Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4377            None => Ok(None),
4378        }
4379    }
4380
4381    #[instrument(level = "trace", skip_all)]
4382    pub(crate) fn sign_effects(
4383        &self,
4384        effects: TransactionEffects,
4385        epoch_store: &Arc<AuthorityPerEpochStore>,
4386    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4387        let tx_digest = *effects.transaction_digest();
4388        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4389            Some(sig) if sig.epoch == epoch_store.epoch() => {
4390                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4391            }
4392            _ => {
4393                // If the transaction was executed in previous epochs, the validator will
4394                // re-sign the effects with new current epoch so that a client is always able to
4395                // obtain an effects certificate at the current epoch.
4396                //
4397                // Why is this necessary? Consider the following case:
4398                // - assume there are 4 validators
4399                // - Quorum driver gets 2 signed effects before reconfig halt
4400                // - The tx makes it into final checkpoint.
4401                // - 2 validators go away and are replaced in the new epoch.
4402                // - The new epoch begins.
4403                // - The quorum driver cannot complete the partial effects cert from the
4404                //   previous epoch, because it may not be able to reach either of the 2 former
4405                //   validators.
4406                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4407                //   the new epoch, the QD can make a new effects cert and return it to the
4408                //   client.
4409                //
4410                // This is a considered a short-term workaround. Eventually, Quorum Driver
4411                // should be able to return either an effects certificate, -or-
4412                // a proof of inclusion in a checkpoint. In the case above, the
4413                // Quorum Driver would return a proof of inclusion in the final
4414                // checkpoint, and this code would no longer be necessary.
4415                debug!(
4416                    ?tx_digest,
4417                    epoch=?epoch_store.epoch(),
4418                    "Re-signing the effects with the current epoch"
4419                );
4420
4421                let sig = AuthoritySignInfo::new(
4422                    epoch_store.epoch(),
4423                    &effects,
4424                    Intent::iota_app(IntentScope::TransactionEffects),
4425                    self.name,
4426                    &*self.secret,
4427                );
4428
4429                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4430
4431                epoch_store.insert_effects_digest_and_signature(
4432                    &tx_digest,
4433                    effects.digest(),
4434                    &sig,
4435                )?;
4436
4437                effects
4438            }
4439        };
4440
4441        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4442            signed_effects,
4443        ))
4444    }
4445
4446    // Returns coin objects for indexing for fullnode if indexing is enabled.
4447    #[instrument(level = "trace", skip_all)]
4448    fn fullnode_only_get_tx_coins_for_indexing(
4449        &self,
4450        inner_temporary_store: &InnerTemporaryStore,
4451        epoch_store: &Arc<AuthorityPerEpochStore>,
4452    ) -> Option<TxCoins> {
4453        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4454            return None;
4455        }
4456        let written_coin_objects = inner_temporary_store
4457            .written
4458            .iter()
4459            .filter_map(|(k, v)| {
4460                if v.is_coin() {
4461                    Some((*k, v.clone()))
4462                } else {
4463                    None
4464                }
4465            })
4466            .collect();
4467        let input_coin_objects = inner_temporary_store
4468            .input_objects
4469            .iter()
4470            .filter_map(|(k, v)| {
4471                if v.is_coin() {
4472                    Some((*k, v.clone()))
4473                } else {
4474                    None
4475                }
4476            })
4477            .collect::<ObjectMap>();
4478        Some((input_coin_objects, written_coin_objects))
4479    }
4480
4481    /// Get the TransactionEnvelope that currently locks the given object, if
4482    /// any. Since object locks are only valid for one epoch, we also need
4483    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4484    /// no lock records for the given object can be found.
4485    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4486    /// object record is at a different version.
4487    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4488    /// certain transaction. Returns None if the a lock record is
4489    /// initialized for the given ObjectRef but not yet locked by any
4490    /// transaction,     or cannot find the transaction in transaction
4491    /// table, because of data race etc.
4492    #[instrument(level = "trace", skip_all)]
4493    pub async fn get_transaction_lock(
4494        &self,
4495        object_ref: &ObjectRef,
4496        epoch_store: &AuthorityPerEpochStore,
4497    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4498        let lock_info = self
4499            .get_object_cache_reader()
4500            .try_get_lock(*object_ref, epoch_store)?;
4501        let lock_info = match lock_info {
4502            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4503                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4504                    provided_obj_ref: *object_ref,
4505                    current_version: locked_ref.1,
4506                }
4507                .into());
4508            }
4509            ObjectLockStatus::Initialized => {
4510                return Ok(None);
4511            }
4512            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4513        };
4514
4515        epoch_store.get_signed_transaction(&lock_info)
4516    }
4517
4518    pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4519        self.get_object_cache_reader().try_get_objects(objects)
4520    }
4521
4522    /// Non-fallible version of `try_get_objects`.
4523    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4524        self.try_get_objects(objects)
4525            .await
4526            .expect("storage access failed")
4527    }
4528
4529    pub async fn try_get_object_or_tombstone(
4530        &self,
4531        object_id: ObjectID,
4532    ) -> IotaResult<Option<ObjectRef>> {
4533        self.get_object_cache_reader()
4534            .try_get_latest_object_ref_or_tombstone(object_id)
4535    }
4536
4537    /// Non-fallible version of `try_get_object_or_tombstone`.
4538    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4539        self.try_get_object_or_tombstone(object_id)
4540            .await
4541            .expect("storage access failed")
4542    }
4543
4544    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4545    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4546    /// upgrade.
4547    ///
4548    /// This method can be used to dynamic adjust the amount of buffer. If set
4549    /// to 0, the upgrade will go through with only 2f+1 votes.
4550    ///
4551    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4552    /// should have the same value), or you risk halting the chain.
4553    pub fn set_override_protocol_upgrade_buffer_stake(
4554        &self,
4555        expected_epoch: EpochId,
4556        buffer_stake_bps: u64,
4557    ) -> IotaResult {
4558        let epoch_store = self.load_epoch_store_one_call_per_task();
4559        let actual_epoch = epoch_store.epoch();
4560        if actual_epoch != expected_epoch {
4561            return Err(IotaError::WrongEpoch {
4562                expected_epoch,
4563                actual_epoch,
4564            });
4565        }
4566
4567        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4568    }
4569
4570    pub fn clear_override_protocol_upgrade_buffer_stake(
4571        &self,
4572        expected_epoch: EpochId,
4573    ) -> IotaResult {
4574        let epoch_store = self.load_epoch_store_one_call_per_task();
4575        let actual_epoch = epoch_store.epoch();
4576        if actual_epoch != expected_epoch {
4577            return Err(IotaError::WrongEpoch {
4578                expected_epoch,
4579                actual_epoch,
4580            });
4581        }
4582
4583        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4584    }
4585
4586    /// Get the set of system packages that are compiled in to this build, if
4587    /// those packages are compatible with the current versions of those
4588    /// packages on-chain.
4589    pub async fn get_available_system_packages(
4590        &self,
4591        binary_config: &BinaryConfig,
4592    ) -> Vec<ObjectRef> {
4593        let mut results = vec![];
4594
4595        let system_packages = BuiltInFramework::iter_system_packages();
4596
4597        // Add extra framework packages during simtest
4598        #[cfg(msim)]
4599        let extra_packages = framework_injection::get_extra_packages(self.name);
4600        #[cfg(msim)]
4601        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4602
4603        for system_package in system_packages {
4604            let modules = system_package.modules().to_vec();
4605            // In simtests, we could override the current built-in framework packages.
4606            #[cfg(msim)]
4607            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4608                .unwrap_or(modules);
4609
4610            let Some(obj_ref) = iota_framework::compare_system_package(
4611                &self.get_object_store(),
4612                &system_package.id,
4613                &modules,
4614                system_package.dependencies.to_vec(),
4615                binary_config,
4616            )
4617            .await
4618            else {
4619                return vec![];
4620            };
4621            results.push(obj_ref);
4622        }
4623
4624        results
4625    }
4626
4627    /// Return the new versions, module bytes, and dependencies for the packages
4628    /// that have been committed to for a framework upgrade, in
4629    /// `system_packages`.  Loads the module contents from the binary, and
4630    /// performs the following checks:
4631    ///
4632    /// - Whether its contents matches what is on-chain already, in which case
4633    ///   no upgrade is required, and its contents are omitted from the output.
4634    /// - Whether the contents in the binary can form a package whose digest
4635    ///   matches the input, meaning the framework will be upgraded, and this
4636    ///   authority can satisfy that upgrade, in which case the contents are
4637    ///   included in the output.
4638    ///
4639    /// If the current version of the framework can't be loaded, the binary does
4640    /// not contain the bytes for that framework ID, or the resulting
4641    /// package fails the digest check, `None` is returned indicating that
4642    /// this authority cannot run the upgrade that the network voted on.
4643    async fn get_system_package_bytes(
4644        &self,
4645        system_packages: Vec<ObjectRef>,
4646        binary_config: &BinaryConfig,
4647    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4648        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4649        let objects = self.get_objects(&ids).await;
4650
4651        let mut res = Vec::with_capacity(system_packages.len());
4652        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4653            let prev_transaction = match object {
4654                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4655                    // Skip this one because it doesn't need to be upgraded.
4656                    info!("Framework {} does not need updating", system_package_ref.0);
4657                    continue;
4658                }
4659
4660                Some(cur_object) => cur_object.previous_transaction,
4661                None => TransactionDigest::genesis_marker(),
4662            };
4663
4664            #[cfg(msim)]
4665            let SystemPackage {
4666                id: _,
4667                bytes,
4668                dependencies,
4669            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4670                .unwrap_or_else(|| {
4671                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4672                });
4673
4674            #[cfg(not(msim))]
4675            let SystemPackage {
4676                id: _,
4677                bytes,
4678                dependencies,
4679            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4680
4681            let modules: Vec<_> = bytes
4682                .iter()
4683                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4684                .collect();
4685
4686            let new_object = Object::new_system_package(
4687                &modules,
4688                system_package_ref.1,
4689                dependencies.clone(),
4690                prev_transaction,
4691            );
4692
4693            let new_ref = new_object.compute_object_reference();
4694            if new_ref != system_package_ref {
4695                error!(
4696                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4697                );
4698                return None;
4699            }
4700
4701            res.push((system_package_ref.1, bytes, dependencies));
4702        }
4703
4704        Some(res)
4705    }
4706
4707    /// Returns the new protocol version and system packages that the network
4708    /// has voted to upgrade to. If the proposed protocol version is not
4709    /// supported, None is returned.
4710    fn is_protocol_version_supported_v1(
4711        proposed_protocol_version: ProtocolVersion,
4712        committee: &Committee,
4713        capabilities: Vec<AuthorityCapabilitiesV1>,
4714        mut buffer_stake_bps: u64,
4715    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4716        if buffer_stake_bps > 10000 {
4717            warn!("clamping buffer_stake_bps to 10000");
4718            buffer_stake_bps = 10000;
4719        }
4720
4721        // For each validator, gather the protocol version and system packages that it
4722        // would like to upgrade to in the next epoch.
4723        let mut desired_upgrades: Vec<_> = capabilities
4724            .into_iter()
4725            .filter_map(|mut cap| {
4726                // A validator that lists no packages is voting against any change at all.
4727                if cap.available_system_packages.is_empty() {
4728                    return None;
4729                }
4730
4731                cap.available_system_packages.sort();
4732
4733                info!(
4734                    "validator {:?} supports {:?} with system packages: {:?}",
4735                    cap.authority.concise(),
4736                    cap.supported_protocol_versions,
4737                    cap.available_system_packages,
4738                );
4739
4740                // A validator that only supports the current protocol version is also voting
4741                // against any change, because framework upgrades always require a protocol
4742                // version bump.
4743                cap.supported_protocol_versions
4744                    .get_version_digest(proposed_protocol_version)
4745                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4746            })
4747            .collect();
4748
4749        // There can only be one set of votes that have a majority, find one if it
4750        // exists.
4751        desired_upgrades.sort();
4752        desired_upgrades
4753            .into_iter()
4754            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4755            .into_iter()
4756            .find_map(|((digest, packages), group)| {
4757                // should have been filtered out earlier.
4758                assert!(!packages.is_empty());
4759
4760                let mut stake_aggregator: StakeAggregator<(), true> =
4761                    StakeAggregator::new(Arc::new(committee.clone()));
4762
4763                for (_, _, authority) in group {
4764                    stake_aggregator.insert_generic(authority, ());
4765                }
4766
4767                let total_votes = stake_aggregator.total_votes();
4768                let quorum_threshold = committee.quorum_threshold();
4769                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4770
4771                info!(
4772                    protocol_config_digest = ?digest,
4773                    ?total_votes,
4774                    ?quorum_threshold,
4775                    ?buffer_stake_bps,
4776                    ?effective_threshold,
4777                    ?proposed_protocol_version,
4778                    ?packages,
4779                    "support for upgrade"
4780                );
4781
4782                let has_support = total_votes >= effective_threshold;
4783                has_support.then_some((proposed_protocol_version, digest, packages))
4784            })
4785    }
4786
4787    /// Selects the highest supported protocol version and system packages that
4788    /// the network has voted to upgrade to. If no upgrade is supported,
4789    /// returns the current protocol version and system packages.
4790    fn choose_protocol_version_and_system_packages_v1(
4791        current_protocol_version: ProtocolVersion,
4792        current_protocol_digest: Digest,
4793        committee: &Committee,
4794        capabilities: Vec<AuthorityCapabilitiesV1>,
4795        buffer_stake_bps: u64,
4796    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4797        let mut next_protocol_version = current_protocol_version;
4798        let mut system_packages = vec![];
4799        let mut protocol_version_digest = current_protocol_digest;
4800
4801        // Finds the highest supported protocol version and system packages by
4802        // incrementing the proposed protocol version by one until no further
4803        // upgrades are supported.
4804        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4805            next_protocol_version + 1,
4806            committee,
4807            capabilities.clone(),
4808            buffer_stake_bps,
4809        ) {
4810            next_protocol_version = version;
4811            protocol_version_digest = digest;
4812            system_packages = packages;
4813        }
4814
4815        (
4816            next_protocol_version,
4817            protocol_version_digest,
4818            system_packages,
4819        )
4820    }
4821
4822    /// Returns the indices of validators that support the given protocol
4823    /// version and digest. This includes both committee and non-committee
4824    /// validators based on their capabilities. Uses active validators
4825    /// instead of committee indices.
4826    fn get_validators_supporting_protocol_version(
4827        target_protocol_version: ProtocolVersion,
4828        target_digest: Digest,
4829        active_validators: &[AuthorityPublicKey],
4830        capabilities: &[AuthorityCapabilitiesV1],
4831    ) -> Vec<u64> {
4832        let mut eligible_validators = Vec::new();
4833
4834        for capability in capabilities {
4835            // Check if this validator supports the target protocol version and digest
4836            if let Some(digest) = capability
4837                .supported_protocol_versions
4838                .get_version_digest(target_protocol_version)
4839            {
4840                if digest == target_digest {
4841                    // Find the validator's index in the active validators list
4842                    if let Some(index) = active_validators
4843                        .iter()
4844                        .position(|name| AuthorityName::from(name) == capability.authority)
4845                    {
4846                        eligible_validators.push(index as u64);
4847                    }
4848                }
4849            }
4850        }
4851
4852        // Sort indices for deterministic behavior
4853        eligible_validators.sort();
4854        eligible_validators
4855    }
4856
4857    /// Calculates the sum of weights for eligible validators that are part of
4858    /// the committee. Takes the indices from
4859    /// get_validators_supporting_protocol_version and maps them back
4860    /// to committee members to get their weights.
4861    fn calculate_eligible_validators_weight(
4862        eligible_validator_indices: &[u64],
4863        active_validators: &[AuthorityPublicKey],
4864        committee: &Committee,
4865    ) -> u64 {
4866        let mut total_weight = 0u64;
4867
4868        for &index in eligible_validator_indices {
4869            let authority_pubkey = &active_validators[index as usize];
4870            // Check if this validator is in the committee and get their weight
4871            if let Some((_, weight)) = committee
4872                .members()
4873                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4874            {
4875                total_weight += weight;
4876            }
4877        }
4878
4879        total_weight
4880    }
4881
4882    #[instrument(level = "debug", skip_all)]
4883    fn create_authenticator_state_tx(
4884        &self,
4885        epoch_store: &Arc<AuthorityPerEpochStore>,
4886    ) -> Option<EndOfEpochTransactionKind> {
4887        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4888            info!("authenticator state transactions not enabled");
4889            return None;
4890        }
4891
4892        let authenticator_state_exists = epoch_store.authenticator_state_exists();
4893        let tx = if authenticator_state_exists {
4894            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4895            let min_epoch =
4896                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4897            let authenticator_obj_initial_shared_version = epoch_store
4898                .epoch_start_config()
4899                .authenticator_obj_initial_shared_version()
4900                .expect("initial version must exist");
4901
4902            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4903                min_epoch,
4904                authenticator_obj_initial_shared_version,
4905            );
4906
4907            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4908
4909            tx
4910        } else {
4911            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4912            info!("Creating AuthenticatorStateCreate tx");
4913            tx
4914        };
4915        Some(tx)
4916    }
4917
4918    /// Creates and execute the advance epoch transaction to effects without
4919    /// committing it to the database. The effects of the change epoch tx
4920    /// are only written to the database after a certified checkpoint has been
4921    /// formed and executed by CheckpointExecutor.
4922    ///
4923    /// When a framework upgraded has been decided on, but the validator does
4924    /// not have the new versions of the packages locally, the validator
4925    /// cannot form the ChangeEpochTx. In this case it returns Err,
4926    /// indicating that the checkpoint builder should give up trying to make the
4927    /// final checkpoint. As long as the network is able to create a certified
4928    /// checkpoint (which should be ensured by the capabilities vote), it
4929    /// will arrive via state sync and be executed by CheckpointExecutor.
4930    #[instrument(level = "error", skip_all)]
4931    pub async fn create_and_execute_advance_epoch_tx(
4932        &self,
4933        epoch_store: &Arc<AuthorityPerEpochStore>,
4934        gas_cost_summary: &GasCostSummary,
4935        checkpoint: CheckpointSequenceNumber,
4936        epoch_start_timestamp_ms: CheckpointTimestamp,
4937        scores: Vec<u64>,
4938    ) -> anyhow::Result<(
4939        IotaSystemState,
4940        Option<SystemEpochInfoEvent>,
4941        TransactionEffects,
4942    )> {
4943        let mut txns = Vec::new();
4944
4945        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4946            txns.push(tx);
4947        }
4948
4949        let next_epoch = epoch_store.epoch() + 1;
4950
4951        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4952        let authority_capabilities = epoch_store
4953            .get_capabilities_v1()
4954            .expect("read capabilities from db cannot fail");
4955        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4956            Self::choose_protocol_version_and_system_packages_v1(
4957                epoch_store.protocol_version(),
4958                SupportedProtocolVersionsWithHashes::protocol_config_digest(
4959                    epoch_store.protocol_config(),
4960                ),
4961                epoch_store.committee(),
4962                authority_capabilities.clone(),
4963                buffer_stake_bps,
4964            );
4965
4966        // since system packages are created during the current epoch, they should abide
4967        // by the rules of the current epoch, including the current epoch's max
4968        // Move binary format version
4969        let config = epoch_store.protocol_config();
4970        let binary_config = to_binary_config(config);
4971        let Some(next_epoch_system_package_bytes) = self
4972            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4973            .await
4974        else {
4975            error!(
4976                "upgraded system packages {:?} are not locally available, cannot create \
4977                ChangeEpochTx. validator binary must be upgraded to the correct version!",
4978                next_epoch_system_packages
4979            );
4980            // the checkpoint builder will keep retrying forever when it hits this error.
4981            // Eventually, one of two things will happen:
4982            // - The operator will upgrade this binary to one that has the new packages
4983            //   locally, and this function will succeed.
4984            // - The final checkpoint will be certified by other validators, we will receive
4985            //   it via state sync, and execute it. This will upgrade the framework
4986            //   packages, reconfigure, and most likely shut down in the new epoch (this
4987            //   validator likely doesn't support the new protocol version, or else it
4988            //   should have had the packages.)
4989            bail!("missing system packages: cannot form ChangeEpochTx");
4990        };
4991
4992        // Use ChangeEpochV3 or ChangeEpochV4 when the feature flags are enabled and
4993        // ChangeEpochV2 requirements are met
4994        if config.select_committee_from_eligible_validators() {
4995            // Get the list of eligible validators that support the target protocol version
4996            let active_validators = epoch_store.epoch_start_state().get_active_validators();
4997
4998            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
4999
5000            // Use validators supporting the target protocol version as eligible validators
5001            // in the next version if select_committee_supporting_next_epoch_version feature
5002            // flag is set to true.
5003            if config.select_committee_supporting_next_epoch_version() {
5004                eligible_active_validators = Self::get_validators_supporting_protocol_version(
5005                    next_epoch_protocol_version,
5006                    next_epoch_protocol_digest,
5007                    &active_validators,
5008                    &authority_capabilities,
5009                );
5010
5011                // Calculate the total weight of eligible validators in the committee
5012                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5013                    &eligible_active_validators,
5014                    &active_validators,
5015                    epoch_store.committee(),
5016                );
5017
5018                // Safety check: ensure eligible validators have enough stake
5019                // Use the same effective threshold calculation that was used to decide the
5020                // protocol version
5021                let committee = epoch_store.committee();
5022                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5023
5024                if eligible_validators_weight < effective_threshold {
5025                    error!(
5026                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5027                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5028                    );
5029                    // Pass all active validator indices as eligible validators
5030                    // to perform selection among all of them.
5031                    eligible_active_validators = (0..active_validators.len() as u64).collect();
5032                }
5033            }
5034
5035            // Use ChangeEpochV4 when the pass_validator_scores_to_advance_epoch feature
5036            // flag is enabled.
5037            if config.pass_validator_scores_to_advance_epoch() {
5038                txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5039                    next_epoch,
5040                    next_epoch_protocol_version,
5041                    gas_cost_summary.storage_cost,
5042                    gas_cost_summary.computation_cost,
5043                    gas_cost_summary.computation_cost_burned,
5044                    gas_cost_summary.storage_rebate,
5045                    gas_cost_summary.non_refundable_storage_fee,
5046                    epoch_start_timestamp_ms,
5047                    next_epoch_system_package_bytes,
5048                    eligible_active_validators,
5049                    scores,
5050                    config.adjust_rewards_by_score(),
5051                ));
5052            } else {
5053                txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5054                    next_epoch,
5055                    next_epoch_protocol_version,
5056                    gas_cost_summary.storage_cost,
5057                    gas_cost_summary.computation_cost,
5058                    gas_cost_summary.computation_cost_burned,
5059                    gas_cost_summary.storage_rebate,
5060                    gas_cost_summary.non_refundable_storage_fee,
5061                    epoch_start_timestamp_ms,
5062                    next_epoch_system_package_bytes,
5063                    eligible_active_validators,
5064                ));
5065            }
5066        } else if config.protocol_defined_base_fee()
5067            && config.max_committee_members_count_as_option().is_some()
5068        {
5069            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5070                next_epoch,
5071                next_epoch_protocol_version,
5072                gas_cost_summary.storage_cost,
5073                gas_cost_summary.computation_cost,
5074                gas_cost_summary.computation_cost_burned,
5075                gas_cost_summary.storage_rebate,
5076                gas_cost_summary.non_refundable_storage_fee,
5077                epoch_start_timestamp_ms,
5078                next_epoch_system_package_bytes,
5079            ));
5080        } else {
5081            txns.push(EndOfEpochTransactionKind::new_change_epoch(
5082                next_epoch,
5083                next_epoch_protocol_version,
5084                gas_cost_summary.storage_cost,
5085                gas_cost_summary.computation_cost,
5086                gas_cost_summary.storage_rebate,
5087                gas_cost_summary.non_refundable_storage_fee,
5088                epoch_start_timestamp_ms,
5089                next_epoch_system_package_bytes,
5090            ));
5091        }
5092
5093        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5094
5095        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5096            tx.clone(),
5097            epoch_store.epoch(),
5098            checkpoint,
5099        );
5100
5101        let tx_digest = executable_tx.digest();
5102
5103        info!(
5104            ?next_epoch,
5105            ?next_epoch_protocol_version,
5106            ?next_epoch_system_packages,
5107            computation_cost=?gas_cost_summary.computation_cost,
5108            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5109            storage_cost=?gas_cost_summary.storage_cost,
5110            storage_rebate=?gas_cost_summary.storage_rebate,
5111            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5112            ?tx_digest,
5113            "Creating advance epoch transaction"
5114        );
5115
5116        fail_point_async!("change_epoch_tx_delay");
5117        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5118
5119        // The tx could have been executed by state sync already - if so simply return
5120        // an error. The checkpoint builder will shortly be terminated by
5121        // reconfiguration anyway.
5122        if self
5123            .get_transaction_cache_reader()
5124            .try_is_tx_already_executed(tx_digest)?
5125        {
5126            warn!("change epoch tx has already been executed via state sync");
5127            bail!("change epoch tx has already been executed via state sync",);
5128        }
5129
5130        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5131
5132        // We must manually assign the shared object versions to the transaction before
5133        // executing it. This is because we do not sequence end-of-epoch
5134        // transactions through consensus.
5135        epoch_store.assign_shared_object_versions_idempotent(
5136            self.get_object_cache_reader().as_ref(),
5137            std::slice::from_ref(&executable_tx),
5138        )?;
5139
5140        let (input_objects, _, _) =
5141            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5142
5143        let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5144            &execution_guard,
5145            &executable_tx,
5146            input_objects,
5147            None,
5148            None,
5149            epoch_store,
5150        )?;
5151        let system_obj = get_iota_system_state(&temporary_store.written)
5152            .expect("change epoch tx must write to system object");
5153        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
5154        let system_epoch_info_event = temporary_store
5155            .events
5156            .data
5157            .into_iter()
5158            .find(|event| event.is_system_epoch_info_event())
5159            .map(SystemEpochInfoEvent::from);
5160        // The system epoch info event can be `None` in case if the `advance_epoch`
5161        // Move function call failed and was executed in the safe mode.
5162        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5163
5164        // We must write tx and effects to the state sync tables so that state sync is
5165        // able to deliver to the transaction to CheckpointExecutor after it is
5166        // included in a certified checkpoint.
5167        self.get_state_sync_store()
5168            .try_insert_transaction_and_effects(&tx, &effects)
5169            .map_err(|err| {
5170                let err: anyhow::Error = err.into();
5171                err
5172            })?;
5173
5174        info!(
5175            "Effects summary of the change epoch transaction: {:?}",
5176            effects.summary_for_debug()
5177        );
5178        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5179        // The change epoch transaction cannot fail to execute.
5180        assert!(effects.status().is_ok());
5181        Ok((system_obj, system_epoch_info_event, effects))
5182    }
5183
5184    /// This function is called at the very end of the epoch.
5185    /// This step is required before updating new epoch in the db and calling
5186    /// reopen_epoch_db.
5187    #[instrument(level = "error", skip_all)]
5188    async fn revert_uncommitted_epoch_transactions(
5189        &self,
5190        epoch_store: &AuthorityPerEpochStore,
5191    ) -> IotaResult {
5192        {
5193            let state = epoch_store.get_reconfig_state_write_lock_guard();
5194            if state.should_accept_user_certs() {
5195                // Need to change this so that consensus adapter do not accept certificates from
5196                // user. This can happen if our local validator did not initiate
5197                // epoch change locally, but 2f+1 nodes already concluded the
5198                // epoch.
5199                //
5200                // This lock is essentially a barrier for
5201                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5202                // after this block
5203                epoch_store.close_user_certs(state);
5204            }
5205            // lock is dropped here
5206        }
5207        let pending_certificates = epoch_store.pending_consensus_certificates();
5208        info!(
5209            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5210            pending_certificates.len(),
5211            pending_certificates,
5212        );
5213        for digest in pending_certificates {
5214            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5215                info!(
5216                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5217                    digest
5218                );
5219                continue;
5220            }
5221            info!("Reverting {:?} at the end of epoch", digest);
5222            epoch_store.revert_executed_transaction(&digest)?;
5223            self.get_reconfig_api().try_revert_state_update(&digest)?;
5224        }
5225        info!("All uncommitted local transactions reverted");
5226        Ok(())
5227    }
5228
5229    #[instrument(level = "error", skip_all)]
5230    async fn reopen_epoch_db(
5231        &self,
5232        cur_epoch_store: &AuthorityPerEpochStore,
5233        new_committee: Committee,
5234        epoch_start_configuration: EpochStartConfiguration,
5235        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5236        epoch_last_checkpoint: CheckpointSequenceNumber,
5237    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5238        let new_epoch = new_committee.epoch;
5239        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5240        assert_eq!(
5241            epoch_start_configuration.epoch_start_state().epoch(),
5242            new_committee.epoch
5243        );
5244        fail_point!("before-open-new-epoch-store");
5245        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5246            self.name,
5247            new_committee,
5248            epoch_start_configuration,
5249            self.get_backing_package_store().clone(),
5250            self.get_object_store().clone(),
5251            expensive_safety_check_config,
5252            epoch_last_checkpoint,
5253        )?;
5254        self.epoch_store.store(new_epoch_store.clone());
5255        Ok(new_epoch_store)
5256    }
5257
5258    /// Checks if `authenticator` unlocks a valid Move account and returns the
5259    /// account-related `AuthenticatorFunctionRef` object.
5260    fn check_move_account(
5261        &self,
5262        auth_account_object_id: ObjectID,
5263        auth_account_object_seq_number: Option<SequenceNumber>,
5264        auth_account_object_digest: Option<ObjectDigest>,
5265        account_object: ObjectReadResult,
5266        signer: &IotaAddress,
5267    ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5268        let account_object = match account_object.object {
5269            ObjectReadResultKind::Object(object) => Ok(object),
5270            ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5271                Err(UserInputError::AccountObjectDeleted {
5272                    account_id: account_object.id(),
5273                    account_version: version,
5274                    transaction_digest: digest,
5275                })
5276            }
5277            // It is impossible to check the account object because it is used in a canceled
5278            // transaction and is not loaded.
5279            ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5280                Err(UserInputError::AccountObjectInCanceledTransaction {
5281                    account_id: account_object.id(),
5282                    account_version: version,
5283                })
5284            }
5285        }?;
5286
5287        let account_object_addr = IotaAddress::from(auth_account_object_id);
5288
5289        fp_ensure!(
5290            signer == &account_object_addr,
5291            UserInputError::IncorrectUserSignature {
5292                error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5293            }
5294            .into()
5295        );
5296
5297        fp_ensure!(
5298            account_object.is_shared() || account_object.is_immutable(),
5299            UserInputError::AccountObjectNotSupported {
5300                object_id: auth_account_object_id
5301            }
5302            .into()
5303        );
5304
5305        let auth_account_object_seq_number =
5306            if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5307                let account_object_version = account_object.version();
5308
5309                fp_ensure!(
5310                    account_object_version == auth_account_object_seq_number,
5311                    UserInputError::AccountObjectVersionMismatch {
5312                        object_id: auth_account_object_id,
5313                        expected_version: auth_account_object_seq_number,
5314                        actual_version: account_object_version,
5315                    }
5316                    .into()
5317                );
5318
5319                auth_account_object_seq_number
5320            } else {
5321                account_object.version()
5322            };
5323
5324        if let Some(auth_account_object_digest) = auth_account_object_digest {
5325            let expected_digest = account_object.digest();
5326            fp_ensure!(
5327                expected_digest == auth_account_object_digest,
5328                UserInputError::InvalidAccountObjectDigest {
5329                    object_id: auth_account_object_id,
5330                    expected_digest,
5331                    actual_digest: auth_account_object_digest,
5332                }
5333                .into()
5334            );
5335        }
5336
5337        let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5338            auth_account_object_id,
5339            &AuthenticatorFunctionRefV1Key::tag().into(),
5340            &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5341        )
5342        .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5343            account_object_id: auth_account_object_id,
5344        })?;
5345
5346        let authenticator_function_ref_field = self
5347            .get_object_cache_reader()
5348            .try_find_object_lt_or_eq_version(
5349                authenticator_function_ref_field_id,
5350                auth_account_object_seq_number,
5351            )?;
5352
5353        if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5354            let field_move_object = authenticator_function_ref_field_obj
5355                .data
5356                .try_as_move()
5357                .expect("dynamic field should never be a package object");
5358
5359            let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5360                field_move_object.to_rust().ok_or(
5361                    UserInputError::InvalidAuthenticatorFunctionRefField {
5362                        account_object_id: auth_account_object_id,
5363                    },
5364                )?;
5365
5366            Ok(AuthenticatorFunctionRefForExecution::new_v1(
5367                field.value,
5368                authenticator_function_ref_field_obj.compute_object_reference(),
5369                authenticator_function_ref_field_obj.owner,
5370                authenticator_function_ref_field_obj.storage_rebate,
5371                authenticator_function_ref_field_obj.previous_transaction,
5372            ))
5373        } else {
5374            Err(UserInputError::MoveAuthenticatorNotFound {
5375                authenticator_function_ref_id: authenticator_function_ref_field_id,
5376                account_object_id: auth_account_object_id,
5377                account_object_version: auth_account_object_seq_number,
5378            }
5379            .into())
5380        }
5381    }
5382
5383    fn read_objects_for_signing(
5384        &self,
5385        transaction: &VerifiedTransaction,
5386        epoch: u64,
5387    ) -> IotaResult<(
5388        InputObjects,
5389        ReceivingObjects,
5390        Option<InputObjects>,
5391        Option<ObjectReadResult>,
5392    )> {
5393        let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5394            Some(transaction.digest()),
5395            &transaction.collect_all_input_object_kind_for_reading()?,
5396            &transaction.data().transaction_data().receiving_objects(),
5397            epoch,
5398        )?;
5399
5400        transaction
5401            .split_input_objects_into_groups_for_reading(input_objects)
5402            .map(|(tx_input_objects, auth_input_objects, account_object)| {
5403                (
5404                    tx_input_objects,
5405                    tx_receiving_objects,
5406                    auth_input_objects,
5407                    account_object,
5408                )
5409            })
5410    }
5411
5412    fn check_transaction_inputs_for_signing(
5413        &self,
5414        protocol_config: &ProtocolConfig,
5415        reference_gas_price: u64,
5416        tx_data: &TransactionData,
5417        tx_input_objects: InputObjects,
5418        tx_receiving_objects: &ReceivingObjects,
5419        move_authenticator: Option<&MoveAuthenticator>,
5420        auth_input_objects: Option<InputObjects>,
5421        account_object: Option<ObjectReadResult>,
5422    ) -> IotaResult<(
5423        IotaGasStatus,
5424        CheckedInputObjects,
5425        Option<CheckedInputObjects>,
5426        Option<AuthenticatorFunctionRef>,
5427    )> {
5428        let (
5429            auth_checked_input_objects_union,
5430            authenticator_function_ref,
5431            authenticator_gas_budget,
5432        ) = if let Some(move_authenticator) = move_authenticator {
5433            let auth_input_objects =
5434                auth_input_objects.expect("MoveAuthenticator input objects must be provided");
5435            let account_object = account_object.expect("Move account object must be provided");
5436
5437            // Check basic `object_to_authenticate` preconditions and get its components.
5438            let (
5439                auth_account_object_id,
5440                auth_account_object_seq_number,
5441                auth_account_object_digest,
5442            ) = move_authenticator.object_to_authenticate_components()?;
5443
5444            // Make sure the sender is a Move account.
5445            let AuthenticatorFunctionRefForExecution {
5446                authenticator_function_ref,
5447                ..
5448            } = self.check_move_account(
5449                auth_account_object_id,
5450                auth_account_object_seq_number,
5451                auth_account_object_digest,
5452                account_object,
5453                &tx_data.sender(),
5454            )?;
5455
5456            // Check the MoveAuthenticator input objects.
5457            let auth_checked_input_objects =
5458                iota_transaction_checks::check_move_authenticator_input_for_signing(
5459                    auth_input_objects,
5460                )?;
5461
5462            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
5463            // not a part of the transaction data.
5464            let authenticator_gas_budget = protocol_config.max_auth_gas();
5465
5466            (
5467                Some(auth_checked_input_objects),
5468                Some(authenticator_function_ref),
5469                authenticator_gas_budget,
5470            )
5471        } else {
5472            (None, None, 0)
5473        };
5474
5475        // Check the transaction inputs.
5476        let (gas_status, tx_checked_input_objects) =
5477            iota_transaction_checks::check_transaction_input(
5478                protocol_config,
5479                reference_gas_price,
5480                tx_data,
5481                tx_input_objects,
5482                tx_receiving_objects,
5483                &self.metrics.bytecode_verifier_metrics,
5484                &self.config.verifier_signing_config,
5485                authenticator_gas_budget,
5486            )?;
5487
5488        Ok((
5489            gas_status,
5490            tx_checked_input_objects,
5491            auth_checked_input_objects_union,
5492            authenticator_function_ref,
5493        ))
5494    }
5495
5496    #[cfg(test)]
5497    pub(crate) fn iter_live_object_set_for_testing(
5498        &self,
5499    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5500        self.get_accumulator_store()
5501            .iter_cached_live_object_set_for_testing()
5502    }
5503
5504    #[cfg(test)]
5505    pub(crate) fn shutdown_execution_for_test(&self) {
5506        self.tx_execution_shutdown
5507            .lock()
5508            .take()
5509            .unwrap()
5510            .send(())
5511            .unwrap();
5512    }
5513
5514    /// NOTE: this function is only to be used for fuzzing and testing. Never
5515    /// use in prod
5516    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5517        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5518        self.get_object_cache_reader()
5519            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5520        self.get_reconfig_api()
5521            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5522    }
5523}
5524
5525pub struct RandomnessRoundReceiver {
5526    authority_state: Arc<AuthorityState>,
5527    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5528}
5529
5530impl RandomnessRoundReceiver {
5531    pub fn spawn(
5532        authority_state: Arc<AuthorityState>,
5533        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5534    ) -> JoinHandle<()> {
5535        let rrr = RandomnessRoundReceiver {
5536            authority_state,
5537            randomness_rx,
5538        };
5539        spawn_monitored_task!(rrr.run())
5540    }
5541
5542    async fn run(mut self) {
5543        info!("RandomnessRoundReceiver event loop started");
5544
5545        loop {
5546            tokio::select! {
5547                maybe_recv = self.randomness_rx.recv() => {
5548                    if let Some((epoch, round, bytes)) = maybe_recv {
5549                        self.handle_new_randomness(epoch, round, bytes);
5550                    } else {
5551                        break;
5552                    }
5553                },
5554            }
5555        }
5556
5557        info!("RandomnessRoundReceiver event loop ended");
5558    }
5559
5560    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5561    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5562        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5563        if epoch_store.epoch() != epoch {
5564            warn!(
5565                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5566                epoch_store.epoch()
5567            );
5568            return;
5569        }
5570        let transaction = VerifiedTransaction::new_randomness_state_update(
5571            epoch,
5572            round,
5573            bytes,
5574            epoch_store
5575                .epoch_start_config()
5576                .randomness_obj_initial_shared_version(),
5577        );
5578        debug!(
5579            "created randomness state update transaction with digest: {:?}",
5580            transaction.digest()
5581        );
5582        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5583        let digest = *transaction.digest();
5584
5585        // Randomness state updates contain the full bls signature for the random round,
5586        // which cannot necessarily be reconstructed again later. Therefore we must
5587        // immediately persist this transaction. If we crash before its outputs
5588        // are committed, this ensures we will be able to re-execute it.
5589        self.authority_state
5590            .get_cache_commit()
5591            .persist_transaction(&transaction);
5592
5593        // Send transaction to TransactionManager for execution.
5594        self.authority_state
5595            .transaction_manager()
5596            .enqueue(vec![transaction], &epoch_store);
5597
5598        let authority_state = self.authority_state.clone();
5599        spawn_monitored_task!(async move {
5600            // Wait for transaction execution in a separate task, to avoid deadlock in case
5601            // of out-of-order randomness generation. (Each
5602            // RandomnessStateUpdate depends on the output of the
5603            // RandomnessStateUpdate from the previous round.)
5604            //
5605            // We set a very long timeout so that in case this gets stuck for some reason,
5606            // the validator will eventually crash rather than continuing in a
5607            // zombie mode.
5608            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5609            let result = tokio::time::timeout(
5610                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5611                authority_state
5612                    .get_transaction_cache_reader()
5613                    .try_notify_read_executed_effects(&[digest]),
5614            )
5615            .await;
5616            let result = match result {
5617                Ok(result) => result,
5618                Err(_) => {
5619                    if cfg!(debug_assertions) {
5620                        // Crash on randomness update execution timeout in debug builds.
5621                        panic!(
5622                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5623                        );
5624                    }
5625                    warn!(
5626                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5627                    );
5628                    // Continue waiting as long as necessary in non-debug builds.
5629                    authority_state
5630                        .get_transaction_cache_reader()
5631                        .try_notify_read_executed_effects(&[digest])
5632                        .await
5633                }
5634            };
5635
5636            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5637            let effects = effects.pop().expect("should return effects");
5638            if *effects.status() != ExecutionStatus::Success {
5639                fatal!(
5640                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5641                );
5642            }
5643            debug!(
5644                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5645            );
5646        });
5647    }
5648}
5649
5650#[async_trait]
5651impl TransactionKeyValueStoreTrait for AuthorityState {
5652    async fn multi_get(
5653        &self,
5654        transaction_keys: &[TransactionDigest],
5655        effects_keys: &[TransactionDigest],
5656    ) -> IotaResult<KVStoreTransactionData> {
5657        let txns = if !transaction_keys.is_empty() {
5658            self.get_transaction_cache_reader()
5659                .try_multi_get_transaction_blocks(transaction_keys)?
5660                .into_iter()
5661                .map(|t| t.map(|t| (*t).clone().into_inner()))
5662                .collect()
5663        } else {
5664            vec![]
5665        };
5666
5667        let fx = if !effects_keys.is_empty() {
5668            self.get_transaction_cache_reader()
5669                .try_multi_get_executed_effects(effects_keys)?
5670        } else {
5671            vec![]
5672        };
5673
5674        Ok((txns, fx))
5675    }
5676
5677    async fn multi_get_checkpoints(
5678        &self,
5679        checkpoint_summaries: &[CheckpointSequenceNumber],
5680        checkpoint_contents: &[CheckpointSequenceNumber],
5681        checkpoint_summaries_by_digest: &[CheckpointDigest],
5682    ) -> IotaResult<(
5683        Vec<Option<CertifiedCheckpointSummary>>,
5684        Vec<Option<CheckpointContents>>,
5685        Vec<Option<CertifiedCheckpointSummary>>,
5686    )> {
5687        // TODO: use multi-get methods if it ever becomes important (unlikely)
5688        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5689        let store = self.get_checkpoint_store();
5690        for seq in checkpoint_summaries {
5691            let checkpoint = store
5692                .get_checkpoint_by_sequence_number(*seq)?
5693                .map(|c| c.into_inner());
5694
5695            summaries.push(checkpoint);
5696        }
5697
5698        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5699        for seq in checkpoint_contents {
5700            let checkpoint = store
5701                .get_checkpoint_by_sequence_number(*seq)?
5702                .and_then(|summary| {
5703                    store
5704                        .get_checkpoint_contents(&summary.content_digest)
5705                        .expect("db read cannot fail")
5706                });
5707            contents.push(checkpoint);
5708        }
5709
5710        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5711        for digest in checkpoint_summaries_by_digest {
5712            let checkpoint = store
5713                .get_checkpoint_by_digest(digest)?
5714                .map(|c| c.into_inner());
5715            summaries_by_digest.push(checkpoint);
5716        }
5717
5718        Ok((summaries, contents, summaries_by_digest))
5719    }
5720
5721    async fn get_transaction_perpetual_checkpoint(
5722        &self,
5723        digest: TransactionDigest,
5724    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5725        self.get_checkpoint_cache()
5726            .try_get_transaction_perpetual_checkpoint(&digest)
5727            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5728    }
5729
5730    async fn get_object(
5731        &self,
5732        object_id: ObjectID,
5733        version: VersionNumber,
5734    ) -> IotaResult<Option<Object>> {
5735        self.get_object_cache_reader()
5736            .try_get_object_by_key(&object_id, version)
5737    }
5738
5739    async fn multi_get_transactions_perpetual_checkpoints(
5740        &self,
5741        digests: &[TransactionDigest],
5742    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5743        let res = self
5744            .get_checkpoint_cache()
5745            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5746
5747        Ok(res
5748            .into_iter()
5749            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5750            .collect())
5751    }
5752
5753    #[instrument(skip(self))]
5754    async fn multi_get_events_by_tx_digests(
5755        &self,
5756        digests: &[TransactionDigest],
5757    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5758        if digests.is_empty() {
5759            return Ok(vec![]);
5760        }
5761        let events_digests: Vec<_> = self
5762            .get_transaction_cache_reader()
5763            .try_multi_get_executed_effects(digests)?
5764            .into_iter()
5765            .map(|t| t.and_then(|t| t.events_digest().cloned()))
5766            .collect();
5767        let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5768        let mut events = self
5769            .get_transaction_cache_reader()
5770            .try_multi_get_events(&non_empty_events)?
5771            .into_iter();
5772        Ok(events_digests
5773            .into_iter()
5774            .map(|ev| ev.and_then(|_| events.next()?))
5775            .collect())
5776    }
5777}
5778
5779#[cfg(msim)]
5780pub mod framework_injection {
5781    use std::{
5782        cell::RefCell,
5783        collections::{BTreeMap, BTreeSet},
5784    };
5785
5786    use iota_framework::{BuiltInFramework, SystemPackage};
5787    use iota_types::{
5788        base_types::{AuthorityName, ObjectID},
5789        is_system_package,
5790    };
5791    use move_binary_format::CompiledModule;
5792
5793    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5794
5795    // Thread local cache because all simtests run in a single unique thread.
5796    thread_local! {
5797        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5798    }
5799
5800    type Framework = Vec<CompiledModule>;
5801
5802    pub type PackageUpgradeCallback =
5803        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5804
5805    enum PackageOverrideConfig {
5806        Global(Framework),
5807        PerValidator(PackageUpgradeCallback),
5808    }
5809
5810    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5811        modules
5812            .iter()
5813            .map(|m| {
5814                let mut buf = Vec::new();
5815                m.serialize_with_version(m.version, &mut buf).unwrap();
5816                buf
5817            })
5818            .collect()
5819    }
5820
5821    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5822        OVERRIDE.with(|bs| {
5823            bs.borrow_mut()
5824                .insert(package_id, PackageOverrideConfig::Global(modules))
5825        });
5826    }
5827
5828    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5829        OVERRIDE.with(|bs| {
5830            bs.borrow_mut()
5831                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5832        });
5833    }
5834
5835    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5836        OVERRIDE.with(|cfg| {
5837            cfg.borrow().get(package_id).and_then(|entry| match entry {
5838                PackageOverrideConfig::Global(framework) => {
5839                    Some(compiled_modules_to_bytes(framework))
5840                }
5841                PackageOverrideConfig::PerValidator(func) => {
5842                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
5843                }
5844            })
5845        })
5846    }
5847
5848    pub fn get_override_modules(
5849        package_id: &ObjectID,
5850        name: AuthorityName,
5851    ) -> Option<Vec<CompiledModule>> {
5852        OVERRIDE.with(|cfg| {
5853            cfg.borrow().get(package_id).and_then(|entry| match entry {
5854                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5855                PackageOverrideConfig::PerValidator(func) => func(name),
5856            })
5857        })
5858    }
5859
5860    pub fn get_override_system_package(
5861        package_id: &ObjectID,
5862        name: AuthorityName,
5863    ) -> Option<SystemPackage> {
5864        let bytes = get_override_bytes(package_id, name)?;
5865        let dependencies = if is_system_package(*package_id) {
5866            BuiltInFramework::get_package_by_id(package_id)
5867                .dependencies
5868                .to_vec()
5869        } else {
5870            // Assume that entirely new injected packages depend on all existing system
5871            // packages.
5872            BuiltInFramework::all_package_ids()
5873        };
5874        Some(SystemPackage {
5875            id: *package_id,
5876            bytes,
5877            dependencies,
5878        })
5879    }
5880
5881    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5882        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5883        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5884            cfg.borrow()
5885                .keys()
5886                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5887                .collect()
5888        });
5889
5890        extra
5891            .into_iter()
5892            .map(|package| SystemPackage {
5893                id: package,
5894                bytes: get_override_bytes(&package, name).unwrap(),
5895                dependencies: BuiltInFramework::all_package_ids(),
5896            })
5897            .collect()
5898    }
5899}
5900
5901#[derive(Debug, Serialize, Deserialize, Clone)]
5902pub struct ObjDumpFormat {
5903    pub id: ObjectID,
5904    pub version: VersionNumber,
5905    pub digest: ObjectDigest,
5906    pub object: Object,
5907}
5908
5909impl ObjDumpFormat {
5910    fn new(object: Object) -> Self {
5911        let oref = object.compute_object_reference();
5912        Self {
5913            id: oref.0,
5914            version: oref.1,
5915            digest: oref.2,
5916            object,
5917        }
5918    }
5919}
5920
5921#[derive(Debug, Serialize, Deserialize, Clone)]
5922pub struct NodeStateDump {
5923    pub tx_digest: TransactionDigest,
5924    pub sender_signed_data: SenderSignedData,
5925    pub executed_epoch: u64,
5926    pub reference_gas_price: u64,
5927    pub protocol_version: u64,
5928    pub epoch_start_timestamp_ms: u64,
5929    pub computed_effects: TransactionEffects,
5930    pub expected_effects_digest: TransactionEffectsDigest,
5931    pub relevant_system_packages: Vec<ObjDumpFormat>,
5932    pub shared_objects: Vec<ObjDumpFormat>,
5933    pub loaded_child_objects: Vec<ObjDumpFormat>,
5934    pub modified_at_versions: Vec<ObjDumpFormat>,
5935    pub runtime_reads: Vec<ObjDumpFormat>,
5936    pub input_objects: Vec<ObjDumpFormat>,
5937}
5938
5939impl NodeStateDump {
5940    pub fn new(
5941        tx_digest: &TransactionDigest,
5942        effects: &TransactionEffects,
5943        expected_effects_digest: TransactionEffectsDigest,
5944        object_store: &dyn ObjectStore,
5945        epoch_store: &Arc<AuthorityPerEpochStore>,
5946        inner_temporary_store: &InnerTemporaryStore,
5947        certificate: &VerifiedExecutableTransaction,
5948    ) -> IotaResult<Self> {
5949        // Epoch info
5950        let executed_epoch = epoch_store.epoch();
5951        let reference_gas_price = epoch_store.reference_gas_price();
5952        let epoch_start_config = epoch_store.epoch_start_config();
5953        let protocol_version = epoch_store.protocol_version().as_u64();
5954        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5955
5956        // Record all system packages at this version
5957        let mut relevant_system_packages = Vec::new();
5958        for sys_package_id in BuiltInFramework::all_package_ids() {
5959            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5960                relevant_system_packages.push(ObjDumpFormat::new(w))
5961            }
5962        }
5963
5964        // Record all the shared objects
5965        let mut shared_objects = Vec::new();
5966        for kind in effects.input_shared_objects() {
5967            match kind {
5968                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5969                    if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5970                        shared_objects.push(ObjDumpFormat::new(w))
5971                    }
5972                }
5973                InputSharedObject::ReadDeleted(..)
5974                | InputSharedObject::MutateDeleted(..)
5975                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
5976                                                           * objects. */
5977            }
5978        }
5979
5980        // Record all loaded child objects
5981        // Child objects which are read but not mutated are not tracked anywhere else
5982        let mut loaded_child_objects = Vec::new();
5983        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5984            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5985                loaded_child_objects.push(ObjDumpFormat::new(w))
5986            }
5987        }
5988
5989        // Record all modified objects
5990        let mut modified_at_versions = Vec::new();
5991        for (id, ver) in effects.modified_at_versions() {
5992            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5993                modified_at_versions.push(ObjDumpFormat::new(w))
5994            }
5995        }
5996
5997        // Packages read at runtime, which were not previously loaded into the temoorary
5998        // store Some packages may be fetched at runtime and wont show up in
5999        // input objects
6000        let mut runtime_reads = Vec::new();
6001        for obj in inner_temporary_store
6002            .runtime_packages_loaded_from_db
6003            .values()
6004        {
6005            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6006        }
6007
6008        // All other input objects should already be in `inner_temporary_store.objects`
6009
6010        Ok(Self {
6011            tx_digest: *tx_digest,
6012            executed_epoch,
6013            reference_gas_price,
6014            epoch_start_timestamp_ms,
6015            protocol_version,
6016            relevant_system_packages,
6017            shared_objects,
6018            loaded_child_objects,
6019            modified_at_versions,
6020            runtime_reads,
6021            sender_signed_data: certificate.clone().into_message(),
6022            input_objects: inner_temporary_store
6023                .input_objects
6024                .values()
6025                .map(|o| ObjDumpFormat::new(o.clone()))
6026                .collect(),
6027            computed_effects: effects.clone(),
6028            expected_effects_digest,
6029        })
6030    }
6031
6032    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6033        let mut objects = Vec::new();
6034        objects.extend(self.relevant_system_packages.clone());
6035        objects.extend(self.shared_objects.clone());
6036        objects.extend(self.loaded_child_objects.clone());
6037        objects.extend(self.modified_at_versions.clone());
6038        objects.extend(self.runtime_reads.clone());
6039        objects.extend(self.input_objects.clone());
6040        objects
6041    }
6042
6043    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6044        let file_name = format!(
6045            "{}_{}_NODE_DUMP.json",
6046            self.tx_digest,
6047            AuthorityState::unixtime_now_ms()
6048        );
6049        let mut path = path.to_path_buf();
6050        path.push(&file_name);
6051        let mut file = File::create(path.clone())?;
6052        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6053        Ok(path)
6054    }
6055
6056    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6057        let file = File::open(path)?;
6058        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6059    }
6060}